🎓 Complete all tutorials to earn your Free MLOps Engineer Certificate
Shareable on LinkedIn • Verified by AITutorials.site • No signup fee
🎯 Project Overview
Build a production-ready ML pipeline for house price prediction that includes:
Data Pipeline
Automated data ingestion, validation, and preprocessing
Training Pipeline
MLflow experiment tracking with hyperparameter tuning
Containerization
Docker packaging for consistent deployment
API Deployment
FastAPI REST endpoint with validation
Monitoring
Prometheus metrics and dashboard
Testing
Unit tests and integration tests
What You'll Build
A complete ML system that:
- Trains a regression model to predict house prices
- Tracks experiments with MLflow
- Packages the model in a Docker container
- Deploys a FastAPI endpoint for predictions
- Monitors performance with Prometheus and Grafana
- Includes automated tests for quality assurance
💡 Prerequisites:
- Python 3.9+, Docker Desktop installed
- Completed tutorials 1-13
- Basic understanding of regression models
- Familiarity with command line
📁 Step 1: Project Setup
Create Project Structure
# Create project directory
mkdir ml-pipeline-project
cd ml-pipeline-project
# Create structure
mkdir -p {data,models,notebooks,src/{data,models,api},tests,config}
touch README.md requirements.txt Dockerfile docker-compose.yml .gitignore
# Project structure:
# ml-pipeline-project/
# ├── data/ # Raw and processed data
# ├── models/ # Saved models
# ├── notebooks/ # Exploratory notebooks
# ├── src/
# │ ├── data/ # Data processing
# │ ├── models/ # Model training
# │ └── api/ # FastAPI app
# ├── tests/ # Unit and integration tests
# ├── config/ # Configuration files
# ├── requirements.txt
# ├── Dockerfile
# └── docker-compose.yml
Install Dependencies
# requirements.txt
pandas==2.0.3
scikit-learn==1.3.0
mlflow==2.7.1
fastapi==0.103.1
uvicorn[standard]==0.23.2
pydantic==2.3.0
prometheus-client==0.17.1
pytest==7.4.2
requests==2.31.0
python-dotenv==1.0.0
# Install
pip install -r requirements.txt
📊 Step 2: Data Pipeline
Download Dataset
# src/data/load_data.py
"""
Download and load house price dataset
"""
import pandas as pd
from sklearn.datasets import fetch_california_housing
import os
def load_raw_data():
"""Load California housing dataset"""
print("Loading dataset...")
data = fetch_california_housing(as_frame=True)
df = data.frame
# Save raw data
os.makedirs('data/raw', exist_ok=True)
df.to_csv('data/raw/housing.csv', index=False)
print(f"Loaded {len(df)} samples with {len(df.columns)} features")
return df
if __name__ == '__main__':
load_raw_data()
Data Validation & Preprocessing
# src/data/preprocess.py
"""
Data validation and preprocessing
"""
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import joblib
import os
def validate_data(df):
"""Validate data quality"""
print("\n🔍 Validating data...")
# Check missing values
missing = df.isnull().sum()
if missing.any():
print(f"⚠️ Missing values found:\n{missing[missing > 0]}")
raise ValueError("Dataset contains missing values")
# Check data types
numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns
if len(numeric_cols) != len(df.columns):
print("⚠️ Non-numeric columns found")
# Check for outliers (simple check)
for col in numeric_cols:
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
outliers = ((df[col] < q1 - 3*iqr) | (df[col] > q3 + 3*iqr)).sum()
if outliers > 0:
print(f" {col}: {outliers} outliers detected")
print("✅ Data validation passed")
def preprocess_data(input_path='data/raw/housing.csv', test_size=0.2):
"""Preprocess and split data"""
print("\n📊 Preprocessing data...")
# Load
df = pd.read_csv(input_path)
# Validate
validate_data(df)
# Split features and target
X = df.drop('MedHouseVal', axis=1)
y = df['MedHouseVal']
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=42
)
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Save preprocessed data
os.makedirs('data/processed', exist_ok=True)
pd.DataFrame(X_train_scaled, columns=X.columns).to_csv(
'data/processed/X_train.csv', index=False
)
pd.DataFrame(X_test_scaled, columns=X.columns).to_csv(
'data/processed/X_test.csv', index=False
)
pd.DataFrame(y_train).to_csv('data/processed/y_train.csv', index=False)
pd.DataFrame(y_test).to_csv('data/processed/y_test.csv', index=False)
# Save scaler
os.makedirs('models/artifacts', exist_ok=True)
joblib.dump(scaler, 'models/artifacts/scaler.pkl')
print(f"✅ Data preprocessed: {len(X_train)} train, {len(X_test)} test samples")
return X_train_scaled, X_test_scaled, y_train, y_test
if __name__ == '__main__':
preprocess_data()
🤖 Step 3: Training Pipeline with MLflow
# src/models/train.py
"""
Model training with MLflow tracking
"""
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import pandas as pd
import numpy as np
import joblib
import os
def train_model(n_estimators=100, max_depth=10, experiment_name="house_price_prediction"):
"""Train model with MLflow tracking"""
# Set MLflow experiment
mlflow.set_experiment(experiment_name)
# Load data
print("📊 Loading preprocessed data...")
X_train = pd.read_csv('data/processed/X_train.csv')
X_test = pd.read_csv('data/processed/X_test.csv')
y_train = pd.read_csv('data/processed/y_train.csv').values.ravel()
y_test = pd.read_csv('data/processed/y_test.csv').values.ravel()
# Start MLflow run
with mlflow.start_run():
print(f"\n🤖 Training RandomForest model...")
# Train model
model = RandomForestRegressor(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
# Predictions
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)
# Metrics
train_rmse = np.sqrt(mean_squared_error(y_train, y_pred_train))
test_rmse = np.sqrt(mean_squared_error(y_test, y_pred_test))
train_r2 = r2_score(y_train, y_pred_train)
test_r2 = r2_score(y_test, y_pred_test)
test_mae = mean_absolute_error(y_test, y_pred_test)
print(f"\n📈 Training Metrics:")
print(f" Train RMSE: {train_rmse:.4f}")
print(f" Train R²: {train_r2:.4f}")
print(f"\n📈 Test Metrics:")
print(f" Test RMSE: {test_rmse:.4f}")
print(f" Test R²: {test_r2:.4f}")
print(f" Test MAE: {test_mae:.4f}")
# Log parameters
mlflow.log_param("n_estimators", n_estimators)
mlflow.log_param("max_depth", max_depth)
mlflow.log_param("model_type", "RandomForestRegressor")
# Log metrics
mlflow.log_metric("train_rmse", train_rmse)
mlflow.log_metric("test_rmse", test_rmse)
mlflow.log_metric("train_r2", train_r2)
mlflow.log_metric("test_r2", test_r2)
mlflow.log_metric("test_mae", test_mae)
# Log model
mlflow.sklearn.log_model(model, "model")
# Save model locally
os.makedirs('models/production', exist_ok=True)
joblib.dump(model, 'models/production/model.pkl')
print(f"\n✅ Model saved to models/production/model.pkl")
print(f" MLflow run: {mlflow.active_run().info.run_id}")
return model, test_rmse, test_r2
if __name__ == '__main__':
train_model()
Hyperparameter Tuning
# src/models/tune.py
"""
Hyperparameter tuning with MLflow
"""
import mlflow
from train import train_model
def hyperparameter_search():
"""Grid search with MLflow tracking"""
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [10, 20, None]
}
best_rmse = float('inf')
best_params = {}
print("🔍 Starting hyperparameter search...\n")
for n_est in param_grid['n_estimators']:
for depth in param_grid['max_depth']:
print(f"Testing: n_estimators={n_est}, max_depth={depth}")
model, rmse, r2 = train_model(
n_estimators=n_est,
max_depth=depth
)
if rmse < best_rmse:
best_rmse = rmse
best_params = {'n_estimators': n_est, 'max_depth': depth}
print(f"\n🏆 Best parameters: {best_params}")
print(f" Best RMSE: {best_rmse:.4f}")
if __name__ == '__main__':
hyperparameter_search()
🚀 Step 4: FastAPI Deployment
# src/api/main.py
"""
FastAPI application for house price prediction
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from prometheus_client import Counter, Histogram, generate_latest
from fastapi.responses import Response
import joblib
import numpy as np
import time
# Load model and scaler
model = joblib.load('models/production/model.pkl')
scaler = joblib.load('models/artifacts/scaler.pkl')
app = FastAPI(
title="House Price Prediction API",
description="Predict California house prices",
version="1.0.0"
)
# Prometheus metrics
prediction_counter = Counter(
'predictions_total',
'Total number of predictions'
)
prediction_latency = Histogram(
'prediction_latency_seconds',
'Prediction latency'
)
prediction_errors = Counter(
'prediction_errors_total',
'Total prediction errors'
)
# Request model
class HouseFeatures(BaseModel):
MedInc: float = Field(..., description="Median income", ge=0)
HouseAge: float = Field(..., description="House age", ge=0, le=100)
AveRooms: float = Field(..., description="Average rooms", ge=0)
AveBedrms: float = Field(..., description="Average bedrooms", ge=0)
Population: float = Field(..., description="Population", ge=0)
AveOccup: float = Field(..., description="Average occupancy", ge=0)
Latitude: float = Field(..., description="Latitude", ge=32, le=42)
Longitude: float = Field(..., description="Longitude", ge=-125, le=-114)
class PredictionResponse(BaseModel):
predicted_price: float
model_version: str = "1.0.0"
@app.get("/")
async def root():
"""Health check endpoint"""
return {
"status": "healthy",
"service": "House Price Prediction API",
"version": "1.0.0"
}
@app.post("/predict", response_model=PredictionResponse)
async def predict(features: HouseFeatures):
"""Make house price prediction"""
start_time = time.time()
try:
# Convert to array
feature_array = np.array([[
features.MedInc,
features.HouseAge,
features.AveRooms,
features.AveBedrms,
features.Population,
features.AveOccup,
features.Latitude,
features.Longitude
]])
# Scale features
features_scaled = scaler.transform(feature_array)
# Predict
prediction = model.predict(features_scaled)[0]
# Metrics
prediction_counter.inc()
latency = time.time() - start_time
prediction_latency.observe(latency)
return PredictionResponse(predicted_price=float(prediction))
except Exception as e:
prediction_errors.inc()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
return Response(content=generate_latest(), media_type="text/plain")
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
🐳 Step 5: Containerization
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY src/ ./src/
COPY models/ ./models/
COPY data/processed/ ./data/processed/
# Expose port
EXPOSE 8000
# Run application
CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"]
# Build and run
docker build -t house-price-api:v1 .
docker run -p 8000:8000 house-price-api:v1
# Test
curl -X POST http://localhost:8000/predict \
-H "Content-Type: application/json" \
-d '{
"MedInc": 8.3,
"HouseAge": 41,
"AveRooms": 6.98,
"AveBedrms": 1.02,
"Population": 322,
"AveOccup": 2.55,
"Latitude": 37.88,
"Longitude": -122.23
}'
✅ Step 6: Automated Testing
# tests/test_api.py
"""
API integration tests
"""
from fastapi.testclient import TestClient
from src.api.main import app
client = TestClient(app)
def test_health_check():
"""Test health endpoint"""
response = client.get("/")
assert response.status_code == 200
assert response.json()["status"] == "healthy"
def test_prediction():
"""Test prediction endpoint"""
payload = {
"MedInc": 8.3,
"HouseAge": 41,
"AveRooms": 6.98,
"AveBedrms": 1.02,
"Population": 322,
"AveOccup": 2.55,
"Latitude": 37.88,
"Longitude": -122.23
}
response = client.post("/predict", json=payload)
assert response.status_code == 200
data = response.json()
assert "predicted_price" in data
assert isinstance(data["predicted_price"], float)
assert data["predicted_price"] > 0
def test_invalid_input():
"""Test validation"""
payload = {
"MedInc": -1, # Invalid: negative income
"HouseAge": 41,
"AveRooms": 6.98,
"AveBedrms": 1.02,
"Population": 322,
"AveOccup": 2.55,
"Latitude": 37.88,
"Longitude": -122.23
}
response = client.post("/predict", json=payload)
assert response.status_code == 422 # Validation error
# Run tests
# pytest tests/test_api.py -v
📈 Step 7: Monitoring Dashboard
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- MLFLOW_TRACKING_URI=http://mlflow:5000
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./config/prometheus.yml:/etc/prometheus/prometheus.yml
command:
- '--config.file=/etc/prometheus/prometheus.yml'
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
mlflow:
image: ghcr.io/mlflow/mlflow
ports:
- "5000:5000"
command: mlflow server --host 0.0.0.0 --port 5000
# config/prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'house_price_api'
static_configs:
- targets: ['api:8000']
# Start all services
docker-compose up -d
# Access:
# API: http://localhost:8000/docs
# Prometheus: http://localhost:9090
# Grafana: http://localhost:3000
# MLflow: http://localhost:5000
🎯 Project Completion Checklist
- ✅ Data pipeline with validation and preprocessing
- ✅ Model training with MLflow experiment tracking
- ✅ Hyperparameter tuning logged to MLflow
- ✅ FastAPI REST endpoint with validation
- ✅ Prometheus metrics integration
- ✅ Docker containerization
- ✅ Automated unit and integration tests
- ✅ Docker Compose orchestration
- ✅ Grafana monitoring dashboard
🎉 Congratulations!
You've built a production-ready end-to-end ML pipeline! This project demonstrates data engineering, model training, deployment, and monitoring - core MLOps skills.
Next Steps
- Enhance: Add A/B testing, feature store integration
- Scale: Deploy to Kubernetes with autoscaling
- Monitor: Add drift detection with Evidently AI
- Automate: Create CI/CD pipeline with GitHub Actions