π Complete all tutorials to earn your Free MLOps Engineer Certificate
Shareable on LinkedIn β’ Verified by AITutorials.site β’ No signup fee
π― Project Overview
Build an enterprise-grade multi-model serving platform with advanced deployment strategies:
BentoML
Unified serving framework for multiple models
A/B Testing
Traffic splitting between model versions
Canary Deployment
Gradual rollout with automatic rollback
Model Registry
Version control with metadata tracking
Cost Tracking
Per-model inference time and resource usage
Kubernetes
Orchestration with Istio traffic management
System Architecture
User Request β Istio Ingress Gateway
β
Traffic Splitting (A/B)
β
βββββββββββββ΄ββββββββββββ
β β
Model A (90%) Model B (10%)
β β
BentoML Runner BentoML Runner
β β
Prediction Prediction
β β
Log Metrics Log Metrics
β β
βββββββββββββ¬ββββββββββββ
β
Prometheus Metrics
β
Cost & Performance Analysis
β
Winner Selection (Auto)
π‘ Prerequisites:
- Kubernetes cluster (local or cloud)
- BentoML and Istio installed
- Multiple trained model versions
- Prometheus + Grafana for monitoring
π± Step 1: BentoML Multi-Model Service
# service.py
"""
BentoML service with multiple model versions
"""
import bentoml
from bentoml.io import JSON, NumpyNdarray
import numpy as np
from pydantic import BaseModel
import time
class PredictionRequest(BaseModel):
features: list
model_version: str = "latest"
class PredictionResponse(BaseModel):
prediction: float
model_version: str
inference_time_ms: float
model_name: str
# Load multiple model versions
model_v1 = bentoml.sklearn.get("house_price_model:v1.0.0")
model_v2 = bentoml.sklearn.get("house_price_model:v2.0.0")
# Create BentoML service
svc = bentoml.Service("multi_model_service", runners=[
model_v1.to_runner(name="model_v1"),
model_v2.to_runner(name="model_v2")
])
@svc.api(input=JSON(pydantic_model=PredictionRequest),
output=JSON(pydantic_model=PredictionResponse))
async def predict(request: PredictionRequest) -> PredictionResponse:
"""Multi-model prediction endpoint"""
start_time = time.time()
# Convert features to numpy array
features = np.array([request.features])
# Route to appropriate model
if request.model_version == "v1.0.0":
runner = svc.runners.model_v1
model_name = "model_v1"
elif request.model_version == "v2.0.0" or request.model_version == "latest":
runner = svc.runners.model_v2
model_name = "model_v2"
else:
raise ValueError(f"Unknown model version: {request.model_version}")
# Make prediction
prediction = await runner.predict.async_run(features)
inference_time = (time.time() - start_time) * 1000 # ms
# Log metrics
log_inference_metrics(
model_name=model_name,
inference_time=inference_time,
request_size=len(request.features)
)
return PredictionResponse(
prediction=float(prediction[0]),
model_version=request.model_version,
inference_time_ms=round(inference_time, 2),
model_name=model_name
)
@svc.api(input=JSON(), output=JSON())
async def health() -> dict:
"""Health check endpoint"""
return {
"status": "healthy",
"models": {
"v1.0.0": "available",
"v2.0.0": "available"
}
}
# Prometheus metrics
from prometheus_client import Counter, Histogram, Gauge
prediction_counter = Counter(
'model_predictions_total',
'Total predictions',
['model_name', 'model_version']
)
inference_time_histogram = Histogram(
'inference_time_seconds',
'Inference time',
['model_name'],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0]
)
active_models = Gauge(
'active_models',
'Number of active model versions'
)
def log_inference_metrics(model_name, inference_time, request_size):
"""Log inference metrics to Prometheus"""
prediction_counter.labels(
model_name=model_name,
model_version=model_name.split('_')[-1]
).inc()
inference_time_histogram.labels(
model_name=model_name
).observe(inference_time / 1000) # convert to seconds
Build and Deploy Bento
# bentofile.yaml
service: "service:svc"
labels:
owner: mlops-team
project: multi-model-platform
include:
- "service.py"
- "*.pkl"
python:
packages:
- scikit-learn
- pandas
- numpy
- pydantic
- prometheus-client
# Build Bento
bentoml build
# Containerize
bentoml containerize multi_model_service:latest -t multi-model:latest
# Push to registry
docker tag multi-model:latest gcr.io/project/multi-model:latest
docker push gcr.io/project/multi-model:latest
βοΈ Step 2: A/B Testing Configuration
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: model-v1
spec:
replicas: 3
selector:
matchLabels:
app: model-service
version: v1
template:
metadata:
labels:
app: model-service
version: v1
spec:
containers:
- name: model
image: gcr.io/project/multi-model:v1.0.0
ports:
- containerPort: 3000
env:
- name: MODEL_VERSION
value: "v1.0.0"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: model-v2
spec:
replicas: 1
selector:
matchLabels:
app: model-service
version: v2
template:
metadata:
labels:
app: model-service
version: v2
spec:
containers:
- name: model
image: gcr.io/project/multi-model:v2.0.0
ports:
- containerPort: 3000
env:
- name: MODEL_VERSION
value: "v2.0.0"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
---
apiVersion: v1
kind: Service
metadata:
name: model-service
spec:
selector:
app: model-service
ports:
- port: 80
targetPort: 3000
Istio Traffic Splitting
# k8s/virtualservice.yaml
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: model-service
spec:
hosts:
- model-service
http:
- match:
- headers:
x-test-group:
exact: "beta"
route:
- destination:
host: model-service
subset: v2
weight: 100
- route:
- destination:
host: model-service
subset: v1
weight: 90
- destination:
host: model-service
subset: v2
weight: 10
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: model-service
spec:
host: model-service
subsets:
- name: v1
labels:
version: v1
- name: v2
labels:
version: v2
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
http:
http1MaxPendingRequests: 100
http2MaxRequests: 100
maxRequestsPerConnection: 2
outlierDetection:
consecutiveErrors: 5
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50
minHealthPercent: 40
π Step 3: Automated Winner Selection
# ab_test_analyzer.py
"""
Analyze A/B test results and select winner
"""
import requests
from datetime import datetime, timedelta
import pandas as pd
from scipy import stats
class ABTestAnalyzer:
def __init__(self, prometheus_url='http://localhost:9090'):
self.prometheus_url = prometheus_url
self.min_samples = 1000 # Minimum samples per variant
self.confidence_level = 0.95
def fetch_metrics(self, days=7):
"""Fetch metrics from Prometheus"""
# Query prediction counts
query_counts = '''
sum by (model_name) (
increase(model_predictions_total[7d])
)
'''
# Query inference times
query_latency = '''
histogram_quantile(0.95,
sum by (model_name, le) (
rate(inference_time_seconds_bucket[7d])
)
)
'''
# Query error rates
query_errors = '''
sum by (model_name) (
rate(prediction_errors_total[7d])
) / sum by (model_name) (
rate(model_predictions_total[7d])
)
'''
counts = self._query_prometheus(query_counts)
latency = self._query_prometheus(query_latency)
errors = self._query_prometheus(query_errors)
return {
'counts': counts,
'latency': latency,
'errors': errors
}
def calculate_winner(self, metrics):
"""Determine winning model variant"""
results = {}
for model in ['model_v1', 'model_v2']:
count = metrics['counts'].get(model, 0)
latency = metrics['latency'].get(model, 0)
error_rate = metrics['errors'].get(model, 0)
# Calculate score (lower is better)
score = (
latency * 0.4 + # 40% weight on latency
error_rate * 1000 * 0.4 + # 40% weight on errors
(1 / count) * 100 * 0.2 # 20% weight on traffic
)
results[model] = {
'count': count,
'latency_p95': latency,
'error_rate': error_rate,
'score': score
}
# Check minimum sample size
v1_count = results['model_v1']['count']
v2_count = results['model_v2']['count']
if v1_count < self.min_samples or v2_count < self.min_samples:
print(f"β οΈ Insufficient samples: v1={v1_count}, v2={v2_count}")
return None
# Statistical significance test
is_significant = self._test_significance(
results['model_v1'],
results['model_v2']
)
if not is_significant:
print("π No statistically significant difference")
return None
# Select winner
winner = min(results.items(), key=lambda x: x[1]['score'])
print("\nπ A/B Test Results:")
print(f" Model v1: score={results['model_v1']['score']:.2f}")
print(f" Model v2: score={results['model_v2']['score']:.2f}")
print(f" Winner: {winner[0]}")
return winner[0]
def _test_significance(self, v1_metrics, v2_metrics):
"""Test statistical significance"""
# Use Z-test for proportions (error rates)
p1 = v1_metrics['error_rate']
p2 = v2_metrics['error_rate']
n1 = v1_metrics['count']
n2 = v2_metrics['count']
# Pooled proportion
p_pool = (p1 * n1 + p2 * n2) / (n1 + n2)
# Standard error
se = np.sqrt(p_pool * (1 - p_pool) * (1/n1 + 1/n2))
# Z-score
z = (p1 - p2) / se
# P-value (two-tailed)
p_value = 2 * (1 - stats.norm.cdf(abs(z)))
return p_value < (1 - self.confidence_level)
def _query_prometheus(self, query):
"""Query Prometheus API"""
response = requests.get(
f"{self.prometheus_url}/api/v1/query",
params={'query': query}
)
data = response.json()['data']['result']
return {
item['metric']['model_name']: float(item['value'][1])
for item in data
}
def promote_winner(self, winner):
"""Promote winning model to 100% traffic"""
if winner == 'model_v2':
print("π Promoting v2 to 100% traffic...")
# Update Istio VirtualService
# kubectl apply -f k8s/virtualservice-v2-100.yaml
print("β
Traffic shifted to v2")
else:
print("β
Keeping v1 at 100% traffic")
# Run A/B test analysis
if __name__ == '__main__':
analyzer = ABTestAnalyzer()
metrics = analyzer.fetch_metrics(days=7)
winner = analyzer.calculate_winner(metrics)
if winner:
analyzer.promote_winner(winner)
π° Step 4: Per-Model Cost Tracking
# cost_tracker.py
"""
Track inference costs per model version
"""
import psycopg2
from datetime import datetime, timedelta
import pandas as pd
class CostTracker:
def __init__(self, db_config):
self.conn = psycopg2.connect(**db_config)
self.cost_per_cpu_hour = 0.05 # $0.05 per CPU hour
self.cost_per_gb_hour = 0.01 # $0.01 per GB hour
def track_inference(self, model_name, inference_time_ms,
cpu_usage, memory_mb):
"""Track single inference cost"""
cursor = self.conn.cursor()
# Calculate cost
cpu_hours = cpu_usage * (inference_time_ms / 3600000)
memory_gb_hours = (memory_mb / 1024) * (inference_time_ms / 3600000)
cost = (
cpu_hours * self.cost_per_cpu_hour +
memory_gb_hours * self.cost_per_gb_hour
)
# Insert record
cursor.execute('''
INSERT INTO inference_costs
(model_name, timestamp, inference_time_ms,
cpu_usage, memory_mb, cost)
VALUES (%s, %s, %s, %s, %s, %s)
''', (
model_name,
datetime.utcnow(),
inference_time_ms,
cpu_usage,
memory_mb,
cost
))
self.conn.commit()
cursor.close()
def get_model_costs(self, days=30):
"""Get cost breakdown by model"""
cursor = self.conn.cursor()
cutoff = datetime.utcnow() - timedelta(days=days)
cursor.execute('''
SELECT
model_name,
COUNT(*) as total_inferences,
SUM(cost) as total_cost,
AVG(cost) as avg_cost_per_inference,
AVG(inference_time_ms) as avg_latency_ms,
SUM(cost) / COUNT(*) * 1000000 as cost_per_million
FROM inference_costs
WHERE timestamp > %s
GROUP BY model_name
ORDER BY total_cost DESC
''', (cutoff,))
results = cursor.fetchall()
cursor.close()
df = pd.DataFrame(results, columns=[
'model_name', 'total_inferences', 'total_cost',
'avg_cost_per_inference', 'avg_latency_ms',
'cost_per_million'
])
return df
def generate_cost_report(self, days=30):
"""Generate cost analysis report"""
costs = self.get_model_costs(days=days)
print(f"\nπ° Cost Report ({days} days)")
print("=" * 70)
for _, row in costs.iterrows():
print(f"\n{row['model_name']}:")
print(f" Total inferences: {row['total_inferences']:,.0f}")
print(f" Total cost: ${row['total_cost']:.4f}")
print(f" Avg cost/inference: ${row['avg_cost_per_inference']:.6f}")
print(f" Cost per million: ${row['cost_per_million']:.2f}")
print(f" Avg latency: {row['avg_latency_ms']:.2f}ms")
total = costs['total_cost'].sum()
print(f"\n{'='*70}")
print(f"Total cost (all models): ${total:.2f}")
return costs
# Usage
tracker = CostTracker(db_config={
'host': 'localhost',
'database': 'mlops',
'user': 'postgres',
'password': 'password'
})
# Generate monthly report
report = tracker.generate_cost_report(days=30)
π€ Step 5: Canary Deployment with Auto-Rollback
# canary_deployer.py
"""
Automated canary deployment with health monitoring
"""
import time
import requests
from kubernetes import client, config
class CanaryDeployer:
def __init__(self, k8s_namespace='default'):
config.load_kube_config()
self.apps_v1 = client.AppsV1Api()
self.namespace = k8s_namespace
self.analyzer = ABTestAnalyzer()
def deploy_canary(self, new_version, canary_weight=10):
"""Deploy new version with canary strategy"""
print(f"π€ Starting canary deployment: {new_version}")
stages = [
(10, 5), # 10% traffic for 5 minutes
(25, 10), # 25% traffic for 10 minutes
(50, 15), # 50% traffic for 15 minutes
(100, 0) # 100% traffic (complete rollout)
]
for weight, duration_mins in stages:
print(f"\nπ Stage: {weight}% traffic to {new_version}")
# Update traffic split
self._update_traffic_split(new_version, weight)
if duration_mins > 0:
# Monitor health
is_healthy = self._monitor_health(
new_version,
duration_minutes=duration_mins
)
if not is_healthy:
print("β Health check failed - rolling back")
self._rollback(new_version)
return False
print(f"\nβ
Canary deployment complete: {new_version}")
return True
def _update_traffic_split(self, new_version, weight):
"""Update Istio traffic split"""
# Update VirtualService
cmd = f'''
kubectl apply -f - < 0.05: # 5% error rate
print(f" β High error rate: {error_rate:.2%}")
return False
if latency > 1000: # 1 second
print(f" β High latency: {latency:.0f}ms")
return False
print(" β
Health checks passed")
return True
def _rollback(self, version):
"""Rollback to previous version"""
print(f"\nπ Rolling back {version}...")
# Shift 100% traffic to current version
self._update_traffic_split(version, 0)
# Scale down canary deployment
self.apps_v1.patch_namespaced_deployment_scale(
name=f'model-{version}',
namespace=self.namespace,
body={'spec': {'replicas': 0}}
)
print("β
Rollback complete")
# Deploy new version with canary
deployer = CanaryDeployer()
success = deployer.deploy_canary('v3.0.0', canary_weight=10)
π― Project Completion Checklist
- β BentoML multi-model service created
- β Kubernetes deployments for multiple versions
- β Istio traffic splitting configured
- β A/B testing with automated winner selection
- β Per-model cost tracking implemented
- β Canary deployment with auto-rollback
- β Prometheus metrics collection
- β Statistical significance testing
- β Complete deployment pipeline tested
π Congratulations on Completing the MLOps Engineer Course!
You've built an enterprise-grade multi-model serving platform with advanced deployment strategies, cost optimization, and automated decision-making. You're now ready to deploy ML systems at scale!
Key Achievements
- Multi-Model Serving: BentoML platform for version management
- Traffic Management: Istio-based A/B testing and canary deployments
- Automated Decisions: Statistical winner selection
- Cost Optimization: Per-model cost tracking and analysis
- Safe Deployments: Auto-rollback on health failures
- Production Ready: Complete enterprise deployment pipeline
π Ready to Get Your Certificate?
You've completed all 17 tutorials of the MLOps Engineer course! Fill out the form below to generate your free certificate.