Intermediate

Bug Prediction with Machine Learning

Train ML models to predict where bugs will occur and prioritize testing efforts based on risk

Imagine knowing where bugs will appear before they happen. No more random test selection. No more wondering "Did we test the right things?" Machine learning can analyze your codebase, historical defects, and development patterns to predict which files and modules are most likely to contain bugsโ€”before your users find them.

In this tutorial, you'll build a real bug prediction system using scikit-learn, train it on historical defect data, and deploy it to prioritize your testing efforts. By the end, you'll have a working model that tells you exactly where to focus your QA resources.

Why Predict Bugs?

Traditional testing approaches test everything equallyโ€”or worse, test randomly based on what's convenient. But not all code is equal:

๐Ÿ’ก Real Impact: Microsoft Research found that bug prediction models helped reduce defects by 20-30% by focusing testing on high-risk areas. Google uses ML-based risk analysis to prioritize billions of test executions daily.

Understanding Defect Prediction

Bug prediction is a classification problem in machine learning. Given features about a code module, predict:

What Features Predict Bugs?

ML models learn from featuresโ€”measurable characteristics that correlate with defects:

Feature Category Examples Why It Matters
Code Complexity Cyclomatic complexity, lines of code, nesting depth Complex code is harder to understand and test
Change Metrics Number of commits, lines changed, churn rate Frequent changes introduce instability
Developer Metrics Number of contributors, experience level Too many cooks spoil the code
Historical Defects Past bugs in this file, bug fix frequency Buggy code tends to stay buggy
Code Dependencies Coupling, number of dependencies Highly coupled code spreads bugs

Collecting Training Data

First, we need historical data about which files had bugs. Let's extract this from Git and bug tracking systems:

import pandas as pd
import subprocess
import json
from collections import defaultdict

class DefectDataCollector:
    """
    Collect historical defect data from Git repository
    and prepare it for machine learning
    """
    
    def __init__(self, repo_path):
        self.repo_path = repo_path
        self.defect_data = defaultdict(lambda: {
            'bug_count': 0,
            'commits': 0,
            'lines_changed': 0,
            'developers': set(),
            'last_modified': None
        })
    
    def extract_git_history(self):
        """
        Extract commit history from Git
        Identify bug-fixing commits by keywords
        """
        # Get all commits with file changes
        cmd = [
            'git', 'log',
            '--all',
            '--numstat',
            '--pretty=format:%H|%an|%ad|%s',
            '--date=short'
        ]
        
        result = subprocess.run(
            cmd,
            cwd=self.repo_path,
            capture_output=True,
            text=True
        )
        
        commits = result.stdout.split('\n\n')
        
        for commit_block in commits:
            if not commit_block.strip():
                continue
            
            lines = commit_block.strip().split('\n')
            if not lines:
                continue
            
            # Parse commit metadata
            metadata = lines[0].split('|')
            if len(metadata) < 4:
                continue
            
            commit_hash, author, date, message = metadata
            
            # Identify if this is a bug fix
            is_bug_fix = self._is_bug_fix_commit(message)
            
            # Parse file changes
            for i in range(1, len(lines)):
                parts = lines[i].split('\t')
                if len(parts) != 3:
                    continue
                
                added, deleted, filepath = parts
                
                # Skip non-code files
                if not self._is_code_file(filepath):
                    continue
                
                # Update file statistics
                try:
                    lines_changed = int(added) + int(deleted)
                except ValueError:
                    lines_changed = 0
                
                self.defect_data[filepath]['commits'] += 1
                self.defect_data[filepath]['lines_changed'] += lines_changed
                self.defect_data[filepath]['developers'].add(author)
                self.defect_data[filepath]['last_modified'] = date
                
                if is_bug_fix:
                    self.defect_data[filepath]['bug_count'] += 1
        
        print(f"โœ… Extracted data for {len(self.defect_data)} files")
        return self.defect_data
    
    def _is_bug_fix_commit(self, message):
        """Identify bug fix commits by keywords"""
        bug_keywords = [
            'fix', 'bug', 'issue', 'defect', 'error',
            'crash', 'problem', 'resolve', 'patch'
        ]
        message_lower = message.lower()
        return any(keyword in message_lower for keyword in bug_keywords)
    
    def _is_code_file(self, filepath):
        """Filter for actual code files"""
        code_extensions = ['.py', '.java', '.js', '.ts', '.cpp', '.c', '.go', '.rb']
        return any(filepath.endswith(ext) for ext in code_extensions)
    
    def to_dataframe(self):
        """Convert collected data to pandas DataFrame"""
        rows = []
        
        for filepath, data in self.defect_data.items():
            rows.append({
                'file': filepath,
                'bug_count': data['bug_count'],
                'commits': data['commits'],
                'lines_changed': data['lines_changed'],
                'num_developers': len(data['developers']),
                'has_bug': 1 if data['bug_count'] > 0 else 0  # Target variable
            })
        
        df = pd.DataFrame(rows)
        print(f"\n๐Ÿ“Š Dataset shape: {df.shape}")
        print(f"Files with bugs: {df['has_bug'].sum()} ({df['has_bug'].mean()*100:.1f}%)")
        
        return df

# Usage
collector = DefectDataCollector('/path/to/your/repo')
collector.extract_git_history()
df = collector.to_dataframe()
print(df.head())

Calculating Code Complexity Metrics

Now let's add code complexity features using the radon library:

# Install radon for complexity analysis
pip install radon
from radon.complexity import cc_visit
from radon.metrics import h_visit, mi_visit
import os

class ComplexityAnalyzer:
    """
    Calculate code complexity metrics for prediction
    """
    
    def __init__(self, repo_path):
        self.repo_path = repo_path
    
    def calculate_cyclomatic_complexity(self, filepath):
        """
        Calculate cyclomatic complexity (CC)
        Higher CC = more complex = more bugs
        """
        try:
            with open(os.path.join(self.repo_path, filepath), 'r') as f:
                code = f.read()
            
            # Calculate complexity for each function
            complexity_list = cc_visit(code)
            
            if not complexity_list:
                return 0
            
            # Return average complexity
            avg_complexity = sum(c.complexity for c in complexity_list) / len(complexity_list)
            max_complexity = max(c.complexity for c in complexity_list)
            
            return {
                'avg_complexity': avg_complexity,
                'max_complexity': max_complexity,
                'num_functions': len(complexity_list)
            }
        
        except Exception as e:
            return {
                'avg_complexity': 0,
                'max_complexity': 0,
                'num_functions': 0
            }
    
    def calculate_maintainability_index(self, filepath):
        """
        Calculate maintainability index (0-100)
        Lower MI = harder to maintain = more bugs
        """
        try:
            with open(os.path.join(self.repo_path, filepath), 'r') as f:
                code = f.read()
            
            mi = mi_visit(code, multi=True)
            
            if not mi:
                return 100  # Perfect score if no code
            
            return sum(mi) / len(mi)
        
        except Exception:
            return 100
    
    def get_file_metrics(self, filepath):
        """Get lines of code"""
        try:
            with open(os.path.join(self.repo_path, filepath), 'r') as f:
                lines = f.readlines()
            
            # Count non-empty, non-comment lines
            loc = sum(1 for line in lines if line.strip() and not line.strip().startswith('#'))
            
            return {
                'lines_of_code': loc,
                'total_lines': len(lines)
            }
        except Exception:
            return {
                'lines_of_code': 0,
                'total_lines': 0
            }
    
    def enrich_dataframe(self, df):
        """Add complexity metrics to existing DataFrame"""
        
        complexity_metrics = []
        
        for idx, row in df.iterrows():
            filepath = row['file']
            
            # Calculate all metrics
            cc = self.calculate_cyclomatic_complexity(filepath)
            mi = self.calculate_maintainability_index(filepath)
            loc = self.get_file_metrics(filepath)
            
            complexity_metrics.append({
                'avg_complexity': cc['avg_complexity'],
                'max_complexity': cc['max_complexity'],
                'num_functions': cc['num_functions'],
                'maintainability_index': mi,
                'lines_of_code': loc['lines_of_code'],
                'total_lines': loc['total_lines']
            })
            
            if idx % 100 == 0:
                print(f"Processed {idx}/{len(df)} files...")
        
        # Merge with original DataFrame
        complexity_df = pd.DataFrame(complexity_metrics)
        enriched_df = pd.concat([df, complexity_df], axis=1)
        
        print(f"โœ… Added {len(complexity_df.columns)} complexity features")
        return enriched_df

# Usage
analyzer = ComplexityAnalyzer('/path/to/your/repo')
enriched_df = analyzer.enrich_dataframe(df)
print(enriched_df.head())

๐Ÿ’ก Cyclomatic Complexity: Measures the number of independent paths through code. CC > 10 is considered risky, CC > 20 is very risky. Studies show files with CC > 15 have 2-3x more bugs.

Training a Bug Prediction Model

Now let's train a Random Forest classifier to predict bugs:

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from sklearn.preprocessing import StandardScaler
import numpy as np

class BugPredictor:
    """
    Machine learning model to predict bug-prone files
    """
    
    def __init__(self):
        self.model = None
        self.scaler = StandardScaler()
        self.feature_names = None
    
    def prepare_features(self, df):
        """
        Prepare features for training
        """
        # Select feature columns
        feature_cols = [
            'commits',
            'lines_changed',
            'num_developers',
            'avg_complexity',
            'max_complexity',
            'num_functions',
            'maintainability_index',
            'lines_of_code'
        ]
        
        self.feature_names = feature_cols
        
        # Extract features and target
        X = df[feature_cols].fillna(0)
        y = df['has_bug']
        
        # Handle any remaining NaN or inf values
        X = X.replace([np.inf, -np.inf], np.nan).fillna(0)
        
        return X, y
    
    def train(self, df, test_size=0.3):
        """
        Train the bug prediction model
        """
        # Prepare data
        X, y = self.prepare_features(df)
        
        # Split into training and testing sets
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=42, stratify=y
        )
        
        # Scale features
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        
        print(f"๐Ÿ“Š Training set: {len(X_train)} samples")
        print(f"๐Ÿ“Š Test set: {len(X_test)} samples")
        print(f"๐Ÿ“Š Positive class ratio: {y_train.mean()*100:.1f}%\n")
        
        # Train Random Forest
        self.model = RandomForestClassifier(
            n_estimators=100,
            max_depth=10,
            min_samples_split=5,
            min_samples_leaf=2,
            random_state=42,
            class_weight='balanced'  # Handle imbalanced data
        )
        
        print("๐Ÿค– Training Random Forest model...")
        self.model.fit(X_train_scaled, y_train)
        
        # Evaluate on test set
        y_pred = self.model.predict(X_test_scaled)
        y_pred_proba = self.model.predict_proba(X_test_scaled)[:, 1]
        
        print("\n" + "="*60)
        print("MODEL PERFORMANCE")
        print("="*60)
        print("\nClassification Report:")
        print(classification_report(y_test, y_pred, target_names=['No Bug', 'Bug']))
        
        print("\nConfusion Matrix:")
        cm = confusion_matrix(y_test, y_pred)
        print(cm)
        print(f"True Negatives: {cm[0,0]}, False Positives: {cm[0,1]}")
        print(f"False Negatives: {cm[1,0]}, True Positives: {cm[1,1]}")
        
        print(f"\nROC-AUC Score: {roc_auc_score(y_test, y_pred_proba):.3f}")
        
        # Feature importance
        self._print_feature_importance()
        
        return {
            'X_test': X_test_scaled,
            'y_test': y_test,
            'y_pred': y_pred,
            'y_pred_proba': y_pred_proba
        }
    
    def _print_feature_importance(self):
        """Show which features matter most"""
        if self.model is None:
            return
        
        importances = self.model.feature_importances_
        indices = np.argsort(importances)[::-1]
        
        print("\n" + "="*60)
        print("FEATURE IMPORTANCE")
        print("="*60)
        
        for i, idx in enumerate(indices):
            print(f"{i+1}. {self.feature_names[idx]}: {importances[idx]:.4f}")
    
    def predict_bug_probability(self, file_features):
        """
        Predict probability that a file contains bugs
        
        Args:
            file_features: dict with keys matching feature_names
        
        Returns:
            float: Probability of bug (0.0 to 1.0)
        """
        if self.model is None:
            raise ValueError("Model not trained yet!")
        
        # Create feature vector
        X = np.array([[file_features[f] for f in self.feature_names]])
        X_scaled = self.scaler.transform(X)
        
        # Predict probability
        prob = self.model.predict_proba(X_scaled)[0, 1]
        
        return prob
    
    def prioritize_testing(self, df, top_n=20):
        """
        Rank files by bug probability for test prioritization
        """
        X, _ = self.prepare_features(df)
        X_scaled = self.scaler.transform(X)
        
        # Get bug probabilities
        bug_probs = self.model.predict_proba(X_scaled)[:, 1]
        
        # Create results DataFrame
        results = df.copy()
        results['bug_probability'] = bug_probs
        results['risk_level'] = pd.cut(
            bug_probs,
            bins=[0, 0.3, 0.6, 1.0],
            labels=['Low', 'Medium', 'High']
        )
        
        # Sort by probability
        results = results.sort_values('bug_probability', ascending=False)
        
        print("\n" + "="*60)
        print(f"TOP {top_n} HIGH-RISK FILES")
        print("="*60)
        
        for idx, row in results.head(top_n).iterrows():
            print(f"\n๐Ÿ“ {row['file']}")
            print(f"   Risk: {row['risk_level']} ({row['bug_probability']*100:.1f}% probability)")
            print(f"   Complexity: {row['avg_complexity']:.1f}, LOC: {row['lines_of_code']}")
            print(f"   Changes: {row['commits']} commits, {row['num_developers']} developers")
        
        return results

# Complete workflow
print("๐Ÿš€ Starting bug prediction pipeline...\n")

# Train model
predictor = BugPredictor()
results = predictor.train(enriched_df)

# Prioritize testing
priority_df = predictor.prioritize_testing(enriched_df, top_n=20)

โœ… Model Output: You now have a ranked list of files by bug probability! Focus your testing on the top 20% highest-risk files to catch 80% of bugs.

Interpreting Model Results

Understanding what the model tells you:

Risk-Based Test Strategy

def create_test_strategy(priority_df):
    """
    Generate actionable test strategy based on predictions
    """
    high_risk = priority_df[priority_df['risk_level'] == 'High']
    medium_risk = priority_df[priority_df['risk_level'] == 'Medium']
    low_risk = priority_df[priority_df['risk_level'] == 'Low']
    
    strategy = f"""
    
TEST PRIORITIZATION STRATEGY
{"="*60}

๐Ÿ”ด HIGH RISK FILES ({len(high_risk)} files)
   - Allocate 50% of testing effort here
   - Full test coverage (unit + integration + E2E)
   - Mandatory code review
   - Consider refactoring if complexity > 20
   
๐ŸŸก MEDIUM RISK FILES ({len(medium_risk)} files)
   - Allocate 30% of testing effort
   - Focus on integration and critical paths
   - Standard code review
   
๐ŸŸข LOW RISK FILES ({len(low_risk)} files)
   - Allocate 20% of testing effort
   - Smoke tests and basic validation
   - Optional code review
   
TOTAL EFFORT DISTRIBUTION:
   Test Budget: Assume 100 testing hours
   High Risk: 50 hours across {len(high_risk)} files
   Medium Risk: 30 hours across {len(medium_risk)} files
   Low Risk: 20 hours across {len(low_risk)} files
    """
    
    print(strategy)
    
    # Export high-risk files for CI/CD
    high_risk_files = high_risk['file'].tolist()
    
    with open('high_risk_files.json', 'w') as f:
        json.dump(high_risk_files, f, indent=2)
    
    print(f"โœ… Saved {len(high_risk_files)} high-risk files to high_risk_files.json")
    
    return high_risk_files

high_risk_files = create_test_strategy(priority_df)

Continuous Monitoring

Bug prediction isn't one-timeโ€”retrain as your codebase evolves:

import schedule
import time

def retrain_model():
    """
    Automated retraining pipeline
    Run weekly or after major releases
    """
    print("๐Ÿ”„ Starting automated model retraining...")
    
    # 1. Collect latest data
    collector = DefectDataCollector('/path/to/repo')
    collector.extract_git_history()
    df = collector.to_dataframe()
    
    # 2. Calculate complexity
    analyzer = ComplexityAnalyzer('/path/to/repo')
    enriched_df = analyzer.enrich_dataframe(df)
    
    # 3. Retrain model
    predictor = BugPredictor()
    predictor.train(enriched_df)
    
    # 4. Update prioritization
    priority_df = predictor.prioritize_testing(enriched_df)
    
    # 5. Send alerts for new high-risk files
    # (Integrate with Slack, email, etc.)
    
    print("โœ… Model retraining complete!")

# Schedule weekly retraining
schedule.every().monday.at("02:00").do(retrain_model)

# Or trigger on new releases
# retrain_model()  # Call after deploy

Real-World Integration

Integrate bug prediction into your CI/CD pipeline:

# .github/workflows/bug-prediction.yml
name: Bug Risk Analysis

on:
  pull_request:
    branches: [ main ]

jobs:
  analyze-risk:
    runs-on: ubuntu-latest
    
    steps:
    - uses: actions/checkout@v2
      with:
        fetch-depth: 0  # Full history for analysis
    
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'
    
    - name: Install dependencies
      run: |
        pip install pandas scikit-learn radon
    
    - name: Run bug prediction
      run: |
        python bug_predictor.py --analyze-pr
    
    - name: Comment on PR
      uses: actions/github-script@v5
      with:
        script: |
          const fs = require('fs');
          const results = JSON.parse(fs.readFileSync('risk_analysis.json'));
          
          let comment = '## ๐Ÿค– AI Bug Risk Analysis\\n\\n';
          
          if (results.high_risk_files.length > 0) {
            comment += 'โš ๏ธ **High Risk Files Detected:**\\n';
            results.high_risk_files.forEach(file => {
              comment += `- \`${file.name}\` (${(file.probability * 100).toFixed(1)}% bug probability)\\n`;
            });
            comment += '\\n**Recommendation:** Add comprehensive tests for these files.\\n';
          } else {
            comment += 'โœ… No high-risk files detected in this PR.\\n';
          }
          
          github.rest.issues.createComment({
            issue_number: context.issue.number,
            owner: context.repo.owner,
            repo: context.repo.repo,
            body: comment
          });

โœ… CI/CD Integration: Now every PR automatically gets a risk assessment, helping developers know where to focus testing!

Best Practices

  1. Start Simple: Begin with basic metrics (complexity, churn, history), add sophistication later
  2. Validate Predictions: Track whether predicted high-risk files actually had bugs
  3. Update Regularly: Retrain monthly or after major releases
  4. Combine with Human Judgment: Use predictions to inform, not replace, QA expertise
  5. Monitor Model Drift: If accuracy drops, investigate changes in development patterns
  6. Balance Precision/Recall: Tune thresholds based on testing capacity
  7. Explain Predictions: Use SHAP or LIME to show why a file is risky

โš ๏ธ Avoid Overfitting: Don't train on too few samples (<100 files) or the model will memorize specific files rather than learn patterns. Collect at least 6-12 months of history.

Practice Exercise

Challenge: Build a complete bug prediction system for an open-source project:

  1. Clone a GitHub repo (e.g., popular Python project with >1000 commits)
  2. Extract Git history and calculate complexity metrics
  3. Train a Random Forest model
  4. Generate a risk report ranking the top 50 highest-risk files
  5. Validate predictions by checking if recent bugs occurred in predicted high-risk files

Bonus: Add time-series features (bug trends over time) and ensemble multiple models!

Key Takeaways

What's Next?

In the next tutorial, Visual AI Testing & Computer Vision, you'll learn to use AI for layout testing, screenshot comparison, and detecting visual regressions. You'll explore:

โœ… Tutorial Complete! You can now predict bugs before they happen and prioritize testing like data-driven QA teams at Google and Microsoft!

๐ŸŽฏ Test Your Knowledge: Bug Prediction

Check your understanding of ML-based defect prediction

1. What percentage of defects typically occur in what percentage of code modules?

50% of defects in 50% of modules
80% of defects in 20% of modules
90% of defects in 10% of modules
Defects are evenly distributed

2. What is cyclomatic complexity and why does it matter for bug prediction?

The number of lines in a file
The number of independent paths through code; higher complexity correlates with more bugs
The number of functions in a module
The depth of inheritance in classes

3. Which ML algorithm is commonly used for bug prediction in this tutorial?

Linear Regression
Neural Networks
Random Forest Classifier
K-Means Clustering

4. What is the recommended approach for integrating bug prediction into development workflow?

Run predictions manually once per year
Replace all human QA with ML models
Integrate into CI/CD to analyze PRs automatically and prioritize testing
Only use predictions for legacy code