π― Project Overview
You've fine-tuned an LLM. Now comes the hard part: deploying it at scale. This project teaches you production deployment with efficient inference, load balancing, monitoring, and cost optimization.
What You'll Build
Optimized Inference
Use vLLM for 10-20x faster inference. Add quantization for 4x memory reduction.
Containerization
Docker containers with GPU support. Build, test, and push to registry.
Kubernetes Deploy
Auto-scaling deployments. Load balancing across multiple GPUs.
Monitoring
Track latency, throughput, costs. Prometheus metrics and Grafana dashboards.
π Prerequisites
- Fine-tuned LLM model (from Project 1 or Tutorial 5)
- GPU server or cloud account (AWS/GCP/Azure)
- Docker installed and basic knowledge
- Optional: Kubernetes cluster access
β±οΈ Time Breakdown
- Model Preparation: 30 minutes (quantization, testing)
- vLLM Setup: 30 minutes (install, configure, benchmark)
- Docker: 30 minutes (containerize, test locally)
- Kubernetes: 45 minutes (deploy, scaling, load balancing)
- Monitoring: 30 minutes (Prometheus, Grafana)
β οΈ Cost Warning: This project requires GPU resources. Use spot instances or credits to minimize costs (~$2-5/hour for A100).
π§ Step 1: Model Preparation & Quantization
Option A: Use Pre-trained Model
# Download a popular fine-tuned model from HuggingFace
from transformers import AutoModelForCausalLM, AutoTokenizer
model_name = "meta-llama/Llama-2-7b-chat-hf" # Or your fine-tuned model
# Download model
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype="auto",
device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
# Save locally
model.save_pretrained("./models/llama-2-7b-chat")
tokenizer.save_pretrained("./models/llama-2-7b-chat")
print("Model downloaded and saved!")
Option B: Use Your Fine-tuned Model
# If you completed Project 1 or Tutorial 5
# Your model is already saved locally
model_path = "./models/bert-sentiment-final" # From Project 1
# OR
model_path = "./models/llama-2-7b-lora" # From Tutorial 5
# Verify model exists
import os
if os.path.exists(model_path):
print(f"β
Model found at {model_path}")
else:
print(f"β Model not found. Please fine-tune first.")
Quantize Model (4-bit for Production)
# Quantize to 4-bit using BitsAndBytes
from transformers import AutoModelForCausalLM, BitsAndBytesConfig
import torch
# Configure 4-bit quantization
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True,
)
# Load and quantize
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-2-7b-chat-hf",
quantization_config=bnb_config,
device_map="auto"
)
# Save quantized model
model.save_pretrained("./models/llama-2-7b-4bit")
print("Model quantized to 4-bit!")
print(f"Original size: ~28GB β Quantized: ~7GB")
π‘ Quantization Benefits
- 4x memory reduction: 28GB β 7GB for Llama-2-7B
- 2-3x faster inference: Smaller model fits better in GPU cache
- Minimal quality loss: <2% accuracy drop typically
- Cost savings: Use smaller/cheaper GPUs
Test Quantized Model
# Quick test to verify model works after quantization
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
# Load quantized model
model = AutoModelForCausalLM.from_pretrained(
"./models/llama-2-7b-4bit",
device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-chat-hf")
# Test inference
prompt = "What is machine learning?"
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=100,
temperature=0.7,
do_sample=True
)
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(f"Prompt: {prompt}")
print(f"Response: {response}")
# Check model size
import os
model_size = sum(
os.path.getsize(os.path.join("./models/llama-2-7b-4bit", f))
for f in os.listdir("./models/llama-2-7b-4bit")
if os.path.isfile(os.path.join("./models/llama-2-7b-4bit", f))
)
print(f"\nModel size: {model_size / (1024**3):.2f} GB")
β Expected Output: Coherent response in ~2-3 seconds. Model size should be ~7GB for Llama-2-7B.
β‘ Step 2: vLLM Setup for Fast Inference
vLLM is a high-throughput inference engine that achieves 10-20x faster serving than standard transformers. It uses PagedAttention for efficient memory management.
Install vLLM
# Install vLLM (requires CUDA 11.8+)
pip install vllm
# Verify installation
python -c "import vllm; print(f'vLLM version: {vllm.__version__}')"
# Check GPU
nvidia-smi
Basic vLLM Server
# serve_model.py - Simple vLLM inference server
from vllm import LLM, SamplingParams
class ModelServer:
def __init__(self, model_path: str):
"""Initialize vLLM engine"""
self.llm = LLM(
model=model_path,
tensor_parallel_size=1, # Number of GPUs
dtype="float16", # Use fp16 for speed
max_model_len=2048, # Context length
gpu_memory_utilization=0.9 # Use 90% of GPU memory
)
def generate(self, prompts: list[str], max_tokens: int = 256):
"""Generate completions for prompts"""
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=max_tokens,
stop=["", "Human:", "\n\n"]
)
outputs = self.llm.generate(prompts, sampling_params)
return [output.outputs[0].text for output in outputs]
# Usage
if __name__ == "__main__":
server = ModelServer("./models/llama-2-7b-4bit")
prompts = [
"What is deep learning?",
"Explain neural networks in simple terms.",
"How does backpropagation work?"
]
responses = server.generate(prompts)
for prompt, response in zip(prompts, responses):
print(f"Q: {prompt}")
print(f"A: {response}\n")
vLLM with OpenAI-Compatible API
# Start vLLM with OpenAI-compatible server
# This allows drop-in replacement for OpenAI API
# Command line:
# python -m vllm.entrypoints.openai.api_server \
# --model ./models/llama-2-7b-4bit \
# --port 8000 \
# --tensor-parallel-size 1
# OR use Python:
from vllm.entrypoints.openai.api_server import run_server
run_server(
model="./models/llama-2-7b-4bit",
host="0.0.0.0",
port=8000,
tensor_parallel_size=1
)
Test API Server
# test_vllm_api.py
import requests
import json
url = "http://localhost:8000/v1/completions"
data = {
"model": "./models/llama-2-7b-4bit",
"prompt": "Explain quantum computing in 50 words:",
"max_tokens": 100,
"temperature": 0.7
}
response = requests.post(url, json=data)
result = response.json()
print(json.dumps(result, indent=2))
print(f"\nGenerated text: {result['choices'][0]['text']}")
π vLLM Performance Benefits
- PagedAttention: Efficient KV cache management (2-4x throughput)
- Continuous batching: Process requests as they arrive
- Tensor parallelism: Distribute across multiple GPUs
- Optimized kernels: CUDA kernels for common operations
- OpenAI compatible: Drop-in replacement for existing code
Benchmark vLLM vs Standard Transformers
# benchmark.py - Compare vLLM vs Transformers
import time
from vllm import LLM, SamplingParams
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
def benchmark_vllm(model_path, prompts):
"""Benchmark vLLM"""
llm = LLM(model=model_path, dtype="float16")
sampling_params = SamplingParams(temperature=0.7, max_tokens=100)
start = time.time()
outputs = llm.generate(prompts, sampling_params)
elapsed = time.time() - start
return elapsed, len(prompts)
def benchmark_transformers(model_path, prompts):
"""Benchmark standard Transformers"""
model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.float16,
device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_path)
start = time.time()
for prompt in prompts:
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
model.generate(**inputs, max_new_tokens=100)
elapsed = time.time() - start
return elapsed, len(prompts)
# Run benchmarks
model_path = "./models/llama-2-7b-4bit"
test_prompts = [
"What is machine learning?",
"Explain neural networks.",
"How does gradient descent work?",
"What is a transformer model?",
"Describe attention mechanism."
] * 4 # 20 prompts total
print("π₯ Benchmarking vLLM...")
vllm_time, vllm_count = benchmark_vllm(model_path, test_prompts)
print(f"vLLM: {vllm_time:.2f}s for {vllm_count} prompts")
print(f"Throughput: {vllm_count/vllm_time:.2f} prompts/sec")
print("\nπ’ Benchmarking Transformers...")
hf_time, hf_count = benchmark_transformers(model_path, test_prompts)
print(f"Transformers: {hf_time:.2f}s for {hf_count} prompts")
print(f"Throughput: {hf_count/hf_time:.2f} prompts/sec")
print(f"\nπ Speedup: {hf_time/vllm_time:.1f}x faster with vLLM!")
π Expected Benchmark Results (Llama-2-7B on A100)
π₯ Benchmarking vLLM...
vLLM: 8.3s for 20 prompts
Throughput: 2.41 prompts/sec
π’ Benchmarking Transformers...
Transformers: 127.5s for 20 prompts
Throughput: 0.16 prompts/sec
π Speedup: 15.4x faster with vLLM!
π³ Step 3: Docker Containerization
Package your model and vLLM server in a Docker container for consistent deployment across environments.
Create Dockerfile
# Dockerfile
FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04
# Install Python and dependencies
RUN apt-get update && apt-get install -y \
python3.10 \
python3-pip \
git \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Install Python packages
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt
# Copy model and server code
COPY models/ /app/models/
COPY serve_model.py /app/
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run server
CMD ["python3", "-m", "vllm.entrypoints.openai.api_server", \
"--model", "/app/models/llama-2-7b-4bit", \
"--host", "0.0.0.0", \
"--port", "8000"]
Requirements File
# requirements.txt
vllm==0.2.7
transformers==4.36.0
torch==2.1.0
fastapi==0.104.1
uvicorn==0.24.0
pydantic==2.5.0
prometheus-client==0.19.0
Build and Test Docker Image
# Build Docker image
docker build -t llm-server:v1 .
# Check image size
docker images llm-server:v1
# Run container locally
docker run -d \
--name llm-server \
--gpus all \
-p 8000:8000 \
-v $(pwd)/models:/app/models \
llm-server:v1
# Check logs
docker logs -f llm-server
# Test API
curl http://localhost:8000/v1/models
# Test generation
curl http://localhost:8000/v1/completions \
-H "Content-Type: application/json" \
-d '{
"model": "/app/models/llama-2-7b-4bit",
"prompt": "What is Docker?",
"max_tokens": 50
}'
# Stop container
docker stop llm-server
docker rm llm-server
Add Health Check Endpoint
# serve_model.py - Add health check
from fastapi import FastAPI
from vllm.entrypoints.openai.api_server import app as vllm_app
import psutil
import torch
@vllm_app.get("/health")
async def health_check():
"""Health check endpoint for load balancer"""
gpu_available = torch.cuda.is_available()
gpu_memory = torch.cuda.memory_allocated() / torch.cuda.max_memory_allocated()
cpu_percent = psutil.cpu_percent()
memory_percent = psutil.virtual_memory().percent
return {
"status": "healthy" if gpu_available else "degraded",
"gpu_available": gpu_available,
"gpu_memory_used": f"{gpu_memory*100:.1f}%",
"cpu_usage": f"{cpu_percent:.1f}%",
"memory_usage": f"{memory_percent:.1f}%"
}
@vllm_app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
# Return metrics in Prometheus format
return {
"requests_total": 12345,
"latency_avg_ms": 245.3,
"throughput_tokens_per_sec": 156.7
}
β οΈ Docker Image Size: Final image will be 10-15GB (base image + model). Use Docker layer caching and multi-stage builds to optimize.
Push to Container Registry
# Tag for Docker Hub
docker tag llm-server:v1 yourusername/llm-server:v1
# Login to Docker Hub
docker login
# Push image
docker push yourusername/llm-server:v1
# Or use AWS ECR
aws ecr get-login-password --region us-east-1 | \
docker login --username AWS --password-stdin \
123456789.dkr.ecr.us-east-1.amazonaws.com
docker tag llm-server:v1 \
123456789.dkr.ecr.us-east-1.amazonaws.com/llm-server:v1
docker push 123456789.dkr.ecr.us-east-1.amazonaws.com/llm-server:v1
βΈοΈ Step 4: Kubernetes Deployment
Deploy your containerized LLM with Kubernetes for auto-scaling, load balancing, and high availability.
Prerequisites
- Kubernetes cluster with GPU nodes (AWS EKS, GCP GKE, or Azure AKS)
- kubectl installed and configured
- NVIDIA GPU Operator installed on cluster
Kubernetes Deployment Configuration
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-server
namespace: ml-serving
labels:
app: llm-server
version: v1
spec:
replicas: 2 # Start with 2 replicas
selector:
matchLabels:
app: llm-server
template:
metadata:
labels:
app: llm-server
version: v1
spec:
containers:
- name: llm-server
image: yourusername/llm-server:v1
ports:
- containerPort: 8000
name: http
resources:
requests:
memory: "16Gi"
cpu: "4"
nvidia.com/gpu: 1 # Request 1 GPU
limits:
memory: "32Gi"
cpu: "8"
nvidia.com/gpu: 1
env:
- name: CUDA_VISIBLE_DEVICES
value: "0"
- name: MODEL_PATH
value: "/app/models/llama-2-7b-4bit"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 60
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
nodeSelector:
accelerator: nvidia-tesla-a100 # Target GPU nodes
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
Service Configuration
# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-server-service
namespace: ml-serving
labels:
app: llm-server
spec:
type: LoadBalancer # Or ClusterIP for internal
selector:
app: llm-server
ports:
- protocol: TCP
port: 80
targetPort: 8000
name: http
sessionAffinity: ClientIP # Sticky sessions
sessionAffinityConfig:
clientIP:
timeoutSeconds: 3600
Horizontal Pod Autoscaler (HPA)
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: llm-server-hpa
namespace: ml-serving
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: llm-server
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: requests_per_second
target:
type: AverageValue
averageValue: "100"
behavior:
scaleDown:
stabilizationWindowSeconds: 300 # 5 min cooldown
policies:
- type: Percent
value: 50
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 30
ConfigMap for Model Configuration
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: llm-config
namespace: ml-serving
data:
max_model_len: "2048"
gpu_memory_utilization: "0.9"
tensor_parallel_size: "1"
temperature: "0.7"
max_tokens: "256"
Deploy to Kubernetes
# Create namespace
kubectl create namespace ml-serving
# Apply configurations
kubectl apply -f k8s/configmap.yaml
kubectl apply -f k8s/deployment.yaml
kubectl apply -f k8s/service.yaml
kubectl apply -f k8s/hpa.yaml
# Check deployment status
kubectl get deployments -n ml-serving
kubectl get pods -n ml-serving -w
# Check service
kubectl get svc -n ml-serving
# Get external IP (for LoadBalancer)
export SERVICE_IP=$(kubectl get svc llm-server-service -n ml-serving \
-o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo "Service available at: http://$SERVICE_IP"
# Test deployment
curl http://$SERVICE_IP/health
# View logs
kubectl logs -f deployment/llm-server -n ml-serving
π Auto-scaling Behavior
- Scale up: When CPU >70% or memory >80% for 60 seconds
- Scale down: After 5-minute cooldown, max 50% reduction
- Min replicas: 2 (high availability)
- Max replicas: 10 (cost control)
Ingress for HTTPS & Path Routing
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: llm-server-ingress
namespace: ml-serving
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
nginx.ingress.kubernetes.io/proxy-body-size: "50m"
nginx.ingress.kubernetes.io/proxy-read-timeout: "300"
spec:
tls:
- hosts:
- llm-api.yourdomain.com
secretName: llm-tls-secret
rules:
- host: llm-api.yourdomain.com
http:
paths:
- path: /v1
pathType: Prefix
backend:
service:
name: llm-server-service
port:
number: 80
Load Testing with Locust
# load_test.py - Test Kubernetes deployment at scale
from locust import HttpUser, task, between
class LLMUser(HttpUser):
wait_time = between(1, 3) # Wait 1-3 seconds between requests
@task(3) # Weight 3x
def completions(self):
"""Test completions endpoint"""
self.client.post("/v1/completions", json={
"model": "/app/models/llama-2-7b-4bit",
"prompt": "Explain machine learning:",
"max_tokens": 100
})
@task(1) # Weight 1x
def health_check(self):
"""Test health endpoint"""
self.client.get("/health")
# Run with:
# locust -f load_test.py --host http://llm-api.yourdomain.com
# Open http://localhost:8089 and start test with 100 users
π Expected Load Test Results
# With 2 replicas (2 A100 GPUs)
Users: 100 concurrent
Requests/sec: 8.5
Response time (avg): 450ms
Response time (95%): 780ms
Failure rate: 0%
# After auto-scaling to 4 replicas
Users: 100 concurrent
Requests/sec: 16.2
Response time (avg): 240ms
Response time (95%): 420ms
Failure rate: 0%
π Step 5: Monitoring & Observability
Set up comprehensive monitoring to track performance, costs, and issues in production.
Prometheus Metrics
# metrics.py - Add Prometheus metrics to your server
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response
import time
# Define metrics
REQUEST_COUNT = Counter(
'llm_requests_total',
'Total number of requests',
['endpoint', 'status']
)
REQUEST_LATENCY = Histogram(
'llm_request_duration_seconds',
'Request latency in seconds',
['endpoint'],
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
TOKENS_GENERATED = Counter(
'llm_tokens_generated_total',
'Total tokens generated'
)
GPU_MEMORY = Gauge(
'llm_gpu_memory_used_bytes',
'GPU memory used in bytes',
['gpu_id']
)
ACTIVE_REQUESTS = Gauge(
'llm_active_requests',
'Number of requests currently being processed'
)
# Middleware to track metrics
@app.middleware("http")
async def track_metrics(request, call_next):
ACTIVE_REQUESTS.inc()
start_time = time.time()
try:
response = await call_next(request)
status = response.status_code
except Exception as e:
status = 500
raise
finally:
duration = time.time() - start_time
REQUEST_LATENCY.labels(endpoint=request.url.path).observe(duration)
REQUEST_COUNT.labels(
endpoint=request.url.path,
status=status
).inc()
ACTIVE_REQUESTS.dec()
return response
@app.get("/metrics")
async def metrics():
"""Expose metrics for Prometheus"""
return Response(
content=generate_latest(),
media_type="text/plain"
)
Prometheus Configuration
# k8s/prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
namespace: monitoring
data:
prometheus.yml: |
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'llm-server'
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- ml-serving
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
action: keep
regex: llm-server
- source_labels: [__meta_kubernetes_pod_ip]
target_label: __address__
replacement: $1:8000
Grafana Dashboard
{
"dashboard": {
"title": "LLM Serving Metrics",
"panels": [
{
"title": "Requests per Second",
"targets": [{
"expr": "rate(llm_requests_total[5m])"
}]
},
{
"title": "Average Latency",
"targets": [{
"expr": "rate(llm_request_duration_seconds_sum[5m]) / rate(llm_request_duration_seconds_count[5m])"
}]
},
{
"title": "P95 Latency",
"targets": [{
"expr": "histogram_quantile(0.95, rate(llm_request_duration_seconds_bucket[5m]))"
}]
},
{
"title": "Tokens Generated",
"targets": [{
"expr": "rate(llm_tokens_generated_total[5m])"
}]
},
{
"title": "GPU Memory Usage",
"targets": [{
"expr": "llm_gpu_memory_used_bytes / (1024^3)"
}]
},
{
"title": "Active Requests",
"targets": [{
"expr": "llm_active_requests"
}]
},
{
"title": "Error Rate",
"targets": [{
"expr": "rate(llm_requests_total{status=~\"5..\"}[5m]) / rate(llm_requests_total[5m])"
}]
},
{
"title": "Pod Replicas",
"targets": [{
"expr": "kube_deployment_status_replicas{deployment=\"llm-server\"}"
}]
}
]
}
}
Logging Configuration
# logging_config.py
import logging
import json
from datetime import datetime
class StructuredLogger:
"""Structured logging for better observability"""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# Console handler with JSON formatting
handler = logging.StreamHandler()
handler.setFormatter(self.JSONFormatter())
self.logger.addHandler(handler)
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# Add extra fields
if hasattr(record, 'request_id'):
log_data['request_id'] = record.request_id
if hasattr(record, 'user_id'):
log_data['user_id'] = record.user_id
if hasattr(record, 'duration_ms'):
log_data['duration_ms'] = record.duration_ms
return json.dumps(log_data)
def info(self, message, **kwargs):
extra = {k: v for k, v in kwargs.items()}
self.logger.info(message, extra=extra)
def error(self, message, **kwargs):
extra = {k: v for k, v in kwargs.items()}
self.logger.error(message, extra=extra)
# Usage
logger = StructuredLogger(__name__)
@app.middleware("http")
async def log_requests(request, call_next):
request_id = str(uuid.uuid4())
start_time = time.time()
logger.info(
"Request started",
request_id=request_id,
method=request.method,
path=request.url.path
)
response = await call_next(request)
duration_ms = (time.time() - start_time) * 1000
logger.info(
"Request completed",
request_id=request_id,
status_code=response.status_code,
duration_ms=duration_ms
)
return response
Alert Rules
# k8s/alerts.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-alerts
namespace: monitoring
data:
alerts.yml: |
groups:
- name: llm_serving
interval: 30s
rules:
# High error rate
- alert: HighErrorRate
expr: rate(llm_requests_total{status=~"5.."}[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }}"
# High latency
- alert: HighLatency
expr: histogram_quantile(0.95, rate(llm_request_duration_seconds_bucket[5m])) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "P95 latency is {{ $value }}s"
# GPU memory high
- alert: GPUMemoryHigh
expr: llm_gpu_memory_used_bytes / nvidia_gpu_memory_total_bytes > 0.95
for: 5m
labels:
severity: warning
annotations:
summary: "GPU memory usage high"
description: "GPU memory at {{ $value | humanizePercentage }}"
# Pod down
- alert: LLMServerDown
expr: up{job="llm-server"} == 0
for: 2m
labels:
severity: critical
annotations:
summary: "LLM server pod is down"
description: "Pod {{ $labels.pod }} is not responding"
Key Metrics
Track RPS, latency (P50/P95/P99), error rate, throughput
Alerts
Slack/PagerDuty alerts for errors, high latency, pod failures
Dashboards
Real-time Grafana dashboards for operations team
Logging
Structured JSON logs aggregated in ELK/DataDog
π° Step 6: Cost Optimization
LLM inference can be expensive. Here's how to optimize costs without sacrificing performance.
Cost Breakdown (Llama-2-7B on AWS)
π΅ Monthly Cost Estimates
Option 1: On-Demand A100 (80GB)
- Instance: p4d.24xlarge (8x A100)
- Cost: $32.77/hour = $23,594/month
- Requests/sec: 80 (10 per GPU)
- Cost per 1M requests: ~$80
Option 2: Spot A100 (80GB)
- Instance: p4d.24xlarge spot
- Cost: ~$10/hour = $7,200/month (70% savings)
- Risk: Can be interrupted
- Best for: Non-critical workloads
Option 3: T4 GPU (16GB)
- Instance: g4dn.xlarge
- Cost: $0.526/hour = $379/month
- Requests/sec: 2-3
- Cost per 1M requests: ~$50
- Best for: Low-traffic applications
Option 4: Serverless (AWS Lambda + GPU)
- Pay per request
- Cost: $0.0001 per request
- Cold start: 3-5 seconds
- Best for: Sporadic traffic
Strategy 1: Spot Instances with Fallback
# k8s/spot-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-server-spot
spec:
replicas: 3
template:
spec:
nodeSelector:
node.kubernetes.io/instance-type: p3.2xlarge
karpenter.sh/capacity-type: spot # Use spot instances
tolerations:
- key: spot
operator: Equal
value: "true"
effect: NoSchedule
# Add node affinity for fallback to on-demand
affinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
preference:
matchExpressions:
- key: karpenter.sh/capacity-type
operator: In
values:
- spot
- weight: 50
preference:
matchExpressions:
- key: karpenter.sh/capacity-type
operator: In
values:
- on-demand
Strategy 2: Dynamic Batching
# dynamic_batching.py - Process multiple requests together
import asyncio
from collections import deque
from typing import List, Tuple
import time
class DynamicBatcher:
"""Batch requests dynamically for higher throughput"""
def __init__(
self,
max_batch_size: int = 32,
max_wait_time: float = 0.05 # 50ms
):
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.queue = deque()
self.processing = False
async def add_request(self, prompt: str) -> str:
"""Add request to batch queue"""
future = asyncio.Future()
self.queue.append((prompt, future))
# Start batch processor if not running
if not self.processing:
asyncio.create_task(self._process_batch())
return await future
async def _process_batch(self):
"""Process batch when ready"""
self.processing = True
start_time = time.time()
# Wait for batch to fill or timeout
while len(self.queue) < self.max_batch_size:
if time.time() - start_time > self.max_wait_time:
break
await asyncio.sleep(0.001)
# Get batch
batch_size = min(len(self.queue), self.max_batch_size)
batch = [self.queue.popleft() for _ in range(batch_size)]
# Process batch
prompts = [item[0] for item in batch]
futures = [item[1] for item in batch]
# Generate all at once (much faster than sequential)
responses = await self._generate_batch(prompts)
# Return results
for future, response in zip(futures, responses):
future.set_result(response)
self.processing = False
# Process remaining queue
if self.queue:
asyncio.create_task(self._process_batch())
async def _generate_batch(self, prompts: List[str]) -> List[str]:
"""Generate responses for batch"""
# Use vLLM batch generation
outputs = llm.generate(prompts, sampling_params)
return [out.outputs[0].text for out in outputs]
# Usage
batcher = DynamicBatcher(max_batch_size=32, max_wait_time=0.05)
@app.post("/v1/completions")
async def completions(request: CompletionRequest):
response = await batcher.add_request(request.prompt)
return {"text": response}
π‘ Dynamic Batching Benefits
- 3-5x throughput increase: Process 32 requests vs 1
- 50% cost reduction: Same hardware, more requests
- Low latency: Max 50ms wait time
- Automatic: No client changes needed
Strategy 3: Request Caching
# caching.py - Cache common requests
import hashlib
import redis
import json
from typing import Optional
class ResponseCache:
"""Cache LLM responses to reduce inference costs"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.ttl = 3600 # 1 hour cache
def _make_key(self, prompt: str, params: dict) -> str:
"""Create cache key from prompt + params"""
content = f"{prompt}:{json.dumps(params, sort_keys=True)}"
return hashlib.sha256(content.encode()).hexdigest()
def get(self, prompt: str, params: dict) -> Optional[str]:
"""Get cached response"""
key = self._make_key(prompt, params)
cached = self.redis.get(key)
if cached:
return json.loads(cached)
return None
def set(self, prompt: str, params: dict, response: str):
"""Cache response"""
key = self._make_key(prompt, params)
self.redis.setex(
key,
self.ttl,
json.dumps(response)
)
def invalidate(self, pattern: str = "*"):
"""Clear cache by pattern"""
for key in self.redis.scan_iter(pattern):
self.redis.delete(key)
# Usage
cache = ResponseCache()
@app.post("/v1/completions")
async def completions(request: CompletionRequest):
# Check cache
cached = cache.get(request.prompt, request.params)
if cached:
return {"text": cached, "cached": True}
# Generate response
response = await generate(request.prompt, request.params)
# Cache result
cache.set(request.prompt, request.params, response)
return {"text": response, "cached": False}
Strategy 4: Model Pruning
# pruning.py - Remove unnecessary weights
from transformers import AutoModelForCausalLM
import torch
def prune_model(model, pruning_ratio: float = 0.3):
"""
Prune model weights for faster inference
Args:
model: HuggingFace model
pruning_ratio: Fraction of weights to remove (0.3 = 30%)
"""
import torch.nn.utils.prune as prune
for name, module in model.named_modules():
if isinstance(module, torch.nn.Linear):
# Prune 30% of weights with lowest magnitude
prune.l1_unstructured(
module,
name='weight',
amount=pruning_ratio
)
# Make pruning permanent
prune.remove(module, 'weight')
return model
# Load and prune model
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b-chat-hf")
pruned_model = prune_model(model, pruning_ratio=0.3)
# Save pruned model
pruned_model.save_pretrained("./models/llama-2-7b-pruned30")
print("Model pruned! Size reduced by ~30%")
print("Speed increase: ~20-30%")
print("Quality loss: <3% typically")
Strategy 5: Multi-Tenancy
# multi_tenancy.py - Serve multiple models on one GPU
from vllm import LLM
import asyncio
class MultiTenantServer:
"""Serve multiple models using time-slicing"""
def __init__(self):
self.models = {}
self.current_model = None
def load_model(self, model_id: str, model_path: str):
"""Load model into memory"""
if model_id not in self.models:
self.models[model_id] = {
"path": model_path,
"llm": None, # Lazy load
"last_used": 0
}
async def generate(
self,
model_id: str,
prompt: str,
max_tokens: int = 256
) -> str:
"""Generate with specific model"""
# Switch model if needed
if self.current_model != model_id:
await self._switch_model(model_id)
# Generate
llm = self.models[model_id]["llm"]
outputs = llm.generate([prompt], sampling_params)
return outputs[0].outputs[0].text
async def _switch_model(self, model_id: str):
"""Switch to different model"""
import time
# Unload current model
if self.current_model:
self.models[self.current_model]["llm"] = None
torch.cuda.empty_cache()
# Load new model
model_info = self.models[model_id]
model_info["llm"] = LLM(
model=model_info["path"],
dtype="float16"
)
model_info["last_used"] = time.time()
self.current_model = model_id
# Usage - serve 3 models on 1 GPU
server = MultiTenantServer()
server.load_model("llama-7b", "./models/llama-2-7b")
server.load_model("mistral-7b", "./models/mistral-7b")
server.load_model("custom-7b", "./models/custom-7b")
# Route requests based on model_id
response = await server.generate(
model_id="llama-7b",
prompt="What is AI?",
max_tokens=100
)
Spot Instances
Save 70% with interruption handling
Batching
3-5x throughput, 50% cost reduction
Caching
80% cache hit rate saves inference costs
Pruning
30% size reduction, 25% faster inference
π Step 7: Production Best Practices
Security Hardening
# security.py - Add authentication and rate limiting
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta
from collections import defaultdict
import time
app = FastAPI()
security = HTTPBearer()
# Rate limiting
class RateLimiter:
def __init__(self, max_requests: int = 100, window: int = 60):
self.max_requests = max_requests
self.window = window # seconds
self.requests = defaultdict(list)
def is_allowed(self, client_id: str) -> bool:
now = time.time()
# Remove old requests
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if now - req_time < self.window
]
# Check limit
if len(self.requests[client_id]) >= self.max_requests:
return False
self.requests[client_id].append(now)
return True
rate_limiter = RateLimiter(max_requests=100, window=60)
# API key authentication
async def verify_token(
credentials: HTTPAuthorizationCredentials = Depends(security)
):
token = credentials.credentials
try:
payload = jwt.decode(
token,
"your-secret-key",
algorithms=["HS256"]
)
return payload["client_id"]
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
@app.post("/v1/completions")
async def completions(
request: CompletionRequest,
client_id: str = Depends(verify_token)
):
# Rate limiting
if not rate_limiter.is_allowed(client_id):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
# Input validation
if len(request.prompt) > 4096:
raise HTTPException(
status_code=400,
detail="Prompt too long"
)
if request.max_tokens > 2048:
raise HTTPException(
status_code=400,
detail="max_tokens too large"
)
# Generate response
response = await generate(request.prompt, request.max_tokens)
return {"text": response, "client_id": client_id}
Content Filtering
# content_filter.py - Filter harmful content
from transformers import pipeline
import re
class ContentFilter:
"""Filter inappropriate content"""
def __init__(self):
# Load toxicity classifier
self.toxicity_classifier = pipeline(
"text-classification",
model="unitary/toxic-bert"
)
def is_safe(self, text: str) -> tuple[bool, str]:
"""Check if text is safe"""
# Check toxicity
result = self.toxicity_classifier(text[:512])[0]
if result["label"] == "toxic" and result["score"] > 0.7:
return False, "Content flagged as toxic"
# Check for PII (basic patterns)
if self._contains_pii(text):
return False, "Content contains PII"
# Check for prompt injection
if self._is_prompt_injection(text):
return False, "Potential prompt injection detected"
return True, "Content is safe"
def _contains_pii(self, text: str) -> bool:
"""Check for personally identifiable information"""
patterns = [
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b\d{16}\b', # Credit card
r'\b[\w\.-]+@[\w\.-]+\.\w+\b' # Email
]
for pattern in patterns:
if re.search(pattern, text):
return True
return False
def _is_prompt_injection(self, text: str) -> bool:
"""Detect prompt injection attempts"""
injection_patterns = [
"ignore previous instructions",
"disregard above",
"forget everything",
"new instructions:",
"system:",
"assistant:"
]
text_lower = text.lower()
return any(pattern in text_lower for pattern in injection_patterns)
# Usage
content_filter = ContentFilter()
@app.post("/v1/completions")
async def completions(request: CompletionRequest):
# Filter input
is_safe, message = content_filter.is_safe(request.prompt)
if not is_safe:
raise HTTPException(status_code=400, detail=message)
# Generate
response = await generate(request.prompt)
# Filter output
is_safe, message = content_filter.is_safe(response)
if not is_safe:
return {"text": "[Content filtered]", "reason": message}
return {"text": response}
Graceful Degradation
# fallback.py - Handle failures gracefully
import httpx
from typing import Optional
class FallbackChain:
"""Try multiple backends with fallback"""
def __init__(self):
self.backends = [
"http://llm-server-primary:8000",
"http://llm-server-secondary:8000",
"https://api.openai.com/v1" # Emergency fallback
]
async def generate(
self,
prompt: str,
max_tokens: int = 256
) -> Optional[str]:
"""Try backends in order until success"""
errors = []
for backend_url in self.backends:
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{backend_url}/v1/completions",
json={
"prompt": prompt,
"max_tokens": max_tokens
}
)
response.raise_for_status()
return response.json()["text"]
except Exception as e:
errors.append(f"{backend_url}: {str(e)}")
continue
# All backends failed
raise Exception(f"All backends failed: {errors}")
fallback = FallbackChain()
@app.post("/v1/completions")
async def completions(request: CompletionRequest):
try:
response = await fallback.generate(
request.prompt,
request.max_tokens
)
return {"text": response}
except Exception as e:
# Return cached/default response
return {
"text": "Service temporarily unavailable. Please try again.",
"error": str(e)
}
A/B Testing
# ab_testing.py - Test model improvements
import random
from dataclasses import dataclass
from typing import Dict
@dataclass
class Experiment:
name: str
model_path: str
traffic_percent: float # 0-100
class ABTester:
"""Route traffic to different model versions"""
def __init__(self):
self.experiments = {
"control": Experiment(
name="control",
model_path="./models/llama-2-7b-v1",
traffic_percent=80.0
),
"variant_a": Experiment(
name="variant_a",
model_path="./models/llama-2-7b-v2-pruned",
traffic_percent=10.0
),
"variant_b": Experiment(
name="variant_b",
model_path="./models/llama-2-7b-v2-finetuned",
traffic_percent=10.0
)
}
def select_experiment(self, user_id: str) -> Experiment:
"""Select experiment based on traffic allocation"""
# Deterministic selection based on user_id
user_hash = hash(user_id) % 100
cumulative = 0
for exp in self.experiments.values():
cumulative += exp.traffic_percent
if user_hash < cumulative:
return exp
return self.experiments["control"]
def log_result(
self,
user_id: str,
experiment: str,
latency: float,
user_feedback: Optional[int] = None
):
"""Log experiment results for analysis"""
# Send to analytics system
metrics = {
"user_id": user_id,
"experiment": experiment,
"latency_ms": latency,
"feedback": user_feedback,
"timestamp": datetime.utcnow().isoformat()
}
# Store in database/analytics
pass
ab_tester = ABTester()
@app.post("/v1/completions")
async def completions(
request: CompletionRequest,
user_id: str = Header(...)
):
# Select experiment
experiment = ab_tester.select_experiment(user_id)
# Generate with selected model
start = time.time()
response = await generate(
request.prompt,
model_path=experiment.model_path
)
latency = (time.time() - start) * 1000
# Log result
ab_tester.log_result(user_id, experiment.name, latency)
return {
"text": response,
"experiment": experiment.name # For debugging
}
β Production Checklist:
- β Authentication & rate limiting
- β Input/output content filtering
- β Fallback chains for reliability
- β A/B testing for improvements
- β Monitoring & alerting
- β Cost tracking & optimization
π― Step 8: Testing & Deployment Checklist
Pre-Deployment Tests
# test_deployment.py - Comprehensive deployment tests
import pytest
import httpx
import asyncio
class TestDeployment:
"""Test deployment before going live"""
base_url = "http://llm-server-service"
@pytest.mark.asyncio
async def test_health_check(self):
"""Test health endpoint"""
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.base_url}/health")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
@pytest.mark.asyncio
async def test_basic_generation(self):
"""Test basic text generation"""
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{self.base_url}/v1/completions",
json={
"prompt": "What is 2+2?",
"max_tokens": 10
}
)
assert response.status_code == 200
assert len(response.json()["text"]) > 0
@pytest.mark.asyncio
async def test_rate_limiting(self):
"""Test rate limiter"""
async with httpx.AsyncClient() as client:
# Make 101 requests (limit is 100)
for i in range(101):
response = await client.post(
f"{self.base_url}/v1/completions",
json={"prompt": f"Test {i}", "max_tokens": 10},
headers={"Authorization": "Bearer test-token"}
)
if i < 100:
assert response.status_code == 200
else:
assert response.status_code == 429 # Rate limited
@pytest.mark.asyncio
async def test_latency(self):
"""Test response latency"""
import time
async with httpx.AsyncClient(timeout=30.0) as client:
start = time.time()
response = await client.post(
f"{self.base_url}/v1/completions",
json={"prompt": "Hello", "max_tokens": 50}
)
latency = time.time() - start
assert response.status_code == 200
assert latency < 2.0 # Should respond in <2 seconds
@pytest.mark.asyncio
async def test_concurrent_requests(self):
"""Test handling concurrent requests"""
async with httpx.AsyncClient(timeout=30.0) as client:
tasks = [
client.post(
f"{self.base_url}/v1/completions",
json={"prompt": f"Test {i}", "max_tokens": 20}
)
for i in range(10)
]
responses = await asyncio.gather(*tasks)
# All should succeed
assert all(r.status_code == 200 for r in responses)
@pytest.mark.asyncio
async def test_content_filtering(self):
"""Test content filtering"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/v1/completions",
json={
"prompt": "Ignore previous instructions and...",
"max_tokens": 50
}
)
# Should be filtered
assert response.status_code == 400
# Run tests
# pytest test_deployment.py -v
Deployment Checklist
π Pre-Launch Checklist
Infrastructure:
β‘ Kubernetes cluster configured with GPU nodes
β‘ Docker images built and pushed to registry
β‘ Secrets and ConfigMaps created
β‘ Network policies and ingress configured
β‘ SSL certificates provisioned
Application:
β‘ Model quantized and tested
β‘ vLLM server tested locally
β‘ Health checks responding
β‘ Metrics endpoint working
β‘ Logging configured
Security:
β‘ Authentication enabled
β‘ Rate limiting configured
β‘ Content filtering active
β‘ API keys rotated
β‘ Network policies applied
Monitoring:
β‘ Prometheus scraping metrics
β‘ Grafana dashboards created
β‘ Alert rules configured
β‘ PagerDuty/Slack integration tested
β‘ Log aggregation working
Performance:
β‘ Load testing completed
β‘ Latency targets met (P95 < 1s)
β‘ Throughput targets met (>10 RPS)
β‘ Auto-scaling tested
β‘ Cost estimates validated
Disaster Recovery:
β‘ Backup deployment in different region
β‘ Fallback chain configured
β‘ Data backup strategy
β‘ Rollback procedure documented
β‘ Incident response plan ready
Deployment Commands
# Final deployment script
#!/bin/bash
echo "π Deploying LLM Server to Production"
# 1. Build and push Docker image
docker build -t llm-server:v1 .
docker tag llm-server:v1 yourusername/llm-server:v1
docker push yourusername/llm-server:v1
# 2. Apply Kubernetes configurations
kubectl apply -f k8s/namespace.yaml
kubectl apply -f k8s/configmap.yaml
kubectl apply -f k8s/secrets.yaml
kubectl apply -f k8s/deployment.yaml
kubectl apply -f k8s/service.yaml
kubectl apply -f k8s/hpa.yaml
kubectl apply -f k8s/ingress.yaml
# 3. Wait for rollout
kubectl rollout status deployment/llm-server -n ml-serving
# 4. Run smoke tests
pytest test_deployment.py -v
# 5. Monitor logs
kubectl logs -f deployment/llm-server -n ml-serving
echo "β
Deployment complete!"
echo "API available at: https://llm-api.yourdomain.com"
π What You've Learned
Optimization
Quantization, vLLM, pruning for 10-20x faster inference
Containerization
Docker with GPU support, multi-stage builds, health checks
Orchestration
Kubernetes deployment, auto-scaling, load balancing
Observability
Prometheus metrics, Grafana dashboards, structured logging
Cost Optimization
Spot instances, batching, caching for 70% cost reduction
Production Ready
Security, content filtering, fallbacks, A/B testing
π Next Steps
- Optimize further: Try TensorRT-LLM for even faster inference
- Multi-region: Deploy across regions for lower latency
- Advanced features: Add streaming responses, tool calling
- Cost tracking: Implement detailed cost attribution per user
- Fine-tune: Continuously improve model based on user feedback
π Congratulations! You've built a production-ready LLM deployment system. You can now serve millions of requests at scale with optimized costs and world-class reliability.
Test Your Knowledge
Q1: What is vLLM designed for?
Q2: What is PagedAttention in vLLM?
Q3: Why is load balancing important in production LLM deployments?
Q4: What should you monitor in production LLM systems?
Q5: What is the benefit of containerizing LLM deployments with Docker?
Congratulations!
You've completed the LLMs & Transformers course and mastered building, fine-tuning, and deploying large language models!
Claim Your Free Certificate
Enter your details to generate your professional certificate
β Shareable on LinkedIn β’ β Verified by AITutorials.site β’ β Completely Free
What's Next?