π Complete all tutorials to earn your Free MLOps Engineer Certificate
Shareable on LinkedIn β’ Verified by AITutorials.site β’ No signup fee
π The ML Pipeline Challenge
You've trained a model manually: downloaded data, cleaned it in a notebook, trained on your laptop, evaluated results, and deployed. It worked! But then:
- New data arrives daily - you need to retrain weekly
- A colleague asks "How did you preprocess the data?"
- You need to run the same pipeline on 10 different datasets
- The model performance degrades - which version was in production?
- Training crashes at 3 AM - no one notices until morning
ML orchestration solves this with automated, reproducible workflows. Instead of running scripts manually, you define pipelines as code that execute automatically on schedules or triggers.
π‘ Benefits of ML Orchestration:
- Reproducibility: Same inputs always produce same outputs
- Automation: Run pipelines on schedules or triggers
- Monitoring: Track pipeline health and failures
- Scalability: Process large datasets in parallel
- Versioning: Track every pipeline run and artifact
π οΈ Orchestration Framework Comparison
| Framework | Apache Airflow | Kubeflow Pipelines | MLflow Pipelines |
|---|---|---|---|
| Best For | General workflow orchestration | β ML-specific, Kubernetes-native | Simple ML workflows |
| Learning Curve | Moderate | Steep (needs K8s knowledge) | β Easy |
| UI Dashboard | β Excellent | β Good | Basic |
| Scalability | Good | β Excellent (K8s) | Moderate |
| ML Features | Via integrations | β Native (experiments, artifacts) | β Native |
| Community | β Largest | Large | Growing |
| Deployment | Anywhere | Kubernetes only | Anywhere |
π― Which to Choose?
- Airflow: Best for complex workflows mixing ML and non-ML tasks
- Kubeflow: Best for K8s environments with heavy ML workloads
- MLflow Pipelines: Best for simple ML workflows and rapid prototyping
πͺοΈ Apache Airflow
What is Airflow?
Airflow is a platform to programmatically author, schedule, and monitor workflows. You define workflows as Directed Acyclic Graphs (DAGs) in Python.
Installation & Setup
# Install Airflow
pip install apache-airflow
# Initialize database
airflow db init
# Create admin user
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
# Start webserver
airflow webserver --port 8080
# Start scheduler (in another terminal)
airflow scheduler
# Access UI at http://localhost:8080
Your First ML Pipeline DAG
"""
Simple ML training pipeline with Airflow
File: dags/ml_training_dag.py
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import joblib
# Default arguments
default_args = {
'owner': 'mlops-team',
'depends_on_past': False,
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
# Define DAG
dag = DAG(
'ml_training_pipeline',
default_args=default_args,
description='Train and evaluate ML model',
schedule_interval='0 2 * * *', # Run daily at 2 AM
start_date=days_ago(1),
catchup=False,
tags=['ml', 'training'],
)
def load_data(**context):
"""Load and validate data"""
df = pd.read_csv('/data/training_data.csv')
# Data validation
assert df.isnull().sum().sum() == 0, "Data contains null values"
assert len(df) > 1000, "Insufficient data samples"
print(f"Loaded {len(df)} samples")
# Push to XCom for next task
context['ti'].xcom_push(key='data_path', value='/data/training_data.csv')
context['ti'].xcom_push(key='num_samples', value=len(df))
def preprocess_data(**context):
"""Preprocess and split data"""
# Pull from XCom
data_path = context['ti'].xcom_pull(key='data_path', task_ids='load_data')
df = pd.read_csv(data_path)
# Feature engineering
X = df.drop('target', axis=1)
y = df['target']
# Split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Save splits
X_train.to_csv('/data/X_train.csv', index=False)
X_test.to_csv('/data/X_test.csv', index=False)
y_train.to_csv('/data/y_train.csv', index=False)
y_test.to_csv('/data/y_test.csv', index=False)
print(f"Train size: {len(X_train)}, Test size: {len(X_test)}")
def train_model(**context):
"""Train ML model"""
# Load processed data
X_train = pd.read_csv('/data/X_train.csv')
y_train = pd.read_csv('/data/y_train.csv').values.ravel()
# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Save model
model_path = '/models/model_{{ ds }}.joblib' # Date-stamped
joblib.dump(model, model_path)
context['ti'].xcom_push(key='model_path', value=model_path)
print(f"Model saved to {model_path}")
def evaluate_model(**context):
"""Evaluate model performance"""
model_path = context['ti'].xcom_pull(key='model_path', task_ids='train_model')
# Load model and test data
model = joblib.load(model_path)
X_test = pd.read_csv('/data/X_test.csv')
y_test = pd.read_csv('/data/y_test.csv').values.ravel()
# Evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Model Accuracy: {accuracy:.4f}")
# Quality gate
if accuracy < 0.85:
raise ValueError(f"Model accuracy {accuracy:.4f} below threshold 0.85")
context['ti'].xcom_push(key='accuracy', value=accuracy)
def deploy_model(**context):
"""Deploy model to production"""
model_path = context['ti'].xcom_pull(key='model_path', task_ids='train_model')
accuracy = context['ti'].xcom_pull(key='accuracy', task_ids='evaluate_model')
# Copy to production location
import shutil
shutil.copy(model_path, '/models/production/model.joblib')
print(f"β
Deployed model with accuracy {accuracy:.4f}")
# Define tasks
load_task = PythonOperator(
task_id='load_data',
python_callable=load_data,
dag=dag,
)
preprocess_task = PythonOperator(
task_id='preprocess_data',
python_callable=preprocess_data,
dag=dag,
)
train_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag,
)
evaluate_task = PythonOperator(
task_id='evaluate_model',
python_callable=evaluate_model,
dag=dag,
)
deploy_task = PythonOperator(
task_id='deploy_model',
python_callable=deploy_model,
dag=dag,
)
# Define dependencies
load_task >> preprocess_task >> train_task >> evaluate_task >> deploy_task
Advanced: Parallel Training with Dynamic Tasks
"""
Train multiple models in parallel
"""
from airflow.operators.python import PythonOperator
from airflow.models import Variable
# Models to train
MODELS = ['random_forest', 'xgboost', 'lightgbm', 'catboost']
def train_specific_model(model_name, **context):
"""Train a specific model"""
X_train = pd.read_csv('/data/X_train.csv')
y_train = pd.read_csv('/data/y_train.csv').values.ravel()
if model_name == 'random_forest':
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(n_estimators=100)
elif model_name == 'xgboost':
import xgboost as xgb
model = xgb.XGBClassifier()
# ... other models
model.fit(X_train, y_train)
# Evaluate
X_test = pd.read_csv('/data/X_test.csv')
y_test = pd.read_csv('/data/y_test.csv').values.ravel()
accuracy = model.score(X_test, y_test)
# Save
model_path = f'/models/{model_name}_{{{{ ds }}}}.joblib'
joblib.dump(model, model_path)
return {'model': model_name, 'accuracy': accuracy, 'path': model_path}
# Create parallel training tasks
training_tasks = []
for model_name in MODELS:
task = PythonOperator(
task_id=f'train_{model_name}',
python_callable=train_specific_model,
op_kwargs={'model_name': model_name},
dag=dag,
)
training_tasks.append(task)
def select_best_model(**context):
"""Select best performing model"""
results = []
for model_name in MODELS:
result = context['ti'].xcom_pull(task_ids=f'train_{model_name}')
results.append(result)
# Find best
best = max(results, key=lambda x: x['accuracy'])
print(f"Best model: {best['model']} with accuracy {best['accuracy']:.4f}")
# Deploy best model
import shutil
shutil.copy(best['path'], '/models/production/model.joblib')
select_task = PythonOperator(
task_id='select_best_model',
python_callable=select_best_model,
dag=dag,
)
# All training tasks run in parallel, then selection
preprocess_task >> training_tasks >> select_task
β οΈ Airflow Best Practices:
- Keep tasks idempotent (can run multiple times safely)
- Don't put heavy computation in DAG definition (runs every heartbeat)
- Use XCom sparingly (limited size, not for large data)
- Implement proper error handling and retries
- Use sensors for external dependencies
βΈοΈ Kubeflow Pipelines
What is Kubeflow?
Kubeflow Pipelines is a platform for building and deploying ML workflows on Kubernetes. Each step runs in a container, making pipelines portable and scalable.
Installation
# Install Kubeflow Pipelines SDK
pip install kfp
# Deploy Kubeflow on Kubernetes cluster
# (Requires existing K8s cluster)
export PIPELINE_VERSION=1.8.5
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"
# Access UI (port-forward)
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80
Building a Kubeflow Pipeline
"""
Kubeflow ML pipeline with component-based design
"""
import kfp
from kfp import dsl
from kfp.components import create_component_from_func
# Define components as Python functions
@create_component_from_func
def load_data(data_path: str) -> str:
"""Load and validate data"""
import pandas as pd
df = pd.read_csv(data_path)
print(f"Loaded {len(df)} samples")
# Save processed data
output_path = '/tmp/processed_data.csv'
df.to_csv(output_path, index=False)
return output_path
@create_component_from_func
def train_model(
data_path: str,
n_estimators: int = 100,
max_depth: int = 10
) -> str:
"""Train model and return path"""
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import joblib
# Load data
df = pd.read_csv(data_path)
X = df.drop('target', axis=1)
y = df['target']
# Split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Train
model = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=42
)
model.fit(X_train, y_train)
# Evaluate
accuracy = model.score(X_test, y_test)
print(f"Model Accuracy: {accuracy:.4f}")
# Save
model_path = '/tmp/model.joblib'
joblib.dump(model, model_path)
return model_path
@create_component_from_func
def evaluate_model(model_path: str, data_path: str) -> float:
"""Evaluate model and return metrics"""
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
# Load model and data
model = joblib.load(model_path)
df = pd.read_csv(data_path)
X = df.drop('target', axis=1)
y = df['target']
_, X_test, _, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Predict
y_pred = model.predict(X_test)
# Metrics
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='weighted')
print(f"Accuracy: {accuracy:.4f}, F1: {f1:.4f}")
return accuracy
@create_component_from_func
def deploy_model(model_path: str, accuracy: float, threshold: float = 0.85):
"""Deploy model if meets quality threshold"""
if accuracy < threshold:
raise ValueError(f"Accuracy {accuracy:.4f} below threshold {threshold}")
# In production, deploy to serving system
print(f"β
Deploying model with accuracy {accuracy:.4f}")
# Define pipeline
@dsl.pipeline(
name='ML Training Pipeline',
description='End-to-end ML training and deployment'
)
def ml_pipeline(
data_path: str = 'gs://my-bucket/data.csv',
n_estimators: int = 100,
max_depth: int = 10,
accuracy_threshold: float = 0.85
):
"""Complete ML pipeline"""
# Step 1: Load data
load_data_task = load_data(data_path=data_path)
# Step 2: Train model
train_task = train_model(
data_path=load_data_task.output,
n_estimators=n_estimators,
max_depth=max_depth
)
# Step 3: Evaluate
eval_task = evaluate_model(
model_path=train_task.output,
data_path=load_data_task.output
)
# Step 4: Deploy (conditional)
deploy_task = deploy_model(
model_path=train_task.output,
accuracy=eval_task.output,
threshold=accuracy_threshold
)
# Compile and submit pipeline
if __name__ == '__main__':
kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
# Submit to Kubeflow
client = kfp.Client(host='http://localhost:8080')
# Create experiment
experiment = client.create_experiment(name='ml-training')
# Run pipeline
run = client.run_pipeline(
experiment_id=experiment.id,
job_name='ml-training-run',
pipeline_package_path='ml_pipeline.yaml',
params={
'data_path': 'gs://my-bucket/data.csv',
'n_estimators': 100,
'accuracy_threshold': 0.85
}
)
print(f"Pipeline submitted: {run.run_id}")
Hyperparameter Tuning with Parallel Runs
"""
Run multiple experiments with different hyperparameters
"""
# Hyperparameter grid
param_grid = [
{'n_estimators': 50, 'max_depth': 5},
{'n_estimators': 100, 'max_depth': 10},
{'n_estimators': 200, 'max_depth': 15},
{'n_estimators': 300, 'max_depth': 20},
]
# Submit multiple runs
client = kfp.Client(host='http://localhost:8080')
experiment = client.create_experiment(name='hyperparameter-tuning')
for params in param_grid:
run = client.run_pipeline(
experiment_id=experiment.id,
job_name=f"train_n{params['n_estimators']}_d{params['max_depth']}",
pipeline_package_path='ml_pipeline.yaml',
params=params
)
print(f"Submitted run: {run.run_id}")
# Kubeflow will run all in parallel on K8s cluster!
Using Prebuilt Components
"""
Use Kubeflow's prebuilt components
"""
from kfp.components import load_component_from_url
# Load prebuilt components
train_sklearn = load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/sklearn/Train/component.yaml'
)
deploy_model = load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/deployer/component.yaml'
)
@dsl.pipeline(name='Quick ML Pipeline')
def quick_pipeline(data_path: str):
# Use prebuilt training component
train_task = train_sklearn(
training_data=data_path,
model_type='RandomForestClassifier'
)
# Use prebuilt deployment component
deploy_task = deploy_model(
model=train_task.outputs['model'],
namespace='production'
)
π¦ MLflow Pipelines (Recipes)
Overview
MLflow Pipelines (formerly MLflow Recipes) provides predefined pipeline templates for common ML tasks with built-in best practices.
Installation & Setup
pip install mlflow[pipelines]
# Create pipeline from template
mlflow recipes create --template regression --name my_pipeline
cd my_pipeline
Pipeline Structure
# pipeline.yaml - Configuration
template: "regression/v1"
target_col: "price"
steps:
ingest:
using: parquet
location: "data/training_data.parquet"
split:
split_ratios: [0.7, 0.15, 0.15] # train/val/test
random_state: 42
transform:
using: custom
transformer_method: transform
train:
using: sklearn
estimator_method: estimator
evaluate:
validation_criteria:
- metric: root_mean_squared_error
threshold: 10000
- metric: mean_absolute_percentage_error
threshold: 0.15
register:
registry_uri: "sqlite:///mlflow.db"
model_name: "price_predictor"
Custom Steps
# steps/transform.py
def transform(training_data):
"""Custom transformation logic"""
from sklearn.preprocessing import StandardScaler
# Feature engineering
training_data['age'] = 2024 - training_data['year_built']
training_data['price_per_sqft'] = training_data['price'] / training_data['sqft']
# Scale features
scaler = StandardScaler()
numeric_cols = ['sqft', 'bedrooms', 'age']
training_data[numeric_cols] = scaler.fit_transform(training_data[numeric_cols])
return training_data
# steps/train.py
def estimator():
"""Define model to train"""
from sklearn.ensemble import GradientBoostingRegressor
return GradientBoostingRegressor(
n_estimators=200,
learning_rate=0.1,
max_depth=5,
random_state=42
)
Running Pipeline
# Run entire pipeline
mlflow recipes run
# Run specific step
mlflow recipes run --step train
# Inspect results
mlflow recipes inspect
# View artifacts
mlflow recipes inspect --step train
Integration with MLflow Tracking
"""
MLflow Pipelines automatically log to MLflow Tracking
"""
import mlflow
# View logged runs
client = mlflow.tracking.MlflowClient()
experiment = client.get_experiment_by_name("my_pipeline")
runs = client.search_runs(experiment.experiment_id)
for run in runs:
print(f"Run ID: {run.info.run_id}")
print(f"Metrics: {run.data.metrics}")
print(f"Params: {run.data.params}")
print(f"Artifacts: {client.list_artifacts(run.info.run_id)}")
print("---")
π CI/CD for ML Models
What is ML CI/CD?
Continuous Integration and Continuous Deployment for ML extends traditional software CI/CD with model training, validation, and deployment automation.
GitHub Actions ML Pipeline
# .github/workflows/ml-pipeline.yml
name: ML Training Pipeline
on:
push:
branches: [main]
schedule:
- cron: '0 2 * * *' # Daily at 2 AM
workflow_dispatch: # Manual trigger
jobs:
train:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Download training data
run: |
aws s3 cp s3://my-bucket/data.csv data/
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Train model
run: |
python train.py --config config.yaml
- name: Evaluate model
id: evaluate
run: |
python evaluate.py --model model.pkl
echo "accuracy=$(cat metrics.json | jq -r '.accuracy')" >> $GITHUB_OUTPUT
- name: Quality gate
run: |
if (( $(echo "${{ steps.evaluate.outputs.accuracy }} < 0.85" | bc -l) )); then
echo "Model accuracy below threshold"
exit 1
fi
- name: Upload model artifact
uses: actions/upload-artifact@v3
with:
name: trained-model
path: model.pkl
- name: Log to MLflow
run: |
python log_model.py
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
deploy:
needs: train
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Download model
uses: actions/download-artifact@v3
with:
name: trained-model
- name: Deploy to production
run: |
# Build Docker image
docker build -t ml-model:${{ github.sha }} .
# Push to registry
docker push ml-model:${{ github.sha }}
# Update K8s deployment
kubectl set image deployment/ml-model \
ml-model=ml-model:${{ github.sha }}
env:
KUBECONFIG: ${{ secrets.KUBECONFIG }}
Model Testing in CI/CD
"""
tests/test_model.py - Model quality tests
"""
import pytest
import joblib
import pandas as pd
from sklearn.metrics import accuracy_score
@pytest.fixture
def model():
"""Load trained model"""
return joblib.load('model.pkl')
@pytest.fixture
def test_data():
"""Load test dataset"""
return pd.read_csv('data/test.csv')
def test_model_accuracy(model, test_data):
"""Test model meets accuracy threshold"""
X_test = test_data.drop('target', axis=1)
y_test = test_data['target']
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
assert accuracy >= 0.85, f"Accuracy {accuracy:.4f} below threshold 0.85"
def test_prediction_latency(model, test_data):
"""Test prediction speed"""
import time
X_sample = test_data.drop('target', axis=1).iloc[0:1]
start = time.time()
_ = model.predict(X_sample)
duration = time.time() - start
assert duration < 0.1, f"Prediction took {duration:.4f}s, exceeds 100ms limit"
def test_no_null_predictions(model, test_data):
"""Test model doesn't produce null predictions"""
X_test = test_data.drop('target', axis=1)
predictions = model.predict(X_test)
assert not pd.isnull(predictions).any(), "Model produced null predictions"
def test_prediction_distribution(model, test_data):
"""Test predictions follow expected distribution"""
X_test = test_data.drop('target', axis=1)
predictions = model.predict(X_test)
# Check class balance (for classification)
unique, counts = np.unique(predictions, return_counts=True)
min_class_ratio = counts.min() / counts.sum()
assert min_class_ratio >= 0.05, "Severe class imbalance in predictions"
β ML CI/CD Best Practices:
- Automate model training on code or data changes
- Implement quality gates (accuracy thresholds)
- Test model performance and latency
- Version control data, code, and models
- Use canary deployments for gradual rollout
- Monitor model performance post-deployment
π― Summary
You've mastered ML orchestration and pipeline automation:
Apache Airflow
DAG-based orchestration for complex workflows mixing ML and non-ML tasks
Kubeflow Pipelines
Kubernetes-native ML pipelines with containerized components
MLflow Pipelines
Template-based pipelines with built-in best practices
ML CI/CD
Automated training, testing, and deployment workflows
Model Testing
Quality gates and performance validation
Experiment Tracking
Version control for models, data, and metrics
Key Takeaways
- ML pipelines provide reproducibility, automation, and scalability
- Airflow excels at complex workflows with diverse tasks
- Kubeflow is ideal for Kubernetes-based ML infrastructure
- MLflow Pipelines offer quick setup with templates
- CI/CD for ML automates the entire model lifecycle
- Implement quality gates to prevent bad models reaching production
- Version everything: code, data, models, and configurations
π Next Steps:
Your ML workflows are fully automated! Next, you'll learn Kubernetes for ML - deploying and scaling ML workloads on K8s with GPU scheduling, autoscaling, and production-grade infrastructure.
Test Your Knowledge
Q1: What is the main benefit of using an ML orchestration framework like Airflow?
Q2: What is a DAG in Apache Airflow?
Q3: When should you choose Kubeflow Pipelines over Apache Airflow?
Q4: What is a quality gate in ML CI/CD?
Q5: What should you version control in an ML project?