Use machine learning to generate intelligent load patterns, predict bottlenecks, and optimize performance automatically
Your load test shows the system handles 1,000 users perfectly. You deploy to production. At 1,200 real users, it crashes. Why? Because real user behavior isn't uniform—it's bursty, unpredictable, and concentrated on specific features. Traditional load testing simulates robots. AI simulates humans.
In this tutorial, you'll use machine learning to create realistic load patterns, predict performance bottlenecks before they happen, and get AI-driven optimization recommendations. You'll move beyond simple "ramp up to 1000 users" scripts to intelligent, data-driven performance testing.
Traditional load tests are unrealistic:
💡 Real Data: Amazon found that 1 second of delay costs $1.6B annually. Google discovered 400ms delay reduces searches by 0.7%. Performance directly impacts revenue—AI helps you test what matters.
AI transforms performance testing by:
# Install performance testing tools
pip install locust requests pandas numpy scikit-learn matplotlib
# Install monitoring tools
pip install psutil prometheus-client
Let's analyze production logs and generate realistic load patterns:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
from collections import defaultdict
import json
class AILoadPatternGenerator:
"""
Generate realistic load patterns based on production data analysis
"""
def __init__(self):
self.user_patterns = {}
self.endpoint_popularity = {}
self.think_time_distribution = {}
def analyze_production_logs(self, log_file):
"""
Analyze production access logs to learn user behavior
Expected log format: timestamp, user_id, endpoint, response_time
"""
print("📊 Analyzing production logs...")
# Load logs (assuming CSV format)
df = pd.read_csv(log_file)
# Analyze endpoint popularity
endpoint_counts = df['endpoint'].value_counts()
total_requests = len(df)
self.endpoint_popularity = {
endpoint: count / total_requests
for endpoint, count in endpoint_counts.items()
}
print(f"✅ Analyzed {len(df)} requests across {len(self.endpoint_popularity)} endpoints")
# Analyze think time (time between requests per user)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values(['user_id', 'timestamp'])
think_times = []
for user_id, group in df.groupby('user_id'):
if len(group) > 1:
times = group['timestamp'].diff().dt.total_seconds().dropna()
think_times.extend(times.tolist())
# Fit distribution
self.think_time_distribution = {
'mean': np.mean(think_times),
'std': np.std(think_times),
'min': np.min(think_times),
'max': np.max(think_times)
}
print(f"✅ Think time: {self.think_time_distribution['mean']:.2f}s (±{self.think_time_distribution['std']:.2f}s)")
# Analyze traffic patterns over time (detect peaks)
df['hour'] = df['timestamp'].dt.hour
hourly_traffic = df.groupby('hour').size()
self.hourly_pattern = (hourly_traffic / hourly_traffic.max()).to_dict()
print(f"✅ Identified traffic patterns across 24 hours")
return {
'endpoints': self.endpoint_popularity,
'think_time': self.think_time_distribution,
'hourly_pattern': self.hourly_pattern
}
def generate_realistic_think_time(self):
"""Generate realistic think time based on learned distribution"""
# Use log-normal distribution (more realistic than normal)
think_time = np.random.lognormal(
mean=np.log(self.think_time_distribution['mean']),
sigma=0.5
)
# Clamp to reasonable bounds
return np.clip(
think_time,
self.think_time_distribution['min'],
self.think_time_distribution['max']
)
def generate_user_journey(self, num_requests=10):
"""
Generate realistic user journey (sequence of endpoint requests)
"""
journey = []
for _ in range(num_requests):
# Select endpoint based on popularity
endpoint = random.choices(
list(self.endpoint_popularity.keys()),
weights=list(self.endpoint_popularity.values()),
k=1
)[0]
think_time = self.generate_realistic_think_time()
journey.append({
'endpoint': endpoint,
'think_time_before': think_time
})
return journey
def generate_load_profile(self, duration_minutes=60, target_rps=100):
"""
Generate realistic load profile with peaks and valleys
Args:
duration_minutes: Test duration
target_rps: Target requests per second (average)
Returns:
List of (timestamp, rps) tuples
"""
timestamps = []
load_values = []
current_time = datetime.now()
for minute in range(duration_minutes):
timestamp = current_time + timedelta(minutes=minute)
hour = timestamp.hour
# Apply hourly pattern
hour_multiplier = self.hourly_pattern.get(hour, 1.0)
# Add some randomness (bursty traffic)
burst_factor = random.uniform(0.7, 1.3)
# Calculate RPS for this minute
rps = int(target_rps * hour_multiplier * burst_factor)
timestamps.append(timestamp)
load_values.append(rps)
return list(zip(timestamps, load_values))
def export_locust_scenarios(self, output_file='locust_scenarios.py'):
"""
Generate Locust test scenarios based on learned patterns
"""
scenario_code = '''from locust import HttpUser, task, between
import random
class RealisticUser(HttpUser):
"""
Realistic user behavior based on production data analysis
"""
# Realistic think time between requests
wait_time = between({min_wait}, {max_wait})
def on_start(self):
"""Called when a simulated user starts"""
# Simulate login or session creation if needed
pass
'''.format(
min_wait=self.think_time_distribution['min'],
max_wait=self.think_time_distribution['max']
)
# Generate tasks based on endpoint popularity
for endpoint, popularity in sorted(
self.endpoint_popularity.items(),
key=lambda x: x[1],
reverse=True
)[:10]: # Top 10 endpoints
weight = int(popularity * 100)
# Clean endpoint name for function
func_name = endpoint.replace('/', '_').replace('-', '_').strip('_')
scenario_code += f''' @task({weight})
def {func_name}(self):
"""Request {endpoint} (popularity: {popularity*100:.1f}%)"""
self.client.get("{endpoint}")
'''
with open(output_file, 'w') as f:
f.write(scenario_code)
print(f"✅ Generated Locust scenarios: {output_file}")
print(f" Run with: locust -f {output_file}")
# Usage with sample log data
# Create sample production log
sample_log = pd.DataFrame({
'timestamp': pd.date_range(start='2024-01-01', periods=10000, freq='5s'),
'user_id': [random.randint(1, 100) for _ in range(10000)],
'endpoint': random.choices(
['/api/products', '/api/cart', '/api/checkout', '/api/search', '/api/user'],
weights=[50, 20, 5, 15, 10],
k=10000
),
'response_time': [random.uniform(0.1, 2.0) for _ in range(10000)]
})
sample_log.to_csv('sample_production_logs.csv', index=False)
# Analyze and generate patterns
generator = AILoadPatternGenerator()
patterns = generator.analyze_production_logs('sample_production_logs.csv')
print("\n📊 LEARNED PATTERNS:")
print(f"Top 3 endpoints:")
for endpoint, popularity in sorted(patterns['endpoints'].items(), key=lambda x: x[1], reverse=True)[:3]:
print(f" {endpoint}: {popularity*100:.1f}%")
print(f"\nThink time: {patterns['think_time']['mean']:.1f}s ± {patterns['think_time']['std']:.1f}s")
# Generate user journey
journey = generator.generate_user_journey(num_requests=5)
print("\n🚶 Sample User Journey:")
for i, step in enumerate(journey, 1):
print(f" {i}. Wait {step['think_time_before']:.1f}s → Request {step['endpoint']}")
# Generate load profile
load_profile = generator.generate_load_profile(duration_minutes=60, target_rps=50)
print(f"\n📈 Generated load profile: {len(load_profile)} data points")
# Export Locust scenarios
generator.export_locust_scenarios('realistic_load_test.py')
✅ Realistic Load: Your load test now mimics real user behavior—popular endpoints get more traffic, think times vary naturally, and load has peaks/valleys!
Use machine learning to predict where bottlenecks will occur:
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
class PerformanceBottleneckPredictor:
"""
Predict performance bottlenecks using machine learning
"""
def __init__(self):
self.model = RandomForestRegressor(n_estimators=100, random_state=42)
self.feature_names = []
def prepare_features(self, metrics_df):
"""
Extract features from performance metrics
Expected columns: timestamp, cpu_usage, memory_usage,
disk_io, network_io, request_count, response_time
"""
features = []
# Time-based features
metrics_df['hour'] = pd.to_datetime(metrics_df['timestamp']).dt.hour
metrics_df['day_of_week'] = pd.to_datetime(metrics_df['timestamp']).dt.dayofweek
# Rolling averages (trend detection)
metrics_df['cpu_rolling_avg'] = metrics_df['cpu_usage'].rolling(window=5).mean()
metrics_df['memory_rolling_avg'] = metrics_df['memory_usage'].rolling(window=5).mean()
# Rate of change
metrics_df['cpu_change_rate'] = metrics_df['cpu_usage'].diff()
metrics_df['memory_change_rate'] = metrics_df['memory_usage'].diff()
# Load features
metrics_df['requests_per_minute'] = metrics_df['request_count']
# Resource utilization ratio
metrics_df['cpu_per_request'] = metrics_df['cpu_usage'] / (metrics_df['request_count'] + 1)
metrics_df['memory_per_request'] = metrics_df['memory_usage'] / (metrics_df['request_count'] + 1)
self.feature_names = [
'hour', 'day_of_week',
'cpu_usage', 'memory_usage', 'disk_io', 'network_io',
'cpu_rolling_avg', 'memory_rolling_avg',
'cpu_change_rate', 'memory_change_rate',
'requests_per_minute',
'cpu_per_request', 'memory_per_request'
]
# Fill NaN values from rolling/diff operations
metrics_df = metrics_df.fillna(0)
return metrics_df[self.feature_names], metrics_df['response_time']
def train(self, historical_metrics_csv):
"""
Train model on historical performance data
"""
print("🤖 Training bottleneck prediction model...")
# Load historical data
df = pd.read_csv(historical_metrics_csv)
# Prepare features
X, y = self.prepare_features(df)
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Train model
self.model.fit(X_train, y_train)
# Evaluate
train_score = self.model.score(X_train, y_train)
test_score = self.model.score(X_test, y_test)
print(f"✅ Model trained!")
print(f" Training R² score: {train_score:.3f}")
print(f" Test R² score: {test_score:.3f}")
# Feature importance
self._print_feature_importance()
return test_score
def _print_feature_importance(self):
"""Show which features matter most"""
importances = self.model.feature_importances_
indices = np.argsort(importances)[::-1]
print("\n📊 Top 5 Bottleneck Indicators:")
for i in range(min(5, len(self.feature_names))):
idx = indices[i]
print(f" {i+1}. {self.feature_names[idx]}: {importances[idx]:.3f}")
def predict_bottlenecks(self, current_metrics):
"""
Predict response time based on current system metrics
Returns predicted response time and risk level
"""
# Prepare features
metrics_df = pd.DataFrame([current_metrics])
# Add dummy timestamp for feature extraction
if 'timestamp' not in metrics_df.columns:
metrics_df['timestamp'] = datetime.now()
X, _ = self.prepare_features(metrics_df)
# Predict
predicted_response_time = self.model.predict(X)[0]
# Classify risk level
if predicted_response_time < 0.5:
risk_level = "LOW"
color = "🟢"
elif predicted_response_time < 1.0:
risk_level = "MEDIUM"
color = "🟡"
elif predicted_response_time < 2.0:
risk_level = "HIGH"
color = "🟠"
else:
risk_level = "CRITICAL"
color = "🔴"
return {
'predicted_response_time': predicted_response_time,
'risk_level': risk_level,
'risk_icon': color
}
def generate_optimization_recommendations(self, current_metrics):
"""
Generate AI-powered optimization recommendations
"""
recommendations = []
metrics = current_metrics
# CPU recommendations
if metrics.get('cpu_usage', 0) > 80:
recommendations.append({
'severity': 'HIGH',
'component': 'CPU',
'issue': f"CPU usage at {metrics['cpu_usage']:.1f}%",
'recommendation': "Consider horizontal scaling or optimizing CPU-intensive operations"
})
# Memory recommendations
if metrics.get('memory_usage', 0) > 85:
recommendations.append({
'severity': 'HIGH',
'component': 'Memory',
'issue': f"Memory usage at {metrics['memory_usage']:.1f}%",
'recommendation': "Check for memory leaks or increase instance memory"
})
# Disk I/O
if metrics.get('disk_io', 0) > 1000:
recommendations.append({
'severity': 'MEDIUM',
'component': 'Disk',
'issue': f"High disk I/O: {metrics['disk_io']} ops/s",
'recommendation': "Consider caching frequently accessed data or using SSD"
})
# Request load
if metrics.get('request_count', 0) > 100:
recommendations.append({
'severity': 'INFO',
'component': 'Load',
'issue': f"High request rate: {metrics['request_count']} req/min",
'recommendation': "Monitor for capacity limits, consider load balancing"
})
return recommendations
# Generate sample historical metrics
sample_metrics = pd.DataFrame({
'timestamp': pd.date_range(start='2024-01-01', periods=1000, freq='1min'),
'cpu_usage': np.random.uniform(20, 90, 1000),
'memory_usage': np.random.uniform(30, 85, 1000),
'disk_io': np.random.uniform(100, 2000, 1000),
'network_io': np.random.uniform(50, 500, 1000),
'request_count': np.random.uniform(10, 150, 1000),
'response_time': np.random.uniform(0.1, 3.0, 1000)
})
# Simulate correlation: higher CPU/memory → higher response time
sample_metrics['response_time'] = (
0.5 +
(sample_metrics['cpu_usage'] / 100) * 1.5 +
(sample_metrics['memory_usage'] / 100) * 1.0 +
(sample_metrics['request_count'] / 150) * 0.5 +
np.random.normal(0, 0.1, 1000)
)
sample_metrics.to_csv('historical_performance_metrics.csv', index=False)
# Train predictor
predictor = PerformanceBottleneckPredictor()
predictor.train('historical_performance_metrics.csv')
# Predict bottlenecks for current state
current_state = {
'cpu_usage': 85,
'memory_usage': 75,
'disk_io': 1500,
'network_io': 300,
'request_count': 120
}
prediction = predictor.predict_bottlenecks(current_state)
print("\n" + "="*60)
print("BOTTLENECK PREDICTION")
print("="*60)
print(f"{prediction['risk_icon']} Risk Level: {prediction['risk_level']}")
print(f"Predicted Response Time: {prediction['predicted_response_time']:.2f}s")
# Get recommendations
recommendations = predictor.generate_optimization_recommendations(current_state)
print("\n📋 OPTIMIZATION RECOMMENDATIONS:")
for rec in recommendations:
print(f"\n[{rec['severity']}] {rec['component']}")
print(f" Issue: {rec['issue']}")
print(f" Recommendation: {rec['recommendation']}")
Detect unusual performance patterns during load testing:
from sklearn.ensemble import IsolationForest
import numpy as np
import pandas as pd
class PerformanceAnomalyDetector:
"""
Detect performance anomalies in real-time using ML
"""
def __init__(self, contamination=0.1):
"""
Args:
contamination: Expected proportion of anomalies (0.1 = 10%)
"""
self.model = IsolationForest(
contamination=contamination,
random_state=42
)
self.baseline_metrics = None
def establish_baseline(self, normal_metrics_df):
"""
Train on normal/baseline performance metrics
"""
print("📊 Establishing performance baseline...")
features = ['response_time', 'cpu_usage', 'memory_usage',
'request_count', 'error_rate']
X = normal_metrics_df[features]
self.model.fit(X)
# Store baseline statistics
self.baseline_metrics = {
'mean': X.mean().to_dict(),
'std': X.std().to_dict(),
'p95': X.quantile(0.95).to_dict()
}
print("✅ Baseline established!")
print(f" Mean response time: {self.baseline_metrics['mean']['response_time']:.2f}s")
print(f" P95 response time: {self.baseline_metrics['p95']['response_time']:.2f}s")
return self.baseline_metrics
def detect_anomalies(self, metrics_df):
"""
Detect anomalies in real-time metrics
Returns DataFrame with anomaly flag and severity
"""
features = ['response_time', 'cpu_usage', 'memory_usage',
'request_count', 'error_rate']
X = metrics_df[features]
# Predict: -1 for anomaly, 1 for normal
predictions = self.model.predict(X)
anomaly_scores = self.model.score_samples(X)
# Add results to dataframe
results = metrics_df.copy()
results['is_anomaly'] = predictions == -1
results['anomaly_score'] = anomaly_scores
# Calculate severity (how far from normal)
results['severity'] = 'NORMAL'
for feature in features:
if feature in self.baseline_metrics['mean']:
baseline_mean = self.baseline_metrics['mean'][feature]
baseline_std = self.baseline_metrics['std'][feature]
deviation = abs(results[feature] - baseline_mean) / (baseline_std + 0.001)
# Classify severity
results.loc[(deviation > 2) & results['is_anomaly'], 'severity'] = 'MEDIUM'
results.loc[(deviation > 3) & results['is_anomaly'], 'severity'] = 'HIGH'
results.loc[(deviation > 5) & results['is_anomaly'], 'severity'] = 'CRITICAL'
return results
def generate_anomaly_report(self, anomalies_df):
"""Generate human-readable anomaly report"""
anomalies = anomalies_df[anomalies_df['is_anomaly']]
if len(anomalies) == 0:
return "✅ No anomalies detected. Performance is normal."
report = f"""
{'='*60}
⚠️ PERFORMANCE ANOMALIES DETECTED
{'='*60}
Total anomalies: {len(anomalies)} out of {len(anomalies_df)} measurements
SEVERITY BREAKDOWN:
"""
severity_counts = anomalies['severity'].value_counts()
for severity, count in severity_counts.items():
report += f" {severity}: {count}\n"
report += "\n🔴 CRITICAL ANOMALIES:\n"
critical = anomalies[anomalies['severity'] == 'CRITICAL']
if len(critical) > 0:
for idx, row in critical.head(5).iterrows():
report += f"\n Timestamp: {row.get('timestamp', 'N/A')}\n"
report += f" Response Time: {row['response_time']:.2f}s "
report += f"(baseline: {self.baseline_metrics['mean']['response_time']:.2f}s)\n"
report += f" CPU: {row['cpu_usage']:.1f}% "
report += f"(baseline: {self.baseline_metrics['mean']['cpu_usage']:.1f}%)\n"
report += f" Error Rate: {row['error_rate']:.1f}%\n"
else:
report += " None\n"
return report
# Generate sample baseline (normal operation)
baseline_metrics = pd.DataFrame({
'timestamp': pd.date_range(start='2024-01-01', periods=500, freq='1min'),
'response_time': np.random.normal(0.3, 0.05, 500),
'cpu_usage': np.random.normal(50, 10, 500),
'memory_usage': np.random.normal(60, 5, 500),
'request_count': np.random.normal(80, 15, 500),
'error_rate': np.random.normal(0.5, 0.2, 500)
})
# Generate test metrics with some anomalies
test_metrics = pd.DataFrame({
'timestamp': pd.date_range(start='2024-01-02', periods=100, freq='1min'),
'response_time': np.random.normal(0.3, 0.05, 100),
'cpu_usage': np.random.normal(50, 10, 100),
'memory_usage': np.random.normal(60, 5, 100),
'request_count': np.random.normal(80, 15, 100),
'error_rate': np.random.normal(0.5, 0.2, 100)
})
# Inject anomalies
anomaly_indices = [10, 25, 40, 60, 85]
for idx in anomaly_indices:
test_metrics.loc[idx, 'response_time'] = random.uniform(2.0, 5.0) # Spike
test_metrics.loc[idx, 'cpu_usage'] = random.uniform(90, 98)
test_metrics.loc[idx, 'error_rate'] = random.uniform(5, 15)
# Train detector
detector = PerformanceAnomalyDetector(contamination=0.1)
detector.establish_baseline(baseline_metrics)
# Detect anomalies
results = detector.detect_anomalies(test_metrics)
# Generate report
report = detector.generate_anomaly_report(results)
print(report)
# Show detected anomalies
detected_anomalies = results[results['is_anomaly']]
print(f"\n✅ Detected {len(detected_anomalies)} anomalies")
print(f"Actual anomalies injected: {len(anomaly_indices)}")
print(f"Detection accuracy: {len(detected_anomalies) / len(anomaly_indices) * 100:.1f}%")
✅ Anomaly Detection: The ML model automatically flags unusual performance patterns—response time spikes, memory leaks, error rate increases—without manual threshold configuration!
Integrate AI predictions with actual load testing:
# realistic_locust_test.py
from locust import HttpUser, task, between, events
import random
import json
import time
# Load AI-generated patterns
with open('learned_patterns.json', 'r') as f:
patterns = json.load(f)
class AIGuidedUser(HttpUser):
"""
Locust user that behaves like real users based on ML analysis
"""
# Use learned think time distribution
wait_time = between(
patterns['think_time']['min'],
patterns['think_time']['max']
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.session_requests = 0
def on_start(self):
"""Initialize user session"""
# Simulate login
self.client.post("/api/login", json={
"username": f"testuser{random.randint(1, 1000)}",
"password": "testpass123"
})
@task(50) # 50% of requests (most popular)
def browse_products(self):
"""Browse products - most common action"""
self.client.get("/api/products")
self.session_requests += 1
@task(20) # 20% of requests
def view_product_details(self):
"""View specific product"""
product_id = random.randint(1, 100)
self.client.get(f"/api/products/{product_id}")
self.session_requests += 1
@task(15) # 15% of requests
def search_products(self):
"""Search for products"""
query = random.choice(['laptop', 'phone', 'tablet', 'watch'])
self.client.get(f"/api/search?q={query}")
self.session_requests += 1
@task(10) # 10% of requests
def add_to_cart(self):
"""Add item to cart"""
self.client.post("/api/cart", json={
"product_id": random.randint(1, 100),
"quantity": random.randint(1, 3)
})
self.session_requests += 1
@task(5) # 5% of requests (conversion-critical)
def checkout(self):
"""Complete checkout"""
if self.session_requests > 3: # Only after browsing
self.client.post("/api/checkout", json={
"payment_method": "credit_card"
})
self.session_requests += 1
@events.init.add_listener
def on_locust_init(environment, **kwargs):
"""Log AI guidance info"""
print("="*60)
print("🤖 AI-GUIDED LOAD TEST")
print("="*60)
print("Using ML-learned user behavior patterns")
print(f"Think time: {patterns['think_time']['mean']:.1f}s ± {patterns['think_time']['std']:.1f}s")
print("Task distribution based on production analytics")
print("="*60)
# Performance monitoring
@events.request.add_listener
def on_request(request_type, name, response_time, response_length, exception, **kwargs):
"""Monitor requests and detect anomalies"""
# Flag slow requests
if response_time > 2000 and exception is None:
print(f"⚠️ SLOW REQUEST: {name} took {response_time}ms")
# Flag errors
if exception:
print(f"❌ ERROR: {name} - {exception}")
Run the test:
# Run Locust with AI-guided scenarios
locust -f realistic_locust_test.py --host=https://api.example.com
# Access web UI at http://localhost:8089
# Start test with gradually increasing load (e.g., 100 users, spawn rate 10/sec)
⚠️ Testing in Production: Never run load tests against production without permission! Use staging environments that mirror production architecture.
Challenge: Build a complete AI-powered performance testing system that:
Bonus: Add auto-scaling recommendations based on predicted load!
In the final tutorial, Building AI Testing Pipelines, you'll integrate everything into production CI/CD. You'll learn:
✅ Tutorial Complete! You can now use AI to generate realistic load, predict bottlenecks, and optimize performance—before problems reach production!
Check your understanding of AI-powered performance testing
1. What was Amazon's estimated annual cost of 1 second of delay?
2. What makes AI-generated load patterns more realistic than traditional uniform load?
3. What ML technique is used for real-time performance anomaly detection?
4. What should you analyze from production logs to create realistic load tests?