Skip to content

Evaluation API

This module contains functions for model evaluation, cross-validation, and performance metrics.

Overview

The evaluation framework provides:

  • Time-series cross-validation - Expanding window splits
  • RMSPE metric - Root Mean Square Percentage Error
  • Feature importance - Analyze model behavior
  • Model comparison - Compare multiple models

Module Reference

Cross-Validation

Time-series cross-validation utilities for the Rossmann forecasting project.

CRITICAL: Uses expanding window splits to prevent data leakage. Each fold trains on all historical data up to the validation period.

filter_open_stores(df, open_col='Open', date_col='Date')

Filter dataset to only include days when stores were open.

This is important because: 1. Closed stores have Sales=0 which is ignored in RMSPE 2. Models shouldn't train on closed-store patterns

Parameters

df : pd.DataFrame Dataset with Open column open_col : str, default='Open' Name of open/closed flag column date_col : str, default='Date' Name of date column for sorting

Returns

pd.DataFrame Filtered dataset with only open stores (sorted by date with reset index)

Source code in src/evaluation/cv.py
def filter_open_stores(
    df: pd.DataFrame, open_col: str = "Open", date_col: str = "Date"
) -> pd.DataFrame:
    """Filter dataset to only include days when stores were open.

    This is important because:
    1. Closed stores have Sales=0 which is ignored in RMSPE
    2. Models shouldn't train on closed-store patterns

    Parameters
    ----------
    df : pd.DataFrame
        Dataset with Open column
    open_col : str, default='Open'
        Name of open/closed flag column
    date_col : str, default='Date'
        Name of date column for sorting

    Returns
    -------
    pd.DataFrame
        Filtered dataset with only open stores (sorted by date with reset index)
    """
    initial_size = len(df)
    df_filtered = df[df[open_col] == 1].copy()
    removed = initial_size - len(df_filtered)

    # Sort by date and reset index to ensure positional indices are sequential
    # This is critical for CV fold indices to work correctly
    df_filtered = df_filtered.sort_values(date_col).reset_index(drop=True)

    logger.info(f"Filtered out {removed:,} closed store-days ({removed / initial_size * 100:.2f}%)")
    logger.info(f"Remaining: {len(df_filtered):,} open store-days")

    return df_filtered

get_fold_summary(df, folds, date_col='Date')

Generate a summary dataframe of fold information.

Parameters

df : pd.DataFrame Dataset folds : list of tuple List of (train_indices, val_indices) date_col : str, default='Date' Name of date column

Returns

pd.DataFrame Summary with columns: fold, train_start, train_end, train_size, val_start, val_end, val_size

Source code in src/evaluation/cv.py
def get_fold_summary(
    df: pd.DataFrame, folds: list[tuple[np.ndarray, np.ndarray]], date_col: str = "Date"
) -> pd.DataFrame:
    """Generate a summary dataframe of fold information.

    Parameters
    ----------
    df : pd.DataFrame
        Dataset
    folds : list of tuple
        List of (train_indices, val_indices)
    date_col : str, default='Date'
        Name of date column

    Returns
    -------
    pd.DataFrame
        Summary with columns: fold, train_start, train_end, train_size,
        val_start, val_end, val_size
    """
    # Ensure date column is datetime type
    if not pd.api.types.is_datetime64_any_dtype(df[date_col]):
        df = df.copy()
        df[date_col] = pd.to_datetime(df[date_col])

    summary = []

    for fold_idx, (train_idx, val_idx) in enumerate(folds):
        fold_info = {
            "fold": fold_idx + 1,
            "train_start": df.iloc[train_idx[0]][date_col],
            "train_end": df.iloc[train_idx[-1]][date_col],
            "train_size": len(train_idx),
            "val_start": df.iloc[val_idx[0]][date_col],
            "val_end": df.iloc[val_idx[-1]][date_col],
            "val_size": len(val_idx),
        }

        # Calculate days
        fold_info["train_days"] = (fold_info["train_end"] - fold_info["train_start"]).days + 1
        fold_info["val_days"] = (fold_info["val_end"] - fold_info["val_start"]).days + 1

        summary.append(fold_info)

    return pd.DataFrame(summary)

make_time_series_folds(df, n_folds=5, fold_length_days=42, min_train_days=365, date_col='Date')

Create expanding window time-series cross-validation folds.

Each fold: - Trains on all historical data up to the validation period - Validates on the next fold_length_days (typically 6 weeks = 42 days) - Expanding window: training set grows with each fold

CRITICAL: This prevents data leakage by ensuring validation data is always in the future relative to training data.

Parameters

df : pd.DataFrame Dataset with date column (must be sorted by Store and Date) n_folds : int, default=5 Number of cross-validation folds fold_length_days : int, default=42 Length of validation period in days (6 weeks) min_train_days : int, default=365 Minimum number of days in first training fold (1 year) date_col : str, default='Date' Name of date column

Returns

list of tuple List of (train_indices, val_indices) for each fold

Example

Fold 1: Train [2013-01-01 to 2014-06-30], Val [2014-07-01 to 2014-08-11] Fold 2: Train [2013-01-01 to 2014-08-11], Val [2014-08-12 to 2014-09-22] Fold 3: Train [2013-01-01 to 2014-09-22], Val [2014-09-23 to 2014-11-03] ...

Source code in src/evaluation/cv.py
def make_time_series_folds(
    df: pd.DataFrame,
    n_folds: int = 5,
    fold_length_days: int = 42,
    min_train_days: int = 365,
    date_col: str = "Date",
) -> list[tuple[np.ndarray, np.ndarray]]:
    """Create expanding window time-series cross-validation folds.

    Each fold:
    - Trains on all historical data up to the validation period
    - Validates on the next fold_length_days (typically 6 weeks = 42 days)
    - Expanding window: training set grows with each fold

    CRITICAL: This prevents data leakage by ensuring validation data is
    always in the future relative to training data.

    Parameters
    ----------
    df : pd.DataFrame
        Dataset with date column (must be sorted by Store and Date)
    n_folds : int, default=5
        Number of cross-validation folds
    fold_length_days : int, default=42
        Length of validation period in days (6 weeks)
    min_train_days : int, default=365
        Minimum number of days in first training fold (1 year)
    date_col : str, default='Date'
        Name of date column

    Returns
    -------
    list of tuple
        List of (train_indices, val_indices) for each fold

    Example
    -------
    Fold 1: Train [2013-01-01 to 2014-06-30], Val [2014-07-01 to 2014-08-11]
    Fold 2: Train [2013-01-01 to 2014-08-11], Val [2014-08-12 to 2014-09-22]
    Fold 3: Train [2013-01-01 to 2014-09-22], Val [2014-09-23 to 2014-11-03]
    ...
    """
    logger.info("=" * 60)
    logger.info("Creating time-series cross-validation folds")
    logger.info("=" * 60)
    logger.info(f"Number of folds: {n_folds}")
    logger.info(f"Validation fold length: {fold_length_days} days ({fold_length_days // 7} weeks)")
    logger.info(f"Minimum training days: {min_train_days} days ({min_train_days // 365} year)")

    # Ensure date column is datetime type
    if not pd.api.types.is_datetime64_any_dtype(df[date_col]):
        df = df.copy()
        df[date_col] = pd.to_datetime(df[date_col])
        logger.info(f"Converted {date_col} to datetime type")

    # Ensure data is sorted by date
    df = df.sort_values(date_col).reset_index(drop=True)

    # Get unique dates and ensure they're datetime64 type
    unique_dates = pd.to_datetime(df[date_col].unique())
    unique_dates = pd.Series(unique_dates).sort_values().values
    # Convert to DatetimeIndex to maintain datetime type
    unique_dates = pd.DatetimeIndex(unique_dates)

    logger.info(f"Date range: {unique_dates.min()} to {unique_dates.max()}")
    logger.info(f"Total unique dates: {len(unique_dates)}")

    # Calculate fold boundaries
    # Start with min_train_days for first fold
    # Each subsequent fold adds fold_length_days to validation end
    folds = []

    # Find the date that is min_train_days from start
    start_date = unique_dates[0]
    first_val_start_date = start_date + pd.Timedelta(days=min_train_days)

    # Find closest actual date using searchsorted on DatetimeIndex
    first_val_start_idx = unique_dates.searchsorted(first_val_start_date)
    if first_val_start_idx >= len(unique_dates):
        raise ValueError(f"Not enough data for minimum training days ({min_train_days})")

    logger.info(f"\nFirst validation period starts at: {unique_dates[first_val_start_idx]}")

    for fold_idx in range(n_folds):
        # Calculate validation start date using date arithmetic (not index arithmetic)
        val_start_date = unique_dates[first_val_start_idx] + pd.Timedelta(
            days=fold_idx * fold_length_days
        )

        # Calculate validation end date
        val_end_date = val_start_date + pd.Timedelta(days=fold_length_days - 1)

        # Check if we have enough data
        if val_end_date > unique_dates[-1]:
            logger.warning(f"Not enough data for fold {fold_idx + 1}, stopping at {fold_idx} folds")
            break

        # Training period: everything before validation start
        train_mask = df[date_col] < val_start_date
        val_mask = (df[date_col] >= val_start_date) & (df[date_col] <= val_end_date)

        train_indices = np.where(train_mask)[0]
        val_indices = np.where(val_mask)[0]

        if len(train_indices) == 0 or len(val_indices) == 0:
            logger.warning(f"Fold {fold_idx + 1} has empty train or val set, skipping")
            continue

        folds.append((train_indices, val_indices))

        # Get actual date range for logging
        train_start = df.iloc[train_indices[0]][date_col]
        train_end = df.iloc[train_indices[-1]][date_col]
        val_start = df.iloc[val_indices[0]][date_col]
        val_end = df.iloc[val_indices[-1]][date_col]

        logger.info(f"\nFold {fold_idx + 1}:")
        logger.info(f"  Train: {train_start} to {train_end} ({len(train_indices):,} samples)")
        logger.info(f"  Val:   {val_start} to {val_end} ({len(val_indices):,} samples)")

    logger.info("=" * 60)
    logger.info(f"Created {len(folds)} time-series CV folds")
    logger.info("=" * 60)

    return folds

remove_missing_features(df, feature_cols)

Remove rows with missing values in feature columns.

This is necessary because lag/rolling features have NaN values for early dates where there's insufficient history.

Parameters

df : pd.DataFrame Dataset feature_cols : list of str Feature column names to check

Returns

tuple of (pd.DataFrame, list of str) (Filtered dataset, list of removed feature columns if any had all NaN)

Source code in src/evaluation/cv.py
def remove_missing_features(
    df: pd.DataFrame, feature_cols: list[str]
) -> tuple[pd.DataFrame, list[str]]:
    """Remove rows with missing values in feature columns.

    This is necessary because lag/rolling features have NaN values
    for early dates where there's insufficient history.

    Parameters
    ----------
    df : pd.DataFrame
        Dataset
    feature_cols : list of str
        Feature column names to check

    Returns
    -------
    tuple of (pd.DataFrame, list of str)
        (Filtered dataset, list of removed feature columns if any had all NaN)
    """
    initial_size = len(df)

    # Check for completely empty feature columns
    empty_cols = []
    for col in feature_cols:
        if col in df.columns and df[col].isna().all():
            empty_cols.append(col)
            logger.warning(f"Feature '{col}' is entirely NaN, will be dropped")

    # Remove empty columns from feature list
    valid_features = [col for col in feature_cols if col not in empty_cols]

    # Remove rows with any missing values in valid features
    df_clean = df.dropna(subset=valid_features).copy()

    removed = initial_size - len(df_clean)
    logger.info(
        f"Removed {removed:,} rows with missing features ({removed / initial_size * 100:.2f}%)"
    )
    logger.info(f"Remaining: {len(df_clean):,} complete rows")

    return df_clean, valid_features

Metrics

Evaluation metrics for the Rossmann forecasting project.

mae(y_true, y_pred)

Calculate Mean Absolute Error (MAE).

Parameters

y_true : array-like True target values y_pred : array-like Predicted values

Returns

float MAE score (lower is better)

Source code in src/evaluation/metrics.py
def mae(y_true: Union[np.ndarray, list], y_pred: Union[np.ndarray, list]) -> float:
    """Calculate Mean Absolute Error (MAE).

    Parameters
    ----------
    y_true : array-like
        True target values
    y_pred : array-like
        Predicted values

    Returns
    -------
    float
        MAE score (lower is better)
    """
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)

    return np.mean(np.abs(y_true - y_pred))

mape(y_true, y_pred, ignore_zero_sales=True)

Calculate Mean Absolute Percentage Error (MAPE).

Parameters

y_true : array-like True target values y_pred : array-like Predicted values ignore_zero_sales : bool, default=True If True, exclude observations where y_true == 0 from calculation

Returns

float MAPE score in percentage (lower is better)

Source code in src/evaluation/metrics.py
def mape(
    y_true: Union[np.ndarray, list], y_pred: Union[np.ndarray, list], ignore_zero_sales: bool = True
) -> float:
    """Calculate Mean Absolute Percentage Error (MAPE).

    Parameters
    ----------
    y_true : array-like
        True target values
    y_pred : array-like
        Predicted values
    ignore_zero_sales : bool, default=True
        If True, exclude observations where y_true == 0 from calculation

    Returns
    -------
    float
        MAPE score in percentage (lower is better)
    """
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)

    # Create mask to exclude zero sales if specified
    mask = y_true != 0 if ignore_zero_sales else np.ones_like(y_true, dtype=bool)

    return np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100

rmse(y_true, y_pred)

Calculate Root Mean Square Error (RMSE).

Parameters

y_true : array-like True target values y_pred : array-like Predicted values

Returns

float RMSE score (lower is better)

Source code in src/evaluation/metrics.py
def rmse(y_true: Union[np.ndarray, list], y_pred: Union[np.ndarray, list]) -> float:
    """Calculate Root Mean Square Error (RMSE).

    Parameters
    ----------
    y_true : array-like
        True target values
    y_pred : array-like
        Predicted values

    Returns
    -------
    float
        RMSE score (lower is better)
    """
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)

    return np.sqrt(np.mean(np.square(y_true - y_pred)))

rmspe(y_true, y_pred, ignore_zero_sales=True)

Calculate Root Mean Square Percentage Error (RMSPE).

This is the primary evaluation metric for the Rossmann competition. Observations where Sales = 0 are ignored by default.

Parameters

y_true : array-like True target values y_pred : array-like Predicted values ignore_zero_sales : bool, default=True If True, exclude observations where y_true == 0 from calculation

Returns

float RMSPE score (lower is better)

Notes

Formula: RMSPE = sqrt(mean(((y_true - y_pred) / y_true)^2))

Source code in src/evaluation/metrics.py
def rmspe(
    y_true: Union[np.ndarray, list], y_pred: Union[np.ndarray, list], ignore_zero_sales: bool = True
) -> float:
    """Calculate Root Mean Square Percentage Error (RMSPE).

    This is the primary evaluation metric for the Rossmann competition.
    Observations where Sales = 0 are ignored by default.

    Parameters
    ----------
    y_true : array-like
        True target values
    y_pred : array-like
        Predicted values
    ignore_zero_sales : bool, default=True
        If True, exclude observations where y_true == 0 from calculation

    Returns
    -------
    float
        RMSPE score (lower is better)

    Notes
    -----
    Formula: RMSPE = sqrt(mean(((y_true - y_pred) / y_true)^2))
    """
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)

    # Create mask to exclude zero sales if specified
    mask = y_true != 0 if ignore_zero_sales else np.ones_like(y_true, dtype=bool)

    # Calculate RMSPE only on masked values
    return np.sqrt(np.mean(np.square((y_true[mask] - y_pred[mask]) / y_true[mask])))

Usage Examples

Time-Series Cross-Validation

from src.evaluation.cv import make_time_series_folds

# Create expanding window folds
folds = make_time_series_folds(
    df=features_df,
    n_folds=3,
    validation_weeks=6,
    min_train_weeks=52
)

# Each fold contains:
for fold_idx, (train_idx, val_idx) in enumerate(folds):
    X_train, y_train = X.iloc[train_idx], y.iloc[train_idx]
    X_val, y_val = X.iloc[val_idx], y.iloc[val_idx]

    # Train model
    model.fit(X_train, y_train)

    # Evaluate
    preds = model.predict(X_val)
    fold_rmspe = rmspe(y_val, preds)
    print(f"Fold {fold_idx + 1} RMSPE: {fold_rmspe:.4f}")

RMSPE Calculation

from src.evaluation.metrics import rmspe

# Calculate RMSPE (ignores Sales=0 by default)
score = rmspe(y_true, y_pred, ignore_zero_sales=True)
print(f"RMSPE: {score:.4f}")

# Include Sales=0 in calculation (not recommended)
score_with_zeros = rmspe(y_true, y_pred, ignore_zero_sales=False)

Feature Importance

from src.evaluation.metrics import plot_feature_importance

# Plot top 20 most important features
plot_feature_importance(
    model=lgbm_model,
    feature_names=X_train.columns,
    top_n=20,
    save_path="outputs/figures/feature_importance.png"
)

Cross-Validation Strategy

Expanding Window Approach

Unlike standard K-fold CV, time-series CV uses an expanding window to prevent data leakage:

Fold 1:  [====================TRAIN====================][==VAL==]
Fold 2:  [==============================TRAIN==============================][==VAL==]
Fold 3:  [==========================================TRAIN==========================================][==VAL==]

Key properties:

  • Each validation fold is 6 weeks (42 days)
  • Training data expands with each fold
  • No overlap between validation periods
  • Mimics real-world forecasting scenario

Visualization

gantt
    title Time-Series Cross-Validation Strategy
    dateFormat YYYY-MM-DD
    section Fold 1
    Train :2013-01-01, 365d
    Validate :2014-01-01, 42d
    section Fold 2
    Train :2013-01-01, 457d
    Validate :2014-02-12, 42d
    section Fold 3
    Train :2013-01-01, 549d
    Validate :2014-03-26, 42d

Implementation

def make_time_series_folds(df, n_folds=3, validation_weeks=6, min_train_weeks=52):
    """Create expanding window time-series CV folds.

    Args:
        df: DataFrame with Date column
        n_folds: Number of CV folds
        validation_weeks: Weeks in each validation period
        min_train_weeks: Minimum weeks in first training fold

    Returns:
        List of (train_indices, val_indices) tuples
    """
    df = df.sort_values("Date").reset_index(drop=True)
    validation_days = validation_weeks * 7

    folds = []
    for fold in range(n_folds):
        # Calculate validation period
        val_start_day = min_train_weeks * 7 + fold * validation_days
        val_end_day = val_start_day + validation_days

        # Get indices
        train_idx = df[df["Date"] < df["Date"].min() + pd.Timedelta(days=val_start_day)].index
        val_idx = df[
            (df["Date"] >= df["Date"].min() + pd.Timedelta(days=val_start_day)) &
            (df["Date"] < df["Date"].min() + pd.Timedelta(days=val_end_day))
        ].index

        folds.append((train_idx, val_idx))

    return folds

Metrics

RMSPE (Root Mean Square Percentage Error)

The primary evaluation metric for Rossmann forecasting:

\[ \\text{RMSPE} = \\sqrt{\\frac{1}{n}\\sum\_{i=1}^{n}\\left(\\frac{y_i - \\hat{y}\_i}{y_i}\\right)^2} \]

where:

  • \(y_i\) = actual sales
  • \(\\hat{y}\_i\) = predicted sales
  • \(n\) = number of observations (excluding \(y_i = 0\))

Why RMSPE?

  • Penalizes percentage errors (relative, not absolute)
  • Fair comparison across stores with different sales volumes
  • Ignores closed stores (Sales=0)

Implementation:

def rmspe(y_true, y_pred, ignore_zero_sales=True):
    """Calculate Root Mean Square Percentage Error.

    Args:
        y_true: True sales values
        y_pred: Predicted sales values
        ignore_zero_sales: Exclude Sales=0 from calculation

    Returns:
        RMSPE score (lower is better)
    """
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    # Create mask to exclude zero sales
    if ignore_zero_sales:
        mask = y_true != 0
    else:
        mask = np.ones_like(y_true, dtype=bool)

    # Calculate percentage errors
    percentage_errors = (y_true[mask] - y_pred[mask]) / y_true[mask]

    # Return root mean square
    return np.sqrt(np.mean(percentage_errors ** 2))

Other Metrics

While RMSPE is the primary metric, these are useful for diagnostics:

from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

# MAE - interpretable absolute error
mae = mean_absolute_error(y_true, y_pred)

# RMSE - penalizes large errors
rmse = np.sqrt(mean_squared_error(y_true, y_pred))

# R² - variance explained
r2 = r2_score(y_true, y_pred)

# MAPE - mean absolute percentage error
mape = np.mean(np.abs((y_true - y_pred) / y_true))

Feature Importance Analysis

LightGBM Feature Importance

import lightgbm as lgb
import matplotlib.pyplot as plt

# Get feature importance
importance = model.feature_importance(importance_type="gain")
feature_names = X_train.columns

# Sort by importance
indices = np.argsort(importance)[::-1][:20]  # Top 20

# Plot
plt.figure(figsize=(10, 8))
plt.barh(range(20), importance[indices])
plt.yticks(range(20), feature_names[indices])
plt.xlabel("Feature Importance (Gain)")
plt.title("Top 20 Most Important Features")
plt.tight_layout()
plt.savefig("outputs/figures/feature_importance.png")

SHAP Values

import shap

# Create SHAP explainer
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_val)

# Summary plot
shap.summary_plot(shap_values, X_val, feature_names=X_val.columns)

# Dependence plot for top feature
shap.dependence_plot("DayOfWeek", shap_values, X_val)

Model Comparison

Comparing Multiple Models

import pandas as pd

# Collect results
results = []
for model_name, predictions in model_predictions.items():
    score = rmspe(y_true, predictions)
    results.append({
        "Model": model_name,
        "RMSPE": score
    })

# Create comparison table
results_df = pd.DataFrame(results).sort_values("RMSPE")
print(results_df)

# Example output:
#           Model    RMSPE
# 0   Ensemble    0.0978
# 1   LightGBM    0.1024
# 2   XGBoost     0.1031
# 3   CatBoost    0.1045
# 4   Baseline    0.1523

Validation Checks

Data Leakage Detection

def check_for_leakage(X_train, X_val, y_train, y_val):
    """Verify no data leakage in CV splits."""

    # Check 1: No overlapping indices
    assert len(set(X_train.index) & set(X_val.index)) == 0, "Overlapping indices!"

    # Check 2: Validation data is after training data
    assert X_train["Date"].max() < X_val["Date"].min(), "Temporal leakage!"

    # Check 3: No future features in lag columns
    for lag in [1, 7, 14, 28]:
        train_lag_max = X_train[f"lag_sales_{lag}"].max()
        val_first_sales = y_val.iloc[0]
        # Lag features should not know about validation period
        assert train_lag_max < val_first_sales * 1.5, f"Potential leakage in lag_{lag}"

    print("✓ No data leakage detected")

Key Functions

cv.py

  • make_time_series_folds() - Create expanding window CV splits
  • get_fold_date_ranges() - Get date range for each fold
  • validate_fold_integrity() - Check for data leakage

metrics.py

  • rmspe() - Calculate RMSPE metric
  • rmspe_lgb() - RMSPE for LightGBM (callback format)
  • plot_feature_importance() - Visualize feature importance
  • plot_predictions_vs_actual() - Diagnostic plots
  • Models - Model training using CV framework
  • Features - Features evaluated by importance analysis
  • Monitoring - Production model evaluation