Home → MLOps Engineer → Project 3: Model Monitoring & Drift Detection

šŸ” Project: Model Monitoring & Drift Detection

Build automated monitoring with Evidently AI drift detection, alerting system, and Airflow-triggered retraining

šŸŽÆ Project 3 šŸ“Š Intermediate ā±ļø 2 hours

šŸŽ“ Complete all tutorials to earn your Free MLOps Engineer Certificate
Shareable on LinkedIn • Verified by AITutorials.site • No signup fee

šŸŽÆ Project Overview

Build an automated monitoring and retraining system that detects drift and maintains model quality:

šŸ”

Evidently AI

Automated drift detection for features and predictions

šŸ“Š

Monitoring Dashboard

Real-time drift reports and visualizations

🚨

Alerting System

Slack/email notifications when drift detected

šŸ”„

Automated Retraining

Airflow DAG triggered by drift alerts

šŸ“ˆ

Performance Tracking

Monitor accuracy degradation over time

šŸ’¾

Data Logging

Store predictions for drift analysis

System Architecture

Production API → Log Predictions to DB
                        ↓
            Daily Monitoring Job
                        ↓
              Evidently AI Analysis
                        ↓
              Generate Drift Report
                        ↓
            ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
            ↓                   ↓
    Drift Detected?        No Drift
            ↓                   ↓
    Send Alert          Continue Monitoring
            ↓
    Trigger Airflow DAG
            ↓
    Automated Retraining
            ↓
    Deploy New Model

šŸ’” Prerequisites:

  • Deployed model from Project 1 or 2
  • PostgreSQL or SQLite for logging predictions
  • Apache Airflow installed (Docker recommended)
  • Slack webhook URL (optional, for alerts)

šŸ’¾ Step 1: Prediction Logging System

# logging_middleware.py
"""
Log predictions and features to database
"""
from datetime import datetime
import pandas as pd
from sqlalchemy import create_engine, Column, Integer, Float, DateTime, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class Prediction(Base):
    __tablename__ = 'predictions'
    
    id = Column(Integer, primary_key=True)
    timestamp = Column(DateTime, default=datetime.utcnow)
    
    # Features
    med_inc = Column(Float)
    house_age = Column(Float)
    ave_rooms = Column(Float)
    ave_bedrms = Column(Float)
    population = Column(Float)
    ave_occup = Column(Float)
    latitude = Column(Float)
    longitude = Column(Float)
    
    # Prediction
    prediction = Column(Float)
    model_version = Column(String)
    
    # Ground truth (when available)
    actual_value = Column(Float, nullable=True)

# Database setup
engine = create_engine('sqlite:///predictions.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

def log_prediction(features, prediction, model_version='1.0.0'):
    """Log prediction to database"""
    session = Session()
    
    pred_record = Prediction(
        med_inc=features['MedInc'],
        house_age=features['HouseAge'],
        ave_rooms=features['AveRooms'],
        ave_bedrms=features['AveBedrms'],
        population=features['Population'],
        ave_occup=features['AveOccup'],
        latitude=features['Latitude'],
        longitude=features['Longitude'],
        prediction=prediction,
        model_version=model_version
    )
    
    session.add(pred_record)
    session.commit()
    session.close()

def get_recent_predictions(days=7):
    """Retrieve recent predictions"""
    session = Session()
    
    cutoff = datetime.utcnow() - timedelta(days=days)
    predictions = session.query(Prediction).filter(
        Prediction.timestamp > cutoff
    ).all()
    
    # Convert to DataFrame
    data = [{
        'MedInc': p.med_inc,
        'HouseAge': p.house_age,
        'AveRooms': p.ave_rooms,
        'AveBedrms': p.ave_bedrms,
        'Population': p.population,
        'AveOccup': p.ave_occup,
        'Latitude': p.latitude,
        'Longitude': p.longitude,
        'prediction': p.prediction,
        'timestamp': p.timestamp
    } for p in predictions]
    
    session.close()
    return pd.DataFrame(data)

Update API with Logging

# Add to FastAPI app
from logging_middleware import log_prediction

@app.post("/predict")
async def predict(features: HouseFeatures):
    """Prediction with logging"""
    
    # Make prediction
    feature_array = np.array([[/* ... */]])
    prediction = model.predict(feature_array)[0]
    
    # Log to database
    log_prediction(
        features=features.dict(),
        prediction=float(prediction),
        model_version='1.0.0'
    )
    
    return {"prediction": float(prediction)}

šŸ” Step 2: Drift Detection with Evidently AI

# drift_detector.py
"""
Automated drift detection with Evidently AI
"""
import pandas as pd
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
from datetime import datetime, timedelta
from logging_middleware import get_recent_predictions

class DriftDetector:
    def __init__(self, reference_data_path='data/reference_data.csv'):
        self.reference_data = pd.read_csv(reference_data_path)
        self.drift_threshold = 0.3  # 30% of features drifted
    
    def check_drift(self):
        """Check for data drift"""
        
        # Get recent production data
        current_data = get_recent_predictions(days=7)
        
        if len(current_data) < 100:
            print("āš ļø  Insufficient data for drift detection")
            return None
        
        print(f"šŸ” Analyzing drift on {len(current_data)} recent predictions...")
        
        # Create drift report
        report = Report(metrics=[
            DataDriftPreset(),
            DataQualityPreset()
        ])
        
        # Run analysis
        feature_columns = [
            'MedInc', 'HouseAge', 'AveRooms', 'AveBedrms',
            'Population', 'AveOccup', 'Latitude', 'Longitude'
        ]
        
        report.run(
            reference_data=self.reference_data[feature_columns],
            current_data=current_data[feature_columns]
        )
        
        # Save report
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        report_path = f'reports/drift_report_{timestamp}.html'
        report.save_html(report_path)
        
        # Extract results
        results = report.as_dict()
        drift_detected = results['metrics'][0]['result']['dataset_drift']
        drift_share = results['metrics'][0]['result']['share_of_drifted_columns']
        
        # Get drifted columns
        drifted_features = [
            col for col, drifted in 
            results['metrics'][0]['result']['drift_by_columns'].items()
            if drifted
        ]
        
        print(f"\nšŸ“Š Drift Analysis Results:")
        print(f"  Drift detected: {drift_detected}")
        print(f"  Drifted features: {drift_share:.1%}")
        if drifted_features:
            print(f"  Features: {', '.join(drifted_features)}")
        print(f"  Report saved: {report_path}")
        
        return {
            'drift_detected': drift_detected,
            'drift_share': drift_share,
            'drifted_features': drifted_features,
            'report_path': report_path,
            'timestamp': timestamp,
            'samples_analyzed': len(current_data)
        }
    
    def should_trigger_retraining(self, drift_results):
        """Decide if retraining should be triggered"""
        
        if drift_results is None:
            return False
        
        if drift_results['drift_share'] > self.drift_threshold:
            print(f"🚨 Drift threshold exceeded: {drift_results['drift_share']:.1%} > {self.drift_threshold:.1%}")
            return True
        
        return False

# Run drift detection
if __name__ == '__main__':
    detector = DriftDetector()
    results = detector.check_drift()
    
    if detector.should_trigger_retraining(results):
        print("šŸ”„ Triggering retraining pipeline...")
        # Trigger Airflow DAG (next step)

🚨 Step 3: Alerting System

# alerting.py
"""
Alert system for drift detection
"""
import requests
from email.message import EmailMessage
import smtplib
import json

class AlertManager:
    def __init__(self, slack_webhook=None, email_config=None):
        self.slack_webhook = slack_webhook
        self.email_config = email_config
    
    def send_drift_alert(self, drift_results):
        """Send drift alert via Slack and email"""
        
        message = self._format_alert_message(drift_results)
        
        # Slack notification
        if self.slack_webhook:
            self._send_slack_alert(message, drift_results)
        
        # Email notification
        if self.email_config:
            self._send_email_alert(message, drift_results)
    
    def _format_alert_message(self, results):
        """Format alert message"""
        return f"""
🚨 **Model Drift Alert**

**Drift detected:** {results['drift_share']:.1%} of features drifted
**Drifted features:** {', '.join(results['drifted_features'])}
**Samples analyzed:** {results['samples_analyzed']}
**Report:** {results['report_path']}

**Action Required:**
- Review drift report: {results['report_path']}
- Automated retraining has been triggered
- Monitor retraining progress in Airflow

**Timestamp:** {results['timestamp']}
        """.strip()
    
    def _send_slack_alert(self, message, results):
        """Send Slack notification"""
        
        payload = {
            "text": "🚨 Model Drift Alert",
            "blocks": [
                {
                    "type": "header",
                    "text": {
                        "type": "plain_text",
                        "text": "🚨 Model Drift Alert"
                    }
                },
                {
                    "type": "section",
                    "fields": [
                        {
                            "type": "mrkdwn",
                            "text": f"*Drift Share:*\n{results['drift_share']:.1%}"
                        },
                        {
                            "type": "mrkdwn",
                            "text": f"*Samples:*\n{results['samples_analyzed']}"
                        }
                    ]
                },
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"*Drifted Features:*\n{', '.join(results['drifted_features'])}"
                    }
                },
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"*Report:* `{results['report_path']}`"
                    }
                }
            ]
        }
        
        try:
            response = requests.post(
                self.slack_webhook,
                json=payload,
                headers={'Content-Type': 'application/json'}
            )
            
            if response.status_code == 200:
                print("āœ… Slack alert sent")
            else:
                print(f"āŒ Slack alert failed: {response.status_code}")
        
        except Exception as e:
            print(f"āŒ Slack alert error: {e}")
    
    def _send_email_alert(self, message, results):
        """Send email notification"""
        
        msg = EmailMessage()
        msg.set_content(message)
        msg['Subject'] = f"🚨 Model Drift Alert - {results['drift_share']:.1%} drift"
        msg['From'] = self.email_config['from']
        msg['To'] = self.email_config['to']
        
        try:
            with smtplib.SMTP_SSL(self.email_config['smtp_server'], 465) as server:
                server.login(self.email_config['username'], self.email_config['password'])
                server.send_message(msg)
            
            print("āœ… Email alert sent")
        
        except Exception as e:
            print(f"āŒ Email alert error: {e}")

# Usage
alert_manager = AlertManager(
    slack_webhook='https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
)

if drift_results and drift_detected:
    alert_manager.send_drift_alert(drift_results)

šŸ”„ Step 4: Automated Retraining with Airflow

# dags/drift_monitoring_dag.py
"""
Airflow DAG for drift monitoring and automated retraining
"""
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime, timedelta
import sys
sys.path.append('/path/to/project')

from drift_detector import DriftDetector
from alerting import AlertManager

default_args = {
    'owner': 'mlops',
    'depends_on_past': False,
    'email': ['team@company.com'],
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'drift_monitoring',
    default_args=default_args,
    description='Monitor for data drift and trigger retraining',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
)

def run_drift_detection(**context):
    """Run drift detection"""
    detector = DriftDetector()
    results = detector.check_drift()
    
    # Store results in XCom
    context['task_instance'].xcom_push(key='drift_results', value=results)
    
    return results

def check_if_drift_detected(**context):
    """Decide whether to trigger retraining"""
    results = context['task_instance'].xcom_pull(
        task_ids='detect_drift',
        key='drift_results'
    )
    
    if results is None:
        return 'skip_retraining'
    
    detector = DriftDetector()
    if detector.should_trigger_retraining(results):
        return 'send_alert'
    else:
        return 'skip_retraining'

def send_drift_alert(**context):
    """Send alert notification"""
    results = context['task_instance'].xcom_pull(
        task_ids='detect_drift',
        key='drift_results'
    )
    
    alert_manager = AlertManager(
        slack_webhook='https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
    )
    alert_manager.send_drift_alert(results)

# Task definitions
detect_drift = PythonOperator(
    task_id='detect_drift',
    python_callable=run_drift_detection,
    dag=dag,
)

check_drift = BranchPythonOperator(
    task_id='check_drift',
    python_callable=check_if_drift_detected,
    dag=dag,
)

send_alert = PythonOperator(
    task_id='send_alert',
    python_callable=send_drift_alert,
    dag=dag,
)

trigger_retraining = TriggerDagRunOperator(
    task_id='trigger_retraining',
    trigger_dag_id='model_retraining',
    wait_for_completion=False,
    dag=dag,
)

skip_retraining = BashOperator(
    task_id='skip_retraining',
    bash_command='echo "No retraining needed"',
    dag=dag,
)

# Task dependencies
detect_drift >> check_drift
check_drift >> send_alert >> trigger_retraining
check_drift >> skip_retraining

Retraining DAG

# dags/retraining_dag.py
"""
Automated model retraining DAG
"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'mlops',
    'email': ['team@company.com'],
    'email_on_failure': True,
    'email_on_success': True,
}

dag = DAG(
    'model_retraining',
    default_args=default_args,
    description='Automated model retraining pipeline',
    schedule_interval=None,  # Triggered by drift_monitoring DAG
    start_date=datetime(2024, 1, 1),
    catchup=False,
)

fetch_data = BashOperator(
    task_id='fetch_training_data',
    bash_command='cd /path/to/project && python src/data/fetch_data.py',
    dag=dag,
)

preprocess = BashOperator(
    task_id='preprocess_data',
    bash_command='cd /path/to/project && python src/data/preprocess.py',
    dag=dag,
)

train = BashOperator(
    task_id='train_model',
    bash_command='cd /path/to/project && python src/models/train.py',
    dag=dag,
)

evaluate = BashOperator(
    task_id='evaluate_model',
    bash_command='cd /path/to/project && python src/models/evaluate.py',
    dag=dag,
)

deploy = BashOperator(
    task_id='deploy_model',
    bash_command='cd /path/to/project && python src/models/deploy.py',
    dag=dag,
)

def send_completion_alert(**context):
    """Send retraining completion alert"""
    from alerting import AlertManager
    
    alert_manager = AlertManager(
        slack_webhook='https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
    )
    
    message = "āœ… **Model Retraining Complete**\n\nNew model has been deployed to production."
    # Send notification
    print(message)

notify = PythonOperator(
    task_id='notify_completion',
    python_callable=send_completion_alert,
    dag=dag,
)

fetch_data >> preprocess >> train >> evaluate >> deploy >> notify

šŸ“ˆ Step 5: Monitoring Dashboard

# dashboard.py
"""
Streamlit dashboard for drift monitoring
"""
import streamlit as st
import pandas as pd
from drift_detector import DriftDetector
from logging_middleware import get_recent_predictions
import plotly.express as px

st.set_page_config(page_title="ML Monitoring Dashboard", layout="wide")

st.title("šŸ” Model Monitoring Dashboard")

# Sidebar
st.sidebar.header("Settings")
days = st.sidebar.slider("Days to analyze", 1, 30, 7)

# Load data
@st.cache_data
def load_data(days):
    return get_recent_predictions(days=days)

current_data = load_data(days)
st.sidebar.metric("Total Predictions", len(current_data))

# Tabs
tab1, tab2, tab3 = st.tabs(["šŸ“Š Drift Analysis", "šŸ“ˆ Trends", "āš™ļø Actions"])

with tab1:
    st.header("Data Drift Analysis")
    
    if st.button("Run Drift Detection"):
        with st.spinner("Analyzing drift..."):
            detector = DriftDetector()
            results = detector.check_drift()
            
            if results:
                col1, col2, col3 = st.columns(3)
                
                with col1:
                    st.metric("Drift Detected", "Yes" if results['drift_detected'] else "No")
                
                with col2:
                    st.metric("Drifted Features", f"{results['drift_share']:.1%}")
                
                with col3:
                    st.metric("Samples Analyzed", results['samples_analyzed'])
                
                if results['drifted_features']:
                    st.warning(f"**Drifted Features:** {', '.join(results['drifted_features'])}")
                
                # Display report
                st.info(f"Full report saved: {results['report_path']}")

with tab2:
    st.header("Prediction Trends")
    
    # Prediction distribution over time
    fig = px.histogram(
        current_data,
        x='prediction',
        nbins=50,
        title='Prediction Distribution'
    )
    st.plotly_chart(fig, use_container_width=True)
    
    # Predictions over time
    daily_stats = current_data.groupby(
        current_data['timestamp'].dt.date
    )['prediction'].agg(['mean', 'std', 'count']).reset_index()
    
    fig = px.line(
        daily_stats,
        x='timestamp',
        y='mean',
        title='Average Prediction Over Time'
    )
    st.plotly_chart(fig, use_container_width=True)

with tab3:
    st.header("Actions")
    
    col1, col2 = st.columns(2)
    
    with col1:
        if st.button("šŸ”„ Trigger Retraining"):
            st.info("Retraining job triggered via Airflow")
            # Trigger Airflow DAG
    
    with col2:
        if st.button("šŸ“§ Send Test Alert"):
            st.info("Test alert sent to Slack/Email")
            # Send test alert

# Run dashboard:
# streamlit run dashboard.py

šŸŽÆ Project Completion Checklist

  • āœ… Prediction logging to database implemented
  • āœ… Evidently AI drift detection configured
  • āœ… Drift reports generated automatically
  • āœ… Alerting system (Slack/Email) setup
  • āœ… Airflow DAG for drift monitoring created
  • āœ… Automated retraining pipeline integrated
  • āœ… Streamlit monitoring dashboard deployed
  • āœ… End-to-end automation tested

šŸŽ‰ Congratulations!

You've built a production monitoring system with automated drift detection and retraining! This ensures your model stays healthy and maintains performance over time.

Key Achievements

  • Automated Monitoring: Daily drift checks without manual intervention
  • Proactive Alerts: Team notified immediately when drift detected
  • Self-Healing: Automated retraining triggers on drift
  • Transparency: Interactive dashboard for monitoring
  • Complete Pipeline: Detection → Alert → Retrain → Deploy