Deploy agents at scale with monitoring, optimization, and real-world application patterns
π Complete all tutorials to earn your Free AI Agents Certificate
Shareable on LinkedIn β’ Verified by AITutorials.site β’ No signup fee
Building an agent in a notebook is one thing. Running it reliably in production serving millions of users is entirely different. Production agent systems require:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import logging
app = FastAPI(title="Agent API")
logger = logging.getLogger(__name__)
class TaskRequest(BaseModel):
task: str
user_id: str
timeout: int = 30
class TaskResponse(BaseModel):
task_id: str
status: str
result: dict
@app.post("/agent/run")
async def run_agent(request: TaskRequest):
"""Execute agent asynchronously"""
try:
# Validate request
if len(request.task) > 5000:
raise HTTPException(status_code=400, detail="Task too long")
# Rate limiting
if is_rate_limited(request.user_id):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Queue task for async processing
task_id = queue_agent_task(request.task, request.user_id)
logger.info(f"Task queued: {task_id}")
return TaskResponse(
task_id=task_id,
status="queued",
result={}
)
except Exception as e:
logger.error(f"Error: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/agent/status/{task_id}")
async def get_status(task_id: str):
"""Check task status"""
status = get_task_status(task_id)
return {"task_id": task_id, "status": status}
@app.get("/agent/result/{task_id}")
async def get_result(task_id: str):
"""Retrieve task result"""
result = get_task_result(task_id)
if not result:
raise HTTPException(status_code=404, detail="Task not found")
return result
Agents triggered by events (messages, webhooks, scheduled jobs):
from kafka import KafkaConsumer, KafkaProducer
import json
class EventDrivenAgent:
def __init__(self):
self.consumer = KafkaConsumer('agent-tasks')
self.producer = KafkaProducer()
def process_events(self):
"""Continuously process events"""
for message in self.consumer:
try:
event = json.loads(message.value)
result = self.agent.run(event['task'])
# Publish result
self.producer.send(
'agent-results',
value=json.dumps({
"event_id": event['id'],
"result": result
})
)
except Exception as e:
# Handle errors, retry logic
logger.error(f"Processing failed: {e}")
self.send_to_dlq(message) # Dead letter queue
# Usage
agent = EventDrivenAgent()
agent.process_events()
Each agent component as a microservice:
version: '3'
services:
planner-agent:
image: agent-planner:latest
ports:
- "5001:5000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
executor-agent:
image: agent-executor:latest
ports:
- "5002:5000"
depends_on:
- planner-agent
evaluator-agent:
image: agent-evaluator:latest
ports:
- "5003:5000"
api-gateway:
image: api-gateway:latest
ports:
- "8000:8000"
depends_on:
- planner-agent
- executor-agent
- evaluator-agent
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Load Balancer (NGINX) β
β SSL Termination, Rate Limiting β
ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββΌββββββββββββββββ
β β β
ββββββΌβββββ ββββββΌβββββ ββββββΌβββββ
β API β β API β β API β
β Gateway β β Gateway β β Gateway β
β Instanceβ β Instanceβ β Instanceβ
ββββββ¬βββββ ββββββ¬βββββ ββββββ¬βββββ
β β β
βββββββββββββββββΌββββββββββββββββ
β
βββββββββββββββββΌββββββββββββββββββββ
β β β
ββββββΌββββββββ ββββββΌβββββββ ββββββΌβββββ
β Message β β Redis β β Postgresβ
β Queue β β Cache β β Databaseβ
β (RabbitMQ) β β β β β
ββββββ¬ββββββββ βββββββββββββ βββββββββββ
β
β
ββββββΌβββββββββββββββββββββββ
β Agent Worker Pool β
β ββββββ ββββββ ββββββ β
β β A1 β β A2 β β A3 β ...β
β ββββββ ββββββ ββββββ β
ββββββ¬βββββββββββββββββββββββ
β
ββββββΌβββββββββββββββββββββββ
β Monitoring Stack β
β Prometheus + Grafana β
β ELK Stack (Logs) β
β Jaeger (Traces) β
βββββββββββββββββββββββββββββ
version: '3.8'
services:
# Load Balancer
nginx:
image: nginx:latest
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./certs:/etc/nginx/certs
depends_on:
- api-gateway-1
- api-gateway-2
- api-gateway-3
restart: always
# API Gateway Instances (3 for HA)
api-gateway-1:
build: ./gateway
environment:
- INSTANCE_ID=gateway-1
- REDIS_URL=redis://redis:6379
- POSTGRES_URL=postgresql://user:pass@postgres:5432/agents
- RABBITMQ_URL=amqp://rabbitmq:5672
depends_on:
- redis
- postgres
- rabbitmq
restart: always
api-gateway-2:
build: ./gateway
environment:
- INSTANCE_ID=gateway-2
- REDIS_URL=redis://redis:6379
- POSTGRES_URL=postgresql://user:pass@postgres:5432/agents
- RABBITMQ_URL=amqp://rabbitmq:5672
depends_on:
- redis
- postgres
- rabbitmq
restart: always
api-gateway-3:
build: ./gateway
environment:
- INSTANCE_ID=gateway-3
- REDIS_URL=redis://redis:6379
- POSTGRES_URL=postgresql://user:pass@postgres:5432/agents
- RABBITMQ_URL=amqp://rabbitmq:5672
depends_on:
- redis
- postgres
- rabbitmq
restart: always
# Message Queue
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=${RABBITMQ_PASSWORD}
volumes:
- rabbitmq_data:/var/lib/rabbitmq
restart: always
# Cache
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
restart: always
# Database
postgres:
image: postgres:15
ports:
- "5432:5432"
environment:
- POSTGRES_DB=agents
- POSTGRES_USER=user
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
restart: always
# Agent Workers (scaled 0-20 based on load)
agent-worker:
build: ./worker
environment:
- RABBITMQ_URL=amqp://rabbitmq:5672
- REDIS_URL=redis://redis:6379
- POSTGRES_URL=postgresql://user:pass@postgres:5432/agents
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- rabbitmq
- redis
- postgres
deploy:
replicas: 5
restart_policy:
condition: on-failure
max_attempts: 3
# Monitoring - Prometheus
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
restart: always
# Monitoring - Grafana
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
volumes:
- grafana_data:/var/lib/grafana
- ./grafana-dashboards:/etc/grafana/provisioning/dashboards
restart: always
# Logging - Elasticsearch
elasticsearch:
image: elasticsearch:8.10.0
ports:
- "9200:9200"
environment:
- discovery.type=single-node
- xpack.security.enabled=false
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
restart: always
# Logging - Logstash
logstash:
image: logstash:8.10.0
ports:
- "5044:5044"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
depends_on:
- elasticsearch
restart: always
# Logging - Kibana
kibana:
image: kibana:8.10.0
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
restart: always
volumes:
rabbitmq_data:
redis_data:
postgres_data:
prometheus_data:
grafana_data:
elasticsearch_data:
"""
Production-grade monitoring for agent systems
"""
from prometheus_client import Counter, Histogram, Gauge, Summary
from prometheus_client import start_http_server
import time
import functools
from typing import Callable
# Define comprehensive metrics
class AgentMetrics:
"""Centralized metrics for agent monitoring"""
# Request metrics
requests_total = Counter(
'agent_requests_total',
'Total agent requests',
['agent_id', 'status', 'user_tier']
)
# Latency metrics
request_duration = Histogram(
'agent_request_duration_seconds',
'Request duration in seconds',
['agent_id', 'endpoint'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)
# Cost metrics
cost_per_request = Summary(
'agent_cost_per_request_usd',
'Cost per request in USD',
['agent_id', 'model']
)
# Capacity metrics
active_requests = Gauge(
'agent_active_requests',
'Number of active requests',
['agent_id']
)
queue_depth = Gauge(
'agent_queue_depth',
'Number of queued requests',
['priority']
)
# Quality metrics
user_satisfaction = Histogram(
'agent_user_satisfaction',
'User satisfaction rating (1-5)',
['agent_id'],
buckets=[1, 2, 3, 4, 5]
)
# Tool usage metrics
tool_calls_total = Counter(
'agent_tool_calls_total',
'Total tool calls',
['agent_id', 'tool_name', 'status']
)
# Error metrics
errors_total = Counter(
'agent_errors_total',
'Total errors',
['agent_id', 'error_type', 'severity']
)
def monitor_agent_execution(metrics: AgentMetrics):
"""Decorator to monitor agent execution"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(agent_id: str, *args, **kwargs):
# Track active requests
metrics.active_requests.labels(agent_id=agent_id).inc()
start_time = time.time()
status = 'success'
try:
result = func(agent_id, *args, **kwargs)
# Track cost if available
if 'cost' in result:
metrics.cost_per_request.labels(
agent_id=agent_id,
model=result.get('model', 'unknown')
).observe(result['cost'])
return result
except Exception as e:
status = 'error'
# Track error
metrics.errors_total.labels(
agent_id=agent_id,
error_type=type(e).__name__,
severity='high'
).inc()
raise
finally:
# Track duration
duration = time.time() - start_time
metrics.request_duration.labels(
agent_id=agent_id,
endpoint='execute'
).observe(duration)
# Track request completion
metrics.requests_total.labels(
agent_id=agent_id,
status=status,
user_tier='standard'
).inc()
# Decrement active requests
metrics.active_requests.labels(agent_id=agent_id).dec()
return wrapper
return decorator
# Usage
metrics = AgentMetrics()
# Start metrics server (Prometheus scrapes this)
start_http_server(8000)
@monitor_agent_execution(metrics)
def execute_agent(agent_id: str, task: str):
"""Execute agent with monitoring"""
# Your agent execution logic
result = agent.run(task)
# Add cost calculation
result['cost'] = calculate_cost(result)
return result
# Execute
result = execute_agent('customer-support-agent', 'Help with refund')
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
# Setup tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Configure Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name='localhost',
agent_port=6831,
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Instrument HTTP requests
RequestsInstrumentor().instrument()
class TracedAgent:
"""Agent with distributed tracing"""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.tracer = trace.get_tracer(__name__)
def run(self, task: str) -> dict:
"""Execute task with tracing"""
with self.tracer.start_as_current_span("agent.run") as span:
span.set_attribute("agent.id", self.agent_id)
span.set_attribute("task.length", len(task))
try:
# Planning phase
with self.tracer.start_as_current_span("agent.plan"):
plan = self._plan(task)
span.set_attribute("plan.steps", len(plan))
# Execution phase
with self.tracer.start_as_current_span("agent.execute"):
results = []
for i, step in enumerate(plan):
with self.tracer.start_as_current_span(f"agent.step_{i}"):
result = self._execute_step(step)
results.append(result)
# Synthesis phase
with self.tracer.start_as_current_span("agent.synthesize"):
final_result = self._synthesize(results)
span.set_attribute("result.success", True)
return final_result
except Exception as e:
span.set_attribute("result.success", False)
span.set_attribute("error.type", type(e).__name__)
span.record_exception(e)
raise
def _execute_step(self, step: dict) -> dict:
"""Execute step with tool tracing"""
with self.tracer.start_as_current_span("tool.call") as span:
span.set_attribute("tool.name", step['tool'])
span.set_attribute("tool.input", str(step['input'])[:100])
# Call tool (automatically traced if using requests)
result = self.tools[step['tool']].run(step['input'])
span.set_attribute("tool.output_length", len(str(result)))
return result
# View traces in Jaeger UI at http://localhost:16686
{
"dashboard": {
"title": "Agent Production Metrics",
"panels": [
{
"title": "Request Rate",
"targets": [{
"expr": "rate(agent_requests_total[5m])"
}],
"type": "graph"
},
{
"title": "Success Rate",
"targets": [{
"expr": "sum(rate(agent_requests_total{status='success'}[5m])) / sum(rate(agent_requests_total[5m]))"
}],
"type": "singlestat"
},
{
"title": "P95 Latency",
"targets": [{
"expr": "histogram_quantile(0.95, agent_request_duration_seconds_bucket)"
}],
"type": "graph"
},
{
"title": "Cost per Hour",
"targets": [{
"expr": "sum(rate(agent_cost_per_request_usd_sum[1h])) * 3600"
}],
"type": "graph"
},
{
"title": "Error Rate by Type",
"targets": [{
"expr": "sum by (error_type) (rate(agent_errors_total[5m]))"
}],
"type": "graph"
},
{
"title": "Queue Depth",
"targets": [{
"expr": "agent_queue_depth"
}],
"type": "graph"
}
]
}
}
import hashlib
import redis
from typing import Optional, Callable
import functools
class IntelligentCache:
"""Multi-tier caching with semantic similarity"""
def __init__(self, redis_client, embedding_model):
self.redis = redis_client
self.embedding_model = embedding_model
# Cache statistics
self.hits = 0
self.misses = 0
def get_cache_key(self, input_text: str) -> str:
"""Generate cache key from input"""
return f"agent:cache:{hashlib.sha256(input_text.encode()).hexdigest()}"
def exact_match_cache(self, input_text: str) -> Optional[dict]:
"""Check for exact input match"""
key = self.get_cache_key(input_text)
cached = self.redis.get(key)
if cached:
self.hits += 1
return json.loads(cached)
self.misses += 1
return None
def semantic_match_cache(self, input_text: str, similarity_threshold: float = 0.95) -> Optional[dict]:
"""Find semantically similar cached results"""
# Get embedding of input
input_embedding = self.embedding_model.encode(input_text)
# Search for similar cached queries
# (In production, use vector database like Pinecone/Weaviate)
similar_queries = self.redis.ft("embeddings").search(
query_vector=input_embedding.tolist(),
num_results=1
)
if similar_queries and similar_queries[0].similarity > similarity_threshold:
cached_key = similar_queries[0].id
result = self.redis.get(cached_key)
if result:
self.hits += 1
return json.loads(result)
self.misses += 1
return None
def cache_result(self, input_text: str, result: dict, ttl: int = 3600):
"""Cache result with TTL"""
key = self.get_cache_key(input_text)
# Store result
self.redis.setex(
key,
ttl,
json.dumps(result)
)
# Store embedding for semantic search
embedding = self.embedding_model.encode(input_text)
self.redis.hset(
"embeddings",
key,
embedding.tobytes()
)
def get_cache_stats(self) -> dict:
"""Get cache performance metrics"""
total = self.hits + self.misses
hit_rate = self.hits / total if total > 0 else 0
# Calculate cost savings (assuming $0.002 per request)
cost_saved = self.hits * 0.002
return {
"hits": self.hits,
"misses": self.misses,
"hit_rate": hit_rate,
"cost_saved_usd": cost_saved
}
# Decorator for cached agent execution
def cached_execution(cache: IntelligentCache, ttl: int = 3600):
"""Decorator to cache agent execution"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(input_text: str, *args, **kwargs):
# Try exact match
result = cache.exact_match_cache(input_text)
if result:
return result
# Try semantic match
result = cache.semantic_match_cache(input_text)
if result:
return result
# Execute function
result = func(input_text, *args, **kwargs)
# Cache result
cache.cache_result(input_text, result, ttl)
return result
return wrapper
return decorator
# Usage
cache = IntelligentCache(redis_client, embedding_model)
@cached_execution(cache, ttl=3600)
def run_agent(task: str):
return agent.run(task)
# First call: cache miss, executes agent
result1 = run_agent("What is the refund policy?")
# Second call: cache hit, instant return!
result2 = run_agent("What is the refund policy?")
# Similar question: semantic cache hit!
result3 = run_agent("Tell me about refunds")
print(cache.get_cache_stats())
# {"hits": 2, "misses": 1, "hit_rate": 0.67, "cost_saved_usd": 0.004}
import asyncio
from collections import deque
import time
class RequestBatcher:
"""Batch requests to reduce API calls"""
def __init__(self, batch_size: int = 10, max_wait_time: float = 1.0):
self.batch_size = batch_size
self.max_wait_time = max_wait_time
self.queue = deque()
self.results = {}
self.processing = False
async def add_request(self, request_id: str, task: str) -> dict:
"""Add request to batch queue"""
# Add to queue
self.queue.append((request_id, task))
# Start processing if not already running
if not self.processing:
asyncio.create_task(self._process_batch())
# Wait for result
while request_id not in self.results:
await asyncio.sleep(0.1)
return self.results.pop(request_id)
async def _process_batch(self):
"""Process batch when full or timeout reached"""
self.processing = True
start_time = time.time()
while True:
# Check if should process
should_process = (
len(self.queue) >= self.batch_size or
(len(self.queue) > 0 and time.time() - start_time >= self.max_wait_time)
)
if should_process:
# Extract batch
batch = []
for _ in range(min(self.batch_size, len(self.queue))):
batch.append(self.queue.popleft())
# Process batch in parallel (single API call)
tasks = [task for _, task in batch]
batch_results = await self._execute_batch(tasks)
# Store results
for (request_id, _), result in zip(batch, batch_results):
self.results[request_id] = result
start_time = time.time()
# Stop if queue empty
if len(self.queue) == 0:
break
await asyncio.sleep(0.1)
self.processing = False
async def _execute_batch(self, tasks: list) -> list:
"""Execute batch of tasks"""
# Combine tasks into single prompt
combined_prompt = "Process these tasks:\\n" + "\\n".join(
f"{i+1}. {task}" for i, task in enumerate(tasks)
)
# Single API call
result = await agent.run_async(combined_prompt)
# Parse results
return self._parse_batch_results(result, len(tasks))
# Reduces API calls by 10x, saves 40-50% on costs!
batcher = RequestBatcher(batch_size=10, max_wait_time=1.0)
# These 10 requests will be batched into 1 API call
results = await asyncio.gather(*[
batcher.add_request(f"req{i}", f"Task {i}")
for i in range(10)
])
class ModelRouter:
"""Route requests to appropriate model based on complexity and cost"""
def __init__(self):
self.models = {
"gpt-4": {"cost_per_1k": 0.03, "quality": 0.95, "latency": 3.0},
"gpt-3.5-turbo": {"cost_per_1k": 0.002, "quality": 0.85, "latency": 1.5},
"claude-instant": {"cost_per_1k": 0.0016, "quality": 0.82, "latency": 1.2}
}
def select_model(self, task: str, user_tier: str = "standard") -> str:
"""Select optimal model based on task complexity"""
complexity = self._estimate_complexity(task)
# Premium users always get best model
if user_tier == "premium":
return "gpt-4"
# Simple tasks use cheap models
if complexity < 0.3:
return "claude-instant"
elif complexity < 0.7:
return "gpt-3.5-turbo"
else:
return "gpt-4"
def _estimate_complexity(self, task: str) -> float:
"""Estimate task complexity (0-1)"""
# Simple heuristics (in production, use ML classifier)
factors = []
# Length
factors.append(min(len(task) / 1000, 1.0))
# Keywords indicating complexity
complex_keywords = ["analyze", "compare", "explain", "reason", "complex"]
keyword_score = sum(1 for kw in complex_keywords if kw in task.lower()) / len(complex_keywords)
factors.append(keyword_score)
# Question marks (multiple questions = complex)
factors.append(min(task.count("?") / 3, 1.0))
return sum(factors) / len(factors)
# Usage
router = ModelRouter()
# Simple task -> cheap model
model = router.select_model("What is the weather?") # claude-instant
# Complex task -> powerful model
model = router.select_model("Analyze the economic implications of...") # gpt-4
# Can reduce costs by 70% while maintaining quality!
from typing import List
import random
class AgentLoadBalancer:
def __init__(self, agents: List[Agent]):
self.agents = agents
self.agent_metrics = {agent.id: {"load": 0} for agent in agents}
def select_agent(self) -> Agent:
"""Select least loaded agent"""
return min(self.agents, key=lambda a: self.agent_metrics[a.id]["load"])
def dispatch_task(self, task: str):
"""Dispatch task to least loaded agent"""
agent = self.select_agent()
self.agent_metrics[agent.id]["load"] += 1
try:
result = agent.run(task)
finally:
self.agent_metrics[agent.id]["load"] -= 1
return result
# Usage
balancer = AgentLoadBalancer(agents=[agent1, agent2, agent3])
result = balancer.dispatch_task("user task")
from functools import lru_cache
import redis
class CachedAgent:
def __init__(self, agent, cache_ttl: int = 3600):
self.agent = agent
self.cache = redis.Redis(host='localhost', port=6379)
self.ttl = cache_ttl
def run(self, task: str):
"""Run task with caching"""
# Check cache
cache_key = f"task:{hash(task)}"
cached = self.cache.get(cache_key)
if cached:
return json.loads(cached)
# Execute
result = self.agent.run(task)
# Cache result
self.cache.setex(
cache_key,
self.ttl,
json.dumps(result)
)
return result
# Usage
cached_agent = CachedAgent(agent, cache_ttl=3600)
# First call: executes
result1 = cached_agent.run("expensive task")
# Second call: from cache (instant!)
result2 = cached_agent.run("expensive task")
π Performance: Success rate, latency, throughput, cost per task
π Quality: Accuracy vs gold standard, user satisfaction, error rate
π° Cost: API costs, infrastructure costs, cost per successful task
β οΈ Reliability: Uptime, SLA compliance, error types, recovery time
π§ Behavior: Tool usage patterns, decision distributions, anomalies
from prometheus_client import Counter, Histogram, Gauge
import time
# Define metrics
task_counter = Counter(
'agent_tasks_total',
'Total agent tasks',
['status']
)
task_duration = Histogram(
'agent_task_duration_seconds',
'Task execution time'
)
task_cost = Counter(
'agent_task_cost_total',
'Total cost of agent tasks'
)
def execute_task_with_monitoring(agent, task):
"""Execute task and collect metrics"""
start_time = time.time()
try:
result = agent.run(task)
task_counter.labels(status='success').inc()
return result
except Exception as e:
task_counter.labels(status='error').inc()
raise
finally:
duration = time.time() - start_time
task_duration.observe(duration)
cost = estimate_cost(task)
task_cost._value.get().inc(cost)
# Metrics exposed at /metrics for Prometheus scraping
# Prometheus alert rules
groups:
- name: agent_alerts
rules:
- alert: HighErrorRate
expr: rate(agent_tasks_total{status="error"}[5m]) > 0.05
for: 5m
annotations:
summary: "Agent error rate > 5%"
- alert: SlowResponseTime
expr: histogram_quantile(0.95, agent_task_duration_seconds) > 30
for: 5m
annotations:
summary: "95th percentile latency > 30s"
- alert: HighCost
expr: rate(agent_task_cost_total[1h]) > 100
for: 5m
annotations:
summary: "Hourly cost > $100"
name: Agent CI/CD Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run unit tests
run: pytest tests/unit --cov=agent --cov-report=xml
- name: Run integration tests
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: pytest tests/integration
- name: Run safety tests
run: |
python -m agent.safety.red_team_test
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
files: ./coverage.xml
evaluate:
runs-on: ubuntu-latest
needs: test
steps:
- uses: actions/checkout@v3
- name: Run evaluation suite
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: |
python -m agent.evaluation.benchmark
- name: Check performance regression
run: |
python scripts/check_performance.py \\
--threshold 0.05 \\
--metric success_rate
- name: Generate evaluation report
run: |
python scripts/generate_report.py \\
--output evaluation_report.html
- name: Upload report
uses: actions/upload-artifact@v3
with:
name: evaluation-report
path: evaluation_report.html
deploy-staging:
runs-on: ubuntu-latest
needs: [test, evaluate]
if: github.ref == 'refs/heads/develop'
steps:
- uses: actions/checkout@v3
- name: Build Docker image
run: |
docker build -t agent:${{ github.sha }} .
- name: Push to registry
run: |
echo "${{ secrets.DOCKER_PASSWORD }}" | docker login -u "${{ secrets.DOCKER_USERNAME }}" --password-stdin
docker tag agent:${{ github.sha }} registry.example.com/agent:staging
docker push registry.example.com/agent:staging
- name: Deploy to staging
run: |
kubectl set image deployment/agent-staging agent=registry.example.com/agent:staging
kubectl rollout status deployment/agent-staging
deploy-production:
runs-on: ubuntu-latest
needs: [test, evaluate]
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Build Docker image
run: |
docker build -t agent:${{ github.sha }} .
- name: Push to registry
run: |
docker tag agent:${{ github.sha }} registry.example.com/agent:${{ github.sha }}
docker tag agent:${{ github.sha }} registry.example.com/agent:latest
docker push registry.example.com/agent:${{ github.sha }}
docker push registry.example.com/agent:latest
- name: Blue-Green deployment
run: |
# Deploy to green environment
kubectl apply -f k8s/deployment-green.yaml
# Wait for green to be ready
kubectl wait --for=condition=available --timeout=300s deployment/agent-green
# Run smoke tests on green
python scripts/smoke_test.py --target green
# Switch traffic to green
kubectl patch service agent-service -p '{"spec":{"selector":{"version":"green"}}}'
# Monitor for 5 minutes
sleep 300
# If successful, scale down blue
kubectl scale deployment agent-blue --replicas=0
notify:
runs-on: ubuntu-latest
needs: [deploy-production]
if: always()
steps:
- name: Notify Slack
uses: slackapi/slack-github-action@v1
with:
payload: |
{
"text": "Deployment ${{ job.status }}: Agent v${{ github.sha }}",
"status": "${{ job.status }}"
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}
"""
Automated rollback on deployment issues
"""
import subprocess
import time
import requests
class DeploymentMonitor:
"""Monitor deployment and rollback if needed"""
def __init__(self, service_url: str, thresholds: dict):
self.service_url = service_url
self.thresholds = thresholds
def monitor_deployment(self, duration: int = 300):
"""Monitor new deployment"""
start_time = time.time()
issues = []
while time.time() - start_time < duration:
# Check error rate
error_rate = self._get_error_rate()
if error_rate > self.thresholds['error_rate']:
issues.append(f"Error rate too high: {error_rate}")
# Check latency
p95_latency = self._get_p95_latency()
if p95_latency > self.thresholds['latency']:
issues.append(f"Latency too high: {p95_latency}s")
# Check availability
availability = self._check_availability()
if not availability:
issues.append("Service not available")
# If critical issues, rollback
if len(issues) >= 3:
print(f"Critical issues detected: {issues}")
self.rollback()
return False
time.sleep(10)
print("Deployment successful!")
return True
def _get_error_rate(self) -> float:
"""Get current error rate from Prometheus"""
query = 'rate(agent_requests_total{status="error"}[1m])'
response = requests.get(
'http://prometheus:9090/api/v1/query',
params={'query': query}
)
result = response.json()['data']['result']
return float(result[0]['value'][1]) if result else 0.0
def _get_p95_latency(self) -> float:
"""Get P95 latency"""
query = 'histogram_quantile(0.95, agent_request_duration_seconds_bucket)'
response = requests.get(
'http://prometheus:9090/api/v1/query',
params={'query': query}
)
result = response.json()['data']['result']
return float(result[0]['value'][1]) if result else 0.0
def _check_availability(self) -> bool:
"""Check service availability"""
try:
response = requests.get(f"{self.service_url}/health", timeout=5)
return response.status_code == 200
except:
return False
def rollback(self):
"""Rollback to previous version"""
print("Rolling back deployment...")
# Get previous version
result = subprocess.run(
["kubectl", "rollout", "history", "deployment/agent"],
capture_output=True,
text=True
)
# Rollback
subprocess.run([
"kubectl", "rollout", "undo", "deployment/agent"
])
# Wait for rollback
subprocess.run([
"kubectl", "rollout", "status", "deployment/agent"
])
# Notify team
self._send_alert("Deployment rolled back due to issues")
# Usage in deployment pipeline
monitor = DeploymentMonitor(
service_url="http://agent-service",
thresholds={
"error_rate": 0.05, # 5%
"latency": 5.0, # 5 seconds
}
)
success = monitor.monitor_deployment(duration=300)
if not success:
exit(1)
import asyncio
from typing import List, Dict
class ParallelAgent:
"""Agent that executes independent tools in parallel"""
async def run_async(self, task: str) -> Dict:
"""Execute task with parallel tool calls"""
# Plan steps
plan = await self._plan_async(task)
# Group independent steps
execution_groups = self._group_independent_steps(plan)
results = []
for group in execution_groups:
# Execute group in parallel
group_results = await asyncio.gather(*[
self._execute_step_async(step)
for step in group
])
results.extend(group_results)
# Synthesize final result
return await self._synthesize_async(results)
def _group_independent_steps(self, plan: List[Dict]) -> List[List[Dict]]:
"""Group steps that can run in parallel"""
groups = []
current_group = []
dependencies = set()
for step in plan:
# If step depends on previous outputs, start new group
if step.get('dependencies') and step['dependencies'] & dependencies:
if current_group:
groups.append(current_group)
current_group = [step]
dependencies = {step['id']}
else:
current_group.append(step)
dependencies.add(step['id'])
if current_group:
groups.append(current_group)
return groups
async def _execute_step_async(self, step: Dict) -> Dict:
"""Execute single step asynchronously"""
tool = self.tools[step['tool']]
result = await tool.run_async(step['input'])
return {"step_id": step['id'], "result": result}
# Usage - 5x faster for independent tools!
agent = ParallelAgent()
result = await agent.run_async("Research competitors and analyze market trends")
# Executes research and analysis in parallel instead of sequentially
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
import redis
from redis.connection import ConnectionPool
class ResourceManager:
"""Manage database and cache connections efficiently"""
def __init__(self):
# Database connection pool
self.db_engine = create_engine(
'postgresql://user:pass@localhost/agents',
poolclass=QueuePool,
pool_size=20, # Keep 20 connections open
max_overflow=10, # Allow 10 additional on demand
pool_pre_ping=True, # Check connection health
pool_recycle=3600 # Recycle connections after 1 hour
)
# Redis connection pool
self.redis_pool = ConnectionPool(
host='localhost',
port=6379,
max_connections=50,
decode_responses=True
)
self.redis = redis.Redis(connection_pool=self.redis_pool)
def get_db_connection(self):
"""Get database connection from pool"""
return self.db_engine.connect()
def get_cache_client(self):
"""Get Redis client (uses connection pool automatically)"""
return self.redis
# Share resource manager across workers
resource_manager = ResourceManager()
# Each request reuses connections instead of creating new ones
conn = resource_manager.get_db_connection()
# ... use connection ...
conn.close() # Returns to pool, doesn't actually close!
from sqlalchemy.orm import selectinload, joinedload
class OptimizedAgentDataAccess:
"""Efficient database queries for agent data"""
def get_agent_with_history(self, agent_id: str, limit: int = 100):
"""Get agent with execution history - optimized"""
# BAD: N+1 query problem
# agent = session.query(Agent).get(agent_id)
# for execution in agent.executions: # Each loops -> 1 query!
# print(execution.result)
# GOOD: Single query with JOIN
agent = (
session.query(Agent)
.options(selectinload(Agent.executions).limit(limit))
.filter(Agent.id == agent_id)
.one()
)
return agent
def get_popular_tools(self, days: int = 7):
"""Get tool usage statistics - optimized"""
# Use database aggregation instead of loading all data
result = session.query(
ToolUsage.tool_name,
func.count(ToolUsage.id).label('usage_count'),
func.avg(ToolUsage.duration).label('avg_duration')
).filter(
ToolUsage.timestamp >= datetime.now() - timedelta(days=days)
).group_by(
ToolUsage.tool_name
).order_by(
desc('usage_count')
).limit(10).all()
return result
def batch_get_results(self, task_ids: List[str]):
"""Get multiple results in single query"""
# BAD: Multiple queries
# results = [get_result(id) for id in task_ids]
# GOOD: Single batch query
results = (
session.query(TaskResult)
.filter(TaskResult.task_id.in_(task_ids))
.all()
)
return {r.task_id: r for r in results}
# Reduces database queries from 1000s to 10s!
class DebugAgent:
"""Agent with detailed debugging capabilities"""
def __init__(self, agent, debug_mode: bool = False):
self.agent = agent
self.debug_mode = debug_mode
self.trace = []
def run(self, task: str) -> Dict:
"""Execute with optional debugging"""
if self.debug_mode:
return self._run_debug(task)
else:
return self.agent.run(task)
def _run_debug(self, task: str) -> Dict:
"""Execute with full trace"""
self.trace = []
# Record input
self.trace.append({
"stage": "input",
"data": task,
"timestamp": time.time()
})
# Wrap agent methods
original_plan = self.agent._plan
original_execute = self.agent._execute
def traced_plan(task):
result = original_plan(task)
self.trace.append({
"stage": "plan",
"data": result,
"timestamp": time.time()
})
return result
def traced_execute(step):
start = time.time()
result = original_execute(step)
self.trace.append({
"stage": "execute",
"step": step,
"result": result,
"duration": time.time() - start,
"timestamp": time.time()
})
return result
self.agent._plan = traced_plan
self.agent._execute = traced_execute
# Run agent
try:
result = self.agent.run(task)
self.trace.append({
"stage": "complete",
"data": result,
"timestamp": time.time()
})
return result
except Exception as e:
self.trace.append({
"stage": "error",
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": time.time()
})
raise
finally:
# Restore original methods
self.agent._plan = original_plan
self.agent._execute = original_execute
def get_trace(self) -> List[Dict]:
"""Get execution trace"""
return self.trace
def export_trace(self, filepath: str):
"""Export trace for analysis"""
with open(filepath, 'w') as f:
json.dump(self.trace, f, indent=2)
# Usage - enable debug mode for specific users/requests
debug_agent = DebugAgent(agent, debug_mode=True)
result = debug_agent.run(problematic_task)
# If error occurs, analyze trace
trace = debug_agent.get_trace()
debug_agent.export_trace("incident_trace.json")
class FeedbackLoop:
def __init__(self, agent, feedback_db):
self.agent = agent
self.feedback_db = feedback_db
def collect_feedback(self, task_id: str, user_rating: int, notes: str):
"""Collect user feedback"""
self.feedback_db.store({
"task_id": task_id,
"rating": user_rating,
"notes": notes,
"timestamp": datetime.now()
})
def analyze_feedback(self):
"""Analyze patterns in feedback"""
feedback = self.feedback_db.get_recent(days=7)
# Find failing patterns
low_ratings = [f for f in feedback if f['rating'] < 3]
if len(low_ratings) > 5:
common_issues = extract_common_themes(low_ratings)
return {"action": "retrain", "issues": common_issues}
return {"action": "continue"}
def retrain(self, issues: list):
"""Retrain agent based on feedback"""
# Collect problematic cases
training_data = []
for issue in issues:
cases = self.feedback_db.get_cases_with_issue(issue)
training_data.extend(cases)
# Fine-tune agent
self.agent.finetune(training_data)
# Usage
loop = FeedbackLoop(agent, feedback_db)
action = loop.analyze_feedback()
if action["action"] == "retrain":
loop.retrain(action["issues"])
Handle tickets, look up account info, resolve issues, escalate to humans when needed. Reduce support costs 50-70%.
Query databases, generate insights, create visualizations. Enable non-technical users to analyze data.
Manage inventory, process orders, handle returns, optimize pricing. Automate fulfillment workflows.
Research topics, draft content, edit, publish. Scale content production 10x with agent assistance.
Write code, run tests, debug errors, refactor. Speed up development significantly.
Lead qualification, pitch customization, follow-up scheduling. Increase sales efficiency and conversion.
Agent technology is rapidly evolving. Expect:
You've completed the AI Agents course! You now understand:
You're ready to build production agent systems!
Q1: What is the primary goal of horizontal scaling for agent systems?
Q2: Which metric is most critical for monitoring production agent performance?
Q3: What is the purpose of connection pooling in production agents?
Q4: Why is observability important for production agent systems?
Q5: What is a key consideration when deploying agents in containers (Docker/Kubernetes)?
You've completed the AI Agents course and mastered building autonomous intelligent systems!
Enter your details to generate your professional certificate
β Shareable on LinkedIn β’ β Verified by AITutorials.site β’ β Completely Free
What's Next?