🎓 Complete all tutorials to earn your Free MLOps Engineer Certificate
Shareable on LinkedIn • Verified by AITutorials.site • No signup fee
🏢 Enterprise ML Challenges
Your ML system is deployed and working. But as you scale:
- Features computed differently in training vs serving (training-serving skew)
- Multiple teams building duplicate feature pipelines
- No visibility into which models use which data
- Compliance requirements: explainability, fairness, data privacy
- Cloud costs spiraling out of control
- Security vulnerabilities in model APIs
- No documentation - only original developer understands the system
Production best practices solve these enterprise challenges with feature stores, governance frameworks, cost optimization, and systematic documentation.
💡 Production ML Pillars:
- Feature Stores: Centralized feature management
- Model Governance: Lineage, versioning, approvals
- Compliance: Explainability, fairness, privacy
- Cost Optimization: Resource management
- Security: Authentication, authorization, encryption
- Documentation: Model cards, data sheets, runbooks
🏪 Feature Stores
What is a Feature Store?
A feature store is a centralized repository for ML features that:
- Stores feature definitions and transformations
- Serves features consistently for training and inference
- Prevents training-serving skew
- Enables feature reuse across teams
- Provides point-in-time correct features for training
- Tracks feature lineage and usage
Feast (Open Source Feature Store)
Installation
pip install feast
Define Feature Repository
"""
Feature definitions with Feast
"""
# feature_repo/features.py
from datetime import timedelta
from feast import Entity, Feature, FeatureView, ValueType
from feast.data_source import FileSource
# Define entity (unique identifier)
user = Entity(
name="user_id",
value_type=ValueType.INT64,
description="User ID"
)
# Data source
user_features_source = FileSource(
path="data/user_features.parquet",
event_timestamp_column="event_timestamp",
)
# Feature view
user_features = FeatureView(
name="user_features",
entities=["user_id"],
ttl=timedelta(days=1),
features=[
Feature(name="age", dtype=ValueType.INT64),
Feature(name="total_purchases", dtype=ValueType.INT64),
Feature(name="avg_purchase_amount", dtype=ValueType.FLOAT),
Feature(name="days_since_last_purchase", dtype=ValueType.INT64),
],
online=True,
source=user_features_source,
tags={"team": "ml", "project": "churn_prediction"},
)
Initialize Feature Store
# feature_repo/feature_store.yaml
project: ml_project
registry: data/registry.db
provider: local
online_store:
type: sqlite
path: data/online_store.db
# Apply feature definitions
cd feature_repo
feast apply
Materialize Features (Training)
"""
Get historical features for training
"""
from feast import FeatureStore
from datetime import datetime
import pandas as pd
store = FeatureStore(repo_path="feature_repo")
# Entity dataframe (users and timestamps)
entity_df = pd.DataFrame({
"user_id": [1001, 1002, 1003],
"event_timestamp": [
datetime(2024, 1, 1),
datetime(2024, 1, 2),
datetime(2024, 1, 3),
]
})
# Get historical features (point-in-time correct)
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_features:age",
"user_features:total_purchases",
"user_features:avg_purchase_amount",
"user_features:days_since_last_purchase",
],
).to_df()
print(training_df)
Serve Features (Inference)
"""
Get online features for real-time prediction
"""
# Materialize features to online store
from datetime import datetime, timedelta
store.materialize(
start_date=datetime.now() - timedelta(days=7),
end_date=datetime.now()
)
# Get online features (low latency)
@app.post("/predict")
async def predict(user_id: int):
"""Prediction endpoint with Feast features"""
# Fetch features from online store
features = store.get_online_features(
features=[
"user_features:age",
"user_features:total_purchases",
"user_features:avg_purchase_amount",
"user_features:days_since_last_purchase",
],
entity_rows=[{"user_id": user_id}],
).to_dict()
# Convert to model input
feature_vector = [
features['age'][0],
features['total_purchases'][0],
features['avg_purchase_amount'][0],
features['days_since_last_purchase'][0],
]
# Predict
prediction = model.predict([feature_vector])[0]
return {"user_id": user_id, "churn_probability": float(prediction)}
Tecton (Enterprise Feature Store)
"""
Tecton feature definitions with transformations
"""
from tecton import Entity, Feature, FeatureView, transformation
from tecton.types import Field, String, Int64, Float64
from datetime import timedelta
# Define entity
user = Entity(
name="user",
join_keys=[Field("user_id", Int64)],
description="User entity"
)
# Feature transformation
@transformation(mode="pandas")
def user_transaction_stats(transactions):
"""Compute user transaction statistics"""
return transactions.groupby('user_id').agg({
'amount': ['count', 'mean', 'sum'],
'timestamp': 'max'
}).reset_index()
# Feature view with transformation
user_transaction_features = FeatureView(
name="user_transaction_features",
entities=[user],
features=[
Feature("total_transactions", Int64),
Feature("avg_transaction_amount", Float64),
Feature("total_spend", Float64),
],
transformation=user_transaction_stats,
ttl=timedelta(days=30),
online=True,
offline=True,
feature_start_time=datetime(2024, 1, 1),
)
# Real-time features
@transformation(mode="python", input=RequestDataSource())
def user_context_features(request):
"""Real-time request features"""
return {
'hour_of_day': request['timestamp'].hour,
'day_of_week': request['timestamp'].weekday(),
'device_type': request['device'],
}
user_context = FeatureView(
name="user_context",
entities=[user],
features=[
Feature("hour_of_day", Int64),
Feature("day_of_week", Int64),
Feature("device_type", String),
],
transformation=user_context_features,
online=True,
)
Feature Store Benefits
Consistency
Same features for training and serving
Reusability
Share features across teams and models
Point-in-Time
Correct historical features for training
Low Latency
Fast online feature serving
📋 Model Governance
What is Model Governance?
Model governance ensures models are developed responsibly, meet regulatory requirements, and maintain quality standards throughout their lifecycle.
Model Registry with Metadata
"""
Comprehensive model metadata tracking
"""
import mlflow
from mlflow.tracking import MlflowClient
def register_model_with_governance(model, metadata):
"""Register model with governance metadata"""
with mlflow.start_run() as run:
# Log model
mlflow.sklearn.log_model(model, "model")
# Log governance metadata
mlflow.log_params({
"model_owner": metadata["owner"],
"model_purpose": metadata["purpose"],
"training_data_version": metadata["data_version"],
"compliance_status": metadata["compliance"],
})
# Log metrics
mlflow.log_metrics({
"accuracy": metadata["accuracy"],
"fairness_score": metadata["fairness"],
"explainability_score": metadata["explainability"],
})
# Log artifacts
mlflow.log_dict(metadata["data_lineage"], "data_lineage.json")
mlflow.log_dict(metadata["model_card"], "model_card.json")
# Register
model_uri = f"runs:/{run.info.run_id}/model"
registered = mlflow.register_model(model_uri, metadata["model_name"])
# Add governance tags
client = MlflowClient()
client.set_model_version_tag(
name=metadata["model_name"],
version=registered.version,
key="approved_by",
value=metadata["approver"]
)
client.set_model_version_tag(
name=metadata["model_name"],
version=registered.version,
key="compliance_reviewed",
value="true"
)
return registered
# Usage
metadata = {
"model_name": "credit_risk_model",
"owner": "data-science-team",
"purpose": "Credit risk assessment",
"data_version": "v2024.03",
"compliance": "GDPR_compliant",
"accuracy": 0.92,
"fairness": 0.88,
"explainability": 0.85,
"approver": "john.doe@company.com",
"data_lineage": {
"sources": ["customer_db", "transaction_db"],
"transformations": ["feature_engineering_v3"],
"timestamp": "2024-03-15T10:00:00Z"
},
"model_card": {
"intended_use": "Credit approval decisions",
"limitations": "Not suitable for customers under 18",
"ethical_considerations": "Bias mitigation applied"
}
}
register_model_with_governance(model, metadata)
Approval Workflow
"""
Model approval workflow
"""
class ModelApprovalWorkflow:
def __init__(self, model_name, version):
self.model_name = model_name
self.version = version
self.client = MlflowClient()
def request_approval(self, requester):
"""Request model approval"""
# Transition to staging
self.client.transition_model_version_stage(
name=self.model_name,
version=self.version,
stage="Staging"
)
# Add approval request tag
self.client.set_model_version_tag(
name=self.model_name,
version=self.version,
key="approval_status",
value="pending"
)
self.client.set_model_version_tag(
name=self.model_name,
version=self.version,
key="requested_by",
value=requester
)
# Notify approvers
send_approval_request(self.model_name, self.version, requester)
def approve(self, approver, comments=""):
"""Approve model for production"""
# Validation checks
checks = self.run_validation_checks()
if not all(checks.values()):
raise ValueError(f"Validation failed: {checks}")
# Record approval
self.client.set_model_version_tag(
name=self.model_name,
version=self.version,
key="approval_status",
value="approved"
)
self.client.set_model_version_tag(
name=self.model_name,
version=self.version,
key="approved_by",
value=approver
)
self.client.set_model_version_tag(
name=self.model_name,
version=self.version,
key="approval_date",
value=datetime.now().isoformat()
)
# Promote to production
self.client.transition_model_version_stage(
name=self.model_name,
version=self.version,
stage="Production"
)
print(f"✅ Model v{self.version} approved and promoted to production")
def run_validation_checks(self):
"""Run governance validation checks"""
return {
"accuracy_threshold": self.check_accuracy(),
"fairness_check": self.check_fairness(),
"explainability": self.check_explainability(),
"data_quality": self.check_data_quality(),
"compliance": self.check_compliance(),
}
# Usage
workflow = ModelApprovalWorkflow("credit_risk_model", version=5)
workflow.request_approval(requester="data-scientist@company.com")
# Later, after review
workflow.approve(approver="ml-lead@company.com")
⚖️ Compliance & Responsible AI
Model Explainability with SHAP
"""
Add explainability to models
"""
import shap
import matplotlib.pyplot as plt
def explain_prediction(model, X_test, instance_idx=0):
"""Generate SHAP explanations"""
# Create explainer
explainer = shap.TreeExplainer(model)
# Calculate SHAP values
shap_values = explainer.shap_values(X_test)
# Explain single prediction
shap.initjs()
shap.force_plot(
explainer.expected_value,
shap_values[instance_idx],
X_test.iloc[instance_idx],
matplotlib=True
)
plt.savefig('explanation.png')
# Feature importance
shap.summary_plot(shap_values, X_test, show=False)
plt.savefig('feature_importance.png')
# Log to MLflow
mlflow.log_artifact('explanation.png')
mlflow.log_artifact('feature_importance.png')
return shap_values
# Add to prediction endpoint
@app.post("/predict_with_explanation")
async def predict_explainable(request: PredictRequest):
"""Prediction with explanation"""
features_df = pd.DataFrame([request.features])
prediction = model.predict(features_df)[0]
# Generate explanation
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(features_df)[0]
# Top contributing features
feature_importance = dict(zip(
features_df.columns,
shap_values
))
top_features = sorted(
feature_importance.items(),
key=lambda x: abs(x[1]),
reverse=True
)[:5]
return {
"prediction": float(prediction),
"explanation": {
"top_features": [
{"feature": f, "contribution": float(c)}
for f, c in top_features
]
}
}
Fairness Evaluation
"""
Evaluate model fairness across demographic groups
"""
from fairlearn.metrics import (
demographic_parity_difference,
equalized_odds_difference
)
def evaluate_fairness(model, X_test, y_test, sensitive_feature):
"""Assess model fairness"""
predictions = model.predict(X_test)
# Demographic parity
dp_diff = demographic_parity_difference(
y_true=y_test,
y_pred=predictions,
sensitive_features=sensitive_feature
)
# Equalized odds
eo_diff = equalized_odds_difference(
y_true=y_test,
y_pred=predictions,
sensitive_features=sensitive_feature
)
print(f"Demographic Parity Difference: {dp_diff:.4f}")
print(f"Equalized Odds Difference: {eo_diff:.4f}")
# Log fairness metrics
mlflow.log_metrics({
"demographic_parity_diff": dp_diff,
"equalized_odds_diff": eo_diff,
})
# Fail if unfair
if abs(dp_diff) > 0.1 or abs(eo_diff) > 0.1:
raise ValueError("Model fails fairness criteria")
return dp_diff, eo_diff
Data Privacy (Differential Privacy)
"""
Train model with differential privacy
"""
from diffprivlib.models import LogisticRegression
# Train with privacy guarantees
private_model = LogisticRegression(
epsilon=1.0, # Privacy budget
data_norm=5.0
)
private_model.fit(X_train, y_train)
print(f"Model trained with ε={private_model.epsilon} differential privacy")
💰 Cost Optimization
Resource Right-Sizing
"""
Monitor and optimize resource usage
"""
import psutil
import time
class ResourceMonitor:
def __init__(self):
self.metrics = []
def track_prediction(self, prediction_func):
"""Track resource usage during prediction"""
def wrapper(*args, **kwargs):
# Before
cpu_before = psutil.cpu_percent()
memory_before = psutil.Process().memory_info().rss / 1024**2
start_time = time.time()
# Execute
result = prediction_func(*args, **kwargs)
# After
duration = time.time() - start_time
cpu_after = psutil.cpu_percent()
memory_after = psutil.Process().memory_info().rss / 1024**2
self.metrics.append({
'duration': duration,
'cpu_usage': cpu_after - cpu_before,
'memory_mb': memory_after - memory_before,
})
return result
return wrapper
def analyze(self):
"""Analyze resource usage and recommend sizing"""
df = pd.DataFrame(self.metrics)
avg_duration = df['duration'].mean()
p95_duration = df['duration'].quantile(0.95)
avg_memory = df['memory_mb'].mean()
print(f"Avg latency: {avg_duration:.3f}s")
print(f"P95 latency: {p95_duration:.3f}s")
print(f"Avg memory: {avg_memory:.1f}MB")
# Recommendations
if avg_memory < 512:
print("💡 Consider downsizing to 512MB instance")
if p95_duration < 0.1:
print("💡 Model is fast - consider batching requests")
monitor = ResourceMonitor()
@app.post("/predict")
@monitor.track_prediction
async def predict(request: PredictRequest):
return model.predict(request.features)
Batch Prediction for Cost Efficiency
"""
Batch predictions to reduce costs
"""
# Instead of real-time predictions for all use cases
# Use scheduled batch predictions for non-critical paths
from airflow import DAG
from airflow.operators.python import PythonOperator
def batch_predict():
"""Run batch predictions on accumulated requests"""
# Fetch pending requests
requests = fetch_pending_requests()
# Batch predict (much faster than individual)
features = [r['features'] for r in requests]
predictions = model.predict(features)
# Store results
for request, prediction in zip(requests, predictions):
store_prediction(request['id'], prediction)
print(f"Processed {len(requests)} predictions")
dag = DAG(
'batch_predictions',
schedule_interval='*/15 * * * *', # Every 15 minutes
)
PythonOperator(
task_id='batch_predict',
python_callable=batch_predict,
dag=dag
)
Spot Instances for Training
# Use spot instances for training (70% cost reduction)
# Kubernetes with spot instances
apiVersion: v1
kind: Pod
metadata:
name: ml-training
spec:
nodeSelector:
node.kubernetes.io/instance-type: spot
tolerations:
- key: "spot"
operator: "Equal"
value: "true"
effect: "NoSchedule"
containers:
- name: trainer
image: ml-trainer:latest
resources:
requests:
memory: "8Gi"
cpu: "4"
📝 Documentation Best Practices
Model Cards
# model_card.yaml
model_details:
name: "Customer Churn Prediction Model"
version: "2.1.0"
date: "2024-03-15"
owner: "Data Science Team"
contact: "ds-team@company.com"
intended_use:
primary_use: "Identify customers at risk of churning"
users: "Customer success teams, marketing"
out_of_scope: "Not for individual customer targeting without human review"
training_data:
sources:
- "Customer database (2022-2024)"
- "Transaction history"
- "Support tickets"
size: "1.2M customers"
preprocessing: "Feature engineering pipeline v3"
metrics:
accuracy: 0.89
precision: 0.87
recall: 0.85
auc_roc: 0.93
ethical_considerations:
fairness: "Tested across demographic groups - no significant bias detected"
privacy: "PII removed, differential privacy applied"
limitations: "Performance degrades for new product lines"
recommendations:
monitoring: "Monitor weekly, retrain monthly"
thresholds: "Confidence > 0.7 for automated actions"
Runbooks
# Model Deployment Runbook
## Pre-Deployment Checklist
- [ ] Model passes all validation tests
- [ ] Fairness metrics within acceptable range
- [ ] Performance validated on hold-out test set
- [ ] Model card completed and reviewed
- [ ] Approval obtained from ML lead
## Deployment Steps
1. **Register Model**
```bash
mlflow models register-model runs:/abc123/model \
--name customer_churn
```
2. **Deploy to Staging**
```bash
kubectl apply -f k8s/staging-deployment.yaml
```
3. **Run Shadow Mode** (48 hours)
- Monitor shadow vs production agreement
- Check for performance degradation
4. **Promote to Production**
```bash
kubectl apply -f k8s/production-deployment.yaml
```
## Monitoring
- Dashboard: http://grafana.company.com/ml-models
- Alerts: #ml-alerts Slack channel
- On-call: data-science-oncall@company.com
## Rollback Procedure
If accuracy drops below 85% or error rate exceeds 5%:
```bash
# Rollback to previous version
mlflow models transition-stage \
--name customer_churn \
--version 14 \
--stage Production
```
## Common Issues
- **High latency**: Check feature store connection
- **Low accuracy**: Verify input data quality
- **Fairness violations**: Trigger retraining with balanced data
🎯 Summary
You've mastered production ML best practices:
Feature Stores
Centralized feature management with Feast and Tecton
Governance
Model registry, lineage, and approval workflows
Compliance
Explainability, fairness, and privacy guarantees
Cost Optimization
Resource monitoring and efficient infrastructure
Security
Authentication, authorization, and data protection
Documentation
Model cards, runbooks, and knowledge sharing
Key Takeaways
- Use feature stores to prevent training-serving skew and enable feature reuse
- Implement model governance with registries, approvals, and lineage tracking
- Ensure compliance with explainability, fairness, and privacy safeguards
- Optimize costs through right-sizing, batching, and spot instances
- Document models thoroughly with model cards and runbooks
- Monitor production systems continuously and maintain rollback capabilities
- Build responsible AI systems that are transparent, fair, and secure
🎉 Congratulations on Completing the Core Tutorials!
You've learned all the foundational MLOps concepts! Now put them into practice with hands-on projects where you'll build complete end-to-end ML systems integrating everything you've learned.
Test Your Knowledge
Q1: What problem do feature stores solve?
Q2: What is model governance?
Q3: Why use SHAP for explainability?
Q4: Cost optimization strategy for ML training?
Q5: What should a model card include?