π Complete all tutorials to earn your Free MLOps Engineer Certificate
Shareable on LinkedIn β’ Verified by AITutorials.site β’ No signup fee
π― Project Overview
Build a production-grade real-time prediction system that handles high traffic with low latency:
Kubernetes Deployment
Scalable container orchestration
Horizontal Autoscaling
Auto-scale based on CPU and custom metrics
Redis Caching
Cache predictions for repeated requests
Load Balancing
Distribute traffic across replicas
Grafana Dashboard
Real-time latency and throughput monitoring
Health Checks
Liveness and readiness probes
Architecture
User Request β Ingress/Load Balancer
β
K8s Service (ClusterIP)
β
ββββββββββββ΄βββββββββββ
β β
API Pod 1 API Pod 2 (HPA managed)
β β
Redis Cache (check)
β
ML Model (predict if cache miss)
β
Prometheus (metrics)
β
Grafana (visualization)
π‘ Prerequisites:
- Kubernetes cluster (Minikube/Docker Desktop/Cloud)
- kubectl installed and configured
- Docker for building images
- Trained model from Project 1
β‘ Step 1: API with Redis Caching
# app.py
"""
FastAPI app with Redis caching for real-time predictions
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi.responses import Response
import redis
import joblib
import numpy as np
import json
import time
import hashlib
app = FastAPI(title="Real-Time Prediction API")
# Load model
model = joblib.load('model.pkl')
scaler = joblib.load('scaler.pkl')
# Redis connection
redis_client = redis.Redis(
host='redis-service',
port=6379,
decode_responses=True
)
# Prometheus metrics
predictions_total = Counter('predictions_total', 'Total predictions', ['cache_status'])
prediction_latency = Histogram('prediction_latency_seconds', 'Prediction latency')
cache_hit_rate = Gauge('cache_hit_rate', 'Cache hit rate')
active_requests = Gauge('active_requests', 'Active requests')
# Cache stats
cache_hits = 0
cache_total = 0
class Features(BaseModel):
MedInc: float
HouseAge: float
AveRooms: float
AveBedrms: float
Population: float
AveOccup: float
Latitude: float
Longitude: float
def get_cache_key(features: Features):
"""Generate cache key from features"""
feature_str = json.dumps(features.dict(), sort_keys=True)
return hashlib.md5(feature_str.encode()).hexdigest()
@app.get("/health")
async def health():
"""Health check endpoint"""
try:
redis_client.ping()
return {"status": "healthy", "redis": "connected"}
except:
return {"status": "degraded", "redis": "disconnected"}
@app.get("/ready")
async def ready():
"""Readiness probe"""
return {"status": "ready"}
@app.post("/predict")
async def predict(features: Features):
"""Real-time prediction with caching"""
global cache_hits, cache_total
start_time = time.time()
active_requests.inc()
cache_total += 1
try:
# Check cache
cache_key = get_cache_key(features)
cached = redis_client.get(cache_key)
if cached:
# Cache hit
cache_hits += 1
predictions_total.labels(cache_status='hit').inc()
cache_hit_rate.set(cache_hits / cache_total)
result = json.loads(cached)
result['cached'] = True
else:
# Cache miss - make prediction
feature_array = np.array([[
features.MedInc, features.HouseAge, features.AveRooms,
features.AveBedrms, features.Population, features.AveOccup,
features.Latitude, features.Longitude
]])
features_scaled = scaler.transform(feature_array)
prediction = float(model.predict(features_scaled)[0])
result = {"prediction": prediction, "cached": False}
# Store in cache (expire after 1 hour)
redis_client.setex(
cache_key,
3600,
json.dumps({"prediction": prediction})
)
predictions_total.labels(cache_status='miss').inc()
cache_hit_rate.set(cache_hits / cache_total)
# Record latency
latency = time.time() - start_time
prediction_latency.observe(latency)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
active_requests.dec()
@app.get("/metrics")
async def metrics():
"""Prometheus metrics"""
return Response(content=generate_latest(), media_type="text/plain")
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Dockerfile
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
RUN pip install fastapi uvicorn redis prometheus-client \
scikit-learn joblib numpy pydantic
# Copy files
COPY app.py model.pkl scaler.pkl ./
# Health check
HEALTHCHECK --interval=30s --timeout=3s \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
βΈοΈ Step 2: Kubernetes Deployment
Redis Deployment
# k8s/redis-deployment.yaml
apiVersion: v1
kind: Service
metadata:
name: redis-service
spec:
selector:
app: redis
ports:
- port: 6379
targetPort: 6379
type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis
spec:
replicas: 1
selector:
matchLabels:
app: redis
template:
metadata:
labels:
app: redis
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
API Deployment with HPA
# k8s/api-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-api
spec:
replicas: 2 # Initial replicas
selector:
matchLabels:
app: ml-api
template:
metadata:
labels:
app: ml-api
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
prometheus.io/path: "/metrics"
spec:
containers:
- name: api
image: ml-api:latest
imagePullPolicy: Never # For local testing
ports:
- containerPort: 8000
env:
- name: REDIS_HOST
value: "redis-service"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: ml-api-service
spec:
selector:
app: ml-api
ports:
- port: 80
targetPort: 8000
type: LoadBalancer # Use ClusterIP for production with Ingress
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ml-api-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ml-api
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
- type: Pods
value: 2
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
π Step 3: Monitoring with Prometheus & Grafana
Prometheus ConfigMap
# k8s/prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
data:
prometheus.yml: |
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'ml-api'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
target_label: __address__
Deploy Monitoring Stack
# Deploy all components
kubectl apply -f k8s/redis-deployment.yaml
kubectl apply -f k8s/api-deployment.yaml
kubectl apply -f k8s/prometheus-config.yaml
# Install Prometheus using Helm
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus-community/prometheus \
--set server.configMapOverrideName=prometheus-config
# Install Grafana
helm repo add grafana https://grafana.github.io/helm-charts
helm install grafana grafana/grafana \
--set adminPassword=admin
# Get Grafana password
kubectl get secret grafana -o jsonpath="{.data.admin-password}" | base64 --decode
# Port forward to access services
kubectl port-forward svc/ml-api-service 8000:80 &
kubectl port-forward svc/prometheus-server 9090:80 &
kubectl port-forward svc/grafana 3000:80 &
π₯ Step 4: Load Testing & Autoscaling
# load_test.py
"""
Load testing script to trigger autoscaling
"""
import requests
import concurrent.futures
import time
import random
API_URL = "http://localhost:8000/predict"
def make_prediction():
"""Make single prediction"""
payload = {
"MedInc": random.uniform(1, 15),
"HouseAge": random.uniform(1, 50),
"AveRooms": random.uniform(3, 10),
"AveBedrms": random.uniform(1, 3),
"Population": random.uniform(100, 3000),
"AveOccup": random.uniform(1, 6),
"Latitude": random.uniform(32, 42),
"Longitude": random.uniform(-125, -114)
}
start = time.time()
try:
response = requests.post(API_URL, json=payload, timeout=5)
latency = time.time() - start
return {'status': response.status_code, 'latency': latency}
except Exception as e:
return {'status': 'error', 'error': str(e)}
def run_load_test(num_requests=1000, workers=50):
"""Run load test"""
print(f"π₯ Starting load test: {num_requests} requests with {workers} workers\n")
start_time = time.time()
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(make_prediction) for _ in range(num_requests)]
for i, future in enumerate(concurrent.futures.as_completed(futures)):
result = future.result()
results.append(result)
if (i + 1) % 100 == 0:
print(f"Progress: {i + 1}/{num_requests}")
duration = time.time() - start_time
# Calculate metrics
latencies = [r['latency'] for r in results if 'latency' in r]
successes = sum(1 for r in results if r.get('status') == 200)
print(f"\nπ Load Test Results:")
print(f" Total requests: {num_requests}")
print(f" Successful: {successes} ({successes/num_requests*100:.1f}%)")
print(f" Duration: {duration:.2f}s")
print(f" Throughput: {num_requests/duration:.2f} req/s")
print(f" Avg latency: {sum(latencies)/len(latencies):.3f}s")
print(f" P95 latency: {sorted(latencies)[int(len(latencies)*0.95)]:.3f}s")
print(f" P99 latency: {sorted(latencies)[int(len(latencies)*0.99)]:.3f}s")
if __name__ == '__main__':
# Run sustained load to trigger autoscaling
for round in range(3):
print(f"\n=== Round {round + 1} ===")
run_load_test(num_requests=500, workers=100)
time.sleep(10)
# Check HPA status
print("\nπ Check autoscaling:")
print("kubectl get hpa ml-api-hpa")
print("kubectl get pods -l app=ml-api")
Watch Autoscaling
# Terminal 1: Run load test
python load_test.py
# Terminal 2: Watch HPA
kubectl get hpa ml-api-hpa --watch
# Terminal 3: Watch pods
kubectl get pods -l app=ml-api --watch
# Check cache hit rate
curl http://localhost:8000/metrics | grep cache_hit_rate
π Step 5: Create Grafana Dashboard
Dashboard JSON
{
"dashboard": {
"title": "ML API Performance",
"panels": [
{
"title": "Requests Per Second",
"targets": [{
"expr": "rate(predictions_total[1m])"
}],
"type": "graph"
},
{
"title": "P95 Latency",
"targets": [{
"expr": "histogram_quantile(0.95, rate(prediction_latency_seconds_bucket[5m]))"
}],
"type": "graph"
},
{
"title": "Cache Hit Rate",
"targets": [{
"expr": "cache_hit_rate"
}],
"type": "gauge"
},
{
"title": "Active Pods",
"targets": [{
"expr": "count(up{job='ml-api'})"
}],
"type": "stat"
},
{
"title": "CPU Usage",
"targets": [{
"expr": "rate(container_cpu_usage_seconds_total{pod=~'ml-api.*'}[5m])"
}],
"type": "graph"
},
{
"title": "Memory Usage",
"targets": [{
"expr": "container_memory_usage_bytes{pod=~'ml-api.*'}"
}],
"type": "graph"
}
]
}
}
π‘ Import Dashboard:
- Access Grafana: http://localhost:3000
- Login: admin / (password from kubectl command)
- Add Prometheus data source: http://prometheus-server
- Import dashboard JSON or create panels manually
- Monitor latency, throughput, cache hit rate, autoscaling
π― Project Completion Checklist
- β FastAPI with Redis caching implemented
- β Kubernetes deployment with multiple replicas
- β Horizontal Pod Autoscaler configured
- β Health checks (liveness & readiness probes)
- β Prometheus metrics exposed
- β Grafana dashboard created
- β Load testing demonstrates autoscaling
- β Cache hit rate monitored
- β P95/P99 latency under 100ms with cache
π Congratulations!
You've built a production-grade real-time prediction system! This demonstrates scalability, caching, monitoring, and autoscaling - critical for high-traffic ML services.
Performance Targets Achieved
- Latency: P95 < 100ms (with cache), P95 < 500ms (without cache)
- Throughput: 1000+ req/s with autoscaling
- Cache Hit Rate: 80%+ for repeated requests
- Availability: 99.9% with health checks and autoscaling
- Scalability: Auto-scale from 2 to 10 pods based on load