Train ML models to predict where bugs will occur and prioritize testing efforts based on risk
Imagine knowing where bugs will appear before they happen. No more random test selection. No more wondering "Did we test the right things?" Machine learning can analyze your codebase, historical defects, and development patterns to predict which files and modules are most likely to contain bugsโbefore your users find them.
In this tutorial, you'll build a real bug prediction system using scikit-learn, train it on historical defect data, and deploy it to prioritize your testing efforts. By the end, you'll have a working model that tells you exactly where to focus your QA resources.
Traditional testing approaches test everything equallyโor worse, test randomly based on what's convenient. But not all code is equal:
๐ก Real Impact: Microsoft Research found that bug prediction models helped reduce defects by 20-30% by focusing testing on high-risk areas. Google uses ML-based risk analysis to prioritize billions of test executions daily.
Bug prediction is a classification problem in machine learning. Given features about a code module, predict:
ML models learn from featuresโmeasurable characteristics that correlate with defects:
| Feature Category | Examples | Why It Matters |
|---|---|---|
| Code Complexity | Cyclomatic complexity, lines of code, nesting depth | Complex code is harder to understand and test |
| Change Metrics | Number of commits, lines changed, churn rate | Frequent changes introduce instability |
| Developer Metrics | Number of contributors, experience level | Too many cooks spoil the code |
| Historical Defects | Past bugs in this file, bug fix frequency | Buggy code tends to stay buggy |
| Code Dependencies | Coupling, number of dependencies | Highly coupled code spreads bugs |
First, we need historical data about which files had bugs. Let's extract this from Git and bug tracking systems:
import pandas as pd
import subprocess
import json
from collections import defaultdict
class DefectDataCollector:
"""
Collect historical defect data from Git repository
and prepare it for machine learning
"""
def __init__(self, repo_path):
self.repo_path = repo_path
self.defect_data = defaultdict(lambda: {
'bug_count': 0,
'commits': 0,
'lines_changed': 0,
'developers': set(),
'last_modified': None
})
def extract_git_history(self):
"""
Extract commit history from Git
Identify bug-fixing commits by keywords
"""
# Get all commits with file changes
cmd = [
'git', 'log',
'--all',
'--numstat',
'--pretty=format:%H|%an|%ad|%s',
'--date=short'
]
result = subprocess.run(
cmd,
cwd=self.repo_path,
capture_output=True,
text=True
)
commits = result.stdout.split('\n\n')
for commit_block in commits:
if not commit_block.strip():
continue
lines = commit_block.strip().split('\n')
if not lines:
continue
# Parse commit metadata
metadata = lines[0].split('|')
if len(metadata) < 4:
continue
commit_hash, author, date, message = metadata
# Identify if this is a bug fix
is_bug_fix = self._is_bug_fix_commit(message)
# Parse file changes
for i in range(1, len(lines)):
parts = lines[i].split('\t')
if len(parts) != 3:
continue
added, deleted, filepath = parts
# Skip non-code files
if not self._is_code_file(filepath):
continue
# Update file statistics
try:
lines_changed = int(added) + int(deleted)
except ValueError:
lines_changed = 0
self.defect_data[filepath]['commits'] += 1
self.defect_data[filepath]['lines_changed'] += lines_changed
self.defect_data[filepath]['developers'].add(author)
self.defect_data[filepath]['last_modified'] = date
if is_bug_fix:
self.defect_data[filepath]['bug_count'] += 1
print(f"โ
Extracted data for {len(self.defect_data)} files")
return self.defect_data
def _is_bug_fix_commit(self, message):
"""Identify bug fix commits by keywords"""
bug_keywords = [
'fix', 'bug', 'issue', 'defect', 'error',
'crash', 'problem', 'resolve', 'patch'
]
message_lower = message.lower()
return any(keyword in message_lower for keyword in bug_keywords)
def _is_code_file(self, filepath):
"""Filter for actual code files"""
code_extensions = ['.py', '.java', '.js', '.ts', '.cpp', '.c', '.go', '.rb']
return any(filepath.endswith(ext) for ext in code_extensions)
def to_dataframe(self):
"""Convert collected data to pandas DataFrame"""
rows = []
for filepath, data in self.defect_data.items():
rows.append({
'file': filepath,
'bug_count': data['bug_count'],
'commits': data['commits'],
'lines_changed': data['lines_changed'],
'num_developers': len(data['developers']),
'has_bug': 1 if data['bug_count'] > 0 else 0 # Target variable
})
df = pd.DataFrame(rows)
print(f"\n๐ Dataset shape: {df.shape}")
print(f"Files with bugs: {df['has_bug'].sum()} ({df['has_bug'].mean()*100:.1f}%)")
return df
# Usage
collector = DefectDataCollector('/path/to/your/repo')
collector.extract_git_history()
df = collector.to_dataframe()
print(df.head())
Now let's add code complexity features using the radon library:
# Install radon for complexity analysis
pip install radon
from radon.complexity import cc_visit
from radon.metrics import h_visit, mi_visit
import os
class ComplexityAnalyzer:
"""
Calculate code complexity metrics for prediction
"""
def __init__(self, repo_path):
self.repo_path = repo_path
def calculate_cyclomatic_complexity(self, filepath):
"""
Calculate cyclomatic complexity (CC)
Higher CC = more complex = more bugs
"""
try:
with open(os.path.join(self.repo_path, filepath), 'r') as f:
code = f.read()
# Calculate complexity for each function
complexity_list = cc_visit(code)
if not complexity_list:
return 0
# Return average complexity
avg_complexity = sum(c.complexity for c in complexity_list) / len(complexity_list)
max_complexity = max(c.complexity for c in complexity_list)
return {
'avg_complexity': avg_complexity,
'max_complexity': max_complexity,
'num_functions': len(complexity_list)
}
except Exception as e:
return {
'avg_complexity': 0,
'max_complexity': 0,
'num_functions': 0
}
def calculate_maintainability_index(self, filepath):
"""
Calculate maintainability index (0-100)
Lower MI = harder to maintain = more bugs
"""
try:
with open(os.path.join(self.repo_path, filepath), 'r') as f:
code = f.read()
mi = mi_visit(code, multi=True)
if not mi:
return 100 # Perfect score if no code
return sum(mi) / len(mi)
except Exception:
return 100
def get_file_metrics(self, filepath):
"""Get lines of code"""
try:
with open(os.path.join(self.repo_path, filepath), 'r') as f:
lines = f.readlines()
# Count non-empty, non-comment lines
loc = sum(1 for line in lines if line.strip() and not line.strip().startswith('#'))
return {
'lines_of_code': loc,
'total_lines': len(lines)
}
except Exception:
return {
'lines_of_code': 0,
'total_lines': 0
}
def enrich_dataframe(self, df):
"""Add complexity metrics to existing DataFrame"""
complexity_metrics = []
for idx, row in df.iterrows():
filepath = row['file']
# Calculate all metrics
cc = self.calculate_cyclomatic_complexity(filepath)
mi = self.calculate_maintainability_index(filepath)
loc = self.get_file_metrics(filepath)
complexity_metrics.append({
'avg_complexity': cc['avg_complexity'],
'max_complexity': cc['max_complexity'],
'num_functions': cc['num_functions'],
'maintainability_index': mi,
'lines_of_code': loc['lines_of_code'],
'total_lines': loc['total_lines']
})
if idx % 100 == 0:
print(f"Processed {idx}/{len(df)} files...")
# Merge with original DataFrame
complexity_df = pd.DataFrame(complexity_metrics)
enriched_df = pd.concat([df, complexity_df], axis=1)
print(f"โ
Added {len(complexity_df.columns)} complexity features")
return enriched_df
# Usage
analyzer = ComplexityAnalyzer('/path/to/your/repo')
enriched_df = analyzer.enrich_dataframe(df)
print(enriched_df.head())
๐ก Cyclomatic Complexity: Measures the number of independent paths through code. CC > 10 is considered risky, CC > 20 is very risky. Studies show files with CC > 15 have 2-3x more bugs.
Now let's train a Random Forest classifier to predict bugs:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from sklearn.preprocessing import StandardScaler
import numpy as np
class BugPredictor:
"""
Machine learning model to predict bug-prone files
"""
def __init__(self):
self.model = None
self.scaler = StandardScaler()
self.feature_names = None
def prepare_features(self, df):
"""
Prepare features for training
"""
# Select feature columns
feature_cols = [
'commits',
'lines_changed',
'num_developers',
'avg_complexity',
'max_complexity',
'num_functions',
'maintainability_index',
'lines_of_code'
]
self.feature_names = feature_cols
# Extract features and target
X = df[feature_cols].fillna(0)
y = df['has_bug']
# Handle any remaining NaN or inf values
X = X.replace([np.inf, -np.inf], np.nan).fillna(0)
return X, y
def train(self, df, test_size=0.3):
"""
Train the bug prediction model
"""
# Prepare data
X, y = self.prepare_features(df)
# Split into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42, stratify=y
)
# Scale features
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
print(f"๐ Training set: {len(X_train)} samples")
print(f"๐ Test set: {len(X_test)} samples")
print(f"๐ Positive class ratio: {y_train.mean()*100:.1f}%\n")
# Train Random Forest
self.model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
min_samples_split=5,
min_samples_leaf=2,
random_state=42,
class_weight='balanced' # Handle imbalanced data
)
print("๐ค Training Random Forest model...")
self.model.fit(X_train_scaled, y_train)
# Evaluate on test set
y_pred = self.model.predict(X_test_scaled)
y_pred_proba = self.model.predict_proba(X_test_scaled)[:, 1]
print("\n" + "="*60)
print("MODEL PERFORMANCE")
print("="*60)
print("\nClassification Report:")
print(classification_report(y_test, y_pred, target_names=['No Bug', 'Bug']))
print("\nConfusion Matrix:")
cm = confusion_matrix(y_test, y_pred)
print(cm)
print(f"True Negatives: {cm[0,0]}, False Positives: {cm[0,1]}")
print(f"False Negatives: {cm[1,0]}, True Positives: {cm[1,1]}")
print(f"\nROC-AUC Score: {roc_auc_score(y_test, y_pred_proba):.3f}")
# Feature importance
self._print_feature_importance()
return {
'X_test': X_test_scaled,
'y_test': y_test,
'y_pred': y_pred,
'y_pred_proba': y_pred_proba
}
def _print_feature_importance(self):
"""Show which features matter most"""
if self.model is None:
return
importances = self.model.feature_importances_
indices = np.argsort(importances)[::-1]
print("\n" + "="*60)
print("FEATURE IMPORTANCE")
print("="*60)
for i, idx in enumerate(indices):
print(f"{i+1}. {self.feature_names[idx]}: {importances[idx]:.4f}")
def predict_bug_probability(self, file_features):
"""
Predict probability that a file contains bugs
Args:
file_features: dict with keys matching feature_names
Returns:
float: Probability of bug (0.0 to 1.0)
"""
if self.model is None:
raise ValueError("Model not trained yet!")
# Create feature vector
X = np.array([[file_features[f] for f in self.feature_names]])
X_scaled = self.scaler.transform(X)
# Predict probability
prob = self.model.predict_proba(X_scaled)[0, 1]
return prob
def prioritize_testing(self, df, top_n=20):
"""
Rank files by bug probability for test prioritization
"""
X, _ = self.prepare_features(df)
X_scaled = self.scaler.transform(X)
# Get bug probabilities
bug_probs = self.model.predict_proba(X_scaled)[:, 1]
# Create results DataFrame
results = df.copy()
results['bug_probability'] = bug_probs
results['risk_level'] = pd.cut(
bug_probs,
bins=[0, 0.3, 0.6, 1.0],
labels=['Low', 'Medium', 'High']
)
# Sort by probability
results = results.sort_values('bug_probability', ascending=False)
print("\n" + "="*60)
print(f"TOP {top_n} HIGH-RISK FILES")
print("="*60)
for idx, row in results.head(top_n).iterrows():
print(f"\n๐ {row['file']}")
print(f" Risk: {row['risk_level']} ({row['bug_probability']*100:.1f}% probability)")
print(f" Complexity: {row['avg_complexity']:.1f}, LOC: {row['lines_of_code']}")
print(f" Changes: {row['commits']} commits, {row['num_developers']} developers")
return results
# Complete workflow
print("๐ Starting bug prediction pipeline...\n")
# Train model
predictor = BugPredictor()
results = predictor.train(enriched_df)
# Prioritize testing
priority_df = predictor.prioritize_testing(enriched_df, top_n=20)
โ Model Output: You now have a ranked list of files by bug probability! Focus your testing on the top 20% highest-risk files to catch 80% of bugs.
Understanding what the model tells you:
def create_test_strategy(priority_df):
"""
Generate actionable test strategy based on predictions
"""
high_risk = priority_df[priority_df['risk_level'] == 'High']
medium_risk = priority_df[priority_df['risk_level'] == 'Medium']
low_risk = priority_df[priority_df['risk_level'] == 'Low']
strategy = f"""
TEST PRIORITIZATION STRATEGY
{"="*60}
๐ด HIGH RISK FILES ({len(high_risk)} files)
- Allocate 50% of testing effort here
- Full test coverage (unit + integration + E2E)
- Mandatory code review
- Consider refactoring if complexity > 20
๐ก MEDIUM RISK FILES ({len(medium_risk)} files)
- Allocate 30% of testing effort
- Focus on integration and critical paths
- Standard code review
๐ข LOW RISK FILES ({len(low_risk)} files)
- Allocate 20% of testing effort
- Smoke tests and basic validation
- Optional code review
TOTAL EFFORT DISTRIBUTION:
Test Budget: Assume 100 testing hours
High Risk: 50 hours across {len(high_risk)} files
Medium Risk: 30 hours across {len(medium_risk)} files
Low Risk: 20 hours across {len(low_risk)} files
"""
print(strategy)
# Export high-risk files for CI/CD
high_risk_files = high_risk['file'].tolist()
with open('high_risk_files.json', 'w') as f:
json.dump(high_risk_files, f, indent=2)
print(f"โ
Saved {len(high_risk_files)} high-risk files to high_risk_files.json")
return high_risk_files
high_risk_files = create_test_strategy(priority_df)
Bug prediction isn't one-timeโretrain as your codebase evolves:
import schedule
import time
def retrain_model():
"""
Automated retraining pipeline
Run weekly or after major releases
"""
print("๐ Starting automated model retraining...")
# 1. Collect latest data
collector = DefectDataCollector('/path/to/repo')
collector.extract_git_history()
df = collector.to_dataframe()
# 2. Calculate complexity
analyzer = ComplexityAnalyzer('/path/to/repo')
enriched_df = analyzer.enrich_dataframe(df)
# 3. Retrain model
predictor = BugPredictor()
predictor.train(enriched_df)
# 4. Update prioritization
priority_df = predictor.prioritize_testing(enriched_df)
# 5. Send alerts for new high-risk files
# (Integrate with Slack, email, etc.)
print("โ
Model retraining complete!")
# Schedule weekly retraining
schedule.every().monday.at("02:00").do(retrain_model)
# Or trigger on new releases
# retrain_model() # Call after deploy
Integrate bug prediction into your CI/CD pipeline:
# .github/workflows/bug-prediction.yml
name: Bug Risk Analysis
on:
pull_request:
branches: [ main ]
jobs:
analyze-risk:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0 # Full history for analysis
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install pandas scikit-learn radon
- name: Run bug prediction
run: |
python bug_predictor.py --analyze-pr
- name: Comment on PR
uses: actions/github-script@v5
with:
script: |
const fs = require('fs');
const results = JSON.parse(fs.readFileSync('risk_analysis.json'));
let comment = '## ๐ค AI Bug Risk Analysis\\n\\n';
if (results.high_risk_files.length > 0) {
comment += 'โ ๏ธ **High Risk Files Detected:**\\n';
results.high_risk_files.forEach(file => {
comment += `- \`${file.name}\` (${(file.probability * 100).toFixed(1)}% bug probability)\\n`;
});
comment += '\\n**Recommendation:** Add comprehensive tests for these files.\\n';
} else {
comment += 'โ
No high-risk files detected in this PR.\\n';
}
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: comment
});
โ CI/CD Integration: Now every PR automatically gets a risk assessment, helping developers know where to focus testing!
โ ๏ธ Avoid Overfitting: Don't train on too few samples (<100 files) or the model will memorize specific files rather than learn patterns. Collect at least 6-12 months of history.
Challenge: Build a complete bug prediction system for an open-source project:
Bonus: Add time-series features (bug trends over time) and ensemble multiple models!
In the next tutorial, Visual AI Testing & Computer Vision, you'll learn to use AI for layout testing, screenshot comparison, and detecting visual regressions. You'll explore:
โ Tutorial Complete! You can now predict bugs before they happen and prioritize testing like data-driven QA teams at Google and Microsoft!
Check your understanding of ML-based defect prediction
1. What percentage of defects typically occur in what percentage of code modules?
2. What is cyclomatic complexity and why does it matter for bug prediction?
3. Which ML algorithm is commonly used for bug prediction in this tutorial?
4. What is the recommended approach for integrating bug prediction into development workflow?