Models API
This module contains model training functions for baseline and advanced models.
Overview
The modeling pipeline includes:
- Baseline models - Naive forecasts and simple LightGBM
- Advanced models - Tuned LightGBM, XGBoost, CatBoost
- Ensemble models - Weighted blending and stacking
All models are evaluated using RMSPE (Root Mean Square Percentage Error) with time-series cross-validation.
Module Reference
Ensemble Model
Ensemble model for Rossmann sales forecasting.
This module provides a custom MLflow PyFunc wrapper that encapsulates LightGBM, XGBoost, and
CatBoost models into a weighted ensemble for production deployment.
RossmannEnsemble
Bases: PythonModel
Custom MLflow PyFunc model that wraps LightGBM, XGBoost, and CatBoost models into a weighted
ensemble for production deployment.
This allows the entire ensemble to be registered as a single deployable
artifact in MLflow Model Registry, encapsulating all ensemble logic.
Parameters
lgb_model : lightgbm.Booster
Trained LightGBM model
xgb_model : xgboost.Booster
Trained XGBoost model
cb_model : catboost.CatBoost
Trained CatBoost model
weights : dict
Ensemble weights for each model, e.g., {'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1}
cat_features : list
List of categorical feature names
Examples
ensemble = RossmannEnsemble(
... lgb_model=lgb_model,
... xgb_model=xgb_model,
... cb_model=cb_model,
... weights={'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1},
... cat_features=['StoreType', 'Assortment', 'StateHoliday', 'PromoInterval']
... )
predictions = ensemble.predict(None, X_test)
Source code in src/models/ensemble.py
| class RossmannEnsemble(mlflow.pyfunc.PythonModel):
"""Custom MLflow PyFunc model that wraps LightGBM, XGBoost, and CatBoost models into a weighted
ensemble for production deployment.
This allows the entire ensemble to be registered as a single deployable
artifact in MLflow Model Registry, encapsulating all ensemble logic.
Parameters
----------
lgb_model : lightgbm.Booster
Trained LightGBM model
xgb_model : xgboost.Booster
Trained XGBoost model
cb_model : catboost.CatBoost
Trained CatBoost model
weights : dict
Ensemble weights for each model, e.g., {'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1}
cat_features : list
List of categorical feature names
Examples
--------
>>> ensemble = RossmannEnsemble(
... lgb_model=lgb_model,
... xgb_model=xgb_model,
... cb_model=cb_model,
... weights={'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1},
... cat_features=['StoreType', 'Assortment', 'StateHoliday', 'PromoInterval']
... )
>>> predictions = ensemble.predict(None, X_test)
"""
def __init__(self, lgb_model, xgb_model, cb_model, weights, cat_features):
"""Initialize the ensemble model with trained base models and weights.
Parameters
----------
lgb_model : lightgbm.Booster
Trained LightGBM model
xgb_model : xgboost.Booster
Trained XGBoost model
cb_model : catboost.CatBoost
Trained CatBoost model
weights : dict
Ensemble weights, must sum to 1.0
cat_features : list
Categorical feature names
"""
self.lgb_model = lgb_model
self.xgb_model = xgb_model
self.cb_model = cb_model
self.weights = weights
self.cat_features = cat_features
# Validate weights sum to 1.0
total_weight = sum(weights.values())
if not np.isclose(total_weight, 1.0):
raise ValueError(
f"Ensemble weights must sum to 1.0, got {total_weight}. " f"Weights: {weights}"
)
def predict(self, context, model_input):
"""Generate ensemble predictions by combining predictions from all three models.
Parameters
----------
context : mlflow.pyfunc.PythonModelContext
MLflow model context (unused, required by interface)
model_input : pd.DataFrame
Input features for prediction
Returns
-------
np.ndarray
Weighted ensemble predictions
"""
import catboost as cb
import xgboost as xgb
# LightGBM predictions
lgb_preds = self.lgb_model.predict(model_input)
# XGBoost predictions (needs categorical encoding)
X_xgb = model_input.copy()
for col in X_xgb.columns:
if X_xgb[col].dtype.name == "category":
X_xgb[col] = X_xgb[col].cat.codes
dmatrix = xgb.DMatrix(X_xgb)
xgb_preds = self.xgb_model.predict(dmatrix)
# CatBoost predictions
pool = cb.Pool(model_input, cat_features=self.cat_features)
cb_preds = self.cb_model.predict(pool)
# Weighted ensemble
ensemble_preds = (
self.weights["lightgbm"] * lgb_preds
+ self.weights["xgboost"] * xgb_preds
+ self.weights["catboost"] * cb_preds
)
return ensemble_preds
|
__init__(lgb_model, xgb_model, cb_model, weights, cat_features)
Initialize the ensemble model with trained base models and weights.
Parameters
lgb_model : lightgbm.Booster
Trained LightGBM model
xgb_model : xgboost.Booster
Trained XGBoost model
cb_model : catboost.CatBoost
Trained CatBoost model
weights : dict
Ensemble weights, must sum to 1.0
cat_features : list
Categorical feature names
Source code in src/models/ensemble.py
| def __init__(self, lgb_model, xgb_model, cb_model, weights, cat_features):
"""Initialize the ensemble model with trained base models and weights.
Parameters
----------
lgb_model : lightgbm.Booster
Trained LightGBM model
xgb_model : xgboost.Booster
Trained XGBoost model
cb_model : catboost.CatBoost
Trained CatBoost model
weights : dict
Ensemble weights, must sum to 1.0
cat_features : list
Categorical feature names
"""
self.lgb_model = lgb_model
self.xgb_model = xgb_model
self.cb_model = cb_model
self.weights = weights
self.cat_features = cat_features
# Validate weights sum to 1.0
total_weight = sum(weights.values())
if not np.isclose(total_weight, 1.0):
raise ValueError(
f"Ensemble weights must sum to 1.0, got {total_weight}. " f"Weights: {weights}"
)
|
predict(context, model_input)
Generate ensemble predictions by combining predictions from all three models.
Parameters
context : mlflow.pyfunc.PythonModelContext
MLflow model context (unused, required by interface)
model_input : pd.DataFrame
Input features for prediction
Returns
np.ndarray
Weighted ensemble predictions
Source code in src/models/ensemble.py
| def predict(self, context, model_input):
"""Generate ensemble predictions by combining predictions from all three models.
Parameters
----------
context : mlflow.pyfunc.PythonModelContext
MLflow model context (unused, required by interface)
model_input : pd.DataFrame
Input features for prediction
Returns
-------
np.ndarray
Weighted ensemble predictions
"""
import catboost as cb
import xgboost as xgb
# LightGBM predictions
lgb_preds = self.lgb_model.predict(model_input)
# XGBoost predictions (needs categorical encoding)
X_xgb = model_input.copy()
for col in X_xgb.columns:
if X_xgb[col].dtype.name == "category":
X_xgb[col] = X_xgb[col].cat.codes
dmatrix = xgb.DMatrix(X_xgb)
xgb_preds = self.xgb_model.predict(dmatrix)
# CatBoost predictions
pool = cb.Pool(model_input, cat_features=self.cat_features)
cb_preds = self.cb_model.predict(pool)
# Weighted ensemble
ensemble_preds = (
self.weights["lightgbm"] * lgb_preds
+ self.weights["xgboost"] * xgb_preds
+ self.weights["catboost"] * cb_preds
)
return ensemble_preds
|
create_ensemble(lgb_model, xgb_model, cb_model, weights=None, cat_features=None)
Factory function to create a RossmannEnsemble instance.
Parameters
lgb_model : lightgbm.Booster
Trained LightGBM model
xgb_model : xgboost.Booster
Trained XGBoost model
cb_model : catboost.CatBoost
Trained CatBoost model
weights : dict, optional
Ensemble weights. Default: {'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1}
cat_features : list, optional
Categorical feature names. Default: ['StoreType', 'Assortment', 'StateHoliday', 'PromoInterval']
Returns
RossmannEnsemble
Initialized ensemble model
Examples
ensemble = create_ensemble(lgb_model, xgb_model, cb_model)
predictions = ensemble.predict(None, X_test)
Source code in src/models/ensemble.py
| def create_ensemble(lgb_model, xgb_model, cb_model, weights=None, cat_features=None):
"""Factory function to create a RossmannEnsemble instance.
Parameters
----------
lgb_model : lightgbm.Booster
Trained LightGBM model
xgb_model : xgboost.Booster
Trained XGBoost model
cb_model : catboost.CatBoost
Trained CatBoost model
weights : dict, optional
Ensemble weights. Default: {'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1}
cat_features : list, optional
Categorical feature names. Default: ['StoreType', 'Assortment', 'StateHoliday', 'PromoInterval']
Returns
-------
RossmannEnsemble
Initialized ensemble model
Examples
--------
>>> ensemble = create_ensemble(lgb_model, xgb_model, cb_model)
>>> predictions = ensemble.predict(None, X_test)
"""
if weights is None:
weights = {"lightgbm": 0.3, "xgboost": 0.6, "catboost": 0.1}
if cat_features is None:
cat_features = ["StoreType", "Assortment", "StateHoliday", "PromoInterval"]
return RossmannEnsemble(
lgb_model=lgb_model,
xgb_model=xgb_model,
cb_model=cb_model,
weights=weights,
cat_features=cat_features,
)
|
Model Registry
MLflow Model Registry utilities for Rossmann forecasting.
This module provides helper functions for managing models in MLflow Model Registry, including
registration, promotion, and loading models.
get_mlflow_client()
Get an MLflow tracking client.
Returns
MlflowClient
Initialized MLflow client
Source code in src/models/model_registry.py
| def get_mlflow_client() -> MlflowClient:
"""Get an MLflow tracking client.
Returns
-------
MlflowClient
Initialized MLflow client
"""
return MlflowClient()
|
get_model_info(model_name, version=None)
Get detailed information about a registered model.
Parameters
model_name : str
Name of the registered model
version : str, optional
Model version. If None, gets info for all versions
Returns
dict
Model information including version, stage, metrics, etc.
Examples
info = get_model_info('rossmann-ensemble', version='1')
print(f"Model stage: {info['current_stage']}")
print(f"Run ID: {info['run_id']}")
Source code in src/models/model_registry.py
| def get_model_info(model_name: str, version: Optional[str] = None) -> dict[str, Any]:
"""Get detailed information about a registered model.
Parameters
----------
model_name : str
Name of the registered model
version : str, optional
Model version. If None, gets info for all versions
Returns
-------
dict
Model information including version, stage, metrics, etc.
Examples
--------
>>> info = get_model_info('rossmann-ensemble', version='1')
>>> print(f"Model stage: {info['current_stage']}")
>>> print(f"Run ID: {info['run_id']}")
"""
client = get_mlflow_client()
if version:
mv = client.get_model_version(name=model_name, version=version)
return {
"name": mv.name,
"version": mv.version,
"current_stage": mv.current_stage,
"description": mv.description,
"run_id": mv.run_id,
"status": mv.status,
"creation_timestamp": mv.creation_timestamp,
"last_updated_timestamp": mv.last_updated_timestamp,
}
else:
# Get all versions
model = client.get_registered_model(model_name)
versions = client.search_model_versions(f"name='{model_name}'")
return {
"name": model.name,
"description": model.description,
"creation_timestamp": model.creation_timestamp,
"last_updated_timestamp": model.last_updated_timestamp,
"versions": [
{
"version": mv.version,
"stage": mv.current_stage,
"run_id": mv.run_id,
"status": mv.status,
}
for mv in versions
],
}
|
get_model_version(model_name, stage='Production')
Get the version number of a model in a specific stage.
Parameters
model_name : str
Name of the registered model
stage : str, default='Production'
Model stage ('Staging', 'Production', or 'None')
Returns
str or None
Model version number, or None if no model in stage
Examples
version = get_model_version('rossmann-ensemble', stage='Production')
print(f"Production version: {version}")
Source code in src/models/model_registry.py
| def get_model_version(model_name: str, stage: str = "Production") -> Optional[str]:
"""Get the version number of a model in a specific stage.
Parameters
----------
model_name : str
Name of the registered model
stage : str, default='Production'
Model stage ('Staging', 'Production', or 'None')
Returns
-------
str or None
Model version number, or None if no model in stage
Examples
--------
>>> version = get_model_version('rossmann-ensemble', stage='Production')
>>> print(f"Production version: {version}")
"""
client = get_mlflow_client()
versions = client.get_latest_versions(model_name, stages=[stage])
if not versions:
logger.warning(f"No model found for {model_name} in {stage} stage")
return None
version = versions[0].version
logger.info(f"{model_name} {stage} version: {version}")
return version
|
list_registered_models()
List all registered models in MLflow Model Registry.
Returns
list
List of registered model names
Examples
models = list_registered_models()
for model_name in models:
... print(model_name)
Source code in src/models/model_registry.py
| def list_registered_models() -> list:
"""List all registered models in MLflow Model Registry.
Returns
-------
list
List of registered model names
Examples
--------
>>> models = list_registered_models()
>>> for model_name in models:
... print(model_name)
"""
client = get_mlflow_client()
registered_models = client.search_registered_models()
model_names = [rm.name for rm in registered_models]
logger.info(f"Found {len(model_names)} registered models")
return model_names
|
load_model(model_name, stage='Production')
Load a model from MLflow Model Registry.
Parameters
model_name : str
Name of the registered model
stage : str, default='Production'
Model stage to load ('Staging', 'Production', or version number)
Returns
mlflow.pyfunc.PyFuncModel
Loaded model ready for predictions
Examples
Load production model
model = load_model('rossmann-ensemble', stage='Production')
predictions = model.predict(X_test)
Load specific version
model = load_model('rossmann-ensemble', stage='1')
predictions = model.predict(X_test)
Source code in src/models/model_registry.py
| def load_model(model_name: str, stage: str = "Production"):
"""Load a model from MLflow Model Registry.
Parameters
----------
model_name : str
Name of the registered model
stage : str, default='Production'
Model stage to load ('Staging', 'Production', or version number)
Returns
-------
mlflow.pyfunc.PyFuncModel
Loaded model ready for predictions
Examples
--------
>>> # Load production model
>>> model = load_model('rossmann-ensemble', stage='Production')
>>> predictions = model.predict(X_test)
>>>
>>> # Load specific version
>>> model = load_model('rossmann-ensemble', stage='1')
>>> predictions = model.predict(X_test)
"""
if stage in ["Staging", "Production", "Archived"]:
model_uri = f"models:/{model_name}/{stage}"
else:
# Assume it's a version number
model_uri = f"models:/{model_name}/{stage}"
logger.info(f"Loading model from: {model_uri}")
model = mlflow.pyfunc.load_model(model_uri)
logger.info("✓ Model loaded successfully")
return model
|
Promote a model version to a specific stage.
model_name : str
Name of the registered model
version : str
Model version to promote
stage : str
Target stage ('Staging', 'Production', or 'Archived')
archive_existing : bool, default=True
If True, archive existing models in the target stage
promote_model('rossmann-ensemble', version='1', stage='Staging')
promote_model('rossmann-ensemble', version='2', stage='Production')
Source code in src/models/model_registry.py
| def promote_model(
model_name: str,
version: str,
stage: str,
archive_existing: bool = True,
) -> None:
"""Promote a model version to a specific stage.
Parameters
----------
model_name : str
Name of the registered model
version : str
Model version to promote
stage : str
Target stage ('Staging', 'Production', or 'Archived')
archive_existing : bool, default=True
If True, archive existing models in the target stage
Examples
--------
>>> # Promote to Staging
>>> promote_model('rossmann-ensemble', version='1', stage='Staging')
>>> # Promote to Production
>>> promote_model('rossmann-ensemble', version='2', stage='Production')
"""
client = get_mlflow_client()
# Archive existing models in the target stage if requested
if archive_existing and stage != "Archived":
existing_versions = client.get_latest_versions(model_name, stages=[stage])
for mv in existing_versions:
logger.info(f"Archiving {model_name} version {mv.version} (was in {stage})")
client.transition_model_version_stage(
name=model_name, version=mv.version, stage="Archived"
)
# Promote the specified version
logger.info(f"Promoting {model_name} version {version} to {stage}")
client.transition_model_version_stage(name=model_name, version=version, stage=stage)
logger.info(f"✓ {model_name} version {version} is now in {stage}")
|
register_ensemble_model(ensemble_model, model_name, run_id=None, conda_env=None, signature=None, input_example=None, description=None)
Register an ensemble model to MLflow Model Registry.
Parameters
ensemble_model : RossmannEnsemble
Trained ensemble model instance
model_name : str
Name for the registered model (e.g., 'rossmann-ensemble')
run_id : str, optional
MLflow run ID. If None, uses active run
conda_env : dict, optional
Conda environment specification
signature : mlflow.models.ModelSignature, optional
Model signature
input_example : pd.DataFrame, optional
Example input for model testing
description : str, optional
Model description
Returns
str
Model version number
Examples
from models.ensemble import create_ensemble
ensemble = create_ensemble(lgb_model, xgb_model, cb_model)
version = register_ensemble_model(
... ensemble_model=ensemble,
... model_name='rossmann-ensemble',
... signature=signature,
... input_example=X_sample
... )
print(f"Registered model version: {version}")
Source code in src/models/model_registry.py
| def register_ensemble_model(
ensemble_model,
model_name: str,
run_id: Optional[str] = None,
conda_env: Optional[dict[str, Any]] = None,
signature=None,
input_example=None,
description: Optional[str] = None,
) -> str:
"""Register an ensemble model to MLflow Model Registry.
Parameters
----------
ensemble_model : RossmannEnsemble
Trained ensemble model instance
model_name : str
Name for the registered model (e.g., 'rossmann-ensemble')
run_id : str, optional
MLflow run ID. If None, uses active run
conda_env : dict, optional
Conda environment specification
signature : mlflow.models.ModelSignature, optional
Model signature
input_example : pd.DataFrame, optional
Example input for model testing
description : str, optional
Model description
Returns
-------
str
Model version number
Examples
--------
>>> from models.ensemble import create_ensemble
>>> ensemble = create_ensemble(lgb_model, xgb_model, cb_model)
>>> version = register_ensemble_model(
... ensemble_model=ensemble,
... model_name='rossmann-ensemble',
... signature=signature,
... input_example=X_sample
... )
>>> print(f"Registered model version: {version}")
"""
logger.info(f"Registering ensemble model: {model_name}")
# Log the model
model_info = mlflow.pyfunc.log_model(
artifact_path="ensemble_model",
python_model=ensemble_model,
registered_model_name=model_name,
conda_env=conda_env,
signature=signature,
input_example=input_example,
)
# Get version from model info
version = model_info.registered_model_version
# Update description if provided
if description:
client = get_mlflow_client()
client.update_model_version(name=model_name, version=version, description=description)
logger.info(f"✓ Registered {model_name} version {version}")
return version
|
Training Pipeline
Production ensemble model training pipeline for Rossmann forecasting.
This module provides the main training workflow for retraining the ensemble model with the latest
data and best hyperparameters from Optuna tuning.
load_best_hyperparameters(config_path='config/best_hyperparameters.json')
Load best hyperparameters from Optuna tuning.
Parameters
config_path : str
Path to best hyperparameters JSON file
Returns
dict
Hyperparameters for each model type
Source code in src/models/train_ensemble.py
| def load_best_hyperparameters(
config_path: str = "config/best_hyperparameters.json",
) -> dict:
"""Load best hyperparameters from Optuna tuning.
Parameters
----------
config_path : str
Path to best hyperparameters JSON file
Returns
-------
dict
Hyperparameters for each model type
"""
logger.info(f"Loading hyperparameters from: {config_path}")
with open(config_path) as f:
best_params = json.load(f)
logger.info(f"Best model: {best_params['metadata']['best_model']}")
logger.info(f"Best CV RMSPE: {best_params['metadata']['best_rmspe']:.6f}")
return best_params
|
load_training_data(data_path='data/processed/train_features.parquet', holdout_days=42)
Load and split training data into train and holdout sets.
Parameters
data_path : str
Path to the processed features parquet file
holdout_days : int, default=42
Number of days to use for holdout validation (6 weeks)
Returns
tuple of pd.DataFrame
(train_df, holdout_df)
Source code in src/models/train_ensemble.py
| def load_training_data(
data_path: str = "data/processed/train_features.parquet",
holdout_days: int = 42,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Load and split training data into train and holdout sets.
Parameters
----------
data_path : str
Path to the processed features parquet file
holdout_days : int, default=42
Number of days to use for holdout validation (6 weeks)
Returns
-------
tuple of pd.DataFrame
(train_df, holdout_df)
"""
logger.info(f"Loading data from: {data_path}")
df = read_parquet(data_path)
logger.info(f"Loaded data shape: {df.shape}")
logger.info(f"Date range: {df['Date'].min()} to {df['Date'].max()}")
# Create train/holdout split
max_date = df["Date"].max()
holdout_start = max_date - pd.Timedelta(days=holdout_days - 1)
train_df = df[df["Date"] < holdout_start].copy()
holdout_df = df[df["Date"] >= holdout_start].copy()
logger.info(f"Train set: {len(train_df):,} rows")
logger.info(f" Date range: {train_df['Date'].min()} to {train_df['Date'].max()}")
logger.info(f"Holdout set: {len(holdout_df):,} rows")
logger.info(f" Date range: {holdout_df['Date'].min()} to {holdout_df['Date'].max()}")
return train_df, holdout_df
|
main(data_path='data/processed/train_features.parquet', config_path='config/best_hyperparameters.json', model_name='rossmann-ensemble', ensemble_weights=None, run_name='production_ensemble_training')
Main training pipeline for ensemble model.
Parameters
data_path : str
Path to processed features
config_path : str
Path to best hyperparameters JSON
model_name : str
Name for registered model in MLflow
ensemble_weights : dict, optional
Ensemble weights. Default: {'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1}
run_name : str
Name for MLflow run
Returns
str
Model version number
Source code in src/models/train_ensemble.py
| def main(
data_path: str = "data/processed/train_features.parquet",
config_path: str = "config/best_hyperparameters.json",
model_name: str = "rossmann-ensemble",
ensemble_weights: Optional[dict[str, float]] = None,
run_name: str = "production_ensemble_training",
):
"""Main training pipeline for ensemble model.
Parameters
----------
data_path : str
Path to processed features
config_path : str
Path to best hyperparameters JSON
model_name : str
Name for registered model in MLflow
ensemble_weights : dict, optional
Ensemble weights. Default: {'lightgbm': 0.3, 'xgboost': 0.6, 'catboost': 0.1}
run_name : str
Name for MLflow run
Returns
-------
str
Model version number
"""
# Default ensemble weights
if ensemble_weights is None:
ensemble_weights = {"lightgbm": 0.30, "xgboost": 0.60, "catboost": 0.10}
logger.info("=" * 70)
logger.info("PRODUCTION ENSEMBLE MODEL TRAINING")
logger.info("=" * 70)
# Setup MLflow
experiment_id = setup_mlflow()
logger.info(f"MLflow experiment ID: {experiment_id}")
# Start MLflow run
with mlflow.start_run(run_name=run_name) as run:
logger.info(f"MLflow run ID: {run.info.run_id}")
# Log DVC data version
log_dvc_data_version(data_path)
# Load data
train_df, holdout_df = load_training_data(data_path)
# Load best hyperparameters
best_params = load_best_hyperparameters(config_path)
# Prepare training data
X_train, y_train, feature_cols, cat_features = prepare_training_data(train_df)
# Log dataset info
mlflow.log_param("train_size", len(train_df))
mlflow.log_param("holdout_size", len(holdout_df))
mlflow.log_param("n_features", len(feature_cols))
# Log ensemble weights
for model_type, weight in ensemble_weights.items():
mlflow.log_param(f"{model_type}_weight", weight)
# Train individual models
logger.info("\nTraining individual models...")
lgb_model = train_lightgbm(X_train, y_train, best_params, cat_features)
xgb_model = train_xgboost(X_train, y_train, best_params)
cb_model = train_catboost(X_train, y_train, best_params, cat_features)
logger.info("✓ All models trained successfully!")
# Create ensemble
logger.info("\nCreating ensemble model...")
ensemble = create_ensemble(
lgb_model=lgb_model,
xgb_model=xgb_model,
cb_model=cb_model,
weights=ensemble_weights,
cat_features=cat_features,
)
# Prepare holdout data for input example
from evaluation.cv import remove_missing_features
holdout_data = holdout_df[holdout_df["Open"] == 1].copy()
holdout_data, _ = remove_missing_features(holdout_data, feature_cols)
X_holdout = holdout_data[feature_cols]
# Note: We don't create a signature here due to categorical dtype issues
# MLflow will infer it automatically when the model is saved
# Define conda environment
conda_env = {
"channels": ["conda-forge", "defaults"],
"dependencies": [
f"python={sys.version_info.major}.{sys.version_info.minor}",
"pip",
{
"pip": [
f"lightgbm=={lgb.__version__}",
f"xgboost=={xgb.__version__}",
f"catboost=={cb.__version__}",
"pandas",
"numpy",
"scikit-learn",
]
},
],
"name": "rossmann_ensemble_env",
}
# Register ensemble model
logger.info(f"\nRegistering ensemble model: {model_name}")
version = register_ensemble_model(
ensemble_model=ensemble,
model_name=model_name,
conda_env=conda_env,
signature=None, # Let MLflow infer automatically
input_example=X_holdout.head(5),
description=f"Rossmann ensemble (weights: {ensemble_weights})",
)
logger.info("=" * 70)
logger.info(f"✓ Training complete! Model version: {version}")
logger.info("=" * 70)
return version
|
prepare_training_data(train_df)
Prepare training data by filtering open stores and extracting features.
Parameters
train_df : pd.DataFrame
Raw training data
Returns
tuple
(X_train, y_train, feature_cols, cat_features)
Source code in src/models/train_ensemble.py
| def prepare_training_data(
train_df: pd.DataFrame,
) -> tuple[pd.DataFrame, np.ndarray, list, list]:
"""Prepare training data by filtering open stores and extracting features.
Parameters
----------
train_df : pd.DataFrame
Raw training data
Returns
-------
tuple
(X_train, y_train, feature_cols, cat_features)
"""
# Filter to open stores only
train_data = train_df[train_df["Open"] == 1].copy()
# Define feature columns
exclude_cols = ["Sales", "Date", "Store", "Customers"]
feature_cols = [col for col in train_df.columns if col not in exclude_cols]
# Remove rows with missing features
train_data = train_data.dropna(subset=feature_cols)
X_train = train_data[feature_cols]
y_train = train_data["Sales"].values
# Identify categorical features
cat_features = [col for col in feature_cols if X_train[col].dtype.name == "category"]
logger.info(f"Training data: {len(train_data):,} rows (open stores only)")
logger.info(f"Features: {len(feature_cols)}")
logger.info(f"Categorical features: {len(cat_features)}")
return X_train, y_train, feature_cols, cat_features
|
train_catboost(X_train, y_train, best_params, cat_features)
Train CatBoost model with best hyperparameters.
Parameters
X_train : pd.DataFrame
Training features
y_train : np.ndarray
Training targets
best_params : dict
Best hyperparameters from Optuna
cat_features : list
Categorical feature names
Returns
cb.CatBoost
Trained CatBoost model
Source code in src/models/train_ensemble.py
| def train_catboost(
X_train: pd.DataFrame,
y_train: np.ndarray,
best_params: dict,
cat_features: list,
) -> cb.CatBoost:
"""Train CatBoost model with best hyperparameters.
Parameters
----------
X_train : pd.DataFrame
Training features
y_train : np.ndarray
Training targets
best_params : dict
Best hyperparameters from Optuna
cat_features : list
Categorical feature names
Returns
-------
cb.CatBoost
Trained CatBoost model
"""
logger.info("Training CatBoost...")
start_time = time.time()
cb_params = {
"loss_function": "RMSE",
"eval_metric": "RMSE",
"depth": int(best_params["catboost"]["hyperparameters"]["depth"]),
"learning_rate": best_params["catboost"]["hyperparameters"]["learning_rate"],
"l2_leaf_reg": best_params["catboost"]["hyperparameters"]["l2_leaf_reg"],
"random_strength": best_params["catboost"]["hyperparameters"]["random_strength"],
"bagging_temperature": best_params["catboost"]["hyperparameters"]["bagging_temperature"],
"border_count": int(best_params["catboost"]["hyperparameters"]["border_count"]),
"iterations": 1500,
"verbose": False,
"random_seed": 42,
}
# Log hyperparameters to MLflow
for key, value in cb_params.items():
mlflow.log_param(f"cb_{key}", value)
train_pool = cb.Pool(X_train, label=y_train, cat_features=cat_features)
cb_model = cb.CatBoost(cb_params)
cb_model.fit(train_pool)
cb_time = time.time() - start_time
logger.info(f"CatBoost training complete in {cb_time:.2f}s")
mlflow.log_metric("cb_train_time_seconds", cb_time)
return cb_model
|
train_lightgbm(X_train, y_train, best_params, cat_features)
Train LightGBM model with best hyperparameters.
Parameters
X_train : pd.DataFrame
Training features
y_train : np.ndarray
Training targets
best_params : dict
Best hyperparameters from Optuna
cat_features : list
Categorical feature names
Returns
lgb.Booster
Trained LightGBM model
Source code in src/models/train_ensemble.py
| def train_lightgbm(
X_train: pd.DataFrame,
y_train: np.ndarray,
best_params: dict,
cat_features: list,
) -> lgb.Booster:
"""Train LightGBM model with best hyperparameters.
Parameters
----------
X_train : pd.DataFrame
Training features
y_train : np.ndarray
Training targets
best_params : dict
Best hyperparameters from Optuna
cat_features : list
Categorical feature names
Returns
-------
lgb.Booster
Trained LightGBM model
"""
logger.info("Training LightGBM...")
start_time = time.time()
lgb_params = {
"objective": "regression",
"metric": "rmse",
"boosting_type": "gbdt",
"num_leaves": int(best_params["lightgbm"]["hyperparameters"]["num_leaves"]),
"learning_rate": best_params["lightgbm"]["hyperparameters"]["learning_rate"],
"feature_fraction": best_params["lightgbm"]["hyperparameters"]["feature_fraction"],
"bagging_fraction": best_params["lightgbm"]["hyperparameters"]["bagging_fraction"],
"bagging_freq": int(best_params["lightgbm"]["hyperparameters"]["bagging_freq"]),
"max_depth": int(best_params["lightgbm"]["hyperparameters"]["max_depth"]),
"min_child_samples": int(best_params["lightgbm"]["hyperparameters"]["min_child_samples"]),
"reg_alpha": best_params["lightgbm"]["hyperparameters"]["reg_alpha"],
"reg_lambda": best_params["lightgbm"]["hyperparameters"]["reg_lambda"],
"verbose": -1,
"seed": 42,
}
# Log hyperparameters to MLflow
for key, value in lgb_params.items():
mlflow.log_param(f"lgb_{key}", value)
lgb_train = lgb.Dataset(X_train, label=y_train, categorical_feature=cat_features)
lgb_model = lgb.train(
lgb_params, lgb_train, num_boost_round=1600, callbacks=[lgb.log_evaluation(period=0)]
)
lgb_time = time.time() - start_time
logger.info(f"LightGBM training complete in {lgb_time:.2f}s")
mlflow.log_metric("lgb_train_time_seconds", lgb_time)
return lgb_model
|
train_xgboost(X_train, y_train, best_params)
Train XGBoost model with best hyperparameters.
Parameters
X_train : pd.DataFrame
Training features
y_train : np.ndarray
Training targets
best_params : dict
Best hyperparameters from Optuna
Returns
xgb.Booster
Trained XGBoost model
Source code in src/models/train_ensemble.py
| def train_xgboost(X_train: pd.DataFrame, y_train: np.ndarray, best_params: dict) -> xgb.Booster:
"""Train XGBoost model with best hyperparameters.
Parameters
----------
X_train : pd.DataFrame
Training features
y_train : np.ndarray
Training targets
best_params : dict
Best hyperparameters from Optuna
Returns
-------
xgb.Booster
Trained XGBoost model
"""
logger.info("Training XGBoost...")
start_time = time.time()
# XGBoost needs categorical features as codes
X_train_xgb = X_train.copy()
for col in X_train_xgb.columns:
if X_train_xgb[col].dtype.name == "category":
X_train_xgb[col] = X_train_xgb[col].cat.codes
xgb_params = {
"objective": "reg:squarederror",
"eval_metric": "rmse",
"max_depth": int(best_params["xgboost"]["hyperparameters"]["max_depth"]),
"learning_rate": best_params["xgboost"]["hyperparameters"]["learning_rate"],
"subsample": best_params["xgboost"]["hyperparameters"]["subsample"],
"colsample_bytree": best_params["xgboost"]["hyperparameters"]["colsample_bytree"],
"min_child_weight": int(best_params["xgboost"]["hyperparameters"]["min_child_weight"]),
"reg_alpha": best_params["xgboost"]["hyperparameters"]["reg_alpha"],
"reg_lambda": best_params["xgboost"]["hyperparameters"]["reg_lambda"],
"gamma": best_params["xgboost"]["hyperparameters"]["gamma"],
"seed": 42,
"verbosity": 0,
}
# Log hyperparameters to MLflow
for key, value in xgb_params.items():
mlflow.log_param(f"xgb_{key}", value)
dtrain = xgb.DMatrix(X_train_xgb, label=y_train)
xgb_model = xgb.train(xgb_params, dtrain, num_boost_round=1600, verbose_eval=False)
xgb_time = time.time() - start_time
logger.info(f"XGBoost training complete in {xgb_time:.2f}s")
mlflow.log_metric("xgb_train_time_seconds", xgb_time)
return xgb_model
|
Model Validation
Model validation and promotion for Rossmann forecasting.
This module validates a candidate model against performance thresholds and promotes it to Staging if
it passes all checks.
Check if model meets performance thresholds.
metrics : dict
Model evaluation metrics
threshold_rmspe : float, default=0.10
Maximum acceptable RMSPE (10% error threshold)
strict : bool, default=False
If True, use stricter threshold (0.09856 - top 50 leaderboard)
bool
True if model passes threshold checks
Source code in src/models/validate_model.py
| def check_performance_threshold(
metrics: dict[str, float],
threshold_rmspe: float = 0.10,
strict: bool = False,
) -> bool:
"""Check if model meets performance thresholds.
Parameters
----------
metrics : dict
Model evaluation metrics
threshold_rmspe : float, default=0.10
Maximum acceptable RMSPE (10% error threshold)
strict : bool, default=False
If True, use stricter threshold (0.09856 - top 50 leaderboard)
Returns
-------
bool
True if model passes threshold checks
"""
if strict:
threshold_rmspe = 0.09856 # Top 50 Kaggle leaderboard
logger.info("=" * 70)
logger.info("PERFORMANCE THRESHOLD CHECKS")
logger.info("=" * 70)
# Check RMSPE
rmspe_pass = metrics["rmspe"] <= threshold_rmspe
logger.info(
f"RMSPE: {metrics['rmspe']:.6f} <= {threshold_rmspe:.6f} "
f"{'✓ PASS' if rmspe_pass else '✗ FAIL'}"
)
# Check for reasonable predictions (not extreme values)
predictions_reasonable = metrics["mae"] < 2000 # Average error less than $2000 seems reasonable
logger.info(
f"MAE: {metrics['mae']:.2f} < 2000 " f"{'✓ PASS' if predictions_reasonable else '✗ FAIL'}"
)
all_pass = rmspe_pass and predictions_reasonable
logger.info("=" * 70)
if all_pass:
logger.info("✓ ALL CHECKS PASSED")
else:
logger.info("✗ VALIDATION FAILED")
logger.info("=" * 70)
return all_pass
|
evaluate_model(model, X_test, y_test)
Evaluate model on test data.
Parameters
model : mlflow.pyfunc.PyFuncModel
Loaded model
X_test : pd.DataFrame
Test features
y_test : np.ndarray
Test targets
Returns
dict
Evaluation metrics
Source code in src/models/validate_model.py
| def evaluate_model(model, X_test: pd.DataFrame, y_test: np.ndarray) -> dict[str, float]:
"""Evaluate model on test data.
Parameters
----------
model : mlflow.pyfunc.PyFuncModel
Loaded model
X_test : pd.DataFrame
Test features
y_test : np.ndarray
Test targets
Returns
-------
dict
Evaluation metrics
"""
logger.info("Generating predictions...")
predictions = model.predict(X_test)
logger.info("Calculating metrics...")
metrics = {
"rmspe": float(rmspe(y_test, predictions)),
"rmse": float(np.sqrt(np.mean((y_test - predictions) ** 2))),
"mae": float(np.mean(np.abs(y_test - predictions))),
"mape": float(np.mean(np.abs((y_test - predictions) / y_test)) * 100),
}
logger.info("Model Performance:")
logger.info(f" RMSPE: {metrics['rmspe']:.6f}")
logger.info(f" RMSE: {metrics['rmse']:.2f}")
logger.info(f" MAE: {metrics['mae']:.2f}")
logger.info(f" MAPE: {metrics['mape']:.2f}%")
return metrics
|
load_holdout_data(data_path='data/processed/train_features.parquet', holdout_days=42)
Load holdout validation data.
Parameters
data_path : str
Path to processed features
holdout_days : int
Number of days for holdout set
Returns
tuple
(X_holdout, y_holdout, feature_cols)
Source code in src/models/validate_model.py
| def load_holdout_data(
data_path: str = "data/processed/train_features.parquet",
holdout_days: int = 42,
) -> tuple:
"""Load holdout validation data.
Parameters
----------
data_path : str
Path to processed features
holdout_days : int
Number of days for holdout set
Returns
-------
tuple
(X_holdout, y_holdout, feature_cols)
"""
logger.info(f"Loading holdout data from: {data_path}")
df = read_parquet(data_path)
# Create holdout split
max_date = df["Date"].max()
holdout_start = max_date - pd.Timedelta(days=holdout_days - 1)
holdout_df = df[df["Date"] >= holdout_start].copy()
# Filter to open stores
holdout_data = holdout_df[holdout_df["Open"] == 1].copy()
# Define features
exclude_cols = ["Sales", "Date", "Store", "Customers"]
feature_cols = [col for col in df.columns if col not in exclude_cols]
# Remove missing features
holdout_data, _ = remove_missing_features(holdout_data, feature_cols)
X_holdout = holdout_data[feature_cols]
y_holdout = holdout_data["Sales"].values
logger.info(f"Holdout data: {len(holdout_data):,} rows (open stores)")
logger.info(f"Date range: {holdout_data['Date'].min()} to {holdout_data['Date'].max()}")
return X_holdout, y_holdout, feature_cols
|
main()
Main validation workflow.
Source code in src/models/validate_model.py
| def main():
"""Main validation workflow."""
import argparse
parser = argparse.ArgumentParser(description="Validate and promote Rossmann ensemble model")
parser.add_argument("--model-name", default="rossmann-ensemble", help="Model name in registry")
parser.add_argument("--version", type=str, help="Model version to validate")
parser.add_argument(
"--threshold",
type=float,
default=0.10,
help="RMSPE threshold (default: 0.10)",
)
parser.add_argument(
"--strict",
action="store_true",
help="Use strict threshold (0.09856 - top 50)",
)
parser.add_argument(
"--no-auto-promote",
action="store_true",
help="Disable automatic promotion to Staging",
)
parser.add_argument(
"--promote-to-production",
action="store_true",
help="Promote Staging model to Production (manual step)",
)
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
if args.promote_to_production:
promote_to_production(model_name=args.model_name, version=args.version)
else:
results = validate_and_promote(
model_name=args.model_name,
version=args.version,
threshold_rmspe=args.threshold,
strict=args.strict,
auto_promote=not args.no_auto_promote,
)
# Print summary
print("\n" + "=" * 70)
print("VALIDATION RESULTS")
print("=" * 70)
print(f"Model: {results['model_name']} v{results['version']}")
print(f"RMSPE: {results['metrics']['rmspe']:.6f}")
print(f"Threshold: {results['threshold_rmspe']:.6f}")
print(f"Passed: {'✓ YES' if results['passed'] else '✗ NO'}")
print(f"Stage: {results['stage']}")
print("=" * 70)
if results["passed"] and results["promoted"]:
print("\n✓ Model validated and promoted to Staging!")
print("\nNext steps:")
print(" 1. Test Staging model in production-like environment")
print(" 2. If satisfied, promote to Production:")
print(
f" python src/models/validate_model.py --promote-to-production --version {results['version']}"
)
elif results["passed"]:
print("\n✓ Model passed validation!")
print("\nTo promote to Staging:")
print(f" python src/models/validate_model.py --version {results['version']}")
else:
print("\n✗ Model did not pass validation")
print("\nConsider:")
print(" - Retraining with more data")
print(" - Tuning hyperparameters further")
print(" - Adjusting ensemble weights")
|
Promote a Staging model to Production.
This should typically be done manually after testing the Staging model.
model_name : str
Name of registered model
version : str, optional
Model version to promote. If None, uses current Staging version
Source code in src/models/validate_model.py
| def promote_to_production(
model_name: str = "rossmann-ensemble",
version: Optional[str] = None,
) -> None:
"""Promote a Staging model to Production.
This should typically be done manually after testing the Staging model.
Parameters
----------
model_name : str
Name of registered model
version : str, optional
Model version to promote. If None, uses current Staging version
"""
logger.info("=" * 70)
logger.info("PROMOTE TO PRODUCTION")
logger.info("=" * 70)
# Get Staging version if not specified
if version is None:
version = get_model_version(model_name, stage="Staging")
if version is None:
raise ValueError(f"No Staging version found for {model_name}")
logger.info(f"Promoting Staging version: {version}")
# Promote to Production
logger.info(f"\nPromoting {model_name} v{version} to Production...")
promote_model(model_name, version=version, stage="Production")
logger.info("=" * 70)
logger.info(f"✓ {model_name} v{version} is now in Production")
logger.info("=" * 70)
|
Validate a model and promote to Staging if passing.
model_name : str
Name of registered model
version : str, optional
Model version to validate. If None, uses latest version
data_path : str
Path to processed data
threshold_rmspe : float
Performance threshold
strict : bool
Use strict threshold (0.09856)
auto_promote : bool
Automatically promote to Staging if passing
dict
Validation results including metrics and promotion status
Source code in src/models/validate_model.py
| def validate_and_promote(
model_name: str = "rossmann-ensemble",
version: Optional[str] = None,
data_path: str = "data/processed/train_features.parquet",
threshold_rmspe: float = 0.10,
strict: bool = False,
auto_promote: bool = True,
) -> dict[str, any]:
"""Validate a model and promote to Staging if passing.
Parameters
----------
model_name : str
Name of registered model
version : str, optional
Model version to validate. If None, uses latest version
data_path : str
Path to processed data
threshold_rmspe : float
Performance threshold
strict : bool
Use strict threshold (0.09856)
auto_promote : bool
Automatically promote to Staging if passing
Returns
-------
dict
Validation results including metrics and promotion status
"""
logger.info("=" * 70)
logger.info("MODEL VALIDATION & PROMOTION")
logger.info("=" * 70)
# Get version if not specified
if version is None:
# Find the latest version (any stage)
from mlflow.tracking import MlflowClient
client = MlflowClient()
versions = client.search_model_versions(f"name='{model_name}'")
if not versions:
raise ValueError(f"No versions found for model: {model_name}")
version = max([int(mv.version) for mv in versions])
logger.info(f"Using latest version: {version}")
else:
logger.info(f"Validating version: {version}")
# Load model
logger.info(f"\nLoading model: {model_name} version {version}")
model = load_model(model_name, stage=str(version))
# Load holdout data
logger.info("\nLoading holdout validation data...")
X_holdout, y_holdout, feature_cols = load_holdout_data(data_path)
# Evaluate model
logger.info("\nEvaluating model performance...")
metrics = evaluate_model(model, X_holdout, y_holdout)
# Check thresholds
logger.info("\nChecking performance thresholds...")
passed = check_performance_threshold(metrics, threshold_rmspe, strict)
# Promotion decision
promoted = False
if passed and auto_promote:
logger.info("\n✓ Model passed validation, promoting to Staging...")
promote_model(model_name, version=str(version), stage="Staging")
promoted = True
logger.info(f"✓ Model {model_name} v{version} promoted to Staging")
elif passed:
logger.info("\n✓ Model passed validation (auto_promote=False, manual promotion required)")
else:
logger.info("\n✗ Model did not pass validation, not promoting")
results = {
"model_name": model_name,
"version": version,
"metrics": metrics,
"threshold_rmspe": threshold_rmspe if not strict else 0.09856,
"passed": passed,
"promoted": promoted,
"stage": "Staging" if promoted else "None",
}
logger.info("=" * 70)
logger.info("VALIDATION SUMMARY")
logger.info("=" * 70)
logger.info(f"Model: {model_name} v{version}")
logger.info(f"RMSPE: {metrics['rmspe']:.6f}")
logger.info(f"Passed: {passed}")
logger.info(f"Promoted to Staging: {promoted}")
logger.info("=" * 70)
return results
|
Inference Pipeline
Production inference pipeline for Rossmann forecasting.
This module provides functionality to load the production model from MLflow and generate predictions
on new data.
generate_predictions(model, X_features, data_df)
Generate predictions using loaded model.
Parameters
model : mlflow.pyfunc.PyFuncModel
Loaded production model
X_features : pd.DataFrame
Feature matrix for predictions
data_df : pd.DataFrame
Original data with Store, Date, etc.
Returns
pd.DataFrame
Predictions with metadata (Store, Date, Predicted_Sales)
Source code in src/models/predict.py
| def generate_predictions(
model,
X_features: pd.DataFrame,
data_df: pd.DataFrame,
) -> pd.DataFrame:
"""Generate predictions using loaded model.
Parameters
----------
model : mlflow.pyfunc.PyFuncModel
Loaded production model
X_features : pd.DataFrame
Feature matrix for predictions
data_df : pd.DataFrame
Original data with Store, Date, etc.
Returns
-------
pd.DataFrame
Predictions with metadata (Store, Date, Predicted_Sales)
"""
logger.info(f"Generating predictions for {len(X_features):,} rows...")
# Generate predictions
predictions = model.predict(X_features)
# Create predictions dataframe
predictions_df = pd.DataFrame(
{
"Store": data_df["Store"].values,
"Date": data_df["Date"].values,
"DayOfWeek": data_df["DayOfWeek"].values if "DayOfWeek" in data_df.columns else None,
"Predicted_Sales": predictions,
}
)
# Add actual sales if available (for comparison)
if "Sales" in data_df.columns:
predictions_df["Actual_Sales"] = data_df["Sales"].values
predictions_df["Prediction_Error"] = (
predictions_df["Actual_Sales"] - predictions_df["Predicted_Sales"]
)
predictions_df["Absolute_Percentage_Error"] = (
np.abs(predictions_df["Prediction_Error"]) / predictions_df["Actual_Sales"]
) * 100
logger.info(f"✓ Generated {len(predictions):,} predictions")
logger.info(f" Prediction range: ${predictions.min():.2f} to ${predictions.max():.2f}")
if "Actual_Sales" in predictions_df.columns:
from evaluation.metrics import rmspe
rmspe_score = rmspe(
predictions_df["Actual_Sales"].values,
predictions_df["Predicted_Sales"].values,
)
logger.info(f" RMSPE: {rmspe_score:.6f}")
return predictions_df
|
load_inference_data(data_path='data/processed/train_features.parquet', start_date=None, end_date=None)
Load data for inference.
Parameters
data_path : str
Path to processed features parquet file
start_date : str, optional
Start date for predictions (YYYY-MM-DD). If None, uses all data
end_date : str, optional
End date for predictions (YYYY-MM-DD). If None, uses all data
Returns
tuple
(data_df, X_features, feature_cols)
Source code in src/models/predict.py
| def load_inference_data(
data_path: str = "data/processed/train_features.parquet",
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> tuple:
"""Load data for inference.
Parameters
----------
data_path : str
Path to processed features parquet file
start_date : str, optional
Start date for predictions (YYYY-MM-DD). If None, uses all data
end_date : str, optional
End date for predictions (YYYY-MM-DD). If None, uses all data
Returns
-------
tuple
(data_df, X_features, feature_cols)
"""
logger.info(f"Loading data from: {data_path}")
df = read_parquet(data_path)
# Filter by date range if specified
if start_date:
df = df[df["Date"] >= pd.to_datetime(start_date)]
logger.info(f"Filtered to start_date >= {start_date}")
if end_date:
df = df[df["Date"] <= pd.to_datetime(end_date)]
logger.info(f"Filtered to end_date <= {end_date}")
# Filter to open stores only (predictions only needed for open stores)
df_open = df[df["Open"] == 1].copy()
logger.info(f"Loaded {len(df_open):,} rows (open stores)")
logger.info(f"Date range: {df_open['Date'].min()} to {df_open['Date'].max()}")
# Define feature columns
exclude_cols = ["Sales", "Date", "Store", "Customers"]
feature_cols = [col for col in df.columns if col not in exclude_cols]
# Handle missing features (if any)
df_open = df_open.dropna(subset=feature_cols)
X_features = df_open[feature_cols]
logger.info(f"Features: {len(feature_cols)}")
return df_open, X_features, feature_cols
|
main()
Main inference workflow with CLI.
Source code in src/models/predict.py
| def main():
"""Main inference workflow with CLI."""
import argparse
parser = argparse.ArgumentParser(
description="Generate predictions using production Rossmann model"
)
parser.add_argument("--model-name", default="rossmann-ensemble", help="Model name in registry")
parser.add_argument(
"--stage",
default="Production",
help="Model stage ('Production', 'Staging', or version number)",
)
parser.add_argument(
"--data-path",
default="data/processed/train_features.parquet",
help="Input data path",
)
parser.add_argument(
"--output-path",
default="outputs/predictions/production_predictions.csv",
help="Output predictions path",
)
parser.add_argument("--start-date", help="Start date for predictions (YYYY-MM-DD)")
parser.add_argument("--end-date", help="End date for predictions (YYYY-MM-DD)")
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
predictions_df = predict(
model_name=args.model_name,
stage=args.stage,
data_path=args.data_path,
output_path=args.output_path,
start_date=args.start_date,
end_date=args.end_date,
)
# Print summary statistics
print("\n" + "=" * 70)
print("PREDICTION SUMMARY")
print("=" * 70)
print(f"Total predictions: {len(predictions_df):,}")
print(f"Date range: {predictions_df['Date'].min()} to {predictions_df['Date'].max()}")
print(f"Stores: {predictions_df['Store'].nunique():,} unique stores")
print(
f"Predicted sales range: ${predictions_df['Predicted_Sales'].min():.2f} to ${predictions_df['Predicted_Sales'].max():.2f}"
)
if "Actual_Sales" in predictions_df.columns:
print("\nActual vs Predicted:")
print(f" Mean Actual: ${predictions_df['Actual_Sales'].mean():.2f}")
print(f" Mean Predicted: ${predictions_df['Predicted_Sales'].mean():.2f}")
print(f" Mean APE: {predictions_df['Absolute_Percentage_Error'].mean():.2f}%")
print("=" * 70)
|
predict(model_name='rossmann-ensemble', stage='Production', data_path='data/processed/train_features.parquet', output_path='outputs/predictions/production_predictions.csv', start_date=None, end_date=None)
Main prediction pipeline.
Parameters
model_name : str
Name of registered model in MLflow
stage : str
Model stage to use ('Production', 'Staging', or version number)
data_path : str
Path to input data
output_path : str
Path to save predictions
start_date : str, optional
Start date for predictions (YYYY-MM-DD)
end_date : str, optional
End date for predictions (YYYY-MM-DD)
Returns
pd.DataFrame
Predictions dataframe
Examples
Predict using Production model
predictions = predict(
... model_name='rossmann-ensemble',
... stage='Production',
... data_path='data/processed/train_features.parquet'
... )
Predict for specific date range
predictions = predict(
... model_name='rossmann-ensemble',
... stage='Production',
... start_date='2015-07-01',
... end_date='2015-07-31'
... )
Source code in src/models/predict.py
| def predict(
model_name: str = "rossmann-ensemble",
stage: str = "Production",
data_path: str = "data/processed/train_features.parquet",
output_path: str = "outputs/predictions/production_predictions.csv",
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> pd.DataFrame:
"""Main prediction pipeline.
Parameters
----------
model_name : str
Name of registered model in MLflow
stage : str
Model stage to use ('Production', 'Staging', or version number)
data_path : str
Path to input data
output_path : str
Path to save predictions
start_date : str, optional
Start date for predictions (YYYY-MM-DD)
end_date : str, optional
End date for predictions (YYYY-MM-DD)
Returns
-------
pd.DataFrame
Predictions dataframe
Examples
--------
>>> # Predict using Production model
>>> predictions = predict(
... model_name='rossmann-ensemble',
... stage='Production',
... data_path='data/processed/train_features.parquet'
... )
>>>
>>> # Predict for specific date range
>>> predictions = predict(
... model_name='rossmann-ensemble',
... stage='Production',
... start_date='2015-07-01',
... end_date='2015-07-31'
... )
"""
logger.info("=" * 70)
logger.info("PRODUCTION INFERENCE PIPELINE")
logger.info("=" * 70)
# Get model version
if stage in ["Production", "Staging"]:
version = get_model_version(model_name, stage=stage)
logger.info(f"Using {stage} model version: {version}")
else:
version = stage # Assume it's a version number
logger.info(f"Using model version: {version}")
# Load model
logger.info(f"\nLoading model: {model_name} ({stage})")
model = load_model(model_name, stage=stage)
# Load inference data
logger.info("\nLoading inference data...")
data_df, X_features, feature_cols = load_inference_data(data_path, start_date, end_date)
# Generate predictions
logger.info("\nGenerating predictions...")
predictions_df = generate_predictions(model, X_features, data_df)
# Save predictions
logger.info("\nSaving predictions...")
save_predictions(
predictions_df,
output_path,
model_version=version,
metadata={
"model_name": model_name,
"model_stage": stage,
"data_path": data_path,
},
)
logger.info("=" * 70)
logger.info("✓ PREDICTION PIPELINE COMPLETE")
logger.info("=" * 70)
logger.info(f"Predictions saved to: {output_path}")
logger.info(f"Total predictions: {len(predictions_df):,}")
return predictions_df
|
save_predictions(predictions_df, output_path, model_version, metadata=None)
Save predictions to CSV with metadata.
Parameters
predictions_df : pd.DataFrame
Predictions dataframe
output_path : str
Output file path
model_version : str
Model version used for predictions
metadata : dict, optional
Additional metadata to save
Source code in src/models/predict.py
| def save_predictions(
predictions_df: pd.DataFrame,
output_path: str,
model_version: str,
metadata: Optional[dict] = None,
) -> None:
"""Save predictions to CSV with metadata.
Parameters
----------
predictions_df : pd.DataFrame
Predictions dataframe
output_path : str
Output file path
model_version : str
Model version used for predictions
metadata : dict, optional
Additional metadata to save
"""
# Ensure output directory exists
output_file = Path(output_path)
ensure_dir(output_file.parent)
# Save predictions
predictions_df.to_csv(output_path, index=False)
logger.info(f"✓ Saved predictions to: {output_path}")
# Save metadata
if metadata is None:
metadata = {}
metadata.update(
{
"model_version": model_version,
"prediction_date": datetime.now().isoformat(),
"n_predictions": len(predictions_df),
"date_range": {
"start": str(predictions_df["Date"].min()),
"end": str(predictions_df["Date"].max()),
},
}
)
metadata_path = output_file.parent / f"{output_file.stem}_metadata.json"
import json
with open(metadata_path, "w") as f:
json.dump(metadata, f, indent=2)
logger.info(f"✓ Saved metadata to: {metadata_path}")
|
Usage Examples
Training Baseline Models
# From command line
python -m src.models.train_baselines
# This will:
# 1. Load data/processed/train_features.parquet
# 2. Create time-series CV folds
# 3. Train naive and simple LightGBM models
# 4. Log results to MLflow
# 5. Print RMSPE scores
# From Python
from src.models.train_baselines import train_naive_model, train_simple_lgbm
# Train naive baseline (last week's sales)
rmspe_naive = train_naive_model(df)
print(f"Naive RMSPE: {rmspe_naive:.4f}")
# Train simple LightGBM
rmspe_lgbm = train_simple_lgbm(df)
print(f"Simple LightGBM RMSPE: {rmspe_lgbm:.4f}")
Training Advanced Models
# From command line
python -m src.models.train_advanced
# This will:
# 1. Load features and CV folds
# 2. Train tuned LightGBM, XGBoost, CatBoost
# 3. Perform hyperparameter optimization
# 4. Log all experiments to MLflow
# 5. Save best models to outputs/models/
# From Python
from src.models.train_advanced import train_tuned_lgbm, train_xgboost, train_catboost
# Train tuned models
lgbm_rmspe = train_tuned_lgbm(df, params)
xgb_rmspe = train_xgboost(df, params)
cat_rmspe = train_catboost(df, params)
Creating Ensembles
from src.models.ensembles import weighted_blend, stacked_ensemble
# Weighted blend (simple average or optimized weights)
ensemble_preds = weighted_blend(
predictions=[lgbm_preds, xgb_preds, cat_preds],
weights=[0.5, 0.3, 0.2] # Or optimize with cv
)
# Stacked ensemble (meta-learner)
stacked_preds = stacked_ensemble(
oof_predictions=[lgbm_oof, xgb_oof, cat_oof],
test_predictions=[lgbm_test, xgb_test, cat_test],
y_train=y_train,
meta_model="lgbm" # or "linear"
)
Model Types
Baseline Models
Naive Last-Week Model
Predicts today's sales = sales from 7 days ago (same day of week last week).
Purpose: Establishes minimum performance threshold.
Expected RMSPE: ~0.15-0.20
Simple LightGBM
LightGBM with default parameters, no tuning.
Features used: All standard features from feature engineering
Expected RMSPE: ~0.12-0.15
Advanced Models
Tuned LightGBM
LightGBM with optimized hyperparameters:
num_leaves: 31-127
learning_rate: 0.01-0.1
feature_fraction: 0.7-1.0
bagging_fraction: 0.7-1.0
min_child_samples: 20-100
Expected RMSPE: ~0.10-0.11
XGBoost
XGBoost with tuned parameters:
max_depth: 3-10
learning_rate: 0.01-0.1
subsample: 0.7-1.0
colsample_bytree: 0.7-1.0
Expected RMSPE: ~0.10-0.11
CatBoost
CatBoost with categorical feature handling:
- Automatically handles
StoreType, Assortment, etc.
depth: 4-10
learning_rate: 0.01-0.1
Expected RMSPE: ~0.10-0.11
Ensemble Models
Weighted Blend
Simple weighted average of model predictions.
Weights optimization:
from scipy.optimize import minimize
def objective(weights):
blend = np.average(predictions, axis=0, weights=weights)
return rmspe(y_true, blend)
result = minimize(objective, initial_weights, constraints=constraints)
optimal_weights = result.x
Expected RMSPE: ~0.095-0.100
Stacked Ensemble
Uses out-of-fold predictions as features for a meta-learner.
Architecture:
Base Models (LightGBM, XGBoost, CatBoost)
↓
Out-of-Fold Predictions
↓
Meta-Model (Linear or LightGBM)
↓
Final Predictions
Expected RMSPE: ~0.095-0.098
MLflow Integration
All models are logged to MLflow with:
- Parameters: Hyperparameters used
- Metrics: RMSPE per fold and average
- Artifacts: Model files, feature importance plots
- Tags: Model type, experiment name
import mlflow
with mlflow.start_run(run_name="tuned_lgbm"):
# Log parameters
mlflow.log_params(params)
# Train model
model = train_model(X_train, y_train, params)
# Log metrics
mlflow.log_metric("rmspe", rmspe_score)
# Log model
mlflow.lightgbm.log_model(model, "model")
Model Evaluation Flow
flowchart TD
A[train_features.parquet] --> B[Time-Series CV Splits]
B --> C[Fold 1: Train → Validate]
B --> D[Fold 2: Train → Validate]
B --> E[Fold 3: Train → Validate]
C --> F[Calculate RMSPE per Fold]
D --> F
E --> F
F --> G[Average RMSPE]
G --> H[MLflow Logging]
H --> I[Model Selection]
Key Functions
train_baselines.py
train_naive_model() - Last-week baseline
train_simple_lgbm() - LightGBM with defaults
train_advanced.py
train_tuned_lgbm() - Hyperparameter-tuned LightGBM
train_xgboost() - XGBoost model
train_catboost() - CatBoost model
ensembles.py
weighted_blend() - Weighted average of predictions
stacked_ensemble() - Meta-learner stacking
optimize_blend_weights() - Find optimal ensemble weights