HomeMLOps EngineerProject 1: End-to-End ML Pipeline

🚀 Project: Build an End-to-End ML Pipeline

Complete hands-on project: data ingestion → training → deployment with MLflow, Docker, FastAPI, and monitoring

🎯 Project 1 📊 Intermediate ⏱️ 2-3 hours

🎓 Complete all tutorials to earn your Free MLOps Engineer Certificate
Shareable on LinkedIn • Verified by AITutorials.site • No signup fee

🎯 Project Overview

Build a production-ready ML pipeline for house price prediction that includes:

📊

Data Pipeline

Automated data ingestion, validation, and preprocessing

🤖

Training Pipeline

MLflow experiment tracking with hyperparameter tuning

🐳

Containerization

Docker packaging for consistent deployment

🚀

API Deployment

FastAPI REST endpoint with validation

📈

Monitoring

Prometheus metrics and dashboard

Testing

Unit tests and integration tests

What You'll Build

A complete ML system that:

  • Trains a regression model to predict house prices
  • Tracks experiments with MLflow
  • Packages the model in a Docker container
  • Deploys a FastAPI endpoint for predictions
  • Monitors performance with Prometheus and Grafana
  • Includes automated tests for quality assurance

💡 Prerequisites:

  • Python 3.9+, Docker Desktop installed
  • Completed tutorials 1-13
  • Basic understanding of regression models
  • Familiarity with command line

📁 Step 1: Project Setup

Create Project Structure

# Create project directory
mkdir ml-pipeline-project
cd ml-pipeline-project

# Create structure
mkdir -p {data,models,notebooks,src/{data,models,api},tests,config}
touch README.md requirements.txt Dockerfile docker-compose.yml .gitignore

# Project structure:
# ml-pipeline-project/
# ├── data/               # Raw and processed data
# ├── models/             # Saved models
# ├── notebooks/          # Exploratory notebooks
# ├── src/
# │   ├── data/          # Data processing
# │   ├── models/        # Model training
# │   └── api/           # FastAPI app
# ├── tests/             # Unit and integration tests
# ├── config/            # Configuration files
# ├── requirements.txt
# ├── Dockerfile
# └── docker-compose.yml

Install Dependencies

# requirements.txt
pandas==2.0.3
scikit-learn==1.3.0
mlflow==2.7.1
fastapi==0.103.1
uvicorn[standard]==0.23.2
pydantic==2.3.0
prometheus-client==0.17.1
pytest==7.4.2
requests==2.31.0
python-dotenv==1.0.0

# Install
pip install -r requirements.txt

📊 Step 2: Data Pipeline

Download Dataset

# src/data/load_data.py
"""
Download and load house price dataset
"""
import pandas as pd
from sklearn.datasets import fetch_california_housing
import os

def load_raw_data():
    """Load California housing dataset"""
    print("Loading dataset...")
    data = fetch_california_housing(as_frame=True)
    df = data.frame
    
    # Save raw data
    os.makedirs('data/raw', exist_ok=True)
    df.to_csv('data/raw/housing.csv', index=False)
    
    print(f"Loaded {len(df)} samples with {len(df.columns)} features")
    return df

if __name__ == '__main__':
    load_raw_data()

Data Validation & Preprocessing

# src/data/preprocess.py
"""
Data validation and preprocessing
"""
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import joblib
import os

def validate_data(df):
    """Validate data quality"""
    print("\n🔍 Validating data...")
    
    # Check missing values
    missing = df.isnull().sum()
    if missing.any():
        print(f"⚠️  Missing values found:\n{missing[missing > 0]}")
        raise ValueError("Dataset contains missing values")
    
    # Check data types
    numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns
    if len(numeric_cols) != len(df.columns):
        print("⚠️  Non-numeric columns found")
    
    # Check for outliers (simple check)
    for col in numeric_cols:
        q1 = df[col].quantile(0.25)
        q3 = df[col].quantile(0.75)
        iqr = q3 - q1
        outliers = ((df[col] < q1 - 3*iqr) | (df[col] > q3 + 3*iqr)).sum()
        if outliers > 0:
            print(f"  {col}: {outliers} outliers detected")
    
    print("✅ Data validation passed")

def preprocess_data(input_path='data/raw/housing.csv', test_size=0.2):
    """Preprocess and split data"""
    print("\n📊 Preprocessing data...")
    
    # Load
    df = pd.read_csv(input_path)
    
    # Validate
    validate_data(df)
    
    # Split features and target
    X = df.drop('MedHouseVal', axis=1)
    y = df['MedHouseVal']
    
    # Train/test split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=42
    )
    
    # Scale features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Save preprocessed data
    os.makedirs('data/processed', exist_ok=True)
    pd.DataFrame(X_train_scaled, columns=X.columns).to_csv(
        'data/processed/X_train.csv', index=False
    )
    pd.DataFrame(X_test_scaled, columns=X.columns).to_csv(
        'data/processed/X_test.csv', index=False
    )
    pd.DataFrame(y_train).to_csv('data/processed/y_train.csv', index=False)
    pd.DataFrame(y_test).to_csv('data/processed/y_test.csv', index=False)
    
    # Save scaler
    os.makedirs('models/artifacts', exist_ok=True)
    joblib.dump(scaler, 'models/artifacts/scaler.pkl')
    
    print(f"✅ Data preprocessed: {len(X_train)} train, {len(X_test)} test samples")
    return X_train_scaled, X_test_scaled, y_train, y_test

if __name__ == '__main__':
    preprocess_data()

🤖 Step 3: Training Pipeline with MLflow

# src/models/train.py
"""
Model training with MLflow tracking
"""
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import pandas as pd
import numpy as np
import joblib
import os

def train_model(n_estimators=100, max_depth=10, experiment_name="house_price_prediction"):
    """Train model with MLflow tracking"""
    
    # Set MLflow experiment
    mlflow.set_experiment(experiment_name)
    
    # Load data
    print("📊 Loading preprocessed data...")
    X_train = pd.read_csv('data/processed/X_train.csv')
    X_test = pd.read_csv('data/processed/X_test.csv')
    y_train = pd.read_csv('data/processed/y_train.csv').values.ravel()
    y_test = pd.read_csv('data/processed/y_test.csv').values.ravel()
    
    # Start MLflow run
    with mlflow.start_run():
        print(f"\n🤖 Training RandomForest model...")
        
        # Train model
        model = RandomForestRegressor(
            n_estimators=n_estimators,
            max_depth=max_depth,
            random_state=42,
            n_jobs=-1
        )
        model.fit(X_train, y_train)
        
        # Predictions
        y_pred_train = model.predict(X_train)
        y_pred_test = model.predict(X_test)
        
        # Metrics
        train_rmse = np.sqrt(mean_squared_error(y_train, y_pred_train))
        test_rmse = np.sqrt(mean_squared_error(y_test, y_pred_test))
        train_r2 = r2_score(y_train, y_pred_train)
        test_r2 = r2_score(y_test, y_pred_test)
        test_mae = mean_absolute_error(y_test, y_pred_test)
        
        print(f"\n📈 Training Metrics:")
        print(f"  Train RMSE: {train_rmse:.4f}")
        print(f"  Train R²:   {train_r2:.4f}")
        print(f"\n📈 Test Metrics:")
        print(f"  Test RMSE:  {test_rmse:.4f}")
        print(f"  Test R²:    {test_r2:.4f}")
        print(f"  Test MAE:   {test_mae:.4f}")
        
        # Log parameters
        mlflow.log_param("n_estimators", n_estimators)
        mlflow.log_param("max_depth", max_depth)
        mlflow.log_param("model_type", "RandomForestRegressor")
        
        # Log metrics
        mlflow.log_metric("train_rmse", train_rmse)
        mlflow.log_metric("test_rmse", test_rmse)
        mlflow.log_metric("train_r2", train_r2)
        mlflow.log_metric("test_r2", test_r2)
        mlflow.log_metric("test_mae", test_mae)
        
        # Log model
        mlflow.sklearn.log_model(model, "model")
        
        # Save model locally
        os.makedirs('models/production', exist_ok=True)
        joblib.dump(model, 'models/production/model.pkl')
        
        print(f"\n✅ Model saved to models/production/model.pkl")
        print(f"   MLflow run: {mlflow.active_run().info.run_id}")
        
        return model, test_rmse, test_r2

if __name__ == '__main__':
    train_model()

Hyperparameter Tuning

# src/models/tune.py
"""
Hyperparameter tuning with MLflow
"""
import mlflow
from train import train_model

def hyperparameter_search():
    """Grid search with MLflow tracking"""
    
    param_grid = {
        'n_estimators': [50, 100, 200],
        'max_depth': [10, 20, None]
    }
    
    best_rmse = float('inf')
    best_params = {}
    
    print("🔍 Starting hyperparameter search...\n")
    
    for n_est in param_grid['n_estimators']:
        for depth in param_grid['max_depth']:
            print(f"Testing: n_estimators={n_est}, max_depth={depth}")
            
            model, rmse, r2 = train_model(
                n_estimators=n_est,
                max_depth=depth
            )
            
            if rmse < best_rmse:
                best_rmse = rmse
                best_params = {'n_estimators': n_est, 'max_depth': depth}
    
    print(f"\n🏆 Best parameters: {best_params}")
    print(f"   Best RMSE: {best_rmse:.4f}")

if __name__ == '__main__':
    hyperparameter_search()

🚀 Step 4: FastAPI Deployment

# src/api/main.py
"""
FastAPI application for house price prediction
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from prometheus_client import Counter, Histogram, generate_latest
from fastapi.responses import Response
import joblib
import numpy as np
import time

# Load model and scaler
model = joblib.load('models/production/model.pkl')
scaler = joblib.load('models/artifacts/scaler.pkl')

app = FastAPI(
    title="House Price Prediction API",
    description="Predict California house prices",
    version="1.0.0"
)

# Prometheus metrics
prediction_counter = Counter(
    'predictions_total',
    'Total number of predictions'
)
prediction_latency = Histogram(
    'prediction_latency_seconds',
    'Prediction latency'
)
prediction_errors = Counter(
    'prediction_errors_total',
    'Total prediction errors'
)

# Request model
class HouseFeatures(BaseModel):
    MedInc: float = Field(..., description="Median income", ge=0)
    HouseAge: float = Field(..., description="House age", ge=0, le=100)
    AveRooms: float = Field(..., description="Average rooms", ge=0)
    AveBedrms: float = Field(..., description="Average bedrooms", ge=0)
    Population: float = Field(..., description="Population", ge=0)
    AveOccup: float = Field(..., description="Average occupancy", ge=0)
    Latitude: float = Field(..., description="Latitude", ge=32, le=42)
    Longitude: float = Field(..., description="Longitude", ge=-125, le=-114)

class PredictionResponse(BaseModel):
    predicted_price: float
    model_version: str = "1.0.0"

@app.get("/")
async def root():
    """Health check endpoint"""
    return {
        "status": "healthy",
        "service": "House Price Prediction API",
        "version": "1.0.0"
    }

@app.post("/predict", response_model=PredictionResponse)
async def predict(features: HouseFeatures):
    """Make house price prediction"""
    
    start_time = time.time()
    
    try:
        # Convert to array
        feature_array = np.array([[
            features.MedInc,
            features.HouseAge,
            features.AveRooms,
            features.AveBedrms,
            features.Population,
            features.AveOccup,
            features.Latitude,
            features.Longitude
        ]])
        
        # Scale features
        features_scaled = scaler.transform(feature_array)
        
        # Predict
        prediction = model.predict(features_scaled)[0]
        
        # Metrics
        prediction_counter.inc()
        latency = time.time() - start_time
        prediction_latency.observe(latency)
        
        return PredictionResponse(predicted_price=float(prediction))
    
    except Exception as e:
        prediction_errors.inc()
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint"""
    return Response(content=generate_latest(), media_type="text/plain")

if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

🐳 Step 5: Containerization

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY src/ ./src/
COPY models/ ./models/
COPY data/processed/ ./data/processed/

# Expose port
EXPOSE 8000

# Run application
CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"]
# Build and run
docker build -t house-price-api:v1 .
docker run -p 8000:8000 house-price-api:v1

# Test
curl -X POST http://localhost:8000/predict \
  -H "Content-Type: application/json" \
  -d '{
    "MedInc": 8.3,
    "HouseAge": 41,
    "AveRooms": 6.98,
    "AveBedrms": 1.02,
    "Population": 322,
    "AveOccup": 2.55,
    "Latitude": 37.88,
    "Longitude": -122.23
  }'

✅ Step 6: Automated Testing

# tests/test_api.py
"""
API integration tests
"""
from fastapi.testclient import TestClient
from src.api.main import app

client = TestClient(app)

def test_health_check():
    """Test health endpoint"""
    response = client.get("/")
    assert response.status_code == 200
    assert response.json()["status"] == "healthy"

def test_prediction():
    """Test prediction endpoint"""
    payload = {
        "MedInc": 8.3,
        "HouseAge": 41,
        "AveRooms": 6.98,
        "AveBedrms": 1.02,
        "Population": 322,
        "AveOccup": 2.55,
        "Latitude": 37.88,
        "Longitude": -122.23
    }
    
    response = client.post("/predict", json=payload)
    assert response.status_code == 200
    
    data = response.json()
    assert "predicted_price" in data
    assert isinstance(data["predicted_price"], float)
    assert data["predicted_price"] > 0

def test_invalid_input():
    """Test validation"""
    payload = {
        "MedInc": -1,  # Invalid: negative income
        "HouseAge": 41,
        "AveRooms": 6.98,
        "AveBedrms": 1.02,
        "Population": 322,
        "AveOccup": 2.55,
        "Latitude": 37.88,
        "Longitude": -122.23
    }
    
    response = client.post("/predict", json=payload)
    assert response.status_code == 422  # Validation error

# Run tests
# pytest tests/test_api.py -v

📈 Step 7: Monitoring Dashboard

# docker-compose.yml
version: '3.8'

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - MLFLOW_TRACKING_URI=http://mlflow:5000
  
  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./config/prometheus.yml:/etc/prometheus/prometheus.yml
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
  
  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
  
  mlflow:
    image: ghcr.io/mlflow/mlflow
    ports:
      - "5000:5000"
    command: mlflow server --host 0.0.0.0 --port 5000
# config/prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'house_price_api'
    static_configs:
      - targets: ['api:8000']
# Start all services
docker-compose up -d

# Access:
# API: http://localhost:8000/docs
# Prometheus: http://localhost:9090
# Grafana: http://localhost:3000
# MLflow: http://localhost:5000

🎯 Project Completion Checklist

  • ✅ Data pipeline with validation and preprocessing
  • ✅ Model training with MLflow experiment tracking
  • ✅ Hyperparameter tuning logged to MLflow
  • ✅ FastAPI REST endpoint with validation
  • ✅ Prometheus metrics integration
  • ✅ Docker containerization
  • ✅ Automated unit and integration tests
  • ✅ Docker Compose orchestration
  • ✅ Grafana monitoring dashboard

🎉 Congratulations!

You've built a production-ready end-to-end ML pipeline! This project demonstrates data engineering, model training, deployment, and monitoring - core MLOps skills.

Next Steps

  • Enhance: Add A/B testing, feature store integration
  • Scale: Deploy to Kubernetes with autoscaling
  • Monitor: Add drift detection with Evidently AI
  • Automate: Create CI/CD pipeline with GitHub Actions