Skip to content

Models API

This module contains model training functions for baseline and advanced models.

Overview

The modeling pipeline includes:

  • Baseline models - Naive forecasts and simple LightGBM
  • Advanced models - Tuned LightGBM, XGBoost, CatBoost
  • Ensemble models - Weighted blending and stacking

All models are evaluated using RMSPE (Root Mean Square Percentage Error) with time-series cross-validation.

Module Reference

Ensemble Model

Ensemble model for Rossmann sales forecasting.

This module provides a custom MLflow PyFunc wrapper that encapsulates LightGBM, XGBoost, and CatBoost models into a weighted ensemble for production deployment.

RossmannEnsemble

Bases: PythonModel

Custom MLflow PyFunc model that wraps LightGBM, XGBoost, and CatBoost models into a weighted ensemble for production deployment.

This allows the entire ensemble to be registered as a single deployable artifact in MLflow Model Registry, encapsulating all ensemble logic.

Parameters

lgb_model : lightgbm.Booster Trained LightGBM model xgb_model : xgboost.Booster Trained XGBoost model cb_model : catboost.CatBoost Trained CatBoost model weights : dict Ensemble weights for each model, e.g., {'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1} cat_features : list List of categorical feature names

Examples

ensemble = RossmannEnsemble( ... lgb_model=lgb_model, ... xgb_model=xgb_model, ... cb_model=cb_model, ... weights={'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1}, ... cat_features=['StoreType', 'Assortment', 'StateHoliday', 'PromoInterval'] ... ) predictions = ensemble.predict(None, X_test)

Source code in src/models/ensemble.py
class RossmannEnsemble(mlflow.pyfunc.PythonModel):
    """Custom MLflow PyFunc model that wraps LightGBM, XGBoost, and CatBoost models into a weighted
    ensemble for production deployment.

    This allows the entire ensemble to be registered as a single deployable
    artifact in MLflow Model Registry, encapsulating all ensemble logic.

    Parameters
    ----------
    lgb_model : lightgbm.Booster
        Trained LightGBM model
    xgb_model : xgboost.Booster
        Trained XGBoost model
    cb_model : catboost.CatBoost
        Trained CatBoost model
    weights : dict
        Ensemble weights for each model, e.g., {'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1}
    cat_features : list
        List of categorical feature names

    Examples
    --------
    >>> ensemble = RossmannEnsemble(
    ...     lgb_model=lgb_model,
    ...     xgb_model=xgb_model,
    ...     cb_model=cb_model,
    ...     weights={'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1},
    ...     cat_features=['StoreType', 'Assortment', 'StateHoliday', 'PromoInterval']
    ... )
    >>> predictions = ensemble.predict(None, X_test)
    """

    def __init__(self, lgb_model, xgb_model, cb_model, weights, cat_features):
        """Initialize the ensemble model with trained base models and weights.

        Parameters
        ----------
        lgb_model : lightgbm.Booster
            Trained LightGBM model
        xgb_model : xgboost.Booster
            Trained XGBoost model
        cb_model : catboost.CatBoost
            Trained CatBoost model
        weights : dict
            Ensemble weights, must sum to 1.0
        cat_features : list
            Categorical feature names
        """
        self.lgb_model = lgb_model
        self.xgb_model = xgb_model
        self.cb_model = cb_model
        self.weights = weights
        self.cat_features = cat_features

        # Validate weights sum to 1.0
        total_weight = sum(weights.values())
        if not np.isclose(total_weight, 1.0):
            raise ValueError(
                f"Ensemble weights must sum to 1.0, got {total_weight}. " f"Weights: {weights}"
            )

    def predict(self, context, model_input):
        """Generate ensemble predictions by combining predictions from all three models.

        Parameters
        ----------
        context : mlflow.pyfunc.PythonModelContext
            MLflow model context (unused, required by interface)
        model_input : pd.DataFrame
            Input features for prediction

        Returns
        -------
        np.ndarray
            Weighted ensemble predictions
        """
        import catboost as cb
        import xgboost as xgb

        # LightGBM predictions
        lgb_preds = self.lgb_model.predict(model_input)

        # XGBoost predictions (needs categorical encoding)
        X_xgb = model_input.copy()
        for col in X_xgb.columns:
            if X_xgb[col].dtype.name == "category":
                X_xgb[col] = X_xgb[col].cat.codes
        dmatrix = xgb.DMatrix(X_xgb)
        xgb_preds = self.xgb_model.predict(dmatrix)

        # CatBoost predictions
        pool = cb.Pool(model_input, cat_features=self.cat_features)
        cb_preds = self.cb_model.predict(pool)

        # Weighted ensemble
        ensemble_preds = (
            self.weights["lightgbm"] * lgb_preds
            + self.weights["xgboost"] * xgb_preds
            + self.weights["catboost"] * cb_preds
        )

        return ensemble_preds
__init__(lgb_model, xgb_model, cb_model, weights, cat_features)

Initialize the ensemble model with trained base models and weights.

Parameters

lgb_model : lightgbm.Booster Trained LightGBM model xgb_model : xgboost.Booster Trained XGBoost model cb_model : catboost.CatBoost Trained CatBoost model weights : dict Ensemble weights, must sum to 1.0 cat_features : list Categorical feature names

Source code in src/models/ensemble.py
def __init__(self, lgb_model, xgb_model, cb_model, weights, cat_features):
    """Initialize the ensemble model with trained base models and weights.

    Parameters
    ----------
    lgb_model : lightgbm.Booster
        Trained LightGBM model
    xgb_model : xgboost.Booster
        Trained XGBoost model
    cb_model : catboost.CatBoost
        Trained CatBoost model
    weights : dict
        Ensemble weights, must sum to 1.0
    cat_features : list
        Categorical feature names
    """
    self.lgb_model = lgb_model
    self.xgb_model = xgb_model
    self.cb_model = cb_model
    self.weights = weights
    self.cat_features = cat_features

    # Validate weights sum to 1.0
    total_weight = sum(weights.values())
    if not np.isclose(total_weight, 1.0):
        raise ValueError(
            f"Ensemble weights must sum to 1.0, got {total_weight}. " f"Weights: {weights}"
        )
predict(context, model_input)

Generate ensemble predictions by combining predictions from all three models.

Parameters

context : mlflow.pyfunc.PythonModelContext MLflow model context (unused, required by interface) model_input : pd.DataFrame Input features for prediction

Returns

np.ndarray Weighted ensemble predictions

Source code in src/models/ensemble.py
def predict(self, context, model_input):
    """Generate ensemble predictions by combining predictions from all three models.

    Parameters
    ----------
    context : mlflow.pyfunc.PythonModelContext
        MLflow model context (unused, required by interface)
    model_input : pd.DataFrame
        Input features for prediction

    Returns
    -------
    np.ndarray
        Weighted ensemble predictions
    """
    import catboost as cb
    import xgboost as xgb

    # LightGBM predictions
    lgb_preds = self.lgb_model.predict(model_input)

    # XGBoost predictions (needs categorical encoding)
    X_xgb = model_input.copy()
    for col in X_xgb.columns:
        if X_xgb[col].dtype.name == "category":
            X_xgb[col] = X_xgb[col].cat.codes
    dmatrix = xgb.DMatrix(X_xgb)
    xgb_preds = self.xgb_model.predict(dmatrix)

    # CatBoost predictions
    pool = cb.Pool(model_input, cat_features=self.cat_features)
    cb_preds = self.cb_model.predict(pool)

    # Weighted ensemble
    ensemble_preds = (
        self.weights["lightgbm"] * lgb_preds
        + self.weights["xgboost"] * xgb_preds
        + self.weights["catboost"] * cb_preds
    )

    return ensemble_preds

create_ensemble(lgb_model, xgb_model, cb_model, weights=None, cat_features=None)

Factory function to create a RossmannEnsemble instance.

Parameters

lgb_model : lightgbm.Booster Trained LightGBM model xgb_model : xgboost.Booster Trained XGBoost model cb_model : catboost.CatBoost Trained CatBoost model weights : dict, optional Ensemble weights. Default: {'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1} cat_features : list, optional Categorical feature names. Default: ['StoreType', 'Assortment', 'StateHoliday', 'PromoInterval']

Returns

RossmannEnsemble Initialized ensemble model

Examples

ensemble = create_ensemble(lgb_model, xgb_model, cb_model) predictions = ensemble.predict(None, X_test)

Source code in src/models/ensemble.py
def create_ensemble(lgb_model, xgb_model, cb_model, weights=None, cat_features=None):
    """Factory function to create a RossmannEnsemble instance.

    Parameters
    ----------
    lgb_model : lightgbm.Booster
        Trained LightGBM model
    xgb_model : xgboost.Booster
        Trained XGBoost model
    cb_model : catboost.CatBoost
        Trained CatBoost model
    weights : dict, optional
        Ensemble weights. Default: {'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1}
    cat_features : list, optional
        Categorical feature names. Default: ['StoreType', 'Assortment', 'StateHoliday', 'PromoInterval']

    Returns
    -------
    RossmannEnsemble
        Initialized ensemble model

    Examples
    --------
    >>> ensemble = create_ensemble(lgb_model, xgb_model, cb_model)
    >>> predictions = ensemble.predict(None, X_test)
    """
    if weights is None:
        weights = {"lightgbm": 0.3, "xgboost": 0.6, "catboost": 0.1}

    if cat_features is None:
        cat_features = ["StoreType", "Assortment", "StateHoliday", "PromoInterval"]

    return RossmannEnsemble(
        lgb_model=lgb_model,
        xgb_model=xgb_model,
        cb_model=cb_model,
        weights=weights,
        cat_features=cat_features,
    )

Model Registry

MLflow Model Registry utilities for Rossmann forecasting.

This module provides helper functions for managing models in MLflow Model Registry, including registration, promotion, and loading models.

get_mlflow_client()

Get an MLflow tracking client.

Returns

MlflowClient Initialized MLflow client

Source code in src/models/model_registry.py
def get_mlflow_client() -> MlflowClient:
    """Get an MLflow tracking client.

    Returns
    -------
    MlflowClient
        Initialized MLflow client
    """
    return MlflowClient()

get_model_info(model_name, version=None)

Get detailed information about a registered model.

Parameters

model_name : str Name of the registered model version : str, optional Model version. If None, gets info for all versions

Returns

dict Model information including version, stage, metrics, etc.

Examples

info = get_model_info('rossmann-ensemble', version='1') print(f"Model stage: {info['current_stage']}") print(f"Run ID: {info['run_id']}")

Source code in src/models/model_registry.py
def get_model_info(model_name: str, version: Optional[str] = None) -> dict[str, Any]:
    """Get detailed information about a registered model.

    Parameters
    ----------
    model_name : str
        Name of the registered model
    version : str, optional
        Model version. If None, gets info for all versions

    Returns
    -------
    dict
        Model information including version, stage, metrics, etc.

    Examples
    --------
    >>> info = get_model_info('rossmann-ensemble', version='1')
    >>> print(f"Model stage: {info['current_stage']}")
    >>> print(f"Run ID: {info['run_id']}")
    """
    client = get_mlflow_client()

    if version:
        mv = client.get_model_version(name=model_name, version=version)
        return {
            "name": mv.name,
            "version": mv.version,
            "current_stage": mv.current_stage,
            "description": mv.description,
            "run_id": mv.run_id,
            "status": mv.status,
            "creation_timestamp": mv.creation_timestamp,
            "last_updated_timestamp": mv.last_updated_timestamp,
        }
    else:
        # Get all versions
        model = client.get_registered_model(model_name)
        versions = client.search_model_versions(f"name='{model_name}'")

        return {
            "name": model.name,
            "description": model.description,
            "creation_timestamp": model.creation_timestamp,
            "last_updated_timestamp": model.last_updated_timestamp,
            "versions": [
                {
                    "version": mv.version,
                    "stage": mv.current_stage,
                    "run_id": mv.run_id,
                    "status": mv.status,
                }
                for mv in versions
            ],
        }

get_model_version(model_name, stage='Production')

Get the version number of a model in a specific stage.

Parameters

model_name : str Name of the registered model stage : str, default='Production' Model stage ('Staging', 'Production', or 'None')

Returns

str or None Model version number, or None if no model in stage

Examples

version = get_model_version('rossmann-ensemble', stage='Production') print(f"Production version: {version}")

Source code in src/models/model_registry.py
def get_model_version(model_name: str, stage: str = "Production") -> Optional[str]:
    """Get the version number of a model in a specific stage.

    Parameters
    ----------
    model_name : str
        Name of the registered model
    stage : str, default='Production'
        Model stage ('Staging', 'Production', or 'None')

    Returns
    -------
    str or None
        Model version number, or None if no model in stage

    Examples
    --------
    >>> version = get_model_version('rossmann-ensemble', stage='Production')
    >>> print(f"Production version: {version}")
    """
    client = get_mlflow_client()
    versions = client.get_latest_versions(model_name, stages=[stage])

    if not versions:
        logger.warning(f"No model found for {model_name} in {stage} stage")
        return None

    version = versions[0].version
    logger.info(f"{model_name} {stage} version: {version}")
    return version

list_registered_models()

List all registered models in MLflow Model Registry.

Returns

list List of registered model names

Examples

models = list_registered_models() for model_name in models: ... print(model_name)

Source code in src/models/model_registry.py
def list_registered_models() -> list:
    """List all registered models in MLflow Model Registry.

    Returns
    -------
    list
        List of registered model names

    Examples
    --------
    >>> models = list_registered_models()
    >>> for model_name in models:
    ...     print(model_name)
    """
    client = get_mlflow_client()
    registered_models = client.search_registered_models()

    model_names = [rm.name for rm in registered_models]
    logger.info(f"Found {len(model_names)} registered models")

    return model_names

load_model(model_name, stage='Production')

Load a model from MLflow Model Registry.

Parameters

model_name : str Name of the registered model stage : str, default='Production' Model stage to load ('Staging', 'Production', or version number)

Returns

mlflow.pyfunc.PyFuncModel Loaded model ready for predictions

Examples
Load production model

model = load_model('rossmann-ensemble', stage='Production') predictions = model.predict(X_test)

Load specific version

model = load_model('rossmann-ensemble', stage='1') predictions = model.predict(X_test)

Source code in src/models/model_registry.py
def load_model(model_name: str, stage: str = "Production"):
    """Load a model from MLflow Model Registry.

    Parameters
    ----------
    model_name : str
        Name of the registered model
    stage : str, default='Production'
        Model stage to load ('Staging', 'Production', or version number)

    Returns
    -------
    mlflow.pyfunc.PyFuncModel
        Loaded model ready for predictions

    Examples
    --------
    >>> # Load production model
    >>> model = load_model('rossmann-ensemble', stage='Production')
    >>> predictions = model.predict(X_test)
    >>>
    >>> # Load specific version
    >>> model = load_model('rossmann-ensemble', stage='1')
    >>> predictions = model.predict(X_test)
    """
    if stage in ["Staging", "Production", "Archived"]:
        model_uri = f"models:/{model_name}/{stage}"
    else:
        # Assume it's a version number
        model_uri = f"models:/{model_name}/{stage}"

    logger.info(f"Loading model from: {model_uri}")
    model = mlflow.pyfunc.load_model(model_uri)
    logger.info("✓ Model loaded successfully")

    return model

promote_model(model_name, version, stage, archive_existing=True)

Promote a model version to a specific stage.

Parameters

model_name : str Name of the registered model version : str Model version to promote stage : str Target stage ('Staging', 'Production', or 'Archived') archive_existing : bool, default=True If True, archive existing models in the target stage

Examples
Promote to Staging

promote_model('rossmann-ensemble', version='1', stage='Staging')

Promote to Production

promote_model('rossmann-ensemble', version='2', stage='Production')

Source code in src/models/model_registry.py
def promote_model(
    model_name: str,
    version: str,
    stage: str,
    archive_existing: bool = True,
) -> None:
    """Promote a model version to a specific stage.

    Parameters
    ----------
    model_name : str
        Name of the registered model
    version : str
        Model version to promote
    stage : str
        Target stage ('Staging', 'Production', or 'Archived')
    archive_existing : bool, default=True
        If True, archive existing models in the target stage

    Examples
    --------
    >>> # Promote to Staging
    >>> promote_model('rossmann-ensemble', version='1', stage='Staging')
    >>> # Promote to Production
    >>> promote_model('rossmann-ensemble', version='2', stage='Production')
    """
    client = get_mlflow_client()

    # Archive existing models in the target stage if requested
    if archive_existing and stage != "Archived":
        existing_versions = client.get_latest_versions(model_name, stages=[stage])
        for mv in existing_versions:
            logger.info(f"Archiving {model_name} version {mv.version} (was in {stage})")
            client.transition_model_version_stage(
                name=model_name, version=mv.version, stage="Archived"
            )

    # Promote the specified version
    logger.info(f"Promoting {model_name} version {version} to {stage}")
    client.transition_model_version_stage(name=model_name, version=version, stage=stage)
    logger.info(f"✓ {model_name} version {version} is now in {stage}")

register_ensemble_model(ensemble_model, model_name, run_id=None, conda_env=None, signature=None, input_example=None, description=None)

Register an ensemble model to MLflow Model Registry.

Parameters

ensemble_model : RossmannEnsemble Trained ensemble model instance model_name : str Name for the registered model (e.g., 'rossmann-ensemble') run_id : str, optional MLflow run ID. If None, uses active run conda_env : dict, optional Conda environment specification signature : mlflow.models.ModelSignature, optional Model signature input_example : pd.DataFrame, optional Example input for model testing description : str, optional Model description

Returns

str Model version number

Examples

from models.ensemble import create_ensemble ensemble = create_ensemble(lgb_model, xgb_model, cb_model) version = register_ensemble_model( ... ensemble_model=ensemble, ... model_name='rossmann-ensemble', ... signature=signature, ... input_example=X_sample ... ) print(f"Registered model version: {version}")

Source code in src/models/model_registry.py
def register_ensemble_model(
    ensemble_model,
    model_name: str,
    run_id: Optional[str] = None,
    conda_env: Optional[dict[str, Any]] = None,
    signature=None,
    input_example=None,
    description: Optional[str] = None,
) -> str:
    """Register an ensemble model to MLflow Model Registry.

    Parameters
    ----------
    ensemble_model : RossmannEnsemble
        Trained ensemble model instance
    model_name : str
        Name for the registered model (e.g., 'rossmann-ensemble')
    run_id : str, optional
        MLflow run ID. If None, uses active run
    conda_env : dict, optional
        Conda environment specification
    signature : mlflow.models.ModelSignature, optional
        Model signature
    input_example : pd.DataFrame, optional
        Example input for model testing
    description : str, optional
        Model description

    Returns
    -------
    str
        Model version number

    Examples
    --------
    >>> from models.ensemble import create_ensemble
    >>> ensemble = create_ensemble(lgb_model, xgb_model, cb_model)
    >>> version = register_ensemble_model(
    ...     ensemble_model=ensemble,
    ...     model_name='rossmann-ensemble',
    ...     signature=signature,
    ...     input_example=X_sample
    ... )
    >>> print(f"Registered model version: {version}")
    """
    logger.info(f"Registering ensemble model: {model_name}")

    # Log the model
    model_info = mlflow.pyfunc.log_model(
        artifact_path="ensemble_model",
        python_model=ensemble_model,
        registered_model_name=model_name,
        conda_env=conda_env,
        signature=signature,
        input_example=input_example,
    )

    # Get version from model info
    version = model_info.registered_model_version

    # Update description if provided
    if description:
        client = get_mlflow_client()
        client.update_model_version(name=model_name, version=version, description=description)

    logger.info(f"✓ Registered {model_name} version {version}")
    return version

Training Pipeline

Production ensemble model training pipeline for Rossmann forecasting.

This module provides the main training workflow for retraining the ensemble model with the latest data and best hyperparameters from Optuna tuning.

load_best_hyperparameters(config_path='config/best_hyperparameters.json')

Load best hyperparameters from Optuna tuning.

Parameters

config_path : str Path to best hyperparameters JSON file

Returns

dict Hyperparameters for each model type

Source code in src/models/train_ensemble.py
def load_best_hyperparameters(
    config_path: str = "config/best_hyperparameters.json",
) -> dict:
    """Load best hyperparameters from Optuna tuning.

    Parameters
    ----------
    config_path : str
        Path to best hyperparameters JSON file

    Returns
    -------
    dict
        Hyperparameters for each model type
    """
    logger.info(f"Loading hyperparameters from: {config_path}")
    with open(config_path) as f:
        best_params = json.load(f)

    logger.info(f"Best model: {best_params['metadata']['best_model']}")
    logger.info(f"Best CV RMSPE: {best_params['metadata']['best_rmspe']:.6f}")

    return best_params

load_training_data(data_path='data/processed/train_features.parquet', holdout_days=42)

Load and split training data into train and holdout sets.

Parameters

data_path : str Path to the processed features parquet file holdout_days : int, default=42 Number of days to use for holdout validation (6 weeks)

Returns

tuple of pd.DataFrame (train_df, holdout_df)

Source code in src/models/train_ensemble.py
def load_training_data(
    data_path: str = "data/processed/train_features.parquet",
    holdout_days: int = 42,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Load and split training data into train and holdout sets.

    Parameters
    ----------
    data_path : str
        Path to the processed features parquet file
    holdout_days : int, default=42
        Number of days to use for holdout validation (6 weeks)

    Returns
    -------
    tuple of pd.DataFrame
        (train_df, holdout_df)
    """
    logger.info(f"Loading data from: {data_path}")
    df = read_parquet(data_path)

    logger.info(f"Loaded data shape: {df.shape}")
    logger.info(f"Date range: {df['Date'].min()} to {df['Date'].max()}")

    # Create train/holdout split
    max_date = df["Date"].max()
    holdout_start = max_date - pd.Timedelta(days=holdout_days - 1)

    train_df = df[df["Date"] < holdout_start].copy()
    holdout_df = df[df["Date"] >= holdout_start].copy()

    logger.info(f"Train set: {len(train_df):,} rows")
    logger.info(f"  Date range: {train_df['Date'].min()} to {train_df['Date'].max()}")
    logger.info(f"Holdout set: {len(holdout_df):,} rows")
    logger.info(f"  Date range: {holdout_df['Date'].min()} to {holdout_df['Date'].max()}")

    return train_df, holdout_df

main(data_path='data/processed/train_features.parquet', config_path='config/best_hyperparameters.json', model_name='rossmann-ensemble', ensemble_weights=None, run_name='production_ensemble_training')

Main training pipeline for ensemble model.

Parameters

data_path : str Path to processed features config_path : str Path to best hyperparameters JSON model_name : str Name for registered model in MLflow ensemble_weights : dict, optional Ensemble weights. Default: {'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1} run_name : str Name for MLflow run

Returns

str Model version number

Source code in src/models/train_ensemble.py
def main(
    data_path: str = "data/processed/train_features.parquet",
    config_path: str = "config/best_hyperparameters.json",
    model_name: str = "rossmann-ensemble",
    ensemble_weights: Optional[dict[str, float]] = None,
    run_name: str = "production_ensemble_training",
):
    """Main training pipeline for ensemble model.

    Parameters
    ----------
    data_path : str
        Path to processed features
    config_path : str
        Path to best hyperparameters JSON
    model_name : str
        Name for registered model in MLflow
    ensemble_weights : dict, optional
        Ensemble weights. Default: {'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1}
    run_name : str
        Name for MLflow run

    Returns
    -------
    str
        Model version number
    """
    # Default ensemble weights
    if ensemble_weights is None:
        ensemble_weights = {"lightgbm": 0.30, "xgboost": 0.60, "catboost": 0.10}

    logger.info("=" * 70)
    logger.info("PRODUCTION ENSEMBLE MODEL TRAINING")
    logger.info("=" * 70)

    # Setup MLflow
    experiment_id = setup_mlflow()
    logger.info(f"MLflow experiment ID: {experiment_id}")

    # Start MLflow run
    with mlflow.start_run(run_name=run_name) as run:
        logger.info(f"MLflow run ID: {run.info.run_id}")

        # Log DVC data version
        log_dvc_data_version(data_path)

        # Load data
        train_df, holdout_df = load_training_data(data_path)

        # Load best hyperparameters
        best_params = load_best_hyperparameters(config_path)

        # Prepare training data
        X_train, y_train, feature_cols, cat_features = prepare_training_data(train_df)

        # Log dataset info
        mlflow.log_param("train_size", len(train_df))
        mlflow.log_param("holdout_size", len(holdout_df))
        mlflow.log_param("n_features", len(feature_cols))

        # Log ensemble weights
        for model_type, weight in ensemble_weights.items():
            mlflow.log_param(f"{model_type}_weight", weight)

        # Train individual models
        logger.info("\nTraining individual models...")
        lgb_model = train_lightgbm(X_train, y_train, best_params, cat_features)
        xgb_model = train_xgboost(X_train, y_train, best_params)
        cb_model = train_catboost(X_train, y_train, best_params, cat_features)

        logger.info("✓ All models trained successfully!")

        # Create ensemble
        logger.info("\nCreating ensemble model...")
        ensemble = create_ensemble(
            lgb_model=lgb_model,
            xgb_model=xgb_model,
            cb_model=cb_model,
            weights=ensemble_weights,
            cat_features=cat_features,
        )

        # Prepare holdout data for input example
        from evaluation.cv import remove_missing_features

        holdout_data = holdout_df[holdout_df["Open"] == 1].copy()
        holdout_data, _ = remove_missing_features(holdout_data, feature_cols)
        X_holdout = holdout_data[feature_cols]

        # Note: We don't create a signature here due to categorical dtype issues
        # MLflow will infer it automatically when the model is saved

        # Define conda environment
        conda_env = {
            "channels": ["conda-forge", "defaults"],
            "dependencies": [
                f"python={sys.version_info.major}.{sys.version_info.minor}",
                "pip",
                {
                    "pip": [
                        f"lightgbm=={lgb.__version__}",
                        f"xgboost=={xgb.__version__}",
                        f"catboost=={cb.__version__}",
                        "pandas",
                        "numpy",
                        "scikit-learn",
                    ]
                },
            ],
            "name": "rossmann_ensemble_env",
        }

        # Register ensemble model
        logger.info(f"\nRegistering ensemble model: {model_name}")
        version = register_ensemble_model(
            ensemble_model=ensemble,
            model_name=model_name,
            conda_env=conda_env,
            signature=None,  # Let MLflow infer automatically
            input_example=X_holdout.head(5),
            description=f"Rossmann ensemble (weights: {ensemble_weights})",
        )

        logger.info("=" * 70)
        logger.info(f"✓ Training complete! Model version: {version}")
        logger.info("=" * 70)

        return version

prepare_training_data(train_df)

Prepare training data by filtering open stores and extracting features.

Parameters

train_df : pd.DataFrame Raw training data

Returns

tuple (X_train, y_train, feature_cols, cat_features)

Source code in src/models/train_ensemble.py
def prepare_training_data(
    train_df: pd.DataFrame,
) -> tuple[pd.DataFrame, np.ndarray, list, list]:
    """Prepare training data by filtering open stores and extracting features.

    Parameters
    ----------
    train_df : pd.DataFrame
        Raw training data

    Returns
    -------
    tuple
        (X_train, y_train, feature_cols, cat_features)
    """
    # Filter to open stores only
    train_data = train_df[train_df["Open"] == 1].copy()

    # Define feature columns
    exclude_cols = ["Sales", "Date", "Store", "Customers"]
    feature_cols = [col for col in train_df.columns if col not in exclude_cols]

    # Remove rows with missing features
    train_data = train_data.dropna(subset=feature_cols)

    X_train = train_data[feature_cols]
    y_train = train_data["Sales"].values

    # Identify categorical features
    cat_features = [col for col in feature_cols if X_train[col].dtype.name == "category"]

    logger.info(f"Training data: {len(train_data):,} rows (open stores only)")
    logger.info(f"Features: {len(feature_cols)}")
    logger.info(f"Categorical features: {len(cat_features)}")

    return X_train, y_train, feature_cols, cat_features

train_catboost(X_train, y_train, best_params, cat_features)

Train CatBoost model with best hyperparameters.

Parameters

X_train : pd.DataFrame Training features y_train : np.ndarray Training targets best_params : dict Best hyperparameters from Optuna cat_features : list Categorical feature names

Returns

cb.CatBoost Trained CatBoost model

Source code in src/models/train_ensemble.py
def train_catboost(
    X_train: pd.DataFrame,
    y_train: np.ndarray,
    best_params: dict,
    cat_features: list,
) -> cb.CatBoost:
    """Train CatBoost model with best hyperparameters.

    Parameters
    ----------
    X_train : pd.DataFrame
        Training features
    y_train : np.ndarray
        Training targets
    best_params : dict
        Best hyperparameters from Optuna
    cat_features : list
        Categorical feature names

    Returns
    -------
    cb.CatBoost
        Trained CatBoost model
    """
    logger.info("Training CatBoost...")
    start_time = time.time()

    cb_params = {
        "loss_function": "RMSE",
        "eval_metric": "RMSE",
        "depth": int(best_params["catboost"]["hyperparameters"]["depth"]),
        "learning_rate": best_params["catboost"]["hyperparameters"]["learning_rate"],
        "l2_leaf_reg": best_params["catboost"]["hyperparameters"]["l2_leaf_reg"],
        "random_strength": best_params["catboost"]["hyperparameters"]["random_strength"],
        "bagging_temperature": best_params["catboost"]["hyperparameters"]["bagging_temperature"],
        "border_count": int(best_params["catboost"]["hyperparameters"]["border_count"]),
        "iterations": 1500,
        "verbose": False,
        "random_seed": 42,
    }

    # Log hyperparameters to MLflow
    for key, value in cb_params.items():
        mlflow.log_param(f"cb_{key}", value)

    train_pool = cb.Pool(X_train, label=y_train, cat_features=cat_features)
    cb_model = cb.CatBoost(cb_params)
    cb_model.fit(train_pool)

    cb_time = time.time() - start_time
    logger.info(f"CatBoost training complete in {cb_time:.2f}s")
    mlflow.log_metric("cb_train_time_seconds", cb_time)

    return cb_model

train_lightgbm(X_train, y_train, best_params, cat_features)

Train LightGBM model with best hyperparameters.

Parameters

X_train : pd.DataFrame Training features y_train : np.ndarray Training targets best_params : dict Best hyperparameters from Optuna cat_features : list Categorical feature names

Returns

lgb.Booster Trained LightGBM model

Source code in src/models/train_ensemble.py
def train_lightgbm(
    X_train: pd.DataFrame,
    y_train: np.ndarray,
    best_params: dict,
    cat_features: list,
) -> lgb.Booster:
    """Train LightGBM model with best hyperparameters.

    Parameters
    ----------
    X_train : pd.DataFrame
        Training features
    y_train : np.ndarray
        Training targets
    best_params : dict
        Best hyperparameters from Optuna
    cat_features : list
        Categorical feature names

    Returns
    -------
    lgb.Booster
        Trained LightGBM model
    """
    logger.info("Training LightGBM...")
    start_time = time.time()

    lgb_params = {
        "objective": "regression",
        "metric": "rmse",
        "boosting_type": "gbdt",
        "num_leaves": int(best_params["lightgbm"]["hyperparameters"]["num_leaves"]),
        "learning_rate": best_params["lightgbm"]["hyperparameters"]["learning_rate"],
        "feature_fraction": best_params["lightgbm"]["hyperparameters"]["feature_fraction"],
        "bagging_fraction": best_params["lightgbm"]["hyperparameters"]["bagging_fraction"],
        "bagging_freq": int(best_params["lightgbm"]["hyperparameters"]["bagging_freq"]),
        "max_depth": int(best_params["lightgbm"]["hyperparameters"]["max_depth"]),
        "min_child_samples": int(best_params["lightgbm"]["hyperparameters"]["min_child_samples"]),
        "reg_alpha": best_params["lightgbm"]["hyperparameters"]["reg_alpha"],
        "reg_lambda": best_params["lightgbm"]["hyperparameters"]["reg_lambda"],
        "verbose": -1,
        "seed": 42,
    }

    # Log hyperparameters to MLflow
    for key, value in lgb_params.items():
        mlflow.log_param(f"lgb_{key}", value)

    lgb_train = lgb.Dataset(X_train, label=y_train, categorical_feature=cat_features)
    lgb_model = lgb.train(
        lgb_params, lgb_train, num_boost_round=1600, callbacks=[lgb.log_evaluation(period=0)]
    )

    lgb_time = time.time() - start_time
    logger.info(f"LightGBM training complete in {lgb_time:.2f}s")
    mlflow.log_metric("lgb_train_time_seconds", lgb_time)

    return lgb_model

train_xgboost(X_train, y_train, best_params)

Train XGBoost model with best hyperparameters.

Parameters

X_train : pd.DataFrame Training features y_train : np.ndarray Training targets best_params : dict Best hyperparameters from Optuna

Returns

xgb.Booster Trained XGBoost model

Source code in src/models/train_ensemble.py
def train_xgboost(X_train: pd.DataFrame, y_train: np.ndarray, best_params: dict) -> xgb.Booster:
    """Train XGBoost model with best hyperparameters.

    Parameters
    ----------
    X_train : pd.DataFrame
        Training features
    y_train : np.ndarray
        Training targets
    best_params : dict
        Best hyperparameters from Optuna

    Returns
    -------
    xgb.Booster
        Trained XGBoost model
    """
    logger.info("Training XGBoost...")
    start_time = time.time()

    # XGBoost needs categorical features as codes
    X_train_xgb = X_train.copy()
    for col in X_train_xgb.columns:
        if X_train_xgb[col].dtype.name == "category":
            X_train_xgb[col] = X_train_xgb[col].cat.codes

    xgb_params = {
        "objective": "reg:squarederror",
        "eval_metric": "rmse",
        "max_depth": int(best_params["xgboost"]["hyperparameters"]["max_depth"]),
        "learning_rate": best_params["xgboost"]["hyperparameters"]["learning_rate"],
        "subsample": best_params["xgboost"]["hyperparameters"]["subsample"],
        "colsample_bytree": best_params["xgboost"]["hyperparameters"]["colsample_bytree"],
        "min_child_weight": int(best_params["xgboost"]["hyperparameters"]["min_child_weight"]),
        "reg_alpha": best_params["xgboost"]["hyperparameters"]["reg_alpha"],
        "reg_lambda": best_params["xgboost"]["hyperparameters"]["reg_lambda"],
        "gamma": best_params["xgboost"]["hyperparameters"]["gamma"],
        "seed": 42,
        "verbosity": 0,
    }

    # Log hyperparameters to MLflow
    for key, value in xgb_params.items():
        mlflow.log_param(f"xgb_{key}", value)

    dtrain = xgb.DMatrix(X_train_xgb, label=y_train)
    xgb_model = xgb.train(xgb_params, dtrain, num_boost_round=1600, verbose_eval=False)

    xgb_time = time.time() - start_time
    logger.info(f"XGBoost training complete in {xgb_time:.2f}s")
    mlflow.log_metric("xgb_train_time_seconds", xgb_time)

    return xgb_model

Model Validation

Model validation and promotion for Rossmann forecasting.

This module validates a candidate model against performance thresholds and promotes it to Staging if it passes all checks.

check_performance_threshold(metrics, threshold_rmspe=0.1, strict=False)

Check if model meets performance thresholds.

Parameters

metrics : dict Model evaluation metrics threshold_rmspe : float, default=0.10 Maximum acceptable RMSPE (10% error threshold) strict : bool, default=False If True, use stricter threshold (0.09856 - top 50 leaderboard)

Returns

bool True if model passes threshold checks

Source code in src/models/validate_model.py
def check_performance_threshold(
    metrics: dict[str, float],
    threshold_rmspe: float = 0.10,
    strict: bool = False,
) -> bool:
    """Check if model meets performance thresholds.

    Parameters
    ----------
    metrics : dict
        Model evaluation metrics
    threshold_rmspe : float, default=0.10
        Maximum acceptable RMSPE (10% error threshold)
    strict : bool, default=False
        If True, use stricter threshold (0.09856 - top 50 leaderboard)

    Returns
    -------
    bool
        True if model passes threshold checks
    """
    if strict:
        threshold_rmspe = 0.09856  # Top 50 Kaggle leaderboard

    logger.info("=" * 70)
    logger.info("PERFORMANCE THRESHOLD CHECKS")
    logger.info("=" * 70)

    # Check RMSPE
    rmspe_pass = metrics["rmspe"] <= threshold_rmspe
    logger.info(
        f"RMSPE: {metrics['rmspe']:.6f} <= {threshold_rmspe:.6f} "
        f"{'✓ PASS' if rmspe_pass else '✗ FAIL'}"
    )

    # Check for reasonable predictions (not extreme values)
    predictions_reasonable = metrics["mae"] < 2000  # Average error less than $2000 seems reasonable
    logger.info(
        f"MAE: {metrics['mae']:.2f} < 2000 " f"{'✓ PASS' if predictions_reasonable else '✗ FAIL'}"
    )

    all_pass = rmspe_pass and predictions_reasonable

    logger.info("=" * 70)
    if all_pass:
        logger.info("✓ ALL CHECKS PASSED")
    else:
        logger.info("✗ VALIDATION FAILED")
    logger.info("=" * 70)

    return all_pass

evaluate_model(model, X_test, y_test)

Evaluate model on test data.

Parameters

model : mlflow.pyfunc.PyFuncModel Loaded model X_test : pd.DataFrame Test features y_test : np.ndarray Test targets

Returns

dict Evaluation metrics

Source code in src/models/validate_model.py
def evaluate_model(model, X_test: pd.DataFrame, y_test: np.ndarray) -> dict[str, float]:
    """Evaluate model on test data.

    Parameters
    ----------
    model : mlflow.pyfunc.PyFuncModel
        Loaded model
    X_test : pd.DataFrame
        Test features
    y_test : np.ndarray
        Test targets

    Returns
    -------
    dict
        Evaluation metrics
    """
    logger.info("Generating predictions...")
    predictions = model.predict(X_test)

    logger.info("Calculating metrics...")
    metrics = {
        "rmspe": float(rmspe(y_test, predictions)),
        "rmse": float(np.sqrt(np.mean((y_test - predictions) ** 2))),
        "mae": float(np.mean(np.abs(y_test - predictions))),
        "mape": float(np.mean(np.abs((y_test - predictions) / y_test)) * 100),
    }

    logger.info("Model Performance:")
    logger.info(f"  RMSPE: {metrics['rmspe']:.6f}")
    logger.info(f"  RMSE:  {metrics['rmse']:.2f}")
    logger.info(f"  MAE:   {metrics['mae']:.2f}")
    logger.info(f"  MAPE:  {metrics['mape']:.2f}%")

    return metrics

load_holdout_data(data_path='data/processed/train_features.parquet', holdout_days=42)

Load holdout validation data.

Parameters

data_path : str Path to processed features holdout_days : int Number of days for holdout set

Returns

tuple (X_holdout, y_holdout, feature_cols)

Source code in src/models/validate_model.py
def load_holdout_data(
    data_path: str = "data/processed/train_features.parquet",
    holdout_days: int = 42,
) -> tuple:
    """Load holdout validation data.

    Parameters
    ----------
    data_path : str
        Path to processed features
    holdout_days : int
        Number of days for holdout set

    Returns
    -------
    tuple
        (X_holdout, y_holdout, feature_cols)
    """
    logger.info(f"Loading holdout data from: {data_path}")
    df = read_parquet(data_path)

    # Create holdout split
    max_date = df["Date"].max()
    holdout_start = max_date - pd.Timedelta(days=holdout_days - 1)
    holdout_df = df[df["Date"] >= holdout_start].copy()

    # Filter to open stores
    holdout_data = holdout_df[holdout_df["Open"] == 1].copy()

    # Define features
    exclude_cols = ["Sales", "Date", "Store", "Customers"]
    feature_cols = [col for col in df.columns if col not in exclude_cols]

    # Remove missing features
    holdout_data, _ = remove_missing_features(holdout_data, feature_cols)

    X_holdout = holdout_data[feature_cols]
    y_holdout = holdout_data["Sales"].values

    logger.info(f"Holdout data: {len(holdout_data):,} rows (open stores)")
    logger.info(f"Date range: {holdout_data['Date'].min()} to {holdout_data['Date'].max()}")

    return X_holdout, y_holdout, feature_cols

main()

Main validation workflow.

Source code in src/models/validate_model.py
def main():
    """Main validation workflow."""
    import argparse

    parser = argparse.ArgumentParser(description="Validate and promote Rossmann ensemble model")
    parser.add_argument("--model-name", default="rossmann-ensemble", help="Model name in registry")
    parser.add_argument("--version", type=str, help="Model version to validate")
    parser.add_argument(
        "--threshold",
        type=float,
        default=0.10,
        help="RMSPE threshold (default: 0.10)",
    )
    parser.add_argument(
        "--strict",
        action="store_true",
        help="Use strict threshold (0.09856 - top 50)",
    )
    parser.add_argument(
        "--no-auto-promote",
        action="store_true",
        help="Disable automatic promotion to Staging",
    )
    parser.add_argument(
        "--promote-to-production",
        action="store_true",
        help="Promote Staging model to Production (manual step)",
    )

    args = parser.parse_args()

    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )

    if args.promote_to_production:
        promote_to_production(model_name=args.model_name, version=args.version)
    else:
        results = validate_and_promote(
            model_name=args.model_name,
            version=args.version,
            threshold_rmspe=args.threshold,
            strict=args.strict,
            auto_promote=not args.no_auto_promote,
        )

        # Print summary
        print("\n" + "=" * 70)
        print("VALIDATION RESULTS")
        print("=" * 70)
        print(f"Model: {results['model_name']} v{results['version']}")
        print(f"RMSPE: {results['metrics']['rmspe']:.6f}")
        print(f"Threshold: {results['threshold_rmspe']:.6f}")
        print(f"Passed: {'✓ YES' if results['passed'] else '✗ NO'}")
        print(f"Stage: {results['stage']}")
        print("=" * 70)

        if results["passed"] and results["promoted"]:
            print("\n✓ Model validated and promoted to Staging!")
            print("\nNext steps:")
            print("  1. Test Staging model in production-like environment")
            print("  2. If satisfied, promote to Production:")
            print(
                f"     python src/models/validate_model.py --promote-to-production --version {results['version']}"
            )
        elif results["passed"]:
            print("\n✓ Model passed validation!")
            print("\nTo promote to Staging:")
            print(f"  python src/models/validate_model.py --version {results['version']}")
        else:
            print("\n✗ Model did not pass validation")
            print("\nConsider:")
            print("  - Retraining with more data")
            print("  - Tuning hyperparameters further")
            print("  - Adjusting ensemble weights")

promote_to_production(model_name='rossmann-ensemble', version=None)

Promote a Staging model to Production.

This should typically be done manually after testing the Staging model.

Parameters

model_name : str Name of registered model version : str, optional Model version to promote. If None, uses current Staging version

Source code in src/models/validate_model.py
def promote_to_production(
    model_name: str = "rossmann-ensemble",
    version: Optional[str] = None,
) -> None:
    """Promote a Staging model to Production.

    This should typically be done manually after testing the Staging model.

    Parameters
    ----------
    model_name : str
        Name of registered model
    version : str, optional
        Model version to promote. If None, uses current Staging version
    """
    logger.info("=" * 70)
    logger.info("PROMOTE TO PRODUCTION")
    logger.info("=" * 70)

    # Get Staging version if not specified
    if version is None:
        version = get_model_version(model_name, stage="Staging")
        if version is None:
            raise ValueError(f"No Staging version found for {model_name}")
        logger.info(f"Promoting Staging version: {version}")

    # Promote to Production
    logger.info(f"\nPromoting {model_name} v{version} to Production...")
    promote_model(model_name, version=version, stage="Production")

    logger.info("=" * 70)
    logger.info(f"✓ {model_name} v{version} is now in Production")
    logger.info("=" * 70)

validate_and_promote(model_name='rossmann-ensemble', version=None, data_path='data/processed/train_features.parquet', threshold_rmspe=0.1, strict=False, auto_promote=True)

Validate a model and promote to Staging if passing.

Parameters

model_name : str Name of registered model version : str, optional Model version to validate. If None, uses latest version data_path : str Path to processed data threshold_rmspe : float Performance threshold strict : bool Use strict threshold (0.09856) auto_promote : bool Automatically promote to Staging if passing

Returns

dict Validation results including metrics and promotion status

Source code in src/models/validate_model.py
def validate_and_promote(
    model_name: str = "rossmann-ensemble",
    version: Optional[str] = None,
    data_path: str = "data/processed/train_features.parquet",
    threshold_rmspe: float = 0.10,
    strict: bool = False,
    auto_promote: bool = True,
) -> dict[str, any]:
    """Validate a model and promote to Staging if passing.

    Parameters
    ----------
    model_name : str
        Name of registered model
    version : str, optional
        Model version to validate. If None, uses latest version
    data_path : str
        Path to processed data
    threshold_rmspe : float
        Performance threshold
    strict : bool
        Use strict threshold (0.09856)
    auto_promote : bool
        Automatically promote to Staging if passing

    Returns
    -------
    dict
        Validation results including metrics and promotion status
    """
    logger.info("=" * 70)
    logger.info("MODEL VALIDATION & PROMOTION")
    logger.info("=" * 70)

    # Get version if not specified
    if version is None:
        # Find the latest version (any stage)
        from mlflow.tracking import MlflowClient

        client = MlflowClient()
        versions = client.search_model_versions(f"name='{model_name}'")
        if not versions:
            raise ValueError(f"No versions found for model: {model_name}")
        version = max([int(mv.version) for mv in versions])
        logger.info(f"Using latest version: {version}")
    else:
        logger.info(f"Validating version: {version}")

    # Load model
    logger.info(f"\nLoading model: {model_name} version {version}")
    model = load_model(model_name, stage=str(version))

    # Load holdout data
    logger.info("\nLoading holdout validation data...")
    X_holdout, y_holdout, feature_cols = load_holdout_data(data_path)

    # Evaluate model
    logger.info("\nEvaluating model performance...")
    metrics = evaluate_model(model, X_holdout, y_holdout)

    # Check thresholds
    logger.info("\nChecking performance thresholds...")
    passed = check_performance_threshold(metrics, threshold_rmspe, strict)

    # Promotion decision
    promoted = False
    if passed and auto_promote:
        logger.info("\n✓ Model passed validation, promoting to Staging...")
        promote_model(model_name, version=str(version), stage="Staging")
        promoted = True
        logger.info(f"✓ Model {model_name} v{version} promoted to Staging")
    elif passed:
        logger.info("\n✓ Model passed validation (auto_promote=False, manual promotion required)")
    else:
        logger.info("\n✗ Model did not pass validation, not promoting")

    results = {
        "model_name": model_name,
        "version": version,
        "metrics": metrics,
        "threshold_rmspe": threshold_rmspe if not strict else 0.09856,
        "passed": passed,
        "promoted": promoted,
        "stage": "Staging" if promoted else "None",
    }

    logger.info("=" * 70)
    logger.info("VALIDATION SUMMARY")
    logger.info("=" * 70)
    logger.info(f"Model: {model_name} v{version}")
    logger.info(f"RMSPE: {metrics['rmspe']:.6f}")
    logger.info(f"Passed: {passed}")
    logger.info(f"Promoted to Staging: {promoted}")
    logger.info("=" * 70)

    return results

Inference Pipeline

Production inference pipeline for Rossmann forecasting.

This module provides functionality to load the production model from MLflow and generate predictions on new data.

generate_predictions(model, X_features, data_df)

Generate predictions using loaded model.

Parameters

model : mlflow.pyfunc.PyFuncModel Loaded production model X_features : pd.DataFrame Feature matrix for predictions data_df : pd.DataFrame Original data with Store, Date, etc.

Returns

pd.DataFrame Predictions with metadata (Store, Date, Predicted_Sales)

Source code in src/models/predict.py
def generate_predictions(
    model,
    X_features: pd.DataFrame,
    data_df: pd.DataFrame,
) -> pd.DataFrame:
    """Generate predictions using loaded model.

    Parameters
    ----------
    model : mlflow.pyfunc.PyFuncModel
        Loaded production model
    X_features : pd.DataFrame
        Feature matrix for predictions
    data_df : pd.DataFrame
        Original data with Store, Date, etc.

    Returns
    -------
    pd.DataFrame
        Predictions with metadata (Store, Date, Predicted_Sales)
    """
    logger.info(f"Generating predictions for {len(X_features):,} rows...")

    # Generate predictions
    predictions = model.predict(X_features)

    # Create predictions dataframe
    predictions_df = pd.DataFrame(
        {
            "Store": data_df["Store"].values,
            "Date": data_df["Date"].values,
            "DayOfWeek": data_df["DayOfWeek"].values if "DayOfWeek" in data_df.columns else None,
            "Predicted_Sales": predictions,
        }
    )

    # Add actual sales if available (for comparison)
    if "Sales" in data_df.columns:
        predictions_df["Actual_Sales"] = data_df["Sales"].values
        predictions_df["Prediction_Error"] = (
            predictions_df["Actual_Sales"] - predictions_df["Predicted_Sales"]
        )
        predictions_df["Absolute_Percentage_Error"] = (
            np.abs(predictions_df["Prediction_Error"]) / predictions_df["Actual_Sales"]
        ) * 100

    logger.info(f"✓ Generated {len(predictions):,} predictions")
    logger.info(f"  Prediction range: ${predictions.min():.2f} to ${predictions.max():.2f}")

    if "Actual_Sales" in predictions_df.columns:
        from evaluation.metrics import rmspe

        rmspe_score = rmspe(
            predictions_df["Actual_Sales"].values,
            predictions_df["Predicted_Sales"].values,
        )
        logger.info(f"  RMSPE: {rmspe_score:.6f}")

    return predictions_df

load_inference_data(data_path='data/processed/train_features.parquet', start_date=None, end_date=None)

Load data for inference.

Parameters

data_path : str Path to processed features parquet file start_date : str, optional Start date for predictions (YYYY-MM-DD). If None, uses all data end_date : str, optional End date for predictions (YYYY-MM-DD). If None, uses all data

Returns

tuple (data_df, X_features, feature_cols)

Source code in src/models/predict.py
def load_inference_data(
    data_path: str = "data/processed/train_features.parquet",
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
) -> tuple:
    """Load data for inference.

    Parameters
    ----------
    data_path : str
        Path to processed features parquet file
    start_date : str, optional
        Start date for predictions (YYYY-MM-DD). If None, uses all data
    end_date : str, optional
        End date for predictions (YYYY-MM-DD). If None, uses all data

    Returns
    -------
    tuple
        (data_df, X_features, feature_cols)
    """
    logger.info(f"Loading data from: {data_path}")
    df = read_parquet(data_path)

    # Filter by date range if specified
    if start_date:
        df = df[df["Date"] >= pd.to_datetime(start_date)]
        logger.info(f"Filtered to start_date >= {start_date}")

    if end_date:
        df = df[df["Date"] <= pd.to_datetime(end_date)]
        logger.info(f"Filtered to end_date <= {end_date}")

    # Filter to open stores only (predictions only needed for open stores)
    df_open = df[df["Open"] == 1].copy()

    logger.info(f"Loaded {len(df_open):,} rows (open stores)")
    logger.info(f"Date range: {df_open['Date'].min()} to {df_open['Date'].max()}")

    # Define feature columns
    exclude_cols = ["Sales", "Date", "Store", "Customers"]
    feature_cols = [col for col in df.columns if col not in exclude_cols]

    # Handle missing features (if any)
    df_open = df_open.dropna(subset=feature_cols)

    X_features = df_open[feature_cols]

    logger.info(f"Features: {len(feature_cols)}")

    return df_open, X_features, feature_cols

main()

Main inference workflow with CLI.

Source code in src/models/predict.py
def main():
    """Main inference workflow with CLI."""
    import argparse

    parser = argparse.ArgumentParser(
        description="Generate predictions using production Rossmann model"
    )
    parser.add_argument("--model-name", default="rossmann-ensemble", help="Model name in registry")
    parser.add_argument(
        "--stage",
        default="Production",
        help="Model stage ('Production', 'Staging', or version number)",
    )
    parser.add_argument(
        "--data-path",
        default="data/processed/train_features.parquet",
        help="Input data path",
    )
    parser.add_argument(
        "--output-path",
        default="outputs/predictions/production_predictions.csv",
        help="Output predictions path",
    )
    parser.add_argument("--start-date", help="Start date for predictions (YYYY-MM-DD)")
    parser.add_argument("--end-date", help="End date for predictions (YYYY-MM-DD)")

    args = parser.parse_args()

    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )

    predictions_df = predict(
        model_name=args.model_name,
        stage=args.stage,
        data_path=args.data_path,
        output_path=args.output_path,
        start_date=args.start_date,
        end_date=args.end_date,
    )

    # Print summary statistics
    print("\n" + "=" * 70)
    print("PREDICTION SUMMARY")
    print("=" * 70)
    print(f"Total predictions: {len(predictions_df):,}")
    print(f"Date range: {predictions_df['Date'].min()} to {predictions_df['Date'].max()}")
    print(f"Stores: {predictions_df['Store'].nunique():,} unique stores")
    print(
        f"Predicted sales range: ${predictions_df['Predicted_Sales'].min():.2f} to ${predictions_df['Predicted_Sales'].max():.2f}"
    )

    if "Actual_Sales" in predictions_df.columns:
        print("\nActual vs Predicted:")
        print(f"  Mean Actual: ${predictions_df['Actual_Sales'].mean():.2f}")
        print(f"  Mean Predicted: ${predictions_df['Predicted_Sales'].mean():.2f}")
        print(f"  Mean APE: {predictions_df['Absolute_Percentage_Error'].mean():.2f}%")

    print("=" * 70)

predict(model_name='rossmann-ensemble', stage='Production', data_path='data/processed/train_features.parquet', output_path='outputs/predictions/production_predictions.csv', start_date=None, end_date=None)

Main prediction pipeline.

Parameters

model_name : str Name of registered model in MLflow stage : str Model stage to use ('Production', 'Staging', or version number) data_path : str Path to input data output_path : str Path to save predictions start_date : str, optional Start date for predictions (YYYY-MM-DD) end_date : str, optional End date for predictions (YYYY-MM-DD)

Returns

pd.DataFrame Predictions dataframe

Examples
Predict using Production model

predictions = predict( ... model_name='rossmann-ensemble', ... stage='Production', ... data_path='data/processed/train_features.parquet' ... )

Predict for specific date range

predictions = predict( ... model_name='rossmann-ensemble', ... stage='Production', ... start_date='2015-07-01', ... end_date='2015-07-31' ... )

Source code in src/models/predict.py
def predict(
    model_name: str = "rossmann-ensemble",
    stage: str = "Production",
    data_path: str = "data/processed/train_features.parquet",
    output_path: str = "outputs/predictions/production_predictions.csv",
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
) -> pd.DataFrame:
    """Main prediction pipeline.

    Parameters
    ----------
    model_name : str
        Name of registered model in MLflow
    stage : str
        Model stage to use ('Production', 'Staging', or version number)
    data_path : str
        Path to input data
    output_path : str
        Path to save predictions
    start_date : str, optional
        Start date for predictions (YYYY-MM-DD)
    end_date : str, optional
        End date for predictions (YYYY-MM-DD)

    Returns
    -------
    pd.DataFrame
        Predictions dataframe

    Examples
    --------
    >>> # Predict using Production model
    >>> predictions = predict(
    ...     model_name='rossmann-ensemble',
    ...     stage='Production',
    ...     data_path='data/processed/train_features.parquet'
    ... )
    >>>
    >>> # Predict for specific date range
    >>> predictions = predict(
    ...     model_name='rossmann-ensemble',
    ...     stage='Production',
    ...     start_date='2015-07-01',
    ...     end_date='2015-07-31'
    ... )
    """
    logger.info("=" * 70)
    logger.info("PRODUCTION INFERENCE PIPELINE")
    logger.info("=" * 70)

    # Get model version
    if stage in ["Production", "Staging"]:
        version = get_model_version(model_name, stage=stage)
        logger.info(f"Using {stage} model version: {version}")
    else:
        version = stage  # Assume it's a version number
        logger.info(f"Using model version: {version}")

    # Load model
    logger.info(f"\nLoading model: {model_name} ({stage})")
    model = load_model(model_name, stage=stage)

    # Load inference data
    logger.info("\nLoading inference data...")
    data_df, X_features, feature_cols = load_inference_data(data_path, start_date, end_date)

    # Generate predictions
    logger.info("\nGenerating predictions...")
    predictions_df = generate_predictions(model, X_features, data_df)

    # Save predictions
    logger.info("\nSaving predictions...")
    save_predictions(
        predictions_df,
        output_path,
        model_version=version,
        metadata={
            "model_name": model_name,
            "model_stage": stage,
            "data_path": data_path,
        },
    )

    logger.info("=" * 70)
    logger.info("✓ PREDICTION PIPELINE COMPLETE")
    logger.info("=" * 70)
    logger.info(f"Predictions saved to: {output_path}")
    logger.info(f"Total predictions: {len(predictions_df):,}")

    return predictions_df

save_predictions(predictions_df, output_path, model_version, metadata=None)

Save predictions to CSV with metadata.

Parameters

predictions_df : pd.DataFrame Predictions dataframe output_path : str Output file path model_version : str Model version used for predictions metadata : dict, optional Additional metadata to save

Source code in src/models/predict.py
def save_predictions(
    predictions_df: pd.DataFrame,
    output_path: str,
    model_version: str,
    metadata: Optional[dict] = None,
) -> None:
    """Save predictions to CSV with metadata.

    Parameters
    ----------
    predictions_df : pd.DataFrame
        Predictions dataframe
    output_path : str
        Output file path
    model_version : str
        Model version used for predictions
    metadata : dict, optional
        Additional metadata to save
    """
    # Ensure output directory exists
    output_file = Path(output_path)
    ensure_dir(output_file.parent)

    # Save predictions
    predictions_df.to_csv(output_path, index=False)
    logger.info(f"✓ Saved predictions to: {output_path}")

    # Save metadata
    if metadata is None:
        metadata = {}

    metadata.update(
        {
            "model_version": model_version,
            "prediction_date": datetime.now().isoformat(),
            "n_predictions": len(predictions_df),
            "date_range": {
                "start": str(predictions_df["Date"].min()),
                "end": str(predictions_df["Date"].max()),
            },
        }
    )

    metadata_path = output_file.parent / f"{output_file.stem}_metadata.json"
    import json

    with open(metadata_path, "w") as f:
        json.dump(metadata, f, indent=2)

    logger.info(f"✓ Saved metadata to: {metadata_path}")

Usage Examples

Training Baseline Models

# From command line
python -m src.models.train_baselines

# This will:
# 1. Load data/processed/train_features.parquet
# 2. Create time-series CV folds
# 3. Train naive and simple LightGBM models
# 4. Log results to MLflow
# 5. Print RMSPE scores
# From Python
from src.models.train_baselines import train_naive_model, train_simple_lgbm

# Train naive baseline (last week's sales)
rmspe_naive = train_naive_model(df)
print(f"Naive RMSPE: {rmspe_naive:.4f}")

# Train simple LightGBM
rmspe_lgbm = train_simple_lgbm(df)
print(f"Simple LightGBM RMSPE: {rmspe_lgbm:.4f}")

Training Advanced Models

# From command line
python -m src.models.train_advanced

# This will:
# 1. Load features and CV folds
# 2. Train tuned LightGBM, XGBoost, CatBoost
# 3. Perform hyperparameter optimization
# 4. Log all experiments to MLflow
# 5. Save best models to outputs/models/
# From Python
from src.models.train_advanced import train_tuned_lgbm, train_xgboost, train_catboost

# Train tuned models
lgbm_rmspe = train_tuned_lgbm(df, params)
xgb_rmspe = train_xgboost(df, params)
cat_rmspe = train_catboost(df, params)

Creating Ensembles

from src.models.ensembles import weighted_blend, stacked_ensemble

# Weighted blend (simple average or optimized weights)
ensemble_preds = weighted_blend(
    predictions=[lgbm_preds, xgb_preds, cat_preds],
    weights=[0.5, 0.3, 0.2]  # Or optimize with cv
)

# Stacked ensemble (meta-learner)
stacked_preds = stacked_ensemble(
    oof_predictions=[lgbm_oof, xgb_oof, cat_oof],
    test_predictions=[lgbm_test, xgb_test, cat_test],
    y_train=y_train,
    meta_model="lgbm"  # or "linear"
)

Model Types

Baseline Models

Naive Last-Week Model

Predicts today's sales = sales from 7 days ago (same day of week last week).

Purpose: Establishes minimum performance threshold.

Expected RMSPE: ~0.15-0.20

Simple LightGBM

LightGBM with default parameters, no tuning.

Features used: All standard features from feature engineering

Expected RMSPE: ~0.12-0.15

Advanced Models

Tuned LightGBM

LightGBM with optimized hyperparameters:

  • num_leaves: 31-127
  • learning_rate: 0.01-0.1
  • feature_fraction: 0.7-1.0
  • bagging_fraction: 0.7-1.0
  • min_child_samples: 20-100

Expected RMSPE: ~0.10-0.11

XGBoost

XGBoost with tuned parameters:

  • max_depth: 3-10
  • learning_rate: 0.01-0.1
  • subsample: 0.7-1.0
  • colsample_bytree: 0.7-1.0

Expected RMSPE: ~0.10-0.11

CatBoost

CatBoost with categorical feature handling:

  • Automatically handles StoreType, Assortment, etc.
  • depth: 4-10
  • learning_rate: 0.01-0.1

Expected RMSPE: ~0.10-0.11

Ensemble Models

Weighted Blend

Simple weighted average of model predictions.

Weights optimization:

from scipy.optimize import minimize

def objective(weights):
    blend = np.average(predictions, axis=0, weights=weights)
    return rmspe(y_true, blend)

result = minimize(objective, initial_weights, constraints=constraints)
optimal_weights = result.x

Expected RMSPE: ~0.095-0.100

Stacked Ensemble

Uses out-of-fold predictions as features for a meta-learner.

Architecture:

Base Models (LightGBM, XGBoost, CatBoost)
Out-of-Fold Predictions
Meta-Model (Linear or LightGBM)
Final Predictions

Expected RMSPE: ~0.095-0.098

MLflow Integration

All models are logged to MLflow with:

  • Parameters: Hyperparameters used
  • Metrics: RMSPE per fold and average
  • Artifacts: Model files, feature importance plots
  • Tags: Model type, experiment name
import mlflow

with mlflow.start_run(run_name="tuned_lgbm"):
    # Log parameters
    mlflow.log_params(params)

    # Train model
    model = train_model(X_train, y_train, params)

    # Log metrics
    mlflow.log_metric("rmspe", rmspe_score)

    # Log model
    mlflow.lightgbm.log_model(model, "model")

Model Evaluation Flow

flowchart TD
    A[train_features.parquet] --> B[Time-Series CV Splits]
    B --> C[Fold 1: Train → Validate]
    B --> D[Fold 2: Train → Validate]
    B --> E[Fold 3: Train → Validate]
    C --> F[Calculate RMSPE per Fold]
    D --> F
    E --> F
    F --> G[Average RMSPE]
    G --> H[MLflow Logging]
    H --> I[Model Selection]

Key Functions

train_baselines.py

  • train_naive_model() - Last-week baseline
  • train_simple_lgbm() - LightGBM with defaults

train_advanced.py

  • train_tuned_lgbm() - Hyperparameter-tuned LightGBM
  • train_xgboost() - XGBoost model
  • train_catboost() - CatBoost model

ensembles.py

  • weighted_blend() - Weighted average of predictions
  • stacked_ensemble() - Meta-learner stacking
  • optimize_blend_weights() - Find optimal ensemble weights