Home β†’ MLOps Engineer β†’ Project 4: Multi-Model Serving Platform

πŸš€ Project: Multi-Model Serving Platform

Build production serving platform with BentoML, A/B testing, model versioning, and cost tracking

🎯 Project 4 πŸ“Š Advanced ⏱️ 3 hours

πŸŽ“ Complete all tutorials to earn your Free MLOps Engineer Certificate
Shareable on LinkedIn β€’ Verified by AITutorials.site β€’ No signup fee

🎯 Project Overview

Build an enterprise-grade multi-model serving platform with advanced deployment strategies:

🍱

BentoML

Unified serving framework for multiple models

βš–οΈ

A/B Testing

Traffic splitting between model versions

πŸ”€

Canary Deployment

Gradual rollout with automatic rollback

πŸ“¦

Model Registry

Version control with metadata tracking

πŸ’°

Cost Tracking

Per-model inference time and resource usage

☸️

Kubernetes

Orchestration with Istio traffic management

System Architecture

User Request β†’ Istio Ingress Gateway
                        ↓
              Traffic Splitting (A/B)
                        ↓
            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
            ↓                       ↓
    Model A (90%)           Model B (10%)
            ↓                       ↓
    BentoML Runner         BentoML Runner
            ↓                       ↓
        Prediction             Prediction
            ↓                       ↓
        Log Metrics            Log Metrics
            ↓                       ↓
            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                        ↓
              Prometheus Metrics
                        ↓
           Cost & Performance Analysis
                        ↓
            Winner Selection (Auto)

πŸ’‘ Prerequisites:

  • Kubernetes cluster (local or cloud)
  • BentoML and Istio installed
  • Multiple trained model versions
  • Prometheus + Grafana for monitoring

🍱 Step 1: BentoML Multi-Model Service

# service.py
"""
BentoML service with multiple model versions
"""
import bentoml
from bentoml.io import JSON, NumpyNdarray
import numpy as np
from pydantic import BaseModel
import time

class PredictionRequest(BaseModel):
    features: list
    model_version: str = "latest"

class PredictionResponse(BaseModel):
    prediction: float
    model_version: str
    inference_time_ms: float
    model_name: str

# Load multiple model versions
model_v1 = bentoml.sklearn.get("house_price_model:v1.0.0")
model_v2 = bentoml.sklearn.get("house_price_model:v2.0.0")

# Create BentoML service
svc = bentoml.Service("multi_model_service", runners=[
    model_v1.to_runner(name="model_v1"),
    model_v2.to_runner(name="model_v2")
])

@svc.api(input=JSON(pydantic_model=PredictionRequest), 
         output=JSON(pydantic_model=PredictionResponse))
async def predict(request: PredictionRequest) -> PredictionResponse:
    """Multi-model prediction endpoint"""
    
    start_time = time.time()
    
    # Convert features to numpy array
    features = np.array([request.features])
    
    # Route to appropriate model
    if request.model_version == "v1.0.0":
        runner = svc.runners.model_v1
        model_name = "model_v1"
    elif request.model_version == "v2.0.0" or request.model_version == "latest":
        runner = svc.runners.model_v2
        model_name = "model_v2"
    else:
        raise ValueError(f"Unknown model version: {request.model_version}")
    
    # Make prediction
    prediction = await runner.predict.async_run(features)
    
    inference_time = (time.time() - start_time) * 1000  # ms
    
    # Log metrics
    log_inference_metrics(
        model_name=model_name,
        inference_time=inference_time,
        request_size=len(request.features)
    )
    
    return PredictionResponse(
        prediction=float(prediction[0]),
        model_version=request.model_version,
        inference_time_ms=round(inference_time, 2),
        model_name=model_name
    )

@svc.api(input=JSON(), output=JSON())
async def health() -> dict:
    """Health check endpoint"""
    return {
        "status": "healthy",
        "models": {
            "v1.0.0": "available",
            "v2.0.0": "available"
        }
    }

# Prometheus metrics
from prometheus_client import Counter, Histogram, Gauge

prediction_counter = Counter(
    'model_predictions_total',
    'Total predictions',
    ['model_name', 'model_version']
)

inference_time_histogram = Histogram(
    'inference_time_seconds',
    'Inference time',
    ['model_name'],
    buckets=[0.01, 0.05, 0.1, 0.5, 1.0]
)

active_models = Gauge(
    'active_models',
    'Number of active model versions'
)

def log_inference_metrics(model_name, inference_time, request_size):
    """Log inference metrics to Prometheus"""
    prediction_counter.labels(
        model_name=model_name,
        model_version=model_name.split('_')[-1]
    ).inc()
    
    inference_time_histogram.labels(
        model_name=model_name
    ).observe(inference_time / 1000)  # convert to seconds

Build and Deploy Bento

# bentofile.yaml
service: "service:svc"
labels:
  owner: mlops-team
  project: multi-model-platform
include:
  - "service.py"
  - "*.pkl"
python:
  packages:
    - scikit-learn
    - pandas
    - numpy
    - pydantic
    - prometheus-client

# Build Bento
bentoml build

# Containerize
bentoml containerize multi_model_service:latest -t multi-model:latest

# Push to registry
docker tag multi-model:latest gcr.io/project/multi-model:latest
docker push gcr.io/project/multi-model:latest

βš–οΈ Step 2: A/B Testing Configuration

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: model-v1
spec:
  replicas: 3
  selector:
    matchLabels:
      app: model-service
      version: v1
  template:
    metadata:
      labels:
        app: model-service
        version: v1
    spec:
      containers:
      - name: model
        image: gcr.io/project/multi-model:v1.0.0
        ports:
        - containerPort: 3000
        env:
        - name: MODEL_VERSION
          value: "v1.0.0"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: model-v2
spec:
  replicas: 1
  selector:
    matchLabels:
      app: model-service
      version: v2
  template:
    metadata:
      labels:
        app: model-service
        version: v2
    spec:
      containers:
      - name: model
        image: gcr.io/project/multi-model:v2.0.0
        ports:
        - containerPort: 3000
        env:
        - name: MODEL_VERSION
          value: "v2.0.0"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
---
apiVersion: v1
kind: Service
metadata:
  name: model-service
spec:
  selector:
    app: model-service
  ports:
  - port: 80
    targetPort: 3000

Istio Traffic Splitting

# k8s/virtualservice.yaml
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: model-service
spec:
  hosts:
  - model-service
  http:
  - match:
    - headers:
        x-test-group:
          exact: "beta"
    route:
    - destination:
        host: model-service
        subset: v2
      weight: 100
  - route:
    - destination:
        host: model-service
        subset: v1
      weight: 90
    - destination:
        host: model-service
        subset: v2
      weight: 10
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: model-service
spec:
  host: model-service
  subsets:
  - name: v1
    labels:
      version: v1
  - name: v2
    labels:
      version: v2
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http1MaxPendingRequests: 100
        http2MaxRequests: 100
        maxRequestsPerConnection: 2
    outlierDetection:
      consecutiveErrors: 5
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
      minHealthPercent: 40

πŸ† Step 3: Automated Winner Selection

# ab_test_analyzer.py
"""
Analyze A/B test results and select winner
"""
import requests
from datetime import datetime, timedelta
import pandas as pd
from scipy import stats

class ABTestAnalyzer:
    def __init__(self, prometheus_url='http://localhost:9090'):
        self.prometheus_url = prometheus_url
        self.min_samples = 1000  # Minimum samples per variant
        self.confidence_level = 0.95
    
    def fetch_metrics(self, days=7):
        """Fetch metrics from Prometheus"""
        
        # Query prediction counts
        query_counts = '''
            sum by (model_name) (
                increase(model_predictions_total[7d])
            )
        '''
        
        # Query inference times
        query_latency = '''
            histogram_quantile(0.95, 
                sum by (model_name, le) (
                    rate(inference_time_seconds_bucket[7d])
                )
            )
        '''
        
        # Query error rates
        query_errors = '''
            sum by (model_name) (
                rate(prediction_errors_total[7d])
            ) / sum by (model_name) (
                rate(model_predictions_total[7d])
            )
        '''
        
        counts = self._query_prometheus(query_counts)
        latency = self._query_prometheus(query_latency)
        errors = self._query_prometheus(query_errors)
        
        return {
            'counts': counts,
            'latency': latency,
            'errors': errors
        }
    
    def calculate_winner(self, metrics):
        """Determine winning model variant"""
        
        results = {}
        
        for model in ['model_v1', 'model_v2']:
            count = metrics['counts'].get(model, 0)
            latency = metrics['latency'].get(model, 0)
            error_rate = metrics['errors'].get(model, 0)
            
            # Calculate score (lower is better)
            score = (
                latency * 0.4 +           # 40% weight on latency
                error_rate * 1000 * 0.4 + # 40% weight on errors
                (1 / count) * 100 * 0.2   # 20% weight on traffic
            )
            
            results[model] = {
                'count': count,
                'latency_p95': latency,
                'error_rate': error_rate,
                'score': score
            }
        
        # Check minimum sample size
        v1_count = results['model_v1']['count']
        v2_count = results['model_v2']['count']
        
        if v1_count < self.min_samples or v2_count < self.min_samples:
            print(f"⚠️  Insufficient samples: v1={v1_count}, v2={v2_count}")
            return None
        
        # Statistical significance test
        is_significant = self._test_significance(
            results['model_v1'],
            results['model_v2']
        )
        
        if not is_significant:
            print("πŸ“Š No statistically significant difference")
            return None
        
        # Select winner
        winner = min(results.items(), key=lambda x: x[1]['score'])
        
        print("\nπŸ† A/B Test Results:")
        print(f"  Model v1: score={results['model_v1']['score']:.2f}")
        print(f"  Model v2: score={results['model_v2']['score']:.2f}")
        print(f"  Winner: {winner[0]}")
        
        return winner[0]
    
    def _test_significance(self, v1_metrics, v2_metrics):
        """Test statistical significance"""
        
        # Use Z-test for proportions (error rates)
        p1 = v1_metrics['error_rate']
        p2 = v2_metrics['error_rate']
        n1 = v1_metrics['count']
        n2 = v2_metrics['count']
        
        # Pooled proportion
        p_pool = (p1 * n1 + p2 * n2) / (n1 + n2)
        
        # Standard error
        se = np.sqrt(p_pool * (1 - p_pool) * (1/n1 + 1/n2))
        
        # Z-score
        z = (p1 - p2) / se
        
        # P-value (two-tailed)
        p_value = 2 * (1 - stats.norm.cdf(abs(z)))
        
        return p_value < (1 - self.confidence_level)
    
    def _query_prometheus(self, query):
        """Query Prometheus API"""
        
        response = requests.get(
            f"{self.prometheus_url}/api/v1/query",
            params={'query': query}
        )
        
        data = response.json()['data']['result']
        
        return {
            item['metric']['model_name']: float(item['value'][1])
            for item in data
        }
    
    def promote_winner(self, winner):
        """Promote winning model to 100% traffic"""
        
        if winner == 'model_v2':
            print("πŸš€ Promoting v2 to 100% traffic...")
            
            # Update Istio VirtualService
            # kubectl apply -f k8s/virtualservice-v2-100.yaml
            
            print("βœ… Traffic shifted to v2")
        else:
            print("βœ… Keeping v1 at 100% traffic")

# Run A/B test analysis
if __name__ == '__main__':
    analyzer = ABTestAnalyzer()
    metrics = analyzer.fetch_metrics(days=7)
    winner = analyzer.calculate_winner(metrics)
    
    if winner:
        analyzer.promote_winner(winner)

πŸ’° Step 4: Per-Model Cost Tracking

# cost_tracker.py
"""
Track inference costs per model version
"""
import psycopg2
from datetime import datetime, timedelta
import pandas as pd

class CostTracker:
    def __init__(self, db_config):
        self.conn = psycopg2.connect(**db_config)
        self.cost_per_cpu_hour = 0.05  # $0.05 per CPU hour
        self.cost_per_gb_hour = 0.01   # $0.01 per GB hour
    
    def track_inference(self, model_name, inference_time_ms, 
                       cpu_usage, memory_mb):
        """Track single inference cost"""
        
        cursor = self.conn.cursor()
        
        # Calculate cost
        cpu_hours = cpu_usage * (inference_time_ms / 3600000)
        memory_gb_hours = (memory_mb / 1024) * (inference_time_ms / 3600000)
        
        cost = (
            cpu_hours * self.cost_per_cpu_hour +
            memory_gb_hours * self.cost_per_gb_hour
        )
        
        # Insert record
        cursor.execute('''
            INSERT INTO inference_costs 
            (model_name, timestamp, inference_time_ms, 
             cpu_usage, memory_mb, cost)
            VALUES (%s, %s, %s, %s, %s, %s)
        ''', (
            model_name,
            datetime.utcnow(),
            inference_time_ms,
            cpu_usage,
            memory_mb,
            cost
        ))
        
        self.conn.commit()
        cursor.close()
    
    def get_model_costs(self, days=30):
        """Get cost breakdown by model"""
        
        cursor = self.conn.cursor()
        
        cutoff = datetime.utcnow() - timedelta(days=days)
        
        cursor.execute('''
            SELECT 
                model_name,
                COUNT(*) as total_inferences,
                SUM(cost) as total_cost,
                AVG(cost) as avg_cost_per_inference,
                AVG(inference_time_ms) as avg_latency_ms,
                SUM(cost) / COUNT(*) * 1000000 as cost_per_million
            FROM inference_costs
            WHERE timestamp > %s
            GROUP BY model_name
            ORDER BY total_cost DESC
        ''', (cutoff,))
        
        results = cursor.fetchall()
        cursor.close()
        
        df = pd.DataFrame(results, columns=[
            'model_name', 'total_inferences', 'total_cost',
            'avg_cost_per_inference', 'avg_latency_ms', 
            'cost_per_million'
        ])
        
        return df
    
    def generate_cost_report(self, days=30):
        """Generate cost analysis report"""
        
        costs = self.get_model_costs(days=days)
        
        print(f"\nπŸ’° Cost Report ({days} days)")
        print("=" * 70)
        
        for _, row in costs.iterrows():
            print(f"\n{row['model_name']}:")
            print(f"  Total inferences: {row['total_inferences']:,.0f}")
            print(f"  Total cost: ${row['total_cost']:.4f}")
            print(f"  Avg cost/inference: ${row['avg_cost_per_inference']:.6f}")
            print(f"  Cost per million: ${row['cost_per_million']:.2f}")
            print(f"  Avg latency: {row['avg_latency_ms']:.2f}ms")
        
        total = costs['total_cost'].sum()
        print(f"\n{'='*70}")
        print(f"Total cost (all models): ${total:.2f}")
        
        return costs

# Usage
tracker = CostTracker(db_config={
    'host': 'localhost',
    'database': 'mlops',
    'user': 'postgres',
    'password': 'password'
})

# Generate monthly report
report = tracker.generate_cost_report(days=30)

🐀 Step 5: Canary Deployment with Auto-Rollback

# canary_deployer.py
"""
Automated canary deployment with health monitoring
"""
import time
import requests
from kubernetes import client, config

class CanaryDeployer:
    def __init__(self, k8s_namespace='default'):
        config.load_kube_config()
        self.apps_v1 = client.AppsV1Api()
        self.namespace = k8s_namespace
        self.analyzer = ABTestAnalyzer()
    
    def deploy_canary(self, new_version, canary_weight=10):
        """Deploy new version with canary strategy"""
        
        print(f"🐀 Starting canary deployment: {new_version}")
        
        stages = [
            (10, 5),   # 10% traffic for 5 minutes
            (25, 10),  # 25% traffic for 10 minutes
            (50, 15),  # 50% traffic for 15 minutes
            (100, 0)   # 100% traffic (complete rollout)
        ]
        
        for weight, duration_mins in stages:
            print(f"\nπŸ“Š Stage: {weight}% traffic to {new_version}")
            
            # Update traffic split
            self._update_traffic_split(new_version, weight)
            
            if duration_mins > 0:
                # Monitor health
                is_healthy = self._monitor_health(
                    new_version,
                    duration_minutes=duration_mins
                )
                
                if not is_healthy:
                    print("❌ Health check failed - rolling back")
                    self._rollback(new_version)
                    return False
        
        print(f"\nβœ… Canary deployment complete: {new_version}")
        return True
    
    def _update_traffic_split(self, new_version, weight):
        """Update Istio traffic split"""
        
        # Update VirtualService
        cmd = f'''
        kubectl apply -f - < 0.05:  # 5% error rate
                print(f"  ❌ High error rate: {error_rate:.2%}")
                return False
            
            if latency > 1000:  # 1 second
                print(f"  ❌ High latency: {latency:.0f}ms")
                return False
        
        print("  βœ… Health checks passed")
        return True
    
    def _rollback(self, version):
        """Rollback to previous version"""
        
        print(f"\nπŸ”„ Rolling back {version}...")
        
        # Shift 100% traffic to current version
        self._update_traffic_split(version, 0)
        
        # Scale down canary deployment
        self.apps_v1.patch_namespaced_deployment_scale(
            name=f'model-{version}',
            namespace=self.namespace,
            body={'spec': {'replicas': 0}}
        )
        
        print("βœ… Rollback complete")

# Deploy new version with canary
deployer = CanaryDeployer()
success = deployer.deploy_canary('v3.0.0', canary_weight=10)

🎯 Project Completion Checklist

  • βœ… BentoML multi-model service created
  • βœ… Kubernetes deployments for multiple versions
  • βœ… Istio traffic splitting configured
  • βœ… A/B testing with automated winner selection
  • βœ… Per-model cost tracking implemented
  • βœ… Canary deployment with auto-rollback
  • βœ… Prometheus metrics collection
  • βœ… Statistical significance testing
  • βœ… Complete deployment pipeline tested

πŸŽ‰ Congratulations on Completing the MLOps Engineer Course!

You've built an enterprise-grade multi-model serving platform with advanced deployment strategies, cost optimization, and automated decision-making. You're now ready to deploy ML systems at scale!

Key Achievements

  • Multi-Model Serving: BentoML platform for version management
  • Traffic Management: Istio-based A/B testing and canary deployments
  • Automated Decisions: Statistical winner selection
  • Cost Optimization: Per-model cost tracking and analysis
  • Safe Deployments: Auto-rollback on health failures
  • Production Ready: Complete enterprise deployment pipeline

πŸŽ“ Ready to Get Your Certificate?

You've completed all 17 tutorials of the MLOps Engineer course! Fill out the form below to generate your free certificate.