Home β†’ MLOps Engineer β†’ Project 2: Real-Time Prediction System

⚑ Project: Real-Time Prediction System

Deploy high-performance ML inference with Kubernetes autoscaling, Redis caching, load balancing, and Grafana monitoring

🎯 Project 2 πŸ“Š Intermediate ⏱️ 2-3 hours

πŸŽ“ Complete all tutorials to earn your Free MLOps Engineer Certificate
Shareable on LinkedIn β€’ Verified by AITutorials.site β€’ No signup fee

🎯 Project Overview

Build a production-grade real-time prediction system that handles high traffic with low latency:

☸️

Kubernetes Deployment

Scalable container orchestration

πŸš€

Horizontal Autoscaling

Auto-scale based on CPU and custom metrics

⚑

Redis Caching

Cache predictions for repeated requests

βš–οΈ

Load Balancing

Distribute traffic across replicas

πŸ“Š

Grafana Dashboard

Real-time latency and throughput monitoring

πŸ”„

Health Checks

Liveness and readiness probes

Architecture

User Request β†’ Ingress/Load Balancer
                ↓
     K8s Service (ClusterIP)
                ↓
     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
     ↓                     ↓
API Pod 1            API Pod 2 (HPA managed)
     ↓                     ↓
   Redis Cache (check)
     ↓
   ML Model (predict if cache miss)
     ↓
   Prometheus (metrics)
     ↓
   Grafana (visualization)

πŸ’‘ Prerequisites:

  • Kubernetes cluster (Minikube/Docker Desktop/Cloud)
  • kubectl installed and configured
  • Docker for building images
  • Trained model from Project 1

⚑ Step 1: API with Redis Caching

# app.py
"""
FastAPI app with Redis caching for real-time predictions
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi.responses import Response
import redis
import joblib
import numpy as np
import json
import time
import hashlib

app = FastAPI(title="Real-Time Prediction API")

# Load model
model = joblib.load('model.pkl')
scaler = joblib.load('scaler.pkl')

# Redis connection
redis_client = redis.Redis(
    host='redis-service',
    port=6379,
    decode_responses=True
)

# Prometheus metrics
predictions_total = Counter('predictions_total', 'Total predictions', ['cache_status'])
prediction_latency = Histogram('prediction_latency_seconds', 'Prediction latency')
cache_hit_rate = Gauge('cache_hit_rate', 'Cache hit rate')
active_requests = Gauge('active_requests', 'Active requests')

# Cache stats
cache_hits = 0
cache_total = 0

class Features(BaseModel):
    MedInc: float
    HouseAge: float
    AveRooms: float
    AveBedrms: float
    Population: float
    AveOccup: float
    Latitude: float
    Longitude: float

def get_cache_key(features: Features):
    """Generate cache key from features"""
    feature_str = json.dumps(features.dict(), sort_keys=True)
    return hashlib.md5(feature_str.encode()).hexdigest()

@app.get("/health")
async def health():
    """Health check endpoint"""
    try:
        redis_client.ping()
        return {"status": "healthy", "redis": "connected"}
    except:
        return {"status": "degraded", "redis": "disconnected"}

@app.get("/ready")
async def ready():
    """Readiness probe"""
    return {"status": "ready"}

@app.post("/predict")
async def predict(features: Features):
    """Real-time prediction with caching"""
    global cache_hits, cache_total
    
    start_time = time.time()
    active_requests.inc()
    cache_total += 1
    
    try:
        # Check cache
        cache_key = get_cache_key(features)
        cached = redis_client.get(cache_key)
        
        if cached:
            # Cache hit
            cache_hits += 1
            predictions_total.labels(cache_status='hit').inc()
            cache_hit_rate.set(cache_hits / cache_total)
            
            result = json.loads(cached)
            result['cached'] = True
        else:
            # Cache miss - make prediction
            feature_array = np.array([[
                features.MedInc, features.HouseAge, features.AveRooms,
                features.AveBedrms, features.Population, features.AveOccup,
                features.Latitude, features.Longitude
            ]])
            
            features_scaled = scaler.transform(feature_array)
            prediction = float(model.predict(features_scaled)[0])
            
            result = {"prediction": prediction, "cached": False}
            
            # Store in cache (expire after 1 hour)
            redis_client.setex(
                cache_key,
                3600,
                json.dumps({"prediction": prediction})
            )
            
            predictions_total.labels(cache_status='miss').inc()
            cache_hit_rate.set(cache_hits / cache_total)
        
        # Record latency
        latency = time.time() - start_time
        prediction_latency.observe(latency)
        
        return result
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        active_requests.dec()

@app.get("/metrics")
async def metrics():
    """Prometheus metrics"""
    return Response(content=generate_latest(), media_type="text/plain")

if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Dockerfile

FROM python:3.9-slim

WORKDIR /app

# Install dependencies
RUN pip install fastapi uvicorn redis prometheus-client \
    scikit-learn joblib numpy pydantic

# Copy files
COPY app.py model.pkl scaler.pkl ./

# Health check
HEALTHCHECK --interval=30s --timeout=3s \
  CMD python -c "import requests; requests.get('http://localhost:8000/health')"

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

☸️ Step 2: Kubernetes Deployment

Redis Deployment

# k8s/redis-deployment.yaml
apiVersion: v1
kind: Service
metadata:
  name: redis-service
spec:
  selector:
    app: redis
  ports:
    - port: 6379
      targetPort: 6379
  type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: redis
spec:
  replicas: 1
  selector:
    matchLabels:
      app: redis
  template:
    metadata:
      labels:
        app: redis
    spec:
      containers:
      - name: redis
        image: redis:7-alpine
        ports:
        - containerPort: 6379
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"

API Deployment with HPA

# k8s/api-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-api
spec:
  replicas: 2  # Initial replicas
  selector:
    matchLabels:
      app: ml-api
  template:
    metadata:
      labels:
        app: ml-api
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "8000"
        prometheus.io/path: "/metrics"
    spec:
      containers:
      - name: api
        image: ml-api:latest
        imagePullPolicy: Never  # For local testing
        ports:
        - containerPort: 8000
        env:
        - name: REDIS_HOST
          value: "redis-service"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: ml-api-service
spec:
  selector:
    app: ml-api
  ports:
    - port: 80
      targetPort: 8000
  type: LoadBalancer  # Use ClusterIP for production with Ingress
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ml-api-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ml-api
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 0
      policies:
      - type: Percent
        value: 100
        periodSeconds: 15
      - type: Pods
        value: 2
        periodSeconds: 15
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60

πŸ“Š Step 3: Monitoring with Prometheus & Grafana

Prometheus ConfigMap

# k8s/prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-config
data:
  prometheus.yml: |
    global:
      scrape_interval: 15s
    
    scrape_configs:
      - job_name: 'ml-api'
        kubernetes_sd_configs:
          - role: pod
        relabel_configs:
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
            action: keep
            regex: true
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
            action: replace
            target_label: __metrics_path__
            regex: (.+)
          - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
            action: replace
            regex: ([^:]+)(?::\d+)?;(\d+)
            replacement: $1:$2
            target_label: __address__

Deploy Monitoring Stack

# Deploy all components
kubectl apply -f k8s/redis-deployment.yaml
kubectl apply -f k8s/api-deployment.yaml
kubectl apply -f k8s/prometheus-config.yaml

# Install Prometheus using Helm
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus-community/prometheus \
  --set server.configMapOverrideName=prometheus-config

# Install Grafana
helm repo add grafana https://grafana.github.io/helm-charts
helm install grafana grafana/grafana \
  --set adminPassword=admin

# Get Grafana password
kubectl get secret grafana -o jsonpath="{.data.admin-password}" | base64 --decode

# Port forward to access services
kubectl port-forward svc/ml-api-service 8000:80 &
kubectl port-forward svc/prometheus-server 9090:80 &
kubectl port-forward svc/grafana 3000:80 &

πŸ”₯ Step 4: Load Testing & Autoscaling

# load_test.py
"""
Load testing script to trigger autoscaling
"""
import requests
import concurrent.futures
import time
import random

API_URL = "http://localhost:8000/predict"

def make_prediction():
    """Make single prediction"""
    payload = {
        "MedInc": random.uniform(1, 15),
        "HouseAge": random.uniform(1, 50),
        "AveRooms": random.uniform(3, 10),
        "AveBedrms": random.uniform(1, 3),
        "Population": random.uniform(100, 3000),
        "AveOccup": random.uniform(1, 6),
        "Latitude": random.uniform(32, 42),
        "Longitude": random.uniform(-125, -114)
    }
    
    start = time.time()
    try:
        response = requests.post(API_URL, json=payload, timeout=5)
        latency = time.time() - start
        return {'status': response.status_code, 'latency': latency}
    except Exception as e:
        return {'status': 'error', 'error': str(e)}

def run_load_test(num_requests=1000, workers=50):
    """Run load test"""
    print(f"πŸ”₯ Starting load test: {num_requests} requests with {workers} workers\n")
    
    start_time = time.time()
    results = []
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
        futures = [executor.submit(make_prediction) for _ in range(num_requests)]
        
        for i, future in enumerate(concurrent.futures.as_completed(futures)):
            result = future.result()
            results.append(result)
            
            if (i + 1) % 100 == 0:
                print(f"Progress: {i + 1}/{num_requests}")
    
    duration = time.time() - start_time
    
    # Calculate metrics
    latencies = [r['latency'] for r in results if 'latency' in r]
    successes = sum(1 for r in results if r.get('status') == 200)
    
    print(f"\nπŸ“Š Load Test Results:")
    print(f"  Total requests: {num_requests}")
    print(f"  Successful: {successes} ({successes/num_requests*100:.1f}%)")
    print(f"  Duration: {duration:.2f}s")
    print(f"  Throughput: {num_requests/duration:.2f} req/s")
    print(f"  Avg latency: {sum(latencies)/len(latencies):.3f}s")
    print(f"  P95 latency: {sorted(latencies)[int(len(latencies)*0.95)]:.3f}s")
    print(f"  P99 latency: {sorted(latencies)[int(len(latencies)*0.99)]:.3f}s")

if __name__ == '__main__':
    # Run sustained load to trigger autoscaling
    for round in range(3):
        print(f"\n=== Round {round + 1} ===")
        run_load_test(num_requests=500, workers=100)
        time.sleep(10)
    
    # Check HPA status
    print("\nπŸ“ˆ Check autoscaling:")
    print("kubectl get hpa ml-api-hpa")
    print("kubectl get pods -l app=ml-api")

Watch Autoscaling

# Terminal 1: Run load test
python load_test.py

# Terminal 2: Watch HPA
kubectl get hpa ml-api-hpa --watch

# Terminal 3: Watch pods
kubectl get pods -l app=ml-api --watch

# Check cache hit rate
curl http://localhost:8000/metrics | grep cache_hit_rate

πŸ“ˆ Step 5: Create Grafana Dashboard

Dashboard JSON

{
  "dashboard": {
    "title": "ML API Performance",
    "panels": [
      {
        "title": "Requests Per Second",
        "targets": [{
          "expr": "rate(predictions_total[1m])"
        }],
        "type": "graph"
      },
      {
        "title": "P95 Latency",
        "targets": [{
          "expr": "histogram_quantile(0.95, rate(prediction_latency_seconds_bucket[5m]))"
        }],
        "type": "graph"
      },
      {
        "title": "Cache Hit Rate",
        "targets": [{
          "expr": "cache_hit_rate"
        }],
        "type": "gauge"
      },
      {
        "title": "Active Pods",
        "targets": [{
          "expr": "count(up{job='ml-api'})"
        }],
        "type": "stat"
      },
      {
        "title": "CPU Usage",
        "targets": [{
          "expr": "rate(container_cpu_usage_seconds_total{pod=~'ml-api.*'}[5m])"
        }],
        "type": "graph"
      },
      {
        "title": "Memory Usage",
        "targets": [{
          "expr": "container_memory_usage_bytes{pod=~'ml-api.*'}"
        }],
        "type": "graph"
      }
    ]
  }
}

πŸ’‘ Import Dashboard:

  • Access Grafana: http://localhost:3000
  • Login: admin / (password from kubectl command)
  • Add Prometheus data source: http://prometheus-server
  • Import dashboard JSON or create panels manually
  • Monitor latency, throughput, cache hit rate, autoscaling

🎯 Project Completion Checklist

  • βœ… FastAPI with Redis caching implemented
  • βœ… Kubernetes deployment with multiple replicas
  • βœ… Horizontal Pod Autoscaler configured
  • βœ… Health checks (liveness & readiness probes)
  • βœ… Prometheus metrics exposed
  • βœ… Grafana dashboard created
  • βœ… Load testing demonstrates autoscaling
  • βœ… Cache hit rate monitored
  • βœ… P95/P99 latency under 100ms with cache

πŸŽ‰ Congratulations!

You've built a production-grade real-time prediction system! This demonstrates scalability, caching, monitoring, and autoscaling - critical for high-traffic ML services.

Performance Targets Achieved

  • Latency: P95 < 100ms (with cache), P95 < 500ms (without cache)
  • Throughput: 1000+ req/s with autoscaling
  • Cache Hit Rate: 80%+ for repeated requests
  • Availability: 99.9% with health checks and autoscaling
  • Scalability: Auto-scale from 2 to 10 pods based on load