HomeMLOps EngineerModel Monitoring

Model Monitoring & Observability

Track model performance, detect data drift with Evidently AI and WhyLabs, implement alerting with Prometheus and Grafana, and keep production models healthy

📅 Tutorial 11 📊 Intermediate

🎓 Complete all tutorials to earn your Free MLOps Engineer Certificate
Shareable on LinkedIn • Verified by AITutorials.site • No signup fee

📊 Why Monitor ML Models?

Your model deployed successfully! Accuracy was 95% in testing. Three months later:

  • Accuracy dropped to 70% but nobody noticed
  • Input data distribution changed (customers from new regions)
  • Model predictions became biased
  • Business metrics (revenue, conversions) declined
  • Response times increased from 50ms to 500ms

Model monitoring tracks model health in production, detecting issues before they impact business. Unlike traditional software, ML models silently degrade over time as data changes.

⚠️ Silent Model Failures:

  • Data Drift: Input features change distribution
  • Concept Drift: Relationship between features and target changes
  • Model Degradation: Performance slowly declines over time
  • Infrastructure Issues: Latency spikes, memory leaks
  • Data Quality: Missing values, outliers, corrupted data

💡 What to Monitor:

  • Model Performance: Accuracy, precision, recall, F1
  • Data Quality: Missing values, out-of-range features
  • Data Drift: Feature distribution changes
  • Prediction Drift: Output distribution changes
  • System Metrics: Latency, throughput, error rates
  • Business Metrics: Conversion rate, revenue impact

🔍 Drift Detection with Evidently AI

What is Evidently?

Evidently is an open-source Python library for ML model monitoring. It detects data drift, model degradation, and data quality issues with interactive HTML reports and JSON outputs.

Installation

pip install evidently

Basic Drift Detection

"""
Detect data drift with Evidently
"""
import pandas as pd
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

# Load reference data (training data)
reference_data = pd.read_csv('training_data.csv')

# Load current production data
current_data = pd.read_csv('production_data.csv')

# Create drift report
report = Report(metrics=[
    DataDriftPreset()
])

report.run(reference_data=reference_data, current_data=current_data)

# Save HTML report
report.save_html('drift_report.html')

# Get JSON results
results = report.as_dict()
print(f"Dataset drift detected: {results['metrics'][0]['result']['dataset_drift']}")
print(f"Number of drifted features: {results['metrics'][0]['result']['number_of_drifted_columns']}")

Comprehensive Monitoring Report

"""
Full monitoring report with multiple metrics
"""
from evidently.report import Report
from evidently.metric_preset import (
    DataDriftPreset,
    DataQualityPreset,
    TargetDriftPreset,
    RegressionPreset
)

# Create comprehensive report
report = Report(metrics=[
    DataDriftPreset(),           # Feature drift
    DataQualityPreset(),         # Data quality issues
    TargetDriftPreset(),         # Target/prediction drift
    RegressionPreset()           # Model performance (if labels available)
])

report.run(
    reference_data=reference_data,
    current_data=current_data,
    column_mapping={
        'target': 'price',
        'prediction': 'predicted_price',
        'numerical_features': ['sqft', 'bedrooms', 'age'],
        'categorical_features': ['neighborhood', 'property_type']
    }
)

report.save_html('monitoring_report.html')

Automated Monitoring Pipeline

"""
Automated drift monitoring with alerting
"""
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric
import smtplib
from email.message import EmailMessage

def check_drift(reference_data, current_data, threshold=0.5):
    """Check for data drift and send alerts"""
    
    # Create report
    report = Report(metrics=[DatasetDriftMetric()])
    report.run(reference_data=reference_data, current_data=current_data)
    
    # Extract results
    results = report.as_dict()
    drift_detected = results['metrics'][0]['result']['dataset_drift']
    drift_share = results['metrics'][0]['result']['share_of_drifted_columns']
    
    print(f"Drift detected: {drift_detected}")
    print(f"Drifted columns: {drift_share:.1%}")
    
    # Alert if significant drift
    if drift_share > threshold:
        send_alert(
            subject="⚠️ Model Drift Alert",
            message=f"Significant data drift detected: {drift_share:.1%} of features drifted"
        )
        
        # Trigger retraining
        trigger_retraining()
    
    return drift_detected, drift_share

def send_alert(subject, message):
    """Send email alert"""
    msg = EmailMessage()
    msg.set_content(message)
    msg['Subject'] = subject
    msg['From'] = 'alerts@mlops.com'
    msg['To'] = 'team@mlops.com'
    
    # Send email
    with smtplib.SMTP('smtp.gmail.com', 587) as server:
        server.starttls()
        server.login('alerts@mlops.com', 'password')
        server.send_message(msg)

def trigger_retraining():
    """Trigger model retraining pipeline"""
    import requests
    
    # Trigger Airflow DAG
    response = requests.post(
        'http://airflow:8080/api/v1/dags/ml_training/dagRuns',
        auth=('admin', 'password'),
        json={"conf": {"reason": "drift_detected"}}
    )
    print(f"Retraining triggered: {response.status_code}")

# Run monitoring daily
if __name__ == '__main__':
    reference = pd.read_csv('reference_data.csv')
    current = pd.read_csv('recent_predictions.csv')
    
    check_drift(reference, current, threshold=0.3)

Real-Time Monitoring Dashboard

"""
Create monitoring dashboard with Evidently
"""
from evidently.ui.workspace import Workspace
from evidently.ui.dashboards import DashboardPanelCounter, DashboardPanelPlot
from evidently.renderers.html_widgets import WidgetSize

# Create workspace
workspace = Workspace.create("ml_monitoring_workspace")

# Add project
project = workspace.create_project("Production Model Monitoring")

# Generate reports over time
for date in date_range:
    current_data = load_data_for_date(date)
    
    report = Report(metrics=[DataDriftPreset()])
    report.run(reference_data=reference_data, current_data=current_data)
    
    # Add to project
    workspace.add_report(project.id, report)

# View dashboard
# Open http://localhost:8000 in browser

📈 Production Monitoring with WhyLabs

What is WhyLabs?

WhyLabs is a production ML monitoring platform with data profiling, drift detection, and observability. It provides real-time monitoring, alerting, and integrations with MLOps tools.

Setup

pip install whylogs
"""
WhyLabs monitoring integration
"""
import whylogs as why
from whylogs.api.writer.whylabs import WhyLabsWriter
import pandas as pd

# Configure WhyLabs
import os
os.environ["WHYLABS_DEFAULT_ORG_ID"] = "your-org-id"
os.environ["WHYLABS_API_KEY"] = "your-api-key"
os.environ["WHYLABS_DEFAULT_DATASET_ID"] = "model-1"

# Log predictions
def log_predictions(features, predictions):
    """Log model inputs and outputs to WhyLabs"""
    
    # Create DataFrame
    df = pd.DataFrame(features)
    df['prediction'] = predictions
    
    # Profile data
    results = why.log(df)
    
    # Write to WhyLabs
    results.writer("whylabs").write()

# Use in prediction endpoint
@app.post("/predict")
async def predict(request: PredictRequest):
    features = request.features
    prediction = model.predict(features)
    
    # Log to WhyLabs
    log_predictions(features, prediction)
    
    return {"prediction": prediction}

Reference Profile and Drift Detection

"""
Create reference profile and detect drift
"""
import whylogs as why

# Create reference profile from training data
training_data = pd.read_csv('training_data.csv')
reference_profile = why.log(training_data).profile()

# Upload reference profile
reference_profile.writer("whylabs").write()

# Production: Log and compare
production_data = pd.read_csv('production_batch.csv')
production_profile = why.log(production_data).profile()

# Compare profiles
from whylogs.core.constraints import ConstraintsBuilder
from whylogs.core.constraints.factories import no_missing_values, column_is_in_range

# Define constraints
constraints = ConstraintsBuilder(dataset_profile=reference_profile) \
    .add_constraint(no_missing_values()) \
    .add_constraint(column_is_in_range("age", min_value=0, max_value=120)) \
    .build()

# Validate production data
report = production_profile.validate(constraints)
print(f"Validation passed: {report.passed}")

🎯 Metrics with Prometheus & Grafana

Prometheus Metrics

"""
Expose ML metrics for Prometheus
"""
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

# Define metrics
prediction_requests = Counter(
    'ml_prediction_requests_total',
    'Total prediction requests',
    ['model_version', 'status']
)

prediction_latency = Histogram(
    'ml_prediction_latency_seconds',
    'Prediction latency',
    ['model_version'],
    buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)

prediction_score = Histogram(
    'ml_prediction_score',
    'Prediction confidence scores',
    ['model_version'],
    buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
)

model_accuracy = Gauge(
    'ml_model_accuracy',
    'Current model accuracy',
    ['model_version']
)

data_drift_score = Gauge(
    'ml_data_drift_score',
    'Data drift score',
    ['feature']
)

# Instrument prediction endpoint
@app.post("/predict")
async def predict(request: PredictRequest):
    start_time = time.time()
    model_version = "v2.1.0"
    
    try:
        # Make prediction
        prediction = model.predict(request.features)
        confidence = model.predict_proba(request.features).max()
        
        # Record metrics
        prediction_requests.labels(
            model_version=model_version,
            status='success'
        ).inc()
        
        duration = time.time() - start_time
        prediction_latency.labels(model_version=model_version).observe(duration)
        prediction_score.labels(model_version=model_version).observe(confidence)
        
        return {"prediction": prediction, "confidence": confidence}
    
    except Exception as e:
        prediction_requests.labels(
            model_version=model_version,
            status='error'
        ).inc()
        raise

# Start metrics server
start_http_server(9090)  # Metrics at :9090/metrics

Update Metrics Periodically

"""
Background task to update model metrics
"""
import threading
import time

def update_model_metrics():
    """Update model accuracy and drift metrics periodically"""
    while True:
        try:
            # Calculate recent accuracy (if ground truth available)
            recent_predictions = get_recent_predictions()
            recent_labels = get_recent_labels()
            
            if len(recent_labels) > 0:
                accuracy = calculate_accuracy(recent_predictions, recent_labels)
                model_accuracy.labels(model_version='v2.1.0').set(accuracy)
            
            # Calculate drift scores
            reference_data = load_reference_data()
            current_data = get_recent_data()
            
            for feature in reference_data.columns:
                drift = calculate_drift(reference_data[feature], current_data[feature])
                data_drift_score.labels(feature=feature).set(drift)
            
        except Exception as e:
            print(f"Error updating metrics: {e}")
        
        # Update every hour
        time.sleep(3600)

# Start background thread
metrics_thread = threading.Thread(target=update_model_metrics, daemon=True)
metrics_thread.start()

Prometheus Configuration

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'ml-model'
    static_configs:
      - targets: ['ml-model-service:9090']
    
  - job_name: 'kubernetes-pods'
    kubernetes_sd_configs:
      - role: pod
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
        action: keep
        regex: true

Grafana Dashboard

{
  "dashboard": {
    "title": "ML Model Monitoring",
    "panels": [
      {
        "title": "Prediction Rate",
        "targets": [
          {
            "expr": "rate(ml_prediction_requests_total[5m])"
          }
        ]
      },
      {
        "title": "Prediction Latency (p95)",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, ml_prediction_latency_seconds)"
          }
        ]
      },
      {
        "title": "Model Accuracy",
        "targets": [
          {
            "expr": "ml_model_accuracy"
          }
        ]
      },
      {
        "title": "Data Drift Score",
        "targets": [
          {
            "expr": "ml_data_drift_score"
          }
        ]
      },
      {
        "title": "Error Rate",
        "targets": [
          {
            "expr": "rate(ml_prediction_requests_total{status='error'}[5m])"
          }
        ]
      }
    ]
  }
}

🚨 Alerting Strategies

Prometheus Alerting Rules

# alerts.yml
groups:
  - name: ml_model_alerts
    interval: 1m
    rules:
      # High error rate
      - alert: HighPredictionErrorRate
        expr: rate(ml_prediction_requests_total{status="error"}[5m]) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High prediction error rate"
          description: "Error rate is {{ $value }} per second"
      
      # High latency
      - alert: HighPredictionLatency
        expr: histogram_quantile(0.95, ml_prediction_latency_seconds) > 1.0
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "High prediction latency"
          description: "P95 latency is {{ $value }}s"
      
      # Low accuracy
      - alert: ModelAccuracyDegraded
        expr: ml_model_accuracy < 0.85
        for: 1h
        labels:
          severity: critical
        annotations:
          summary: "Model accuracy degraded"
          description: "Current accuracy: {{ $value }}"
      
      # Data drift detected
      - alert: DataDriftDetected
        expr: ml_data_drift_score > 0.5
        for: 30m
        labels:
          severity: warning
        annotations:
          summary: "Data drift detected in {{ $labels.feature }}"
          description: "Drift score: {{ $value }}"
      
      # No predictions
      - alert: NoPredictions
        expr: rate(ml_prediction_requests_total[10m]) == 0
        for: 15m
        labels:
          severity: critical
        annotations:
          summary: "No predictions received"
          description: "Model has not received requests for 15 minutes"

Alertmanager Configuration

# alertmanager.yml
global:
  resolve_timeout: 5m

route:
  group_by: ['alertname', 'severity']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 12h
  receiver: 'team-slack'
  
  routes:
    - match:
        severity: critical
      receiver: 'pagerduty-critical'
      continue: true
    
    - match:
        severity: warning
      receiver: 'team-slack'

receivers:
  - name: 'team-slack'
    slack_configs:
      - api_url: 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
        channel: '#ml-alerts'
        title: '{{ .GroupLabels.alertname }}'
        text: '{{ range .Alerts }}{{ .Annotations.description }}{{ end }}'
  
  - name: 'pagerduty-critical'
    pagerduty_configs:
      - service_key: 'your-pagerduty-key'
        description: '{{ .GroupLabels.alertname }}'

📝 Structured Logging

JSON Logging for ML

"""
Structured logging for ML predictions
"""
import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': record.levelname,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
        }
        
        # Add extra fields
        if hasattr(record, 'prediction_id'):
            log_data['prediction_id'] = record.prediction_id
        if hasattr(record, 'model_version'):
            log_data['model_version'] = record.model_version
        if hasattr(record, 'latency'):
            log_data['latency_ms'] = record.latency
        if hasattr(record, 'features'):
            log_data['features'] = record.features
        if hasattr(record, 'prediction'):
            log_data['prediction'] = record.prediction
        
        return json.dumps(log_data)

# Configure logging
logger = logging.getLogger('ml_model')
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# Log predictions
@app.post("/predict")
async def predict(request: PredictRequest):
    prediction_id = str(uuid.uuid4())
    start_time = time.time()
    
    try:
        prediction = model.predict(request.features)
        latency = (time.time() - start_time) * 1000
        
        # Structured logging
        logger.info(
            "Prediction successful",
            extra={
                'prediction_id': prediction_id,
                'model_version': 'v2.1.0',
                'latency': latency,
                'features': request.features,
                'prediction': prediction
            }
        )
        
        return {"prediction": prediction, "id": prediction_id}
    
    except Exception as e:
        logger.error(
            f"Prediction failed: {str(e)}",
            extra={
                'prediction_id': prediction_id,
                'model_version': 'v2.1.0',
                'features': request.features
            }
        )
        raise

Log Analysis with ELK Stack

# filebeat.yml - Ship logs to Elasticsearch
filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/ml-model/*.log
    json.keys_under_root: true
    json.add_error_key: true

output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  index: "ml-model-logs-%{+yyyy.MM.dd}"

# Kibana dashboard for log analysis

🎯 Summary

You've mastered ML model monitoring:

🔍

Evidently AI

Open-source drift detection and data quality monitoring

📈

WhyLabs

Production monitoring platform with real-time observability

🎯

Prometheus

Time-series metrics collection and alerting

📊

Grafana

Visual dashboards for ML metrics and KPIs

🚨

Alerting

Proactive notifications for model issues

📝

Logging

Structured logs for debugging and analysis

Key Takeaways

  1. Monitor both model performance and data quality in production
  2. Detect data drift early before it impacts business metrics
  3. Use Evidently AI for comprehensive drift analysis
  4. Expose custom Prometheus metrics for model-specific monitoring
  5. Create Grafana dashboards for visual monitoring
  6. Implement alerting to catch issues proactively
  7. Use structured logging for better debugging and analysis

🚀 Next Steps:

Your models are monitored! Next, you'll learn model retraining and continuous learning - automating retraining when drift is detected and implementing online learning strategies.

Test Your Knowledge

Q1: What is data drift?

When code changes
When models retrain
When input feature distributions change over time, potentially degrading model performance
When servers crash

Q2: What does Evidently AI help detect?

Code bugs
Data drift, model degradation, and data quality issues
Security vulnerabilities
Network latency

Q3: Why use Prometheus for ML monitoring?

To collect time-series metrics like prediction latency, accuracy, and custom ML metrics
To train models
To store data
To deploy models

Q4: What should trigger an alert in production ML monitoring?

Every prediction
Successful deployments
Daily reports
High error rates, accuracy degradation, significant data drift, or high latency

Q5: What is structured logging?

Writing logs in paragraphs
Logging only errors
Logging in machine-readable format like JSON with consistent fields for easier analysis
Logging to multiple files