🎓 Complete all tutorials to earn your Free MLOps Engineer Certificate
Shareable on LinkedIn • Verified by AITutorials.site • No signup fee
📊 Why Monitor ML Models?
Your model deployed successfully! Accuracy was 95% in testing. Three months later:
- Accuracy dropped to 70% but nobody noticed
- Input data distribution changed (customers from new regions)
- Model predictions became biased
- Business metrics (revenue, conversions) declined
- Response times increased from 50ms to 500ms
Model monitoring tracks model health in production, detecting issues before they impact business. Unlike traditional software, ML models silently degrade over time as data changes.
⚠️ Silent Model Failures:
- Data Drift: Input features change distribution
- Concept Drift: Relationship between features and target changes
- Model Degradation: Performance slowly declines over time
- Infrastructure Issues: Latency spikes, memory leaks
- Data Quality: Missing values, outliers, corrupted data
💡 What to Monitor:
- Model Performance: Accuracy, precision, recall, F1
- Data Quality: Missing values, out-of-range features
- Data Drift: Feature distribution changes
- Prediction Drift: Output distribution changes
- System Metrics: Latency, throughput, error rates
- Business Metrics: Conversion rate, revenue impact
🔍 Drift Detection with Evidently AI
What is Evidently?
Evidently is an open-source Python library for ML model monitoring. It detects data drift, model degradation, and data quality issues with interactive HTML reports and JSON outputs.
Installation
pip install evidently
Basic Drift Detection
"""
Detect data drift with Evidently
"""
import pandas as pd
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
# Load reference data (training data)
reference_data = pd.read_csv('training_data.csv')
# Load current production data
current_data = pd.read_csv('production_data.csv')
# Create drift report
report = Report(metrics=[
DataDriftPreset()
])
report.run(reference_data=reference_data, current_data=current_data)
# Save HTML report
report.save_html('drift_report.html')
# Get JSON results
results = report.as_dict()
print(f"Dataset drift detected: {results['metrics'][0]['result']['dataset_drift']}")
print(f"Number of drifted features: {results['metrics'][0]['result']['number_of_drifted_columns']}")
Comprehensive Monitoring Report
"""
Full monitoring report with multiple metrics
"""
from evidently.report import Report
from evidently.metric_preset import (
DataDriftPreset,
DataQualityPreset,
TargetDriftPreset,
RegressionPreset
)
# Create comprehensive report
report = Report(metrics=[
DataDriftPreset(), # Feature drift
DataQualityPreset(), # Data quality issues
TargetDriftPreset(), # Target/prediction drift
RegressionPreset() # Model performance (if labels available)
])
report.run(
reference_data=reference_data,
current_data=current_data,
column_mapping={
'target': 'price',
'prediction': 'predicted_price',
'numerical_features': ['sqft', 'bedrooms', 'age'],
'categorical_features': ['neighborhood', 'property_type']
}
)
report.save_html('monitoring_report.html')
Automated Monitoring Pipeline
"""
Automated drift monitoring with alerting
"""
from evidently.report import Report
from evidently.metrics import DatasetDriftMetric
import smtplib
from email.message import EmailMessage
def check_drift(reference_data, current_data, threshold=0.5):
"""Check for data drift and send alerts"""
# Create report
report = Report(metrics=[DatasetDriftMetric()])
report.run(reference_data=reference_data, current_data=current_data)
# Extract results
results = report.as_dict()
drift_detected = results['metrics'][0]['result']['dataset_drift']
drift_share = results['metrics'][0]['result']['share_of_drifted_columns']
print(f"Drift detected: {drift_detected}")
print(f"Drifted columns: {drift_share:.1%}")
# Alert if significant drift
if drift_share > threshold:
send_alert(
subject="⚠️ Model Drift Alert",
message=f"Significant data drift detected: {drift_share:.1%} of features drifted"
)
# Trigger retraining
trigger_retraining()
return drift_detected, drift_share
def send_alert(subject, message):
"""Send email alert"""
msg = EmailMessage()
msg.set_content(message)
msg['Subject'] = subject
msg['From'] = 'alerts@mlops.com'
msg['To'] = 'team@mlops.com'
# Send email
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.starttls()
server.login('alerts@mlops.com', 'password')
server.send_message(msg)
def trigger_retraining():
"""Trigger model retraining pipeline"""
import requests
# Trigger Airflow DAG
response = requests.post(
'http://airflow:8080/api/v1/dags/ml_training/dagRuns',
auth=('admin', 'password'),
json={"conf": {"reason": "drift_detected"}}
)
print(f"Retraining triggered: {response.status_code}")
# Run monitoring daily
if __name__ == '__main__':
reference = pd.read_csv('reference_data.csv')
current = pd.read_csv('recent_predictions.csv')
check_drift(reference, current, threshold=0.3)
Real-Time Monitoring Dashboard
"""
Create monitoring dashboard with Evidently
"""
from evidently.ui.workspace import Workspace
from evidently.ui.dashboards import DashboardPanelCounter, DashboardPanelPlot
from evidently.renderers.html_widgets import WidgetSize
# Create workspace
workspace = Workspace.create("ml_monitoring_workspace")
# Add project
project = workspace.create_project("Production Model Monitoring")
# Generate reports over time
for date in date_range:
current_data = load_data_for_date(date)
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=reference_data, current_data=current_data)
# Add to project
workspace.add_report(project.id, report)
# View dashboard
# Open http://localhost:8000 in browser
📈 Production Monitoring with WhyLabs
What is WhyLabs?
WhyLabs is a production ML monitoring platform with data profiling, drift detection, and observability. It provides real-time monitoring, alerting, and integrations with MLOps tools.
Setup
pip install whylogs
"""
WhyLabs monitoring integration
"""
import whylogs as why
from whylogs.api.writer.whylabs import WhyLabsWriter
import pandas as pd
# Configure WhyLabs
import os
os.environ["WHYLABS_DEFAULT_ORG_ID"] = "your-org-id"
os.environ["WHYLABS_API_KEY"] = "your-api-key"
os.environ["WHYLABS_DEFAULT_DATASET_ID"] = "model-1"
# Log predictions
def log_predictions(features, predictions):
"""Log model inputs and outputs to WhyLabs"""
# Create DataFrame
df = pd.DataFrame(features)
df['prediction'] = predictions
# Profile data
results = why.log(df)
# Write to WhyLabs
results.writer("whylabs").write()
# Use in prediction endpoint
@app.post("/predict")
async def predict(request: PredictRequest):
features = request.features
prediction = model.predict(features)
# Log to WhyLabs
log_predictions(features, prediction)
return {"prediction": prediction}
Reference Profile and Drift Detection
"""
Create reference profile and detect drift
"""
import whylogs as why
# Create reference profile from training data
training_data = pd.read_csv('training_data.csv')
reference_profile = why.log(training_data).profile()
# Upload reference profile
reference_profile.writer("whylabs").write()
# Production: Log and compare
production_data = pd.read_csv('production_batch.csv')
production_profile = why.log(production_data).profile()
# Compare profiles
from whylogs.core.constraints import ConstraintsBuilder
from whylogs.core.constraints.factories import no_missing_values, column_is_in_range
# Define constraints
constraints = ConstraintsBuilder(dataset_profile=reference_profile) \
.add_constraint(no_missing_values()) \
.add_constraint(column_is_in_range("age", min_value=0, max_value=120)) \
.build()
# Validate production data
report = production_profile.validate(constraints)
print(f"Validation passed: {report.passed}")
🎯 Metrics with Prometheus & Grafana
Prometheus Metrics
"""
Expose ML metrics for Prometheus
"""
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# Define metrics
prediction_requests = Counter(
'ml_prediction_requests_total',
'Total prediction requests',
['model_version', 'status']
)
prediction_latency = Histogram(
'ml_prediction_latency_seconds',
'Prediction latency',
['model_version'],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
prediction_score = Histogram(
'ml_prediction_score',
'Prediction confidence scores',
['model_version'],
buckets=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
)
model_accuracy = Gauge(
'ml_model_accuracy',
'Current model accuracy',
['model_version']
)
data_drift_score = Gauge(
'ml_data_drift_score',
'Data drift score',
['feature']
)
# Instrument prediction endpoint
@app.post("/predict")
async def predict(request: PredictRequest):
start_time = time.time()
model_version = "v2.1.0"
try:
# Make prediction
prediction = model.predict(request.features)
confidence = model.predict_proba(request.features).max()
# Record metrics
prediction_requests.labels(
model_version=model_version,
status='success'
).inc()
duration = time.time() - start_time
prediction_latency.labels(model_version=model_version).observe(duration)
prediction_score.labels(model_version=model_version).observe(confidence)
return {"prediction": prediction, "confidence": confidence}
except Exception as e:
prediction_requests.labels(
model_version=model_version,
status='error'
).inc()
raise
# Start metrics server
start_http_server(9090) # Metrics at :9090/metrics
Update Metrics Periodically
"""
Background task to update model metrics
"""
import threading
import time
def update_model_metrics():
"""Update model accuracy and drift metrics periodically"""
while True:
try:
# Calculate recent accuracy (if ground truth available)
recent_predictions = get_recent_predictions()
recent_labels = get_recent_labels()
if len(recent_labels) > 0:
accuracy = calculate_accuracy(recent_predictions, recent_labels)
model_accuracy.labels(model_version='v2.1.0').set(accuracy)
# Calculate drift scores
reference_data = load_reference_data()
current_data = get_recent_data()
for feature in reference_data.columns:
drift = calculate_drift(reference_data[feature], current_data[feature])
data_drift_score.labels(feature=feature).set(drift)
except Exception as e:
print(f"Error updating metrics: {e}")
# Update every hour
time.sleep(3600)
# Start background thread
metrics_thread = threading.Thread(target=update_model_metrics, daemon=True)
metrics_thread.start()
Prometheus Configuration
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'ml-model'
static_configs:
- targets: ['ml-model-service:9090']
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
Grafana Dashboard
{
"dashboard": {
"title": "ML Model Monitoring",
"panels": [
{
"title": "Prediction Rate",
"targets": [
{
"expr": "rate(ml_prediction_requests_total[5m])"
}
]
},
{
"title": "Prediction Latency (p95)",
"targets": [
{
"expr": "histogram_quantile(0.95, ml_prediction_latency_seconds)"
}
]
},
{
"title": "Model Accuracy",
"targets": [
{
"expr": "ml_model_accuracy"
}
]
},
{
"title": "Data Drift Score",
"targets": [
{
"expr": "ml_data_drift_score"
}
]
},
{
"title": "Error Rate",
"targets": [
{
"expr": "rate(ml_prediction_requests_total{status='error'}[5m])"
}
]
}
]
}
}
🚨 Alerting Strategies
Prometheus Alerting Rules
# alerts.yml
groups:
- name: ml_model_alerts
interval: 1m
rules:
# High error rate
- alert: HighPredictionErrorRate
expr: rate(ml_prediction_requests_total{status="error"}[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High prediction error rate"
description: "Error rate is {{ $value }} per second"
# High latency
- alert: HighPredictionLatency
expr: histogram_quantile(0.95, ml_prediction_latency_seconds) > 1.0
for: 10m
labels:
severity: warning
annotations:
summary: "High prediction latency"
description: "P95 latency is {{ $value }}s"
# Low accuracy
- alert: ModelAccuracyDegraded
expr: ml_model_accuracy < 0.85
for: 1h
labels:
severity: critical
annotations:
summary: "Model accuracy degraded"
description: "Current accuracy: {{ $value }}"
# Data drift detected
- alert: DataDriftDetected
expr: ml_data_drift_score > 0.5
for: 30m
labels:
severity: warning
annotations:
summary: "Data drift detected in {{ $labels.feature }}"
description: "Drift score: {{ $value }}"
# No predictions
- alert: NoPredictions
expr: rate(ml_prediction_requests_total[10m]) == 0
for: 15m
labels:
severity: critical
annotations:
summary: "No predictions received"
description: "Model has not received requests for 15 minutes"
Alertmanager Configuration
# alertmanager.yml
global:
resolve_timeout: 5m
route:
group_by: ['alertname', 'severity']
group_wait: 10s
group_interval: 10s
repeat_interval: 12h
receiver: 'team-slack'
routes:
- match:
severity: critical
receiver: 'pagerduty-critical'
continue: true
- match:
severity: warning
receiver: 'team-slack'
receivers:
- name: 'team-slack'
slack_configs:
- api_url: 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
channel: '#ml-alerts'
title: '{{ .GroupLabels.alertname }}'
text: '{{ range .Alerts }}{{ .Annotations.description }}{{ end }}'
- name: 'pagerduty-critical'
pagerduty_configs:
- service_key: 'your-pagerduty-key'
description: '{{ .GroupLabels.alertname }}'
📝 Structured Logging
JSON Logging for ML
"""
Structured logging for ML predictions
"""
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
}
# Add extra fields
if hasattr(record, 'prediction_id'):
log_data['prediction_id'] = record.prediction_id
if hasattr(record, 'model_version'):
log_data['model_version'] = record.model_version
if hasattr(record, 'latency'):
log_data['latency_ms'] = record.latency
if hasattr(record, 'features'):
log_data['features'] = record.features
if hasattr(record, 'prediction'):
log_data['prediction'] = record.prediction
return json.dumps(log_data)
# Configure logging
logger = logging.getLogger('ml_model')
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Log predictions
@app.post("/predict")
async def predict(request: PredictRequest):
prediction_id = str(uuid.uuid4())
start_time = time.time()
try:
prediction = model.predict(request.features)
latency = (time.time() - start_time) * 1000
# Structured logging
logger.info(
"Prediction successful",
extra={
'prediction_id': prediction_id,
'model_version': 'v2.1.0',
'latency': latency,
'features': request.features,
'prediction': prediction
}
)
return {"prediction": prediction, "id": prediction_id}
except Exception as e:
logger.error(
f"Prediction failed: {str(e)}",
extra={
'prediction_id': prediction_id,
'model_version': 'v2.1.0',
'features': request.features
}
)
raise
Log Analysis with ELK Stack
# filebeat.yml - Ship logs to Elasticsearch
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/ml-model/*.log
json.keys_under_root: true
json.add_error_key: true
output.elasticsearch:
hosts: ["elasticsearch:9200"]
index: "ml-model-logs-%{+yyyy.MM.dd}"
# Kibana dashboard for log analysis
🎯 Summary
You've mastered ML model monitoring:
Evidently AI
Open-source drift detection and data quality monitoring
WhyLabs
Production monitoring platform with real-time observability
Prometheus
Time-series metrics collection and alerting
Grafana
Visual dashboards for ML metrics and KPIs
Alerting
Proactive notifications for model issues
Logging
Structured logs for debugging and analysis
Key Takeaways
- Monitor both model performance and data quality in production
- Detect data drift early before it impacts business metrics
- Use Evidently AI for comprehensive drift analysis
- Expose custom Prometheus metrics for model-specific monitoring
- Create Grafana dashboards for visual monitoring
- Implement alerting to catch issues proactively
- Use structured logging for better debugging and analysis
🚀 Next Steps:
Your models are monitored! Next, you'll learn model retraining and continuous learning - automating retraining when drift is detected and implementing online learning strategies.
Test Your Knowledge
Q1: What is data drift?
Q2: What does Evidently AI help detect?
Q3: Why use Prometheus for ML monitoring?
Q4: What should trigger an alert in production ML monitoring?
Q5: What is structured logging?