ml-engineering
Expert ML engineering patterns for production systems: model serving architectures, feature stores, experiment tracking with MLflow, training-serving skew prevention, model drift monitoring, and GPU-optimized Kubernetes deployments. Trigger phrases: deploying ML models, feature pipeline,
ML Engineering
Production ML systems fail not because of bad models, but because of bad engineering. Training accuracy means nothing if your serving pipeline corrupts features, your model drifts silently, or your infrastructure can't handle real traffic. This skill covers the patterns that separate ML demos from production ML systems.
Core Mental Model
Think of a production ML system as three separate concerns that must stay synchronized: training (where models are built), serving (where predictions happen), and monitoring (where you learn if serving is broken). The most common production failure — training-serving skew — happens when these three components diverge. Every architectural decision should be evaluated against the question: "Will this keep training and serving in sync at feature computation time?"
Online vs Batch Serving
Batch serving (offline predictions):
- Pre-compute predictions for all entities; store in a lookup table
- Latency: doesn't matter (minutes to hours acceptable)
- Use when: recommendations for email campaigns, next-day forecasting, report generation
- Risk: predictions go stale between batch runs
Online serving (real-time predictions):
- Model loaded in memory; requests scored synchronously
- Latency: must be <100ms p99 for user-facing; <10ms for critical paths
- Use when: fraud detection, search ranking, personalization at click time
- Risk: scaling challenges, dependency on feature freshness
Lambda architecture (hybrid):
- Batch predictions as fallback + online for fresh signals
- E.g.: batch-computed user embeddings (stable) + real-time session features (fresh)
# Decision tree: online vs batch
def choose_serving_pattern(latency_sla_ms, feature_freshness_required, qps):
if latency_sla_ms < 50:
return "online_lightweight" # simple features, fast model (linear, shallow tree)
elif latency_sla_ms < 500 and feature_freshness_required == "real-time":
return "online_full" # full feature pipeline, GPU if needed
elif feature_freshness_required == "daily":
return "batch_with_online_fallback"
else:
return "batch"
Feature Store Architecture
The feature store is the bridge between training and serving. Without it, you'll rewrite feature logic twice (once in Python for training, once in Java/Go for serving), and they'll eventually diverge.
┌─────────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ Feature Pipelines │──▶│ Feature Store │──▶│ Training Jobs │
│ (Spark/Flink) │ │ ┌────────────┐ │ └──────────────────┘
└─────────────────┘ │ │ Offline │ │ ┌──────────────────┐
│ │ (S3/Hive) │ │──▶│ Serving (Redis) │
│ ├────────────┤ │ └──────────────────┘
│ │ Online │ │
│ │ (Redis) │ │
│ └────────────┘ │
└─────────────────┘
# Feast feature store example
from feast import FeatureStore, Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64
from datetime import timedelta
# Define entity
user = Entity(name="user_id", join_keys=["user_id"])
# Define feature source (same for training and serving)
user_features_source = FileSource(
path="s3://my-bucket/user_features.parquet",
timestamp_field="event_timestamp",
)
# Define feature view
user_feature_view = FeatureView(
name="user_features",
entities=[user],
ttl=timedelta(days=1),
schema=[
Field(name="purchase_count_7d", dtype=Int64),
Field(name="avg_order_value", dtype=Float32),
Field(name="days_since_last_purchase", dtype=Int64),
],
source=user_features_source,
)
store = FeatureStore(repo_path=".")
# Training: point-in-time correct feature retrieval
training_df = store.get_historical_features(
entity_df=training_labels_df, # has user_id + event_timestamp
features=["user_features:purchase_count_7d", "user_features:avg_order_value"],
).to_df()
# Serving: low-latency online lookup (same feature logic)
online_features = store.get_online_features(
features=["user_features:purchase_count_7d", "user_features:avg_order_value"],
entity_rows=[{"user_id": "user_123"}],
).to_dict()
Training-Serving Skew Prevention
This is the #1 production ML failure mode. It happens when the features computed at training time differ from those computed at serving time — even subtly.
# BAD: Feature logic duplicated in two places
# training_pipeline.py
def compute_user_age_days(signup_date, reference_date):
return (reference_date - signup_date).days
# serving_api.py (written 6 months later by different team)
def compute_user_age_days(signup_date):
return (datetime.now() - signup_date).days # Bug: timezone issue, different reference point
# GOOD: Single source of truth for feature computation
# features/user_features.py (used by BOTH training and serving)
def compute_user_features(user_id: str, reference_timestamp: datetime) -> dict:
"""
Canonical feature computation. Import this in training pipeline AND serving API.
Never reimplement feature logic — always import from here.
"""
user = db.get_user(user_id)
return {
"account_age_days": (reference_timestamp - user.signup_date).days,
"purchase_count_30d": db.count_purchases(user_id, reference_timestamp, days=30),
"is_premium": user.subscription_tier == "premium",
}
MLflow Experiment Tracking
import mlflow
import mlflow.sklearn
from mlflow.models import infer_signature
mlflow.set_experiment("fraud-detection-v2")
with mlflow.start_run(run_name="xgboost-baseline"):
# Log parameters
params = {
"n_estimators": 200,
"max_depth": 6,
"learning_rate": 0.1,
"subsample": 0.8,
"colsample_bytree": 0.8,
}
mlflow.log_params(params)
# Train
model = XGBClassifier(**params)
model.fit(X_train, y_train, eval_set=[(X_val, y_val)], early_stopping_rounds=20)
# Log metrics
y_pred = model.predict_proba(X_test)[:, 1]
mlflow.log_metric("auc_roc", roc_auc_score(y_test, y_pred))
mlflow.log_metric("avg_precision", average_precision_score(y_test, y_pred))
mlflow.log_metric("best_iteration", model.best_iteration)
# Log the model with signature and input example
signature = infer_signature(X_train, model.predict(X_train))
mlflow.sklearn.log_model(
model,
"model",
signature=signature,
input_example=X_train.head(5),
registered_model_name="fraud-detector",
)
# Log feature importance as artifact
importance_df = pd.DataFrame({
"feature": X_train.columns,
"importance": model.feature_importances_,
}).sort_values("importance", ascending=False)
importance_df.to_csv("feature_importance.csv", index=False)
mlflow.log_artifact("feature_importance.csv")
# Promote to staging after evaluation
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="fraud-detector",
version=3,
stage="Staging",
archive_existing_versions=False,
)
Model Drift Detection
Data drift: Input feature distribution has changed (population shift, data pipeline bug).
Concept drift: Relationship between features and target has changed (real-world change, adversarial shift).
# PSI (Population Stability Index) for data drift detection
import numpy as np
def compute_psi(expected, actual, buckets=10, epsilon=1e-6):
"""
PSI < 0.1: No significant drift
PSI 0.1-0.2: Moderate drift — monitor closely
PSI > 0.2: Significant drift — investigate, consider retraining
"""
breakpoints = np.percentile(expected, np.linspace(0, 100, buckets + 1))
breakpoints[0] = -np.inf
breakpoints[-1] = np.inf
expected_counts = np.histogram(expected, bins=breakpoints)[0] / len(expected)
actual_counts = np.histogram(actual, bins=breakpoints)[0] / len(actual)
# Clip to avoid log(0)
expected_counts = np.clip(expected_counts, epsilon, None)
actual_counts = np.clip(actual_counts, epsilon, None)
psi = np.sum((actual_counts - expected_counts) * np.log(actual_counts / expected_counts))
return psi
# KS test for distribution shift
from scipy import stats
def check_drift(reference_data: pd.DataFrame, current_data: pd.DataFrame,
threshold_psi=0.2, threshold_ks_pvalue=0.05) -> dict:
drift_report = {}
for col in reference_data.columns:
psi = compute_psi(reference_data[col], current_data[col])
ks_stat, ks_pvalue = stats.ks_2samp(reference_data[col], current_data[col])
drift_report[col] = {
"psi": psi,
"psi_alert": psi > threshold_psi,
"ks_pvalue": ks_pvalue,
"ks_alert": ks_pvalue < threshold_ks_pvalue,
}
return drift_report
Shadow Mode + Canary Deployment
# Shadow mode: new model runs alongside production, results not served but logged
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.post("/predict")
async def predict(request: PredictionRequest):
# Production model always wins
prod_prediction = production_model.predict(request.features)
# Shadow model runs in background, result discarded
asyncio.create_task(
shadow_model_predict_and_log(request, shadow_model)
)
return {"prediction": prod_prediction, "model_version": "v1.2"}
async def shadow_model_predict_and_log(request, shadow_model):
try:
shadow_result = shadow_model.predict(request.features)
# Log for offline comparison — never serve this
metrics_client.record({
"shadow_prediction": shadow_result,
"prod_prediction": prod_prediction, # from closure
"request_id": request.id,
})
except Exception as e:
logger.warning(f"Shadow model failed: {e}") # Never affects users
Kubernetes + KEDA for GPU Auto-scaling
# keda-gpu-scaler.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: ml-inference-scaler
spec:
scaleTargetRef:
name: ml-inference-deployment
minReplicaCount: 1
maxReplicaCount: 10
triggers:
- type: prometheus
metadata:
serverAddress: http://prometheus:9090
metricName: inference_queue_depth
threshold: "5" # Scale up when >5 requests queued per pod
query: sum(inference_queue_size) / count(kube_pod_info{pod=~"ml-inference.*"})
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-inference-deployment
spec:
template:
spec:
containers:
- name: inference-server
image: my-registry/ml-model:v1.2
resources:
limits:
nvidia.com/gpu: "1"
requests:
nvidia.com/gpu: "1"
memory: "8Gi"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
ONNX Cross-Framework Deployment
# Export PyTorch model to ONNX for framework-agnostic serving
import torch
import onnx
import onnxruntime as ort
# Export
dummy_input = torch.randn(1, 3, 224, 224) # batch_size=1, channels=3, H=224, W=224
torch.onnx.export(
model,
dummy_input,
"model.onnx",
opset_version=17,
input_names=["input"],
output_names=["output"],
dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
)
# Validate
onnx_model = onnx.load("model.onnx")
onnx.checker.check_model(onnx_model)
# Inference with ONNX Runtime (faster than PyTorch for CPU)
session = ort.InferenceSession(
"model.onnx",
providers=["CUDAExecutionProvider", "CPUExecutionProvider"], # GPU with CPU fallback
)
outputs = session.run(None, {"input": input_array})
Anti-Patterns
❌ Recomputing features differently in training vs serving
Each reimplementation diverges. Use a shared feature library or feature store.
❌ Using datetime.now() in feature computation
Never use wall-clock time in features. Always pass reference_timestamp explicitly — this is what breaks point-in-time correctness in training.
❌ No baseline model
Always have a simple baseline (moving average, most-frequent-class, rule-based). If your ML model doesn't beat it, you have a data or framing problem, not a model problem.
❌ Monitoring only model metrics, not data pipeline health
Null rates, schema changes, and distribution shifts in your feature pipeline are invisible without explicit monitoring. 80% of "model degradation" is actually pipeline degradation.
❌ Loading model from disk on every request
Load at startup, cache in memory. Model loading can take seconds.
❌ Training on future data leakage
Always perform train/test split by time, not randomly, for time-series problems. Random split gives optimistic results that won't generalize.
Quick Reference
Serving pattern decision:
p99 < 50ms + simple features → Lightweight REST (FastAPI + sklearn)
p99 < 500ms + complex features → Online serving (FastAPI + GPU + feature store)
p99 < 5000ms → Async queue (Celery/Redis + model worker)
Freshness > 1 hour OK → Batch predictions + lookup table
Drift thresholds:
PSI < 0.1 → No action
PSI 0.1-0.2 → Alert, monitor daily
PSI > 0.2 → Investigate + consider retrain
KS p < 0.05 → Statistically significant shift
Deployment stages:
Shadow mode (0% traffic, log only) → Canary (5%) → Staged (25%) → Full rollout
MLflow model stages:
None → Staging (after eval) → Production (after canary) → Archived (after replacement)Skill Information
- Source
- MoltbotDen
- Category
- AI & LLMs
- Repository
- View on GitHub
Related Skills
rag-architect
Design and implement production-grade Retrieval-Augmented Generation (RAG) systems. Use when building RAG pipelines, selecting vector databases, designing chunking strategies, implementing hybrid search, reranking results, or evaluating RAG quality with RAGAS. Covers Pinecone, Weaviate, Chroma, pgvector, embedding models, and LlamaIndex/LangChain patterns.
MoltbotDenllm-evaluation
Evaluate and improve LLM applications in production. Use when building LLM evaluation pipelines, measuring RAG quality, detecting hallucinations, benchmarking models, implementing LLMOps monitoring, selecting evaluation frameworks (RAGAS, Promptfoo, Langsmith, Braintrust), or designing human feedback loops. Covers evals-as-code, metric design, and continuous quality measurement.
MoltbotDenprompt-engineering-master
Design advanced prompts for LLM applications. Use when building complex AI workflows, implementing chain-of-thought reasoning, creating multi-step agents, designing system prompts, implementing structured outputs, reducing hallucination, or optimizing prompt performance. Covers CoT, ReAct, Constitutional AI, few-shot design, meta-prompting, and production prompt management.
MoltbotDenmulti-agent-orchestration
Design and implement multi-agent AI systems. Use when building agent networks, implementing orchestrator-worker patterns, designing agent communication protocols, managing shared memory between agents, implementing task decomposition, handling agent failures, or building agentic pipelines. Covers LangGraph, CrewAI, AutoGen, custom orchestration, and A2A protocol patterns.
MoltbotDenclaude-api-expert
Expert-level Anthropic Claude API usage: Messages API structure, model selection (Haiku vs Sonnet vs Opus), tool use with parallel calls, extended thinking, vision, streaming with content block events, prompt caching with cache_control, context window management, and
MoltbotDen