š Complete all tutorials to earn your Free MLOps Engineer Certificate
Shareable on LinkedIn ⢠Verified by AITutorials.site ⢠No signup fee
šÆ Project Overview
Build an automated monitoring and retraining system that detects drift and maintains model quality:
Evidently AI
Automated drift detection for features and predictions
Monitoring Dashboard
Real-time drift reports and visualizations
Alerting System
Slack/email notifications when drift detected
Automated Retraining
Airflow DAG triggered by drift alerts
Performance Tracking
Monitor accuracy degradation over time
Data Logging
Store predictions for drift analysis
System Architecture
Production API ā Log Predictions to DB
ā
Daily Monitoring Job
ā
Evidently AI Analysis
ā
Generate Drift Report
ā
āāāāāāāāāāā“āāāāāāāāāā
ā ā
Drift Detected? No Drift
ā ā
Send Alert Continue Monitoring
ā
Trigger Airflow DAG
ā
Automated Retraining
ā
Deploy New Model
š” Prerequisites:
- Deployed model from Project 1 or 2
- PostgreSQL or SQLite for logging predictions
- Apache Airflow installed (Docker recommended)
- Slack webhook URL (optional, for alerts)
š¾ Step 1: Prediction Logging System
# logging_middleware.py
"""
Log predictions and features to database
"""
from datetime import datetime
import pandas as pd
from sqlalchemy import create_engine, Column, Integer, Float, DateTime, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Prediction(Base):
__tablename__ = 'predictions'
id = Column(Integer, primary_key=True)
timestamp = Column(DateTime, default=datetime.utcnow)
# Features
med_inc = Column(Float)
house_age = Column(Float)
ave_rooms = Column(Float)
ave_bedrms = Column(Float)
population = Column(Float)
ave_occup = Column(Float)
latitude = Column(Float)
longitude = Column(Float)
# Prediction
prediction = Column(Float)
model_version = Column(String)
# Ground truth (when available)
actual_value = Column(Float, nullable=True)
# Database setup
engine = create_engine('sqlite:///predictions.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
def log_prediction(features, prediction, model_version='1.0.0'):
"""Log prediction to database"""
session = Session()
pred_record = Prediction(
med_inc=features['MedInc'],
house_age=features['HouseAge'],
ave_rooms=features['AveRooms'],
ave_bedrms=features['AveBedrms'],
population=features['Population'],
ave_occup=features['AveOccup'],
latitude=features['Latitude'],
longitude=features['Longitude'],
prediction=prediction,
model_version=model_version
)
session.add(pred_record)
session.commit()
session.close()
def get_recent_predictions(days=7):
"""Retrieve recent predictions"""
session = Session()
cutoff = datetime.utcnow() - timedelta(days=days)
predictions = session.query(Prediction).filter(
Prediction.timestamp > cutoff
).all()
# Convert to DataFrame
data = [{
'MedInc': p.med_inc,
'HouseAge': p.house_age,
'AveRooms': p.ave_rooms,
'AveBedrms': p.ave_bedrms,
'Population': p.population,
'AveOccup': p.ave_occup,
'Latitude': p.latitude,
'Longitude': p.longitude,
'prediction': p.prediction,
'timestamp': p.timestamp
} for p in predictions]
session.close()
return pd.DataFrame(data)
Update API with Logging
# Add to FastAPI app
from logging_middleware import log_prediction
@app.post("/predict")
async def predict(features: HouseFeatures):
"""Prediction with logging"""
# Make prediction
feature_array = np.array([[/* ... */]])
prediction = model.predict(feature_array)[0]
# Log to database
log_prediction(
features=features.dict(),
prediction=float(prediction),
model_version='1.0.0'
)
return {"prediction": float(prediction)}
š Step 2: Drift Detection with Evidently AI
# drift_detector.py
"""
Automated drift detection with Evidently AI
"""
import pandas as pd
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from datetime import datetime, timedelta
from logging_middleware import get_recent_predictions
class DriftDetector:
def __init__(self, reference_data_path='data/reference_data.csv'):
self.reference_data = pd.read_csv(reference_data_path)
self.drift_threshold = 0.3 # 30% of features drifted
def check_drift(self):
"""Check for data drift"""
# Get recent production data
current_data = get_recent_predictions(days=7)
if len(current_data) < 100:
print("ā ļø Insufficient data for drift detection")
return None
print(f"š Analyzing drift on {len(current_data)} recent predictions...")
# Create drift report
report = Report(metrics=[
DataDriftPreset(),
DataQualityPreset()
])
# Run analysis
feature_columns = [
'MedInc', 'HouseAge', 'AveRooms', 'AveBedrms',
'Population', 'AveOccup', 'Latitude', 'Longitude'
]
report.run(
reference_data=self.reference_data[feature_columns],
current_data=current_data[feature_columns]
)
# Save report
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_path = f'reports/drift_report_{timestamp}.html'
report.save_html(report_path)
# Extract results
results = report.as_dict()
drift_detected = results['metrics'][0]['result']['dataset_drift']
drift_share = results['metrics'][0]['result']['share_of_drifted_columns']
# Get drifted columns
drifted_features = [
col for col, drifted in
results['metrics'][0]['result']['drift_by_columns'].items()
if drifted
]
print(f"\nš Drift Analysis Results:")
print(f" Drift detected: {drift_detected}")
print(f" Drifted features: {drift_share:.1%}")
if drifted_features:
print(f" Features: {', '.join(drifted_features)}")
print(f" Report saved: {report_path}")
return {
'drift_detected': drift_detected,
'drift_share': drift_share,
'drifted_features': drifted_features,
'report_path': report_path,
'timestamp': timestamp,
'samples_analyzed': len(current_data)
}
def should_trigger_retraining(self, drift_results):
"""Decide if retraining should be triggered"""
if drift_results is None:
return False
if drift_results['drift_share'] > self.drift_threshold:
print(f"šØ Drift threshold exceeded: {drift_results['drift_share']:.1%} > {self.drift_threshold:.1%}")
return True
return False
# Run drift detection
if __name__ == '__main__':
detector = DriftDetector()
results = detector.check_drift()
if detector.should_trigger_retraining(results):
print("š Triggering retraining pipeline...")
# Trigger Airflow DAG (next step)
šØ Step 3: Alerting System
# alerting.py
"""
Alert system for drift detection
"""
import requests
from email.message import EmailMessage
import smtplib
import json
class AlertManager:
def __init__(self, slack_webhook=None, email_config=None):
self.slack_webhook = slack_webhook
self.email_config = email_config
def send_drift_alert(self, drift_results):
"""Send drift alert via Slack and email"""
message = self._format_alert_message(drift_results)
# Slack notification
if self.slack_webhook:
self._send_slack_alert(message, drift_results)
# Email notification
if self.email_config:
self._send_email_alert(message, drift_results)
def _format_alert_message(self, results):
"""Format alert message"""
return f"""
šØ **Model Drift Alert**
**Drift detected:** {results['drift_share']:.1%} of features drifted
**Drifted features:** {', '.join(results['drifted_features'])}
**Samples analyzed:** {results['samples_analyzed']}
**Report:** {results['report_path']}
**Action Required:**
- Review drift report: {results['report_path']}
- Automated retraining has been triggered
- Monitor retraining progress in Airflow
**Timestamp:** {results['timestamp']}
""".strip()
def _send_slack_alert(self, message, results):
"""Send Slack notification"""
payload = {
"text": "šØ Model Drift Alert",
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "šØ Model Drift Alert"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*Drift Share:*\n{results['drift_share']:.1%}"
},
{
"type": "mrkdwn",
"text": f"*Samples:*\n{results['samples_analyzed']}"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Drifted Features:*\n{', '.join(results['drifted_features'])}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Report:* `{results['report_path']}`"
}
}
]
}
try:
response = requests.post(
self.slack_webhook,
json=payload,
headers={'Content-Type': 'application/json'}
)
if response.status_code == 200:
print("ā
Slack alert sent")
else:
print(f"ā Slack alert failed: {response.status_code}")
except Exception as e:
print(f"ā Slack alert error: {e}")
def _send_email_alert(self, message, results):
"""Send email notification"""
msg = EmailMessage()
msg.set_content(message)
msg['Subject'] = f"šØ Model Drift Alert - {results['drift_share']:.1%} drift"
msg['From'] = self.email_config['from']
msg['To'] = self.email_config['to']
try:
with smtplib.SMTP_SSL(self.email_config['smtp_server'], 465) as server:
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
print("ā
Email alert sent")
except Exception as e:
print(f"ā Email alert error: {e}")
# Usage
alert_manager = AlertManager(
slack_webhook='https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
)
if drift_results and drift_detected:
alert_manager.send_drift_alert(drift_results)
š Step 4: Automated Retraining with Airflow
# dags/drift_monitoring_dag.py
"""
Airflow DAG for drift monitoring and automated retraining
"""
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime, timedelta
import sys
sys.path.append('/path/to/project')
from drift_detector import DriftDetector
from alerting import AlertManager
default_args = {
'owner': 'mlops',
'depends_on_past': False,
'email': ['team@company.com'],
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'drift_monitoring',
default_args=default_args,
description='Monitor for data drift and trigger retraining',
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False,
)
def run_drift_detection(**context):
"""Run drift detection"""
detector = DriftDetector()
results = detector.check_drift()
# Store results in XCom
context['task_instance'].xcom_push(key='drift_results', value=results)
return results
def check_if_drift_detected(**context):
"""Decide whether to trigger retraining"""
results = context['task_instance'].xcom_pull(
task_ids='detect_drift',
key='drift_results'
)
if results is None:
return 'skip_retraining'
detector = DriftDetector()
if detector.should_trigger_retraining(results):
return 'send_alert'
else:
return 'skip_retraining'
def send_drift_alert(**context):
"""Send alert notification"""
results = context['task_instance'].xcom_pull(
task_ids='detect_drift',
key='drift_results'
)
alert_manager = AlertManager(
slack_webhook='https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
)
alert_manager.send_drift_alert(results)
# Task definitions
detect_drift = PythonOperator(
task_id='detect_drift',
python_callable=run_drift_detection,
dag=dag,
)
check_drift = BranchPythonOperator(
task_id='check_drift',
python_callable=check_if_drift_detected,
dag=dag,
)
send_alert = PythonOperator(
task_id='send_alert',
python_callable=send_drift_alert,
dag=dag,
)
trigger_retraining = TriggerDagRunOperator(
task_id='trigger_retraining',
trigger_dag_id='model_retraining',
wait_for_completion=False,
dag=dag,
)
skip_retraining = BashOperator(
task_id='skip_retraining',
bash_command='echo "No retraining needed"',
dag=dag,
)
# Task dependencies
detect_drift >> check_drift
check_drift >> send_alert >> trigger_retraining
check_drift >> skip_retraining
Retraining DAG
# dags/retraining_dag.py
"""
Automated model retraining DAG
"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'mlops',
'email': ['team@company.com'],
'email_on_failure': True,
'email_on_success': True,
}
dag = DAG(
'model_retraining',
default_args=default_args,
description='Automated model retraining pipeline',
schedule_interval=None, # Triggered by drift_monitoring DAG
start_date=datetime(2024, 1, 1),
catchup=False,
)
fetch_data = BashOperator(
task_id='fetch_training_data',
bash_command='cd /path/to/project && python src/data/fetch_data.py',
dag=dag,
)
preprocess = BashOperator(
task_id='preprocess_data',
bash_command='cd /path/to/project && python src/data/preprocess.py',
dag=dag,
)
train = BashOperator(
task_id='train_model',
bash_command='cd /path/to/project && python src/models/train.py',
dag=dag,
)
evaluate = BashOperator(
task_id='evaluate_model',
bash_command='cd /path/to/project && python src/models/evaluate.py',
dag=dag,
)
deploy = BashOperator(
task_id='deploy_model',
bash_command='cd /path/to/project && python src/models/deploy.py',
dag=dag,
)
def send_completion_alert(**context):
"""Send retraining completion alert"""
from alerting import AlertManager
alert_manager = AlertManager(
slack_webhook='https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
)
message = "ā
**Model Retraining Complete**\n\nNew model has been deployed to production."
# Send notification
print(message)
notify = PythonOperator(
task_id='notify_completion',
python_callable=send_completion_alert,
dag=dag,
)
fetch_data >> preprocess >> train >> evaluate >> deploy >> notify
š Step 5: Monitoring Dashboard
# dashboard.py
"""
Streamlit dashboard for drift monitoring
"""
import streamlit as st
import pandas as pd
from drift_detector import DriftDetector
from logging_middleware import get_recent_predictions
import plotly.express as px
st.set_page_config(page_title="ML Monitoring Dashboard", layout="wide")
st.title("š Model Monitoring Dashboard")
# Sidebar
st.sidebar.header("Settings")
days = st.sidebar.slider("Days to analyze", 1, 30, 7)
# Load data
@st.cache_data
def load_data(days):
return get_recent_predictions(days=days)
current_data = load_data(days)
st.sidebar.metric("Total Predictions", len(current_data))
# Tabs
tab1, tab2, tab3 = st.tabs(["š Drift Analysis", "š Trends", "āļø Actions"])
with tab1:
st.header("Data Drift Analysis")
if st.button("Run Drift Detection"):
with st.spinner("Analyzing drift..."):
detector = DriftDetector()
results = detector.check_drift()
if results:
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Drift Detected", "Yes" if results['drift_detected'] else "No")
with col2:
st.metric("Drifted Features", f"{results['drift_share']:.1%}")
with col3:
st.metric("Samples Analyzed", results['samples_analyzed'])
if results['drifted_features']:
st.warning(f"**Drifted Features:** {', '.join(results['drifted_features'])}")
# Display report
st.info(f"Full report saved: {results['report_path']}")
with tab2:
st.header("Prediction Trends")
# Prediction distribution over time
fig = px.histogram(
current_data,
x='prediction',
nbins=50,
title='Prediction Distribution'
)
st.plotly_chart(fig, use_container_width=True)
# Predictions over time
daily_stats = current_data.groupby(
current_data['timestamp'].dt.date
)['prediction'].agg(['mean', 'std', 'count']).reset_index()
fig = px.line(
daily_stats,
x='timestamp',
y='mean',
title='Average Prediction Over Time'
)
st.plotly_chart(fig, use_container_width=True)
with tab3:
st.header("Actions")
col1, col2 = st.columns(2)
with col1:
if st.button("š Trigger Retraining"):
st.info("Retraining job triggered via Airflow")
# Trigger Airflow DAG
with col2:
if st.button("š§ Send Test Alert"):
st.info("Test alert sent to Slack/Email")
# Send test alert
# Run dashboard:
# streamlit run dashboard.py
šÆ Project Completion Checklist
- ā Prediction logging to database implemented
- ā Evidently AI drift detection configured
- ā Drift reports generated automatically
- ā Alerting system (Slack/Email) setup
- ā Airflow DAG for drift monitoring created
- ā Automated retraining pipeline integrated
- ā Streamlit monitoring dashboard deployed
- ā End-to-end automation tested
š Congratulations!
You've built a production monitoring system with automated drift detection and retraining! This ensures your model stays healthy and maintains performance over time.
Key Achievements
- Automated Monitoring: Daily drift checks without manual intervention
- Proactive Alerts: Team notified immediately when drift detected
- Self-Healing: Automated retraining triggers on drift
- Transparency: Interactive dashboard for monitoring
- Complete Pipeline: Detection ā Alert ā Retrain ā Deploy