Advanced

AI for Performance & Load Testing

Use machine learning to generate intelligent load patterns, predict bottlenecks, and optimize performance automatically

Your load test shows the system handles 1,000 users perfectly. You deploy to production. At 1,200 real users, it crashes. Why? Because real user behavior isn't uniform—it's bursty, unpredictable, and concentrated on specific features. Traditional load testing simulates robots. AI simulates humans.

In this tutorial, you'll use machine learning to create realistic load patterns, predict performance bottlenecks before they happen, and get AI-driven optimization recommendations. You'll move beyond simple "ramp up to 1000 users" scripts to intelligent, data-driven performance testing.

The Traditional Load Testing Problem

Traditional load tests are unrealistic:

💡 Real Data: Amazon found that 1 second of delay costs $1.6B annually. Google discovered 400ms delay reduces searches by 0.7%. Performance directly impacts revenue—AI helps you test what matters.

Understanding AI-Driven Performance Testing

AI transforms performance testing by:

Setting Up Performance Testing Tools

# Install performance testing tools
pip install locust requests pandas numpy scikit-learn matplotlib

# Install monitoring tools
pip install psutil prometheus-client

Building an AI Load Pattern Generator

Let's analyze production logs and generate realistic load patterns:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
from collections import defaultdict
import json

class AILoadPatternGenerator:
    """
    Generate realistic load patterns based on production data analysis
    """
    
    def __init__(self):
        self.user_patterns = {}
        self.endpoint_popularity = {}
        self.think_time_distribution = {}
    
    def analyze_production_logs(self, log_file):
        """
        Analyze production access logs to learn user behavior
        
        Expected log format: timestamp, user_id, endpoint, response_time
        """
        print("📊 Analyzing production logs...")
        
        # Load logs (assuming CSV format)
        df = pd.read_csv(log_file)
        
        # Analyze endpoint popularity
        endpoint_counts = df['endpoint'].value_counts()
        total_requests = len(df)
        
        self.endpoint_popularity = {
            endpoint: count / total_requests 
            for endpoint, count in endpoint_counts.items()
        }
        
        print(f"✅ Analyzed {len(df)} requests across {len(self.endpoint_popularity)} endpoints")
        
        # Analyze think time (time between requests per user)
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df = df.sort_values(['user_id', 'timestamp'])
        
        think_times = []
        for user_id, group in df.groupby('user_id'):
            if len(group) > 1:
                times = group['timestamp'].diff().dt.total_seconds().dropna()
                think_times.extend(times.tolist())
        
        # Fit distribution
        self.think_time_distribution = {
            'mean': np.mean(think_times),
            'std': np.std(think_times),
            'min': np.min(think_times),
            'max': np.max(think_times)
        }
        
        print(f"✅ Think time: {self.think_time_distribution['mean']:.2f}s (±{self.think_time_distribution['std']:.2f}s)")
        
        # Analyze traffic patterns over time (detect peaks)
        df['hour'] = df['timestamp'].dt.hour
        hourly_traffic = df.groupby('hour').size()
        
        self.hourly_pattern = (hourly_traffic / hourly_traffic.max()).to_dict()
        
        print(f"✅ Identified traffic patterns across 24 hours")
        
        return {
            'endpoints': self.endpoint_popularity,
            'think_time': self.think_time_distribution,
            'hourly_pattern': self.hourly_pattern
        }
    
    def generate_realistic_think_time(self):
        """Generate realistic think time based on learned distribution"""
        # Use log-normal distribution (more realistic than normal)
        think_time = np.random.lognormal(
            mean=np.log(self.think_time_distribution['mean']),
            sigma=0.5
        )
        
        # Clamp to reasonable bounds
        return np.clip(
            think_time,
            self.think_time_distribution['min'],
            self.think_time_distribution['max']
        )
    
    def generate_user_journey(self, num_requests=10):
        """
        Generate realistic user journey (sequence of endpoint requests)
        """
        journey = []
        
        for _ in range(num_requests):
            # Select endpoint based on popularity
            endpoint = random.choices(
                list(self.endpoint_popularity.keys()),
                weights=list(self.endpoint_popularity.values()),
                k=1
            )[0]
            
            think_time = self.generate_realistic_think_time()
            
            journey.append({
                'endpoint': endpoint,
                'think_time_before': think_time
            })
        
        return journey
    
    def generate_load_profile(self, duration_minutes=60, target_rps=100):
        """
        Generate realistic load profile with peaks and valleys
        
        Args:
            duration_minutes: Test duration
            target_rps: Target requests per second (average)
        
        Returns:
            List of (timestamp, rps) tuples
        """
        timestamps = []
        load_values = []
        
        current_time = datetime.now()
        
        for minute in range(duration_minutes):
            timestamp = current_time + timedelta(minutes=minute)
            hour = timestamp.hour
            
            # Apply hourly pattern
            hour_multiplier = self.hourly_pattern.get(hour, 1.0)
            
            # Add some randomness (bursty traffic)
            burst_factor = random.uniform(0.7, 1.3)
            
            # Calculate RPS for this minute
            rps = int(target_rps * hour_multiplier * burst_factor)
            
            timestamps.append(timestamp)
            load_values.append(rps)
        
        return list(zip(timestamps, load_values))
    
    def export_locust_scenarios(self, output_file='locust_scenarios.py'):
        """
        Generate Locust test scenarios based on learned patterns
        """
        
        scenario_code = '''from locust import HttpUser, task, between
import random

class RealisticUser(HttpUser):
    """
    Realistic user behavior based on production data analysis
    """
    
    # Realistic think time between requests
    wait_time = between({min_wait}, {max_wait})
    
    def on_start(self):
        """Called when a simulated user starts"""
        # Simulate login or session creation if needed
        pass
    
'''.format(
            min_wait=self.think_time_distribution['min'],
            max_wait=self.think_time_distribution['max']
        )
        
        # Generate tasks based on endpoint popularity
        for endpoint, popularity in sorted(
            self.endpoint_popularity.items(), 
            key=lambda x: x[1], 
            reverse=True
        )[:10]:  # Top 10 endpoints
            
            weight = int(popularity * 100)
            
            # Clean endpoint name for function
            func_name = endpoint.replace('/', '_').replace('-', '_').strip('_')
            
            scenario_code += f'''    @task({weight})
    def {func_name}(self):
        """Request {endpoint} (popularity: {popularity*100:.1f}%)"""
        self.client.get("{endpoint}")
    
'''
        
        with open(output_file, 'w') as f:
            f.write(scenario_code)
        
        print(f"✅ Generated Locust scenarios: {output_file}")
        print(f"   Run with: locust -f {output_file}")

# Usage with sample log data
# Create sample production log
sample_log = pd.DataFrame({
    'timestamp': pd.date_range(start='2024-01-01', periods=10000, freq='5s'),
    'user_id': [random.randint(1, 100) for _ in range(10000)],
    'endpoint': random.choices(
        ['/api/products', '/api/cart', '/api/checkout', '/api/search', '/api/user'],
        weights=[50, 20, 5, 15, 10],
        k=10000
    ),
    'response_time': [random.uniform(0.1, 2.0) for _ in range(10000)]
})

sample_log.to_csv('sample_production_logs.csv', index=False)

# Analyze and generate patterns
generator = AILoadPatternGenerator()
patterns = generator.analyze_production_logs('sample_production_logs.csv')

print("\n📊 LEARNED PATTERNS:")
print(f"Top 3 endpoints:")
for endpoint, popularity in sorted(patterns['endpoints'].items(), key=lambda x: x[1], reverse=True)[:3]:
    print(f"  {endpoint}: {popularity*100:.1f}%")

print(f"\nThink time: {patterns['think_time']['mean']:.1f}s ± {patterns['think_time']['std']:.1f}s")

# Generate user journey
journey = generator.generate_user_journey(num_requests=5)
print("\n🚶 Sample User Journey:")
for i, step in enumerate(journey, 1):
    print(f"  {i}. Wait {step['think_time_before']:.1f}s → Request {step['endpoint']}")

# Generate load profile
load_profile = generator.generate_load_profile(duration_minutes=60, target_rps=50)
print(f"\n📈 Generated load profile: {len(load_profile)} data points")

# Export Locust scenarios
generator.export_locust_scenarios('realistic_load_test.py')

Realistic Load: Your load test now mimics real user behavior—popular endpoints get more traffic, think times vary naturally, and load has peaks/valleys!

ML-Based Bottleneck Prediction

Use machine learning to predict where bottlenecks will occur:

from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd

class PerformanceBottleneckPredictor:
    """
    Predict performance bottlenecks using machine learning
    """
    
    def __init__(self):
        self.model = RandomForestRegressor(n_estimators=100, random_state=42)
        self.feature_names = []
    
    def prepare_features(self, metrics_df):
        """
        Extract features from performance metrics
        
        Expected columns: timestamp, cpu_usage, memory_usage, 
                         disk_io, network_io, request_count, response_time
        """
        features = []
        
        # Time-based features
        metrics_df['hour'] = pd.to_datetime(metrics_df['timestamp']).dt.hour
        metrics_df['day_of_week'] = pd.to_datetime(metrics_df['timestamp']).dt.dayofweek
        
        # Rolling averages (trend detection)
        metrics_df['cpu_rolling_avg'] = metrics_df['cpu_usage'].rolling(window=5).mean()
        metrics_df['memory_rolling_avg'] = metrics_df['memory_usage'].rolling(window=5).mean()
        
        # Rate of change
        metrics_df['cpu_change_rate'] = metrics_df['cpu_usage'].diff()
        metrics_df['memory_change_rate'] = metrics_df['memory_usage'].diff()
        
        # Load features
        metrics_df['requests_per_minute'] = metrics_df['request_count']
        
        # Resource utilization ratio
        metrics_df['cpu_per_request'] = metrics_df['cpu_usage'] / (metrics_df['request_count'] + 1)
        metrics_df['memory_per_request'] = metrics_df['memory_usage'] / (metrics_df['request_count'] + 1)
        
        self.feature_names = [
            'hour', 'day_of_week', 
            'cpu_usage', 'memory_usage', 'disk_io', 'network_io',
            'cpu_rolling_avg', 'memory_rolling_avg',
            'cpu_change_rate', 'memory_change_rate',
            'requests_per_minute',
            'cpu_per_request', 'memory_per_request'
        ]
        
        # Fill NaN values from rolling/diff operations
        metrics_df = metrics_df.fillna(0)
        
        return metrics_df[self.feature_names], metrics_df['response_time']
    
    def train(self, historical_metrics_csv):
        """
        Train model on historical performance data
        """
        print("🤖 Training bottleneck prediction model...")
        
        # Load historical data
        df = pd.read_csv(historical_metrics_csv)
        
        # Prepare features
        X, y = self.prepare_features(df)
        
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        
        # Train model
        self.model.fit(X_train, y_train)
        
        # Evaluate
        train_score = self.model.score(X_train, y_train)
        test_score = self.model.score(X_test, y_test)
        
        print(f"✅ Model trained!")
        print(f"   Training R² score: {train_score:.3f}")
        print(f"   Test R² score: {test_score:.3f}")
        
        # Feature importance
        self._print_feature_importance()
        
        return test_score
    
    def _print_feature_importance(self):
        """Show which features matter most"""
        importances = self.model.feature_importances_
        indices = np.argsort(importances)[::-1]
        
        print("\n📊 Top 5 Bottleneck Indicators:")
        for i in range(min(5, len(self.feature_names))):
            idx = indices[i]
            print(f"   {i+1}. {self.feature_names[idx]}: {importances[idx]:.3f}")
    
    def predict_bottlenecks(self, current_metrics):
        """
        Predict response time based on current system metrics
        
        Returns predicted response time and risk level
        """
        # Prepare features
        metrics_df = pd.DataFrame([current_metrics])
        
        # Add dummy timestamp for feature extraction
        if 'timestamp' not in metrics_df.columns:
            metrics_df['timestamp'] = datetime.now()
        
        X, _ = self.prepare_features(metrics_df)
        
        # Predict
        predicted_response_time = self.model.predict(X)[0]
        
        # Classify risk level
        if predicted_response_time < 0.5:
            risk_level = "LOW"
            color = "🟢"
        elif predicted_response_time < 1.0:
            risk_level = "MEDIUM"
            color = "🟡"
        elif predicted_response_time < 2.0:
            risk_level = "HIGH"
            color = "🟠"
        else:
            risk_level = "CRITICAL"
            color = "🔴"
        
        return {
            'predicted_response_time': predicted_response_time,
            'risk_level': risk_level,
            'risk_icon': color
        }
    
    def generate_optimization_recommendations(self, current_metrics):
        """
        Generate AI-powered optimization recommendations
        """
        recommendations = []
        
        metrics = current_metrics
        
        # CPU recommendations
        if metrics.get('cpu_usage', 0) > 80:
            recommendations.append({
                'severity': 'HIGH',
                'component': 'CPU',
                'issue': f"CPU usage at {metrics['cpu_usage']:.1f}%",
                'recommendation': "Consider horizontal scaling or optimizing CPU-intensive operations"
            })
        
        # Memory recommendations
        if metrics.get('memory_usage', 0) > 85:
            recommendations.append({
                'severity': 'HIGH',
                'component': 'Memory',
                'issue': f"Memory usage at {metrics['memory_usage']:.1f}%",
                'recommendation': "Check for memory leaks or increase instance memory"
            })
        
        # Disk I/O
        if metrics.get('disk_io', 0) > 1000:
            recommendations.append({
                'severity': 'MEDIUM',
                'component': 'Disk',
                'issue': f"High disk I/O: {metrics['disk_io']} ops/s",
                'recommendation': "Consider caching frequently accessed data or using SSD"
            })
        
        # Request load
        if metrics.get('request_count', 0) > 100:
            recommendations.append({
                'severity': 'INFO',
                'component': 'Load',
                'issue': f"High request rate: {metrics['request_count']} req/min",
                'recommendation': "Monitor for capacity limits, consider load balancing"
            })
        
        return recommendations

# Generate sample historical metrics
sample_metrics = pd.DataFrame({
    'timestamp': pd.date_range(start='2024-01-01', periods=1000, freq='1min'),
    'cpu_usage': np.random.uniform(20, 90, 1000),
    'memory_usage': np.random.uniform(30, 85, 1000),
    'disk_io': np.random.uniform(100, 2000, 1000),
    'network_io': np.random.uniform(50, 500, 1000),
    'request_count': np.random.uniform(10, 150, 1000),
    'response_time': np.random.uniform(0.1, 3.0, 1000)
})

# Simulate correlation: higher CPU/memory → higher response time
sample_metrics['response_time'] = (
    0.5 + 
    (sample_metrics['cpu_usage'] / 100) * 1.5 +
    (sample_metrics['memory_usage'] / 100) * 1.0 +
    (sample_metrics['request_count'] / 150) * 0.5 +
    np.random.normal(0, 0.1, 1000)
)

sample_metrics.to_csv('historical_performance_metrics.csv', index=False)

# Train predictor
predictor = PerformanceBottleneckPredictor()
predictor.train('historical_performance_metrics.csv')

# Predict bottlenecks for current state
current_state = {
    'cpu_usage': 85,
    'memory_usage': 75,
    'disk_io': 1500,
    'network_io': 300,
    'request_count': 120
}

prediction = predictor.predict_bottlenecks(current_state)

print("\n" + "="*60)
print("BOTTLENECK PREDICTION")
print("="*60)
print(f"{prediction['risk_icon']} Risk Level: {prediction['risk_level']}")
print(f"Predicted Response Time: {prediction['predicted_response_time']:.2f}s")

# Get recommendations
recommendations = predictor.generate_optimization_recommendations(current_state)

print("\n📋 OPTIMIZATION RECOMMENDATIONS:")
for rec in recommendations:
    print(f"\n[{rec['severity']}] {rec['component']}")
    print(f"  Issue: {rec['issue']}")
    print(f"  Recommendation: {rec['recommendation']}")

Real-Time Performance Anomaly Detection

Detect unusual performance patterns during load testing:

from sklearn.ensemble import IsolationForest
import numpy as np
import pandas as pd

class PerformanceAnomalyDetector:
    """
    Detect performance anomalies in real-time using ML
    """
    
    def __init__(self, contamination=0.1):
        """
        Args:
            contamination: Expected proportion of anomalies (0.1 = 10%)
        """
        self.model = IsolationForest(
            contamination=contamination,
            random_state=42
        )
        self.baseline_metrics = None
    
    def establish_baseline(self, normal_metrics_df):
        """
        Train on normal/baseline performance metrics
        """
        print("📊 Establishing performance baseline...")
        
        features = ['response_time', 'cpu_usage', 'memory_usage', 
                   'request_count', 'error_rate']
        
        X = normal_metrics_df[features]
        
        self.model.fit(X)
        
        # Store baseline statistics
        self.baseline_metrics = {
            'mean': X.mean().to_dict(),
            'std': X.std().to_dict(),
            'p95': X.quantile(0.95).to_dict()
        }
        
        print("✅ Baseline established!")
        print(f"   Mean response time: {self.baseline_metrics['mean']['response_time']:.2f}s")
        print(f"   P95 response time: {self.baseline_metrics['p95']['response_time']:.2f}s")
        
        return self.baseline_metrics
    
    def detect_anomalies(self, metrics_df):
        """
        Detect anomalies in real-time metrics
        
        Returns DataFrame with anomaly flag and severity
        """
        features = ['response_time', 'cpu_usage', 'memory_usage', 
                   'request_count', 'error_rate']
        
        X = metrics_df[features]
        
        # Predict: -1 for anomaly, 1 for normal
        predictions = self.model.predict(X)
        anomaly_scores = self.model.score_samples(X)
        
        # Add results to dataframe
        results = metrics_df.copy()
        results['is_anomaly'] = predictions == -1
        results['anomaly_score'] = anomaly_scores
        
        # Calculate severity (how far from normal)
        results['severity'] = 'NORMAL'
        
        for feature in features:
            if feature in self.baseline_metrics['mean']:
                baseline_mean = self.baseline_metrics['mean'][feature]
                baseline_std = self.baseline_metrics['std'][feature]
                
                deviation = abs(results[feature] - baseline_mean) / (baseline_std + 0.001)
                
                # Classify severity
                results.loc[(deviation > 2) & results['is_anomaly'], 'severity'] = 'MEDIUM'
                results.loc[(deviation > 3) & results['is_anomaly'], 'severity'] = 'HIGH'
                results.loc[(deviation > 5) & results['is_anomaly'], 'severity'] = 'CRITICAL'
        
        return results
    
    def generate_anomaly_report(self, anomalies_df):
        """Generate human-readable anomaly report"""
        anomalies = anomalies_df[anomalies_df['is_anomaly']]
        
        if len(anomalies) == 0:
            return "✅ No anomalies detected. Performance is normal."
        
        report = f"""
{'='*60}
⚠️ PERFORMANCE ANOMALIES DETECTED
{'='*60}

Total anomalies: {len(anomalies)} out of {len(anomalies_df)} measurements

SEVERITY BREAKDOWN:
"""
        
        severity_counts = anomalies['severity'].value_counts()
        for severity, count in severity_counts.items():
            report += f"  {severity}: {count}\n"
        
        report += "\n🔴 CRITICAL ANOMALIES:\n"
        
        critical = anomalies[anomalies['severity'] == 'CRITICAL']
        
        if len(critical) > 0:
            for idx, row in critical.head(5).iterrows():
                report += f"\n  Timestamp: {row.get('timestamp', 'N/A')}\n"
                report += f"  Response Time: {row['response_time']:.2f}s "
                report += f"(baseline: {self.baseline_metrics['mean']['response_time']:.2f}s)\n"
                report += f"  CPU: {row['cpu_usage']:.1f}% "
                report += f"(baseline: {self.baseline_metrics['mean']['cpu_usage']:.1f}%)\n"
                report += f"  Error Rate: {row['error_rate']:.1f}%\n"
        else:
            report += "  None\n"
        
        return report

# Generate sample baseline (normal operation)
baseline_metrics = pd.DataFrame({
    'timestamp': pd.date_range(start='2024-01-01', periods=500, freq='1min'),
    'response_time': np.random.normal(0.3, 0.05, 500),
    'cpu_usage': np.random.normal(50, 10, 500),
    'memory_usage': np.random.normal(60, 5, 500),
    'request_count': np.random.normal(80, 15, 500),
    'error_rate': np.random.normal(0.5, 0.2, 500)
})

# Generate test metrics with some anomalies
test_metrics = pd.DataFrame({
    'timestamp': pd.date_range(start='2024-01-02', periods=100, freq='1min'),
    'response_time': np.random.normal(0.3, 0.05, 100),
    'cpu_usage': np.random.normal(50, 10, 100),
    'memory_usage': np.random.normal(60, 5, 100),
    'request_count': np.random.normal(80, 15, 100),
    'error_rate': np.random.normal(0.5, 0.2, 100)
})

# Inject anomalies
anomaly_indices = [10, 25, 40, 60, 85]
for idx in anomaly_indices:
    test_metrics.loc[idx, 'response_time'] = random.uniform(2.0, 5.0)  # Spike
    test_metrics.loc[idx, 'cpu_usage'] = random.uniform(90, 98)
    test_metrics.loc[idx, 'error_rate'] = random.uniform(5, 15)

# Train detector
detector = PerformanceAnomalyDetector(contamination=0.1)
detector.establish_baseline(baseline_metrics)

# Detect anomalies
results = detector.detect_anomalies(test_metrics)

# Generate report
report = detector.generate_anomaly_report(results)
print(report)

# Show detected anomalies
detected_anomalies = results[results['is_anomaly']]
print(f"\n✅ Detected {len(detected_anomalies)} anomalies")
print(f"Actual anomalies injected: {len(anomaly_indices)}")
print(f"Detection accuracy: {len(detected_anomalies) / len(anomaly_indices) * 100:.1f}%")

Anomaly Detection: The ML model automatically flags unusual performance patterns—response time spikes, memory leaks, error rate increases—without manual threshold configuration!

Integration with Locust (Load Testing)

Integrate AI predictions with actual load testing:

# realistic_locust_test.py

from locust import HttpUser, task, between, events
import random
import json
import time

# Load AI-generated patterns
with open('learned_patterns.json', 'r') as f:
    patterns = json.load(f)

class AIGuidedUser(HttpUser):
    """
    Locust user that behaves like real users based on ML analysis
    """
    
    # Use learned think time distribution
    wait_time = between(
        patterns['think_time']['min'],
        patterns['think_time']['max']
    )
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.session_requests = 0
    
    def on_start(self):
        """Initialize user session"""
        # Simulate login
        self.client.post("/api/login", json={
            "username": f"testuser{random.randint(1, 1000)}",
            "password": "testpass123"
        })
    
    @task(50)  # 50% of requests (most popular)
    def browse_products(self):
        """Browse products - most common action"""
        self.client.get("/api/products")
        self.session_requests += 1
    
    @task(20)  # 20% of requests
    def view_product_details(self):
        """View specific product"""
        product_id = random.randint(1, 100)
        self.client.get(f"/api/products/{product_id}")
        self.session_requests += 1
    
    @task(15)  # 15% of requests
    def search_products(self):
        """Search for products"""
        query = random.choice(['laptop', 'phone', 'tablet', 'watch'])
        self.client.get(f"/api/search?q={query}")
        self.session_requests += 1
    
    @task(10)  # 10% of requests
    def add_to_cart(self):
        """Add item to cart"""
        self.client.post("/api/cart", json={
            "product_id": random.randint(1, 100),
            "quantity": random.randint(1, 3)
        })
        self.session_requests += 1
    
    @task(5)  # 5% of requests (conversion-critical)
    def checkout(self):
        """Complete checkout"""
        if self.session_requests > 3:  # Only after browsing
            self.client.post("/api/checkout", json={
                "payment_method": "credit_card"
            })
            self.session_requests += 1


@events.init.add_listener
def on_locust_init(environment, **kwargs):
    """Log AI guidance info"""
    print("="*60)
    print("🤖 AI-GUIDED LOAD TEST")
    print("="*60)
    print("Using ML-learned user behavior patterns")
    print(f"Think time: {patterns['think_time']['mean']:.1f}s ± {patterns['think_time']['std']:.1f}s")
    print("Task distribution based on production analytics")
    print("="*60)


# Performance monitoring
@events.request.add_listener
def on_request(request_type, name, response_time, response_length, exception, **kwargs):
    """Monitor requests and detect anomalies"""
    
    # Flag slow requests
    if response_time > 2000 and exception is None:
        print(f"⚠️ SLOW REQUEST: {name} took {response_time}ms")
    
    # Flag errors
    if exception:
        print(f"❌ ERROR: {name} - {exception}")

Run the test:

# Run Locust with AI-guided scenarios
locust -f realistic_locust_test.py --host=https://api.example.com

# Access web UI at http://localhost:8089
# Start test with gradually increasing load (e.g., 100 users, spawn rate 10/sec)

Best Practices

  1. Learn from production: Analyze real user patterns, don't guess
  2. Test realistic scenarios: Bursty load, varied think times, popular endpoints
  3. Establish baselines: Train ML on normal performance before testing
  4. Monitor key metrics: Response time, CPU, memory, error rate, throughput
  5. Detect anomalies early: Use ML to spot unusual patterns before catastrophic failure
  6. Iterate and refine: Update models as application evolves
  7. Test at peak hours: Use hourly patterns to simulate real traffic
  8. Combine AI with human insight: ML finds patterns, humans interpret business impact

⚠️ Testing in Production: Never run load tests against production without permission! Use staging environments that mirror production architecture.

Practice Exercise

Challenge: Build a complete AI-powered performance testing system that:

  1. Analyzes production logs to learn user behavior patterns
  2. Generates realistic Locust test scenarios
  3. Trains ML model to predict bottlenecks
  4. Runs load test with AI-generated load profile
  5. Detects performance anomalies in real-time
  6. Generates optimization recommendations
  7. Creates comprehensive HTML report with visualizations

Bonus: Add auto-scaling recommendations based on predicted load!

Key Takeaways

What's Next?

In the final tutorial, Building AI Testing Pipelines, you'll integrate everything into production CI/CD. You'll learn:

Tutorial Complete! You can now use AI to generate realistic load, predict bottlenecks, and optimize performance—before problems reach production!

🎯 Test Your Knowledge: AI Performance Testing

Check your understanding of AI-powered performance testing

1. What was Amazon's estimated annual cost of 1 second of delay?

$160 million
$1.6 billion
$16 million
$160 thousand

2. What makes AI-generated load patterns more realistic than traditional uniform load?

They use more users
They include bursty traffic, varied think times, and weighted endpoint popularity based on production data
They run faster
They cost less

3. What ML technique is used for real-time performance anomaly detection?

Linear Regression
Isolation Forest (unsupervised anomaly detection)
K-Means Clustering
Naive Bayes

4. What should you analyze from production logs to create realistic load tests?

Only the number of users
Only response times
Endpoint popularity, think time distribution, hourly traffic patterns, and user journeys
Only error rates