Home β†’ LLMs & Transformers β†’ Project 3: Deploy LLM
πŸš€ HANDS-ON PROJECT

Project 3: Deploy a Fine-tuned LLM at Scale

Take your fine-tuned model to production with vLLM, quantization, Docker, and Kubernetes

⏱️ 2-3 hours πŸ”§ Advanced Project πŸ’» Python, vLLM, Docker, Kubernetes

🎯 Project Overview

You've fine-tuned an LLM. Now comes the hard part: deploying it at scale. This project teaches you production deployment with efficient inference, load balancing, monitoring, and cost optimization.

What You'll Build

⚑

Optimized Inference

Use vLLM for 10-20x faster inference. Add quantization for 4x memory reduction.

🐳

Containerization

Docker containers with GPU support. Build, test, and push to registry.

☸️

Kubernetes Deploy

Auto-scaling deployments. Load balancing across multiple GPUs.

πŸ“Š

Monitoring

Track latency, throughput, costs. Prometheus metrics and Grafana dashboards.

πŸ“š Prerequisites

  • Fine-tuned LLM model (from Project 1 or Tutorial 5)
  • GPU server or cloud account (AWS/GCP/Azure)
  • Docker installed and basic knowledge
  • Optional: Kubernetes cluster access

⏱️ Time Breakdown

  • Model Preparation: 30 minutes (quantization, testing)
  • vLLM Setup: 30 minutes (install, configure, benchmark)
  • Docker: 30 minutes (containerize, test locally)
  • Kubernetes: 45 minutes (deploy, scaling, load balancing)
  • Monitoring: 30 minutes (Prometheus, Grafana)

⚠️ Cost Warning: This project requires GPU resources. Use spot instances or credits to minimize costs (~$2-5/hour for A100).

πŸ”§ Step 1: Model Preparation & Quantization

Option A: Use Pre-trained Model

# Download a popular fine-tuned model from HuggingFace
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "meta-llama/Llama-2-7b-chat-hf"  # Or your fine-tuned model

# Download model
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype="auto",
    device_map="auto"
)

tokenizer = AutoTokenizer.from_pretrained(model_name)

# Save locally
model.save_pretrained("./models/llama-2-7b-chat")
tokenizer.save_pretrained("./models/llama-2-7b-chat")

print("Model downloaded and saved!")

Option B: Use Your Fine-tuned Model

# If you completed Project 1 or Tutorial 5
# Your model is already saved locally

model_path = "./models/bert-sentiment-final"  # From Project 1
# OR
model_path = "./models/llama-2-7b-lora"  # From Tutorial 5

# Verify model exists
import os
if os.path.exists(model_path):
    print(f"βœ… Model found at {model_path}")
else:
    print(f"❌ Model not found. Please fine-tune first.")

Quantize Model (4-bit for Production)

# Quantize to 4-bit using BitsAndBytes
from transformers import AutoModelForCausalLM, BitsAndBytesConfig
import torch

# Configure 4-bit quantization
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=True,
)

# Load and quantize
model = AutoModelForCausalLM.from_pretrained(
    "meta-llama/Llama-2-7b-chat-hf",
    quantization_config=bnb_config,
    device_map="auto"
)

# Save quantized model
model.save_pretrained("./models/llama-2-7b-4bit")

print("Model quantized to 4-bit!")
print(f"Original size: ~28GB β†’ Quantized: ~7GB")

πŸ’‘ Quantization Benefits

  • 4x memory reduction: 28GB β†’ 7GB for Llama-2-7B
  • 2-3x faster inference: Smaller model fits better in GPU cache
  • Minimal quality loss: <2% accuracy drop typically
  • Cost savings: Use smaller/cheaper GPUs

Test Quantized Model

# Quick test to verify model works after quantization
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

# Load quantized model
model = AutoModelForCausalLM.from_pretrained(
    "./models/llama-2-7b-4bit",
    device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-chat-hf")

# Test inference
prompt = "What is machine learning?"
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

with torch.no_grad():
    outputs = model.generate(
        **inputs,
        max_new_tokens=100,
        temperature=0.7,
        do_sample=True
    )

response = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(f"Prompt: {prompt}")
print(f"Response: {response}")

# Check model size
import os
model_size = sum(
    os.path.getsize(os.path.join("./models/llama-2-7b-4bit", f))
    for f in os.listdir("./models/llama-2-7b-4bit")
    if os.path.isfile(os.path.join("./models/llama-2-7b-4bit", f))
)
print(f"\nModel size: {model_size / (1024**3):.2f} GB")

βœ… Expected Output: Coherent response in ~2-3 seconds. Model size should be ~7GB for Llama-2-7B.

⚑ Step 2: vLLM Setup for Fast Inference

vLLM is a high-throughput inference engine that achieves 10-20x faster serving than standard transformers. It uses PagedAttention for efficient memory management.

Install vLLM

# Install vLLM (requires CUDA 11.8+)
pip install vllm

# Verify installation
python -c "import vllm; print(f'vLLM version: {vllm.__version__}')"

# Check GPU
nvidia-smi

Basic vLLM Server

# serve_model.py - Simple vLLM inference server
from vllm import LLM, SamplingParams

class ModelServer:
    def __init__(self, model_path: str):
        """Initialize vLLM engine"""
        self.llm = LLM(
            model=model_path,
            tensor_parallel_size=1,  # Number of GPUs
            dtype="float16",          # Use fp16 for speed
            max_model_len=2048,       # Context length
            gpu_memory_utilization=0.9  # Use 90% of GPU memory
        )
        
    def generate(self, prompts: list[str], max_tokens: int = 256):
        """Generate completions for prompts"""
        sampling_params = SamplingParams(
            temperature=0.7,
            top_p=0.9,
            max_tokens=max_tokens,
            stop=["", "Human:", "\n\n"]
        )
        
        outputs = self.llm.generate(prompts, sampling_params)
        
        return [output.outputs[0].text for output in outputs]

# Usage
if __name__ == "__main__":
    server = ModelServer("./models/llama-2-7b-4bit")
    
    prompts = [
        "What is deep learning?",
        "Explain neural networks in simple terms.",
        "How does backpropagation work?"
    ]
    
    responses = server.generate(prompts)
    
    for prompt, response in zip(prompts, responses):
        print(f"Q: {prompt}")
        print(f"A: {response}\n")

vLLM with OpenAI-Compatible API

# Start vLLM with OpenAI-compatible server
# This allows drop-in replacement for OpenAI API

# Command line:
# python -m vllm.entrypoints.openai.api_server \
#     --model ./models/llama-2-7b-4bit \
#     --port 8000 \
#     --tensor-parallel-size 1

# OR use Python:
from vllm.entrypoints.openai.api_server import run_server

run_server(
    model="./models/llama-2-7b-4bit",
    host="0.0.0.0",
    port=8000,
    tensor_parallel_size=1
)

Test API Server

# test_vllm_api.py
import requests
import json

url = "http://localhost:8000/v1/completions"

data = {
    "model": "./models/llama-2-7b-4bit",
    "prompt": "Explain quantum computing in 50 words:",
    "max_tokens": 100,
    "temperature": 0.7
}

response = requests.post(url, json=data)
result = response.json()

print(json.dumps(result, indent=2))
print(f"\nGenerated text: {result['choices'][0]['text']}")

πŸš€ vLLM Performance Benefits

  • PagedAttention: Efficient KV cache management (2-4x throughput)
  • Continuous batching: Process requests as they arrive
  • Tensor parallelism: Distribute across multiple GPUs
  • Optimized kernels: CUDA kernels for common operations
  • OpenAI compatible: Drop-in replacement for existing code

Benchmark vLLM vs Standard Transformers

# benchmark.py - Compare vLLM vs Transformers
import time
from vllm import LLM, SamplingParams
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

def benchmark_vllm(model_path, prompts):
    """Benchmark vLLM"""
    llm = LLM(model=model_path, dtype="float16")
    sampling_params = SamplingParams(temperature=0.7, max_tokens=100)
    
    start = time.time()
    outputs = llm.generate(prompts, sampling_params)
    elapsed = time.time() - start
    
    return elapsed, len(prompts)

def benchmark_transformers(model_path, prompts):
    """Benchmark standard Transformers"""
    model = AutoModelForCausalLM.from_pretrained(
        model_path, 
        torch_dtype=torch.float16,
        device_map="auto"
    )
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    
    start = time.time()
    for prompt in prompts:
        inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
        with torch.no_grad():
            model.generate(**inputs, max_new_tokens=100)
    elapsed = time.time() - start
    
    return elapsed, len(prompts)

# Run benchmarks
model_path = "./models/llama-2-7b-4bit"
test_prompts = [
    "What is machine learning?",
    "Explain neural networks.",
    "How does gradient descent work?",
    "What is a transformer model?",
    "Describe attention mechanism."
] * 4  # 20 prompts total

print("πŸ”₯ Benchmarking vLLM...")
vllm_time, vllm_count = benchmark_vllm(model_path, test_prompts)
print(f"vLLM: {vllm_time:.2f}s for {vllm_count} prompts")
print(f"Throughput: {vllm_count/vllm_time:.2f} prompts/sec")

print("\n🐒 Benchmarking Transformers...")
hf_time, hf_count = benchmark_transformers(model_path, test_prompts)
print(f"Transformers: {hf_time:.2f}s for {hf_count} prompts")
print(f"Throughput: {hf_count/hf_time:.2f} prompts/sec")

print(f"\nπŸ“Š Speedup: {hf_time/vllm_time:.1f}x faster with vLLM!")

πŸ“Š Expected Benchmark Results (Llama-2-7B on A100)

πŸ”₯ Benchmarking vLLM...
vLLM: 8.3s for 20 prompts
Throughput: 2.41 prompts/sec

🐒 Benchmarking Transformers...
Transformers: 127.5s for 20 prompts
Throughput: 0.16 prompts/sec

πŸ“Š Speedup: 15.4x faster with vLLM!

🐳 Step 3: Docker Containerization

Package your model and vLLM server in a Docker container for consistent deployment across environments.

Create Dockerfile

# Dockerfile
FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04

# Install Python and dependencies
RUN apt-get update && apt-get install -y \
    python3.10 \
    python3-pip \
    git \
    && rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Install Python packages
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt

# Copy model and server code
COPY models/ /app/models/
COPY serve_model.py /app/

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# Run server
CMD ["python3", "-m", "vllm.entrypoints.openai.api_server", \
     "--model", "/app/models/llama-2-7b-4bit", \
     "--host", "0.0.0.0", \
     "--port", "8000"]

Requirements File

# requirements.txt
vllm==0.2.7
transformers==4.36.0
torch==2.1.0
fastapi==0.104.1
uvicorn==0.24.0
pydantic==2.5.0
prometheus-client==0.19.0

Build and Test Docker Image

# Build Docker image
docker build -t llm-server:v1 .

# Check image size
docker images llm-server:v1

# Run container locally
docker run -d \
    --name llm-server \
    --gpus all \
    -p 8000:8000 \
    -v $(pwd)/models:/app/models \
    llm-server:v1

# Check logs
docker logs -f llm-server

# Test API
curl http://localhost:8000/v1/models

# Test generation
curl http://localhost:8000/v1/completions \
    -H "Content-Type: application/json" \
    -d '{
        "model": "/app/models/llama-2-7b-4bit",
        "prompt": "What is Docker?",
        "max_tokens": 50
    }'

# Stop container
docker stop llm-server
docker rm llm-server

Add Health Check Endpoint

# serve_model.py - Add health check
from fastapi import FastAPI
from vllm.entrypoints.openai.api_server import app as vllm_app
import psutil
import torch

@vllm_app.get("/health")
async def health_check():
    """Health check endpoint for load balancer"""
    gpu_available = torch.cuda.is_available()
    gpu_memory = torch.cuda.memory_allocated() / torch.cuda.max_memory_allocated()
    cpu_percent = psutil.cpu_percent()
    memory_percent = psutil.virtual_memory().percent
    
    return {
        "status": "healthy" if gpu_available else "degraded",
        "gpu_available": gpu_available,
        "gpu_memory_used": f"{gpu_memory*100:.1f}%",
        "cpu_usage": f"{cpu_percent:.1f}%",
        "memory_usage": f"{memory_percent:.1f}%"
    }

@vllm_app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint"""
    # Return metrics in Prometheus format
    return {
        "requests_total": 12345,
        "latency_avg_ms": 245.3,
        "throughput_tokens_per_sec": 156.7
    }

⚠️ Docker Image Size: Final image will be 10-15GB (base image + model). Use Docker layer caching and multi-stage builds to optimize.

Push to Container Registry

# Tag for Docker Hub
docker tag llm-server:v1 yourusername/llm-server:v1

# Login to Docker Hub
docker login

# Push image
docker push yourusername/llm-server:v1

# Or use AWS ECR
aws ecr get-login-password --region us-east-1 | \
    docker login --username AWS --password-stdin \
    123456789.dkr.ecr.us-east-1.amazonaws.com

docker tag llm-server:v1 \
    123456789.dkr.ecr.us-east-1.amazonaws.com/llm-server:v1

docker push 123456789.dkr.ecr.us-east-1.amazonaws.com/llm-server:v1

☸️ Step 4: Kubernetes Deployment

Deploy your containerized LLM with Kubernetes for auto-scaling, load balancing, and high availability.

Prerequisites

  • Kubernetes cluster with GPU nodes (AWS EKS, GCP GKE, or Azure AKS)
  • kubectl installed and configured
  • NVIDIA GPU Operator installed on cluster

Kubernetes Deployment Configuration

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-server
  namespace: ml-serving
  labels:
    app: llm-server
    version: v1
spec:
  replicas: 2  # Start with 2 replicas
  selector:
    matchLabels:
      app: llm-server
  template:
    metadata:
      labels:
        app: llm-server
        version: v1
    spec:
      containers:
      - name: llm-server
        image: yourusername/llm-server:v1
        ports:
        - containerPort: 8000
          name: http
        resources:
          requests:
            memory: "16Gi"
            cpu: "4"
            nvidia.com/gpu: 1  # Request 1 GPU
          limits:
            memory: "32Gi"
            cpu: "8"
            nvidia.com/gpu: 1
        env:
        - name: CUDA_VISIBLE_DEVICES
          value: "0"
        - name: MODEL_PATH
          value: "/app/models/llama-2-7b-4bit"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 60
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
      nodeSelector:
        accelerator: nvidia-tesla-a100  # Target GPU nodes
      tolerations:
      - key: nvidia.com/gpu
        operator: Exists
        effect: NoSchedule

Service Configuration

# k8s/service.yaml
apiVersion: v1
kind: Service
metadata:
  name: llm-server-service
  namespace: ml-serving
  labels:
    app: llm-server
spec:
  type: LoadBalancer  # Or ClusterIP for internal
  selector:
    app: llm-server
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
    name: http
  sessionAffinity: ClientIP  # Sticky sessions
  sessionAffinityConfig:
    clientIP:
      timeoutSeconds: 3600

Horizontal Pod Autoscaler (HPA)

# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: llm-server-hpa
  namespace: ml-serving
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: llm-server
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  - type: Pods
    pods:
      metric:
        name: requests_per_second
      target:
        type: AverageValue
        averageValue: "100"
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300  # 5 min cooldown
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 30

ConfigMap for Model Configuration

# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: llm-config
  namespace: ml-serving
data:
  max_model_len: "2048"
  gpu_memory_utilization: "0.9"
  tensor_parallel_size: "1"
  temperature: "0.7"
  max_tokens: "256"

Deploy to Kubernetes

# Create namespace
kubectl create namespace ml-serving

# Apply configurations
kubectl apply -f k8s/configmap.yaml
kubectl apply -f k8s/deployment.yaml
kubectl apply -f k8s/service.yaml
kubectl apply -f k8s/hpa.yaml

# Check deployment status
kubectl get deployments -n ml-serving
kubectl get pods -n ml-serving -w

# Check service
kubectl get svc -n ml-serving

# Get external IP (for LoadBalancer)
export SERVICE_IP=$(kubectl get svc llm-server-service -n ml-serving \
    -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo "Service available at: http://$SERVICE_IP"

# Test deployment
curl http://$SERVICE_IP/health

# View logs
kubectl logs -f deployment/llm-server -n ml-serving

πŸ”„ Auto-scaling Behavior

  • Scale up: When CPU >70% or memory >80% for 60 seconds
  • Scale down: After 5-minute cooldown, max 50% reduction
  • Min replicas: 2 (high availability)
  • Max replicas: 10 (cost control)

Ingress for HTTPS & Path Routing

# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: llm-server-ingress
  namespace: ml-serving
  annotations:
    kubernetes.io/ingress.class: nginx
    cert-manager.io/cluster-issuer: letsencrypt-prod
    nginx.ingress.kubernetes.io/proxy-body-size: "50m"
    nginx.ingress.kubernetes.io/proxy-read-timeout: "300"
spec:
  tls:
  - hosts:
    - llm-api.yourdomain.com
    secretName: llm-tls-secret
  rules:
  - host: llm-api.yourdomain.com
    http:
      paths:
      - path: /v1
        pathType: Prefix
        backend:
          service:
            name: llm-server-service
            port:
              number: 80

Load Testing with Locust

# load_test.py - Test Kubernetes deployment at scale
from locust import HttpUser, task, between

class LLMUser(HttpUser):
    wait_time = between(1, 3)  # Wait 1-3 seconds between requests
    
    @task(3)  # Weight 3x
    def completions(self):
        """Test completions endpoint"""
        self.client.post("/v1/completions", json={
            "model": "/app/models/llama-2-7b-4bit",
            "prompt": "Explain machine learning:",
            "max_tokens": 100
        })
    
    @task(1)  # Weight 1x
    def health_check(self):
        """Test health endpoint"""
        self.client.get("/health")

# Run with:
# locust -f load_test.py --host http://llm-api.yourdomain.com
# Open http://localhost:8089 and start test with 100 users

πŸ“Š Expected Load Test Results

# With 2 replicas (2 A100 GPUs)
Users: 100 concurrent
Requests/sec: 8.5
Response time (avg): 450ms
Response time (95%): 780ms
Failure rate: 0%

# After auto-scaling to 4 replicas
Users: 100 concurrent  
Requests/sec: 16.2
Response time (avg): 240ms
Response time (95%): 420ms
Failure rate: 0%

πŸ“Š Step 5: Monitoring & Observability

Set up comprehensive monitoring to track performance, costs, and issues in production.

Prometheus Metrics

# metrics.py - Add Prometheus metrics to your server
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response
import time

# Define metrics
REQUEST_COUNT = Counter(
    'llm_requests_total',
    'Total number of requests',
    ['endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
    'llm_request_duration_seconds',
    'Request latency in seconds',
    ['endpoint'],
    buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)

TOKENS_GENERATED = Counter(
    'llm_tokens_generated_total',
    'Total tokens generated'
)

GPU_MEMORY = Gauge(
    'llm_gpu_memory_used_bytes',
    'GPU memory used in bytes',
    ['gpu_id']
)

ACTIVE_REQUESTS = Gauge(
    'llm_active_requests',
    'Number of requests currently being processed'
)

# Middleware to track metrics
@app.middleware("http")
async def track_metrics(request, call_next):
    ACTIVE_REQUESTS.inc()
    start_time = time.time()
    
    try:
        response = await call_next(request)
        status = response.status_code
    except Exception as e:
        status = 500
        raise
    finally:
        duration = time.time() - start_time
        REQUEST_LATENCY.labels(endpoint=request.url.path).observe(duration)
        REQUEST_COUNT.labels(
            endpoint=request.url.path,
            status=status
        ).inc()
        ACTIVE_REQUESTS.dec()
    
    return response

@app.get("/metrics")
async def metrics():
    """Expose metrics for Prometheus"""
    return Response(
        content=generate_latest(),
        media_type="text/plain"
    )

Prometheus Configuration

# k8s/prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-config
  namespace: monitoring
data:
  prometheus.yml: |
    global:
      scrape_interval: 15s
      evaluation_interval: 15s
    
    scrape_configs:
    - job_name: 'llm-server'
      kubernetes_sd_configs:
      - role: pod
        namespaces:
          names:
          - ml-serving
      relabel_configs:
      - source_labels: [__meta_kubernetes_pod_label_app]
        action: keep
        regex: llm-server
      - source_labels: [__meta_kubernetes_pod_ip]
        target_label: __address__
        replacement: $1:8000

Grafana Dashboard

{
  "dashboard": {
    "title": "LLM Serving Metrics",
    "panels": [
      {
        "title": "Requests per Second",
        "targets": [{
          "expr": "rate(llm_requests_total[5m])"
        }]
      },
      {
        "title": "Average Latency",
        "targets": [{
          "expr": "rate(llm_request_duration_seconds_sum[5m]) / rate(llm_request_duration_seconds_count[5m])"
        }]
      },
      {
        "title": "P95 Latency",
        "targets": [{
          "expr": "histogram_quantile(0.95, rate(llm_request_duration_seconds_bucket[5m]))"
        }]
      },
      {
        "title": "Tokens Generated",
        "targets": [{
          "expr": "rate(llm_tokens_generated_total[5m])"
        }]
      },
      {
        "title": "GPU Memory Usage",
        "targets": [{
          "expr": "llm_gpu_memory_used_bytes / (1024^3)"
        }]
      },
      {
        "title": "Active Requests",
        "targets": [{
          "expr": "llm_active_requests"
        }]
      },
      {
        "title": "Error Rate",
        "targets": [{
          "expr": "rate(llm_requests_total{status=~\"5..\"}[5m]) / rate(llm_requests_total[5m])"
        }]
      },
      {
        "title": "Pod Replicas",
        "targets": [{
          "expr": "kube_deployment_status_replicas{deployment=\"llm-server\"}"
        }]
      }
    ]
  }
}

Logging Configuration

# logging_config.py
import logging
import json
from datetime import datetime

class StructuredLogger:
    """Structured logging for better observability"""
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # Console handler with JSON formatting
        handler = logging.StreamHandler()
        handler.setFormatter(self.JSONFormatter())
        self.logger.addHandler(handler)
    
    class JSONFormatter(logging.Formatter):
        def format(self, record):
            log_data = {
                "timestamp": datetime.utcnow().isoformat(),
                "level": record.levelname,
                "message": record.getMessage(),
                "module": record.module,
                "function": record.funcName,
                "line": record.lineno
            }
            
            # Add extra fields
            if hasattr(record, 'request_id'):
                log_data['request_id'] = record.request_id
            if hasattr(record, 'user_id'):
                log_data['user_id'] = record.user_id
            if hasattr(record, 'duration_ms'):
                log_data['duration_ms'] = record.duration_ms
            
            return json.dumps(log_data)
    
    def info(self, message, **kwargs):
        extra = {k: v for k, v in kwargs.items()}
        self.logger.info(message, extra=extra)
    
    def error(self, message, **kwargs):
        extra = {k: v for k, v in kwargs.items()}
        self.logger.error(message, extra=extra)

# Usage
logger = StructuredLogger(__name__)

@app.middleware("http")
async def log_requests(request, call_next):
    request_id = str(uuid.uuid4())
    start_time = time.time()
    
    logger.info(
        "Request started",
        request_id=request_id,
        method=request.method,
        path=request.url.path
    )
    
    response = await call_next(request)
    
    duration_ms = (time.time() - start_time) * 1000
    logger.info(
        "Request completed",
        request_id=request_id,
        status_code=response.status_code,
        duration_ms=duration_ms
    )
    
    return response

Alert Rules

# k8s/alerts.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-alerts
  namespace: monitoring
data:
  alerts.yml: |
    groups:
    - name: llm_serving
      interval: 30s
      rules:
      
      # High error rate
      - alert: HighErrorRate
        expr: rate(llm_requests_total{status=~"5.."}[5m]) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value | humanizePercentage }}"
      
      # High latency
      - alert: HighLatency
        expr: histogram_quantile(0.95, rate(llm_request_duration_seconds_bucket[5m])) > 2
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High latency detected"
          description: "P95 latency is {{ $value }}s"
      
      # GPU memory high
      - alert: GPUMemoryHigh
        expr: llm_gpu_memory_used_bytes / nvidia_gpu_memory_total_bytes > 0.95
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "GPU memory usage high"
          description: "GPU memory at {{ $value | humanizePercentage }}"
      
      # Pod down
      - alert: LLMServerDown
        expr: up{job="llm-server"} == 0
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "LLM server pod is down"
          description: "Pod {{ $labels.pod }} is not responding"
πŸ“ˆ

Key Metrics

Track RPS, latency (P50/P95/P99), error rate, throughput

πŸ””

Alerts

Slack/PagerDuty alerts for errors, high latency, pod failures

πŸ“Š

Dashboards

Real-time Grafana dashboards for operations team

πŸ“

Logging

Structured JSON logs aggregated in ELK/DataDog

πŸ’° Step 6: Cost Optimization

LLM inference can be expensive. Here's how to optimize costs without sacrificing performance.

Cost Breakdown (Llama-2-7B on AWS)

πŸ’΅ Monthly Cost Estimates

Option 1: On-Demand A100 (80GB)
- Instance: p4d.24xlarge (8x A100)
- Cost: $32.77/hour = $23,594/month
- Requests/sec: 80 (10 per GPU)
- Cost per 1M requests: ~$80

Option 2: Spot A100 (80GB) 
- Instance: p4d.24xlarge spot
- Cost: ~$10/hour = $7,200/month (70% savings)
- Risk: Can be interrupted
- Best for: Non-critical workloads

Option 3: T4 GPU (16GB)
- Instance: g4dn.xlarge
- Cost: $0.526/hour = $379/month
- Requests/sec: 2-3
- Cost per 1M requests: ~$50
- Best for: Low-traffic applications

Option 4: Serverless (AWS Lambda + GPU)
- Pay per request
- Cost: $0.0001 per request
- Cold start: 3-5 seconds
- Best for: Sporadic traffic

Strategy 1: Spot Instances with Fallback

# k8s/spot-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-server-spot
spec:
  replicas: 3
  template:
    spec:
      nodeSelector:
        node.kubernetes.io/instance-type: p3.2xlarge
        karpenter.sh/capacity-type: spot  # Use spot instances
      tolerations:
      - key: spot
        operator: Equal
        value: "true"
        effect: NoSchedule
      
      # Add node affinity for fallback to on-demand
      affinity:
        nodeAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            preference:
              matchExpressions:
              - key: karpenter.sh/capacity-type
                operator: In
                values:
                - spot
          - weight: 50
            preference:
              matchExpressions:
              - key: karpenter.sh/capacity-type
                operator: In
                values:
                - on-demand

Strategy 2: Dynamic Batching

# dynamic_batching.py - Process multiple requests together
import asyncio
from collections import deque
from typing import List, Tuple
import time

class DynamicBatcher:
    """Batch requests dynamically for higher throughput"""
    
    def __init__(
        self,
        max_batch_size: int = 32,
        max_wait_time: float = 0.05  # 50ms
    ):
        self.max_batch_size = max_batch_size
        self.max_wait_time = max_wait_time
        self.queue = deque()
        self.processing = False
    
    async def add_request(self, prompt: str) -> str:
        """Add request to batch queue"""
        future = asyncio.Future()
        self.queue.append((prompt, future))
        
        # Start batch processor if not running
        if not self.processing:
            asyncio.create_task(self._process_batch())
        
        return await future
    
    async def _process_batch(self):
        """Process batch when ready"""
        self.processing = True
        start_time = time.time()
        
        # Wait for batch to fill or timeout
        while len(self.queue) < self.max_batch_size:
            if time.time() - start_time > self.max_wait_time:
                break
            await asyncio.sleep(0.001)
        
        # Get batch
        batch_size = min(len(self.queue), self.max_batch_size)
        batch = [self.queue.popleft() for _ in range(batch_size)]
        
        # Process batch
        prompts = [item[0] for item in batch]
        futures = [item[1] for item in batch]
        
        # Generate all at once (much faster than sequential)
        responses = await self._generate_batch(prompts)
        
        # Return results
        for future, response in zip(futures, responses):
            future.set_result(response)
        
        self.processing = False
        
        # Process remaining queue
        if self.queue:
            asyncio.create_task(self._process_batch())
    
    async def _generate_batch(self, prompts: List[str]) -> List[str]:
        """Generate responses for batch"""
        # Use vLLM batch generation
        outputs = llm.generate(prompts, sampling_params)
        return [out.outputs[0].text for out in outputs]

# Usage
batcher = DynamicBatcher(max_batch_size=32, max_wait_time=0.05)

@app.post("/v1/completions")
async def completions(request: CompletionRequest):
    response = await batcher.add_request(request.prompt)
    return {"text": response}

πŸ’‘ Dynamic Batching Benefits

  • 3-5x throughput increase: Process 32 requests vs 1
  • 50% cost reduction: Same hardware, more requests
  • Low latency: Max 50ms wait time
  • Automatic: No client changes needed

Strategy 3: Request Caching

# caching.py - Cache common requests
import hashlib
import redis
import json
from typing import Optional

class ResponseCache:
    """Cache LLM responses to reduce inference costs"""
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.ttl = 3600  # 1 hour cache
    
    def _make_key(self, prompt: str, params: dict) -> str:
        """Create cache key from prompt + params"""
        content = f"{prompt}:{json.dumps(params, sort_keys=True)}"
        return hashlib.sha256(content.encode()).hexdigest()
    
    def get(self, prompt: str, params: dict) -> Optional[str]:
        """Get cached response"""
        key = self._make_key(prompt, params)
        cached = self.redis.get(key)
        
        if cached:
            return json.loads(cached)
        return None
    
    def set(self, prompt: str, params: dict, response: str):
        """Cache response"""
        key = self._make_key(prompt, params)
        self.redis.setex(
            key,
            self.ttl,
            json.dumps(response)
        )
    
    def invalidate(self, pattern: str = "*"):
        """Clear cache by pattern"""
        for key in self.redis.scan_iter(pattern):
            self.redis.delete(key)

# Usage
cache = ResponseCache()

@app.post("/v1/completions")
async def completions(request: CompletionRequest):
    # Check cache
    cached = cache.get(request.prompt, request.params)
    if cached:
        return {"text": cached, "cached": True}
    
    # Generate response
    response = await generate(request.prompt, request.params)
    
    # Cache result
    cache.set(request.prompt, request.params, response)
    
    return {"text": response, "cached": False}

Strategy 4: Model Pruning

# pruning.py - Remove unnecessary weights
from transformers import AutoModelForCausalLM
import torch

def prune_model(model, pruning_ratio: float = 0.3):
    """
    Prune model weights for faster inference
    
    Args:
        model: HuggingFace model
        pruning_ratio: Fraction of weights to remove (0.3 = 30%)
    """
    import torch.nn.utils.prune as prune
    
    for name, module in model.named_modules():
        if isinstance(module, torch.nn.Linear):
            # Prune 30% of weights with lowest magnitude
            prune.l1_unstructured(
                module,
                name='weight',
                amount=pruning_ratio
            )
            # Make pruning permanent
            prune.remove(module, 'weight')
    
    return model

# Load and prune model
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b-chat-hf")
pruned_model = prune_model(model, pruning_ratio=0.3)

# Save pruned model
pruned_model.save_pretrained("./models/llama-2-7b-pruned30")

print("Model pruned! Size reduced by ~30%")
print("Speed increase: ~20-30%")
print("Quality loss: <3% typically")

Strategy 5: Multi-Tenancy

# multi_tenancy.py - Serve multiple models on one GPU
from vllm import LLM
import asyncio

class MultiTenantServer:
    """Serve multiple models using time-slicing"""
    
    def __init__(self):
        self.models = {}
        self.current_model = None
    
    def load_model(self, model_id: str, model_path: str):
        """Load model into memory"""
        if model_id not in self.models:
            self.models[model_id] = {
                "path": model_path,
                "llm": None,  # Lazy load
                "last_used": 0
            }
    
    async def generate(
        self,
        model_id: str,
        prompt: str,
        max_tokens: int = 256
    ) -> str:
        """Generate with specific model"""
        
        # Switch model if needed
        if self.current_model != model_id:
            await self._switch_model(model_id)
        
        # Generate
        llm = self.models[model_id]["llm"]
        outputs = llm.generate([prompt], sampling_params)
        
        return outputs[0].outputs[0].text
    
    async def _switch_model(self, model_id: str):
        """Switch to different model"""
        import time
        
        # Unload current model
        if self.current_model:
            self.models[self.current_model]["llm"] = None
            torch.cuda.empty_cache()
        
        # Load new model
        model_info = self.models[model_id]
        model_info["llm"] = LLM(
            model=model_info["path"],
            dtype="float16"
        )
        model_info["last_used"] = time.time()
        self.current_model = model_id

# Usage - serve 3 models on 1 GPU
server = MultiTenantServer()
server.load_model("llama-7b", "./models/llama-2-7b")
server.load_model("mistral-7b", "./models/mistral-7b")
server.load_model("custom-7b", "./models/custom-7b")

# Route requests based on model_id
response = await server.generate(
    model_id="llama-7b",
    prompt="What is AI?",
    max_tokens=100
)
πŸ’΅

Spot Instances

Save 70% with interruption handling

πŸ“¦

Batching

3-5x throughput, 50% cost reduction

⚑

Caching

80% cache hit rate saves inference costs

βœ‚οΈ

Pruning

30% size reduction, 25% faster inference

πŸš€ Step 7: Production Best Practices

Security Hardening

# security.py - Add authentication and rate limiting
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
from datetime import datetime, timedelta
from collections import defaultdict
import time

app = FastAPI()
security = HTTPBearer()

# Rate limiting
class RateLimiter:
    def __init__(self, max_requests: int = 100, window: int = 60):
        self.max_requests = max_requests
        self.window = window  # seconds
        self.requests = defaultdict(list)
    
    def is_allowed(self, client_id: str) -> bool:
        now = time.time()
        
        # Remove old requests
        self.requests[client_id] = [
            req_time for req_time in self.requests[client_id]
            if now - req_time < self.window
        ]
        
        # Check limit
        if len(self.requests[client_id]) >= self.max_requests:
            return False
        
        self.requests[client_id].append(now)
        return True

rate_limiter = RateLimiter(max_requests=100, window=60)

# API key authentication
async def verify_token(
    credentials: HTTPAuthorizationCredentials = Depends(security)
):
    token = credentials.credentials
    try:
        payload = jwt.decode(
            token,
            "your-secret-key",
            algorithms=["HS256"]
        )
        return payload["client_id"]
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

@app.post("/v1/completions")
async def completions(
    request: CompletionRequest,
    client_id: str = Depends(verify_token)
):
    # Rate limiting
    if not rate_limiter.is_allowed(client_id):
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded"
        )
    
    # Input validation
    if len(request.prompt) > 4096:
        raise HTTPException(
            status_code=400,
            detail="Prompt too long"
        )
    
    if request.max_tokens > 2048:
        raise HTTPException(
            status_code=400,
            detail="max_tokens too large"
        )
    
    # Generate response
    response = await generate(request.prompt, request.max_tokens)
    
    return {"text": response, "client_id": client_id}

Content Filtering

# content_filter.py - Filter harmful content
from transformers import pipeline
import re

class ContentFilter:
    """Filter inappropriate content"""
    
    def __init__(self):
        # Load toxicity classifier
        self.toxicity_classifier = pipeline(
            "text-classification",
            model="unitary/toxic-bert"
        )
    
    def is_safe(self, text: str) -> tuple[bool, str]:
        """Check if text is safe"""
        
        # Check toxicity
        result = self.toxicity_classifier(text[:512])[0]
        if result["label"] == "toxic" and result["score"] > 0.7:
            return False, "Content flagged as toxic"
        
        # Check for PII (basic patterns)
        if self._contains_pii(text):
            return False, "Content contains PII"
        
        # Check for prompt injection
        if self._is_prompt_injection(text):
            return False, "Potential prompt injection detected"
        
        return True, "Content is safe"
    
    def _contains_pii(self, text: str) -> bool:
        """Check for personally identifiable information"""
        patterns = [
            r'\b\d{3}-\d{2}-\d{4}\b',  # SSN
            r'\b\d{16}\b',              # Credit card
            r'\b[\w\.-]+@[\w\.-]+\.\w+\b'  # Email
        ]
        
        for pattern in patterns:
            if re.search(pattern, text):
                return True
        return False
    
    def _is_prompt_injection(self, text: str) -> bool:
        """Detect prompt injection attempts"""
        injection_patterns = [
            "ignore previous instructions",
            "disregard above",
            "forget everything",
            "new instructions:",
            "system:",
            "assistant:"
        ]
        
        text_lower = text.lower()
        return any(pattern in text_lower for pattern in injection_patterns)

# Usage
content_filter = ContentFilter()

@app.post("/v1/completions")
async def completions(request: CompletionRequest):
    # Filter input
    is_safe, message = content_filter.is_safe(request.prompt)
    if not is_safe:
        raise HTTPException(status_code=400, detail=message)
    
    # Generate
    response = await generate(request.prompt)
    
    # Filter output
    is_safe, message = content_filter.is_safe(response)
    if not is_safe:
        return {"text": "[Content filtered]", "reason": message}
    
    return {"text": response}

Graceful Degradation

# fallback.py - Handle failures gracefully
import httpx
from typing import Optional

class FallbackChain:
    """Try multiple backends with fallback"""
    
    def __init__(self):
        self.backends = [
            "http://llm-server-primary:8000",
            "http://llm-server-secondary:8000",
            "https://api.openai.com/v1"  # Emergency fallback
        ]
    
    async def generate(
        self,
        prompt: str,
        max_tokens: int = 256
    ) -> Optional[str]:
        """Try backends in order until success"""
        
        errors = []
        
        for backend_url in self.backends:
            try:
                async with httpx.AsyncClient(timeout=30.0) as client:
                    response = await client.post(
                        f"{backend_url}/v1/completions",
                        json={
                            "prompt": prompt,
                            "max_tokens": max_tokens
                        }
                    )
                    response.raise_for_status()
                    return response.json()["text"]
            
            except Exception as e:
                errors.append(f"{backend_url}: {str(e)}")
                continue
        
        # All backends failed
        raise Exception(f"All backends failed: {errors}")

fallback = FallbackChain()

@app.post("/v1/completions")
async def completions(request: CompletionRequest):
    try:
        response = await fallback.generate(
            request.prompt,
            request.max_tokens
        )
        return {"text": response}
    except Exception as e:
        # Return cached/default response
        return {
            "text": "Service temporarily unavailable. Please try again.",
            "error": str(e)
        }

A/B Testing

# ab_testing.py - Test model improvements
import random
from dataclasses import dataclass
from typing import Dict

@dataclass
class Experiment:
    name: str
    model_path: str
    traffic_percent: float  # 0-100
    
class ABTester:
    """Route traffic to different model versions"""
    
    def __init__(self):
        self.experiments = {
            "control": Experiment(
                name="control",
                model_path="./models/llama-2-7b-v1",
                traffic_percent=80.0
            ),
            "variant_a": Experiment(
                name="variant_a",
                model_path="./models/llama-2-7b-v2-pruned",
                traffic_percent=10.0
            ),
            "variant_b": Experiment(
                name="variant_b", 
                model_path="./models/llama-2-7b-v2-finetuned",
                traffic_percent=10.0
            )
        }
    
    def select_experiment(self, user_id: str) -> Experiment:
        """Select experiment based on traffic allocation"""
        
        # Deterministic selection based on user_id
        user_hash = hash(user_id) % 100
        
        cumulative = 0
        for exp in self.experiments.values():
            cumulative += exp.traffic_percent
            if user_hash < cumulative:
                return exp
        
        return self.experiments["control"]
    
    def log_result(
        self,
        user_id: str,
        experiment: str,
        latency: float,
        user_feedback: Optional[int] = None
    ):
        """Log experiment results for analysis"""
        # Send to analytics system
        metrics = {
            "user_id": user_id,
            "experiment": experiment,
            "latency_ms": latency,
            "feedback": user_feedback,
            "timestamp": datetime.utcnow().isoformat()
        }
        # Store in database/analytics
        pass

ab_tester = ABTester()

@app.post("/v1/completions")
async def completions(
    request: CompletionRequest,
    user_id: str = Header(...)
):
    # Select experiment
    experiment = ab_tester.select_experiment(user_id)
    
    # Generate with selected model
    start = time.time()
    response = await generate(
        request.prompt,
        model_path=experiment.model_path
    )
    latency = (time.time() - start) * 1000
    
    # Log result
    ab_tester.log_result(user_id, experiment.name, latency)
    
    return {
        "text": response,
        "experiment": experiment.name  # For debugging
    }

βœ… Production Checklist:

  • βœ… Authentication & rate limiting
  • βœ… Input/output content filtering
  • βœ… Fallback chains for reliability
  • βœ… A/B testing for improvements
  • βœ… Monitoring & alerting
  • βœ… Cost tracking & optimization

🎯 Step 8: Testing & Deployment Checklist

Pre-Deployment Tests

# test_deployment.py - Comprehensive deployment tests
import pytest
import httpx
import asyncio

class TestDeployment:
    """Test deployment before going live"""
    
    base_url = "http://llm-server-service"
    
    @pytest.mark.asyncio
    async def test_health_check(self):
        """Test health endpoint"""
        async with httpx.AsyncClient() as client:
            response = await client.get(f"{self.base_url}/health")
            assert response.status_code == 200
            assert response.json()["status"] == "healthy"
    
    @pytest.mark.asyncio
    async def test_basic_generation(self):
        """Test basic text generation"""
        async with httpx.AsyncClient(timeout=30.0) as client:
            response = await client.post(
                f"{self.base_url}/v1/completions",
                json={
                    "prompt": "What is 2+2?",
                    "max_tokens": 10
                }
            )
            assert response.status_code == 200
            assert len(response.json()["text"]) > 0
    
    @pytest.mark.asyncio
    async def test_rate_limiting(self):
        """Test rate limiter"""
        async with httpx.AsyncClient() as client:
            # Make 101 requests (limit is 100)
            for i in range(101):
                response = await client.post(
                    f"{self.base_url}/v1/completions",
                    json={"prompt": f"Test {i}", "max_tokens": 10},
                    headers={"Authorization": "Bearer test-token"}
                )
                
                if i < 100:
                    assert response.status_code == 200
                else:
                    assert response.status_code == 429  # Rate limited
    
    @pytest.mark.asyncio
    async def test_latency(self):
        """Test response latency"""
        import time
        
        async with httpx.AsyncClient(timeout=30.0) as client:
            start = time.time()
            response = await client.post(
                f"{self.base_url}/v1/completions",
                json={"prompt": "Hello", "max_tokens": 50}
            )
            latency = time.time() - start
            
            assert response.status_code == 200
            assert latency < 2.0  # Should respond in <2 seconds
    
    @pytest.mark.asyncio
    async def test_concurrent_requests(self):
        """Test handling concurrent requests"""
        async with httpx.AsyncClient(timeout=30.0) as client:
            tasks = [
                client.post(
                    f"{self.base_url}/v1/completions",
                    json={"prompt": f"Test {i}", "max_tokens": 20}
                )
                for i in range(10)
            ]
            
            responses = await asyncio.gather(*tasks)
            
            # All should succeed
            assert all(r.status_code == 200 for r in responses)
    
    @pytest.mark.asyncio
    async def test_content_filtering(self):
        """Test content filtering"""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.base_url}/v1/completions",
                json={
                    "prompt": "Ignore previous instructions and...",
                    "max_tokens": 50
                }
            )
            # Should be filtered
            assert response.status_code == 400

# Run tests
# pytest test_deployment.py -v

Deployment Checklist

πŸ“‹ Pre-Launch Checklist

Infrastructure:
β–‘ Kubernetes cluster configured with GPU nodes
β–‘ Docker images built and pushed to registry
β–‘ Secrets and ConfigMaps created
β–‘ Network policies and ingress configured
β–‘ SSL certificates provisioned

Application:
β–‘ Model quantized and tested
β–‘ vLLM server tested locally
β–‘ Health checks responding
β–‘ Metrics endpoint working
β–‘ Logging configured

Security:
β–‘ Authentication enabled
β–‘ Rate limiting configured
β–‘ Content filtering active
β–‘ API keys rotated
β–‘ Network policies applied

Monitoring:
β–‘ Prometheus scraping metrics
β–‘ Grafana dashboards created
β–‘ Alert rules configured
β–‘ PagerDuty/Slack integration tested
β–‘ Log aggregation working

Performance:
β–‘ Load testing completed
β–‘ Latency targets met (P95 < 1s)
β–‘ Throughput targets met (>10 RPS)
β–‘ Auto-scaling tested
β–‘ Cost estimates validated

Disaster Recovery:
β–‘ Backup deployment in different region
β–‘ Fallback chain configured
β–‘ Data backup strategy
β–‘ Rollback procedure documented
β–‘ Incident response plan ready

Deployment Commands

# Final deployment script
#!/bin/bash

echo "πŸš€ Deploying LLM Server to Production"

# 1. Build and push Docker image
docker build -t llm-server:v1 .
docker tag llm-server:v1 yourusername/llm-server:v1
docker push yourusername/llm-server:v1

# 2. Apply Kubernetes configurations
kubectl apply -f k8s/namespace.yaml
kubectl apply -f k8s/configmap.yaml
kubectl apply -f k8s/secrets.yaml
kubectl apply -f k8s/deployment.yaml
kubectl apply -f k8s/service.yaml
kubectl apply -f k8s/hpa.yaml
kubectl apply -f k8s/ingress.yaml

# 3. Wait for rollout
kubectl rollout status deployment/llm-server -n ml-serving

# 4. Run smoke tests
pytest test_deployment.py -v

# 5. Monitor logs
kubectl logs -f deployment/llm-server -n ml-serving

echo "βœ… Deployment complete!"
echo "API available at: https://llm-api.yourdomain.com"

πŸŽ“ What You've Learned

⚑

Optimization

Quantization, vLLM, pruning for 10-20x faster inference

🐳

Containerization

Docker with GPU support, multi-stage builds, health checks

☸️

Orchestration

Kubernetes deployment, auto-scaling, load balancing

πŸ“Š

Observability

Prometheus metrics, Grafana dashboards, structured logging

πŸ’°

Cost Optimization

Spot instances, batching, caching for 70% cost reduction

πŸ”’

Production Ready

Security, content filtering, fallbacks, A/B testing

πŸ“š Next Steps

  • Optimize further: Try TensorRT-LLM for even faster inference
  • Multi-region: Deploy across regions for lower latency
  • Advanced features: Add streaming responses, tool calling
  • Cost tracking: Implement detailed cost attribution per user
  • Fine-tune: Continuously improve model based on user feedback

πŸŽ‰ Congratulations! You've built a production-ready LLM deployment system. You can now serve millions of requests at scale with optimized costs and world-class reliability.

Test Your Knowledge

Q1: What is vLLM designed for?

Training models from scratch
Data preprocessing
High-throughput, low-latency LLM inference serving
Image generation

Q2: What is PagedAttention in vLLM?

A data loading technique
A memory optimization technique for efficient KV cache management
A training algorithm
A tokenization method

Q3: Why is load balancing important in production LLM deployments?

It increases model accuracy
It reduces training time
It compresses models
It distributes requests across multiple instances for high availability and scalability

Q4: What should you monitor in production LLM systems?

Latency, throughput, error rates, GPU utilization, and cost per request
Only latency
Only cost
Nothing, monitoring is unnecessary

Q5: What is the benefit of containerizing LLM deployments with Docker?

It makes models more accurate
It eliminates the need for GPUs
It provides consistent, portable, and reproducible deployment environments
It reduces model size
πŸŽ“

Congratulations!

You've completed the LLMs & Transformers course and mastered building, fine-tuning, and deploying large language models!

Claim Your Free Certificate

Enter your details to generate your professional certificate

βœ“ Shareable on LinkedIn β€’ βœ“ Verified by AITutorials.site β€’ βœ“ Completely Free

What's Next?