Home β†’ MLOps Engineer β†’ Orchestration & ML Pipelines

Orchestration & ML Pipelines

Automate your entire ML lifecycle with Apache Airflow, Kubeflow Pipelines, and MLflow. Build robust, reproducible workflows from data ingestion to model deployment

πŸ“… Tutorial 8 πŸ“Š Intermediate

πŸŽ“ Complete all tutorials to earn your Free MLOps Engineer Certificate
Shareable on LinkedIn β€’ Verified by AITutorials.site β€’ No signup fee

πŸ”„ The ML Pipeline Challenge

You've trained a model manually: downloaded data, cleaned it in a notebook, trained on your laptop, evaluated results, and deployed. It worked! But then:

  • New data arrives daily - you need to retrain weekly
  • A colleague asks "How did you preprocess the data?"
  • You need to run the same pipeline on 10 different datasets
  • The model performance degrades - which version was in production?
  • Training crashes at 3 AM - no one notices until morning

ML orchestration solves this with automated, reproducible workflows. Instead of running scripts manually, you define pipelines as code that execute automatically on schedules or triggers.

Data Ingestion
β†’
Validation
β†’
Preprocessing
β†’
Training
β†’
Evaluation
β†’
Deployment

πŸ’‘ Benefits of ML Orchestration:

  • Reproducibility: Same inputs always produce same outputs
  • Automation: Run pipelines on schedules or triggers
  • Monitoring: Track pipeline health and failures
  • Scalability: Process large datasets in parallel
  • Versioning: Track every pipeline run and artifact

πŸ› οΈ Orchestration Framework Comparison

Framework Apache Airflow Kubeflow Pipelines MLflow Pipelines
Best For General workflow orchestration βœ… ML-specific, Kubernetes-native Simple ML workflows
Learning Curve Moderate Steep (needs K8s knowledge) βœ… Easy
UI Dashboard βœ… Excellent βœ… Good Basic
Scalability Good βœ… Excellent (K8s) Moderate
ML Features Via integrations βœ… Native (experiments, artifacts) βœ… Native
Community βœ… Largest Large Growing
Deployment Anywhere Kubernetes only Anywhere

🎯 Which to Choose?

  • Airflow: Best for complex workflows mixing ML and non-ML tasks
  • Kubeflow: Best for K8s environments with heavy ML workloads
  • MLflow Pipelines: Best for simple ML workflows and rapid prototyping

πŸŒͺ️ Apache Airflow

What is Airflow?

Airflow is a platform to programmatically author, schedule, and monitor workflows. You define workflows as Directed Acyclic Graphs (DAGs) in Python.

Installation & Setup

# Install Airflow
pip install apache-airflow

# Initialize database
airflow db init

# Create admin user
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com

# Start webserver
airflow webserver --port 8080

# Start scheduler (in another terminal)
airflow scheduler

# Access UI at http://localhost:8080

Your First ML Pipeline DAG

"""
Simple ML training pipeline with Airflow
File: dags/ml_training_dag.py
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import joblib

# Default arguments
default_args = {
    'owner': 'mlops-team',
    'depends_on_past': False,
    'email': ['alerts@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

# Define DAG
dag = DAG(
    'ml_training_pipeline',
    default_args=default_args,
    description='Train and evaluate ML model',
    schedule_interval='0 2 * * *',  # Run daily at 2 AM
    start_date=days_ago(1),
    catchup=False,
    tags=['ml', 'training'],
)

def load_data(**context):
    """Load and validate data"""
    df = pd.read_csv('/data/training_data.csv')
    
    # Data validation
    assert df.isnull().sum().sum() == 0, "Data contains null values"
    assert len(df) > 1000, "Insufficient data samples"
    
    print(f"Loaded {len(df)} samples")
    
    # Push to XCom for next task
    context['ti'].xcom_push(key='data_path', value='/data/training_data.csv')
    context['ti'].xcom_push(key='num_samples', value=len(df))

def preprocess_data(**context):
    """Preprocess and split data"""
    # Pull from XCom
    data_path = context['ti'].xcom_pull(key='data_path', task_ids='load_data')
    
    df = pd.read_csv(data_path)
    
    # Feature engineering
    X = df.drop('target', axis=1)
    y = df['target']
    
    # Split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    # Save splits
    X_train.to_csv('/data/X_train.csv', index=False)
    X_test.to_csv('/data/X_test.csv', index=False)
    y_train.to_csv('/data/y_train.csv', index=False)
    y_test.to_csv('/data/y_test.csv', index=False)
    
    print(f"Train size: {len(X_train)}, Test size: {len(X_test)}")

def train_model(**context):
    """Train ML model"""
    # Load processed data
    X_train = pd.read_csv('/data/X_train.csv')
    y_train = pd.read_csv('/data/y_train.csv').values.ravel()
    
    # Train model
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)
    
    # Save model
    model_path = '/models/model_{{ ds }}.joblib'  # Date-stamped
    joblib.dump(model, model_path)
    
    context['ti'].xcom_push(key='model_path', value=model_path)
    print(f"Model saved to {model_path}")

def evaluate_model(**context):
    """Evaluate model performance"""
    model_path = context['ti'].xcom_pull(key='model_path', task_ids='train_model')
    
    # Load model and test data
    model = joblib.load(model_path)
    X_test = pd.read_csv('/data/X_test.csv')
    y_test = pd.read_csv('/data/y_test.csv').values.ravel()
    
    # Evaluate
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    
    print(f"Model Accuracy: {accuracy:.4f}")
    
    # Quality gate
    if accuracy < 0.85:
        raise ValueError(f"Model accuracy {accuracy:.4f} below threshold 0.85")
    
    context['ti'].xcom_push(key='accuracy', value=accuracy)

def deploy_model(**context):
    """Deploy model to production"""
    model_path = context['ti'].xcom_pull(key='model_path', task_ids='train_model')
    accuracy = context['ti'].xcom_pull(key='accuracy', task_ids='evaluate_model')
    
    # Copy to production location
    import shutil
    shutil.copy(model_path, '/models/production/model.joblib')
    
    print(f"βœ… Deployed model with accuracy {accuracy:.4f}")

# Define tasks
load_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    dag=dag,
)

preprocess_task = PythonOperator(
    task_id='preprocess_data',
    python_callable=preprocess_data,
    dag=dag,
)

train_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag,
)

evaluate_task = PythonOperator(
    task_id='evaluate_model',
    python_callable=evaluate_model,
    dag=dag,
)

deploy_task = PythonOperator(
    task_id='deploy_model',
    python_callable=deploy_model,
    dag=dag,
)

# Define dependencies
load_task >> preprocess_task >> train_task >> evaluate_task >> deploy_task

Advanced: Parallel Training with Dynamic Tasks

"""
Train multiple models in parallel
"""
from airflow.operators.python import PythonOperator
from airflow.models import Variable

# Models to train
MODELS = ['random_forest', 'xgboost', 'lightgbm', 'catboost']

def train_specific_model(model_name, **context):
    """Train a specific model"""
    X_train = pd.read_csv('/data/X_train.csv')
    y_train = pd.read_csv('/data/y_train.csv').values.ravel()
    
    if model_name == 'random_forest':
        from sklearn.ensemble import RandomForestClassifier
        model = RandomForestClassifier(n_estimators=100)
    elif model_name == 'xgboost':
        import xgboost as xgb
        model = xgb.XGBClassifier()
    # ... other models
    
    model.fit(X_train, y_train)
    
    # Evaluate
    X_test = pd.read_csv('/data/X_test.csv')
    y_test = pd.read_csv('/data/y_test.csv').values.ravel()
    accuracy = model.score(X_test, y_test)
    
    # Save
    model_path = f'/models/{model_name}_{{{{ ds }}}}.joblib'
    joblib.dump(model, model_path)
    
    return {'model': model_name, 'accuracy': accuracy, 'path': model_path}

# Create parallel training tasks
training_tasks = []
for model_name in MODELS:
    task = PythonOperator(
        task_id=f'train_{model_name}',
        python_callable=train_specific_model,
        op_kwargs={'model_name': model_name},
        dag=dag,
    )
    training_tasks.append(task)

def select_best_model(**context):
    """Select best performing model"""
    results = []
    for model_name in MODELS:
        result = context['ti'].xcom_pull(task_ids=f'train_{model_name}')
        results.append(result)
    
    # Find best
    best = max(results, key=lambda x: x['accuracy'])
    print(f"Best model: {best['model']} with accuracy {best['accuracy']:.4f}")
    
    # Deploy best model
    import shutil
    shutil.copy(best['path'], '/models/production/model.joblib')

select_task = PythonOperator(
    task_id='select_best_model',
    python_callable=select_best_model,
    dag=dag,
)

# All training tasks run in parallel, then selection
preprocess_task >> training_tasks >> select_task

⚠️ Airflow Best Practices:

  • Keep tasks idempotent (can run multiple times safely)
  • Don't put heavy computation in DAG definition (runs every heartbeat)
  • Use XCom sparingly (limited size, not for large data)
  • Implement proper error handling and retries
  • Use sensors for external dependencies

☸️ Kubeflow Pipelines

What is Kubeflow?

Kubeflow Pipelines is a platform for building and deploying ML workflows on Kubernetes. Each step runs in a container, making pipelines portable and scalable.

Installation

# Install Kubeflow Pipelines SDK
pip install kfp

# Deploy Kubeflow on Kubernetes cluster
# (Requires existing K8s cluster)
export PIPELINE_VERSION=1.8.5
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=$PIPELINE_VERSION"

# Access UI (port-forward)
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80

Building a Kubeflow Pipeline

"""
Kubeflow ML pipeline with component-based design
"""
import kfp
from kfp import dsl
from kfp.components import create_component_from_func

# Define components as Python functions

@create_component_from_func
def load_data(data_path: str) -> str:
    """Load and validate data"""
    import pandas as pd
    
    df = pd.read_csv(data_path)
    print(f"Loaded {len(df)} samples")
    
    # Save processed data
    output_path = '/tmp/processed_data.csv'
    df.to_csv(output_path, index=False)
    
    return output_path

@create_component_from_func
def train_model(
    data_path: str,
    n_estimators: int = 100,
    max_depth: int = 10
) -> str:
    """Train model and return path"""
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split
    import joblib
    
    # Load data
    df = pd.read_csv(data_path)
    X = df.drop('target', axis=1)
    y = df['target']
    
    # Split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    # Train
    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        random_state=42
    )
    model.fit(X_train, y_train)
    
    # Evaluate
    accuracy = model.score(X_test, y_test)
    print(f"Model Accuracy: {accuracy:.4f}")
    
    # Save
    model_path = '/tmp/model.joblib'
    joblib.dump(model, model_path)
    
    return model_path

@create_component_from_func
def evaluate_model(model_path: str, data_path: str) -> float:
    """Evaluate model and return metrics"""
    import pandas as pd
    import joblib
    from sklearn.metrics import accuracy_score, f1_score
    from sklearn.model_selection import train_test_split
    
    # Load model and data
    model = joblib.load(model_path)
    df = pd.read_csv(data_path)
    X = df.drop('target', axis=1)
    y = df['target']
    
    _, X_test, _, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    # Predict
    y_pred = model.predict(X_test)
    
    # Metrics
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')
    
    print(f"Accuracy: {accuracy:.4f}, F1: {f1:.4f}")
    
    return accuracy

@create_component_from_func
def deploy_model(model_path: str, accuracy: float, threshold: float = 0.85):
    """Deploy model if meets quality threshold"""
    if accuracy < threshold:
        raise ValueError(f"Accuracy {accuracy:.4f} below threshold {threshold}")
    
    # In production, deploy to serving system
    print(f"βœ… Deploying model with accuracy {accuracy:.4f}")

# Define pipeline
@dsl.pipeline(
    name='ML Training Pipeline',
    description='End-to-end ML training and deployment'
)
def ml_pipeline(
    data_path: str = 'gs://my-bucket/data.csv',
    n_estimators: int = 100,
    max_depth: int = 10,
    accuracy_threshold: float = 0.85
):
    """Complete ML pipeline"""
    
    # Step 1: Load data
    load_data_task = load_data(data_path=data_path)
    
    # Step 2: Train model
    train_task = train_model(
        data_path=load_data_task.output,
        n_estimators=n_estimators,
        max_depth=max_depth
    )
    
    # Step 3: Evaluate
    eval_task = evaluate_model(
        model_path=train_task.output,
        data_path=load_data_task.output
    )
    
    # Step 4: Deploy (conditional)
    deploy_task = deploy_model(
        model_path=train_task.output,
        accuracy=eval_task.output,
        threshold=accuracy_threshold
    )

# Compile and submit pipeline
if __name__ == '__main__':
    kfp.compiler.Compiler().compile(ml_pipeline, 'ml_pipeline.yaml')
    
    # Submit to Kubeflow
    client = kfp.Client(host='http://localhost:8080')
    
    # Create experiment
    experiment = client.create_experiment(name='ml-training')
    
    # Run pipeline
    run = client.run_pipeline(
        experiment_id=experiment.id,
        job_name='ml-training-run',
        pipeline_package_path='ml_pipeline.yaml',
        params={
            'data_path': 'gs://my-bucket/data.csv',
            'n_estimators': 100,
            'accuracy_threshold': 0.85
        }
    )
    
    print(f"Pipeline submitted: {run.run_id}")

Hyperparameter Tuning with Parallel Runs

"""
Run multiple experiments with different hyperparameters
"""
# Hyperparameter grid
param_grid = [
    {'n_estimators': 50, 'max_depth': 5},
    {'n_estimators': 100, 'max_depth': 10},
    {'n_estimators': 200, 'max_depth': 15},
    {'n_estimators': 300, 'max_depth': 20},
]

# Submit multiple runs
client = kfp.Client(host='http://localhost:8080')
experiment = client.create_experiment(name='hyperparameter-tuning')

for params in param_grid:
    run = client.run_pipeline(
        experiment_id=experiment.id,
        job_name=f"train_n{params['n_estimators']}_d{params['max_depth']}",
        pipeline_package_path='ml_pipeline.yaml',
        params=params
    )
    print(f"Submitted run: {run.run_id}")

# Kubeflow will run all in parallel on K8s cluster!

Using Prebuilt Components

"""
Use Kubeflow's prebuilt components
"""
from kfp.components import load_component_from_url

# Load prebuilt components
train_sklearn = load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/sklearn/Train/component.yaml'
)

deploy_model = load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/deployer/component.yaml'
)

@dsl.pipeline(name='Quick ML Pipeline')
def quick_pipeline(data_path: str):
    # Use prebuilt training component
    train_task = train_sklearn(
        training_data=data_path,
        model_type='RandomForestClassifier'
    )
    
    # Use prebuilt deployment component
    deploy_task = deploy_model(
        model=train_task.outputs['model'],
        namespace='production'
    )

πŸ“¦ MLflow Pipelines (Recipes)

Overview

MLflow Pipelines (formerly MLflow Recipes) provides predefined pipeline templates for common ML tasks with built-in best practices.

Installation & Setup

pip install mlflow[pipelines]

# Create pipeline from template
mlflow recipes create --template regression --name my_pipeline
cd my_pipeline

Pipeline Structure

# pipeline.yaml - Configuration
template: "regression/v1"
target_col: "price"

steps:
  ingest:
    using: parquet
    location: "data/training_data.parquet"
  
  split:
    split_ratios: [0.7, 0.15, 0.15]  # train/val/test
    random_state: 42
  
  transform:
    using: custom
    transformer_method: transform
  
  train:
    using: sklearn
    estimator_method: estimator
  
  evaluate:
    validation_criteria:
      - metric: root_mean_squared_error
        threshold: 10000
      - metric: mean_absolute_percentage_error
        threshold: 0.15
  
  register:
    registry_uri: "sqlite:///mlflow.db"
    model_name: "price_predictor"

Custom Steps

# steps/transform.py
def transform(training_data):
    """Custom transformation logic"""
    from sklearn.preprocessing import StandardScaler
    
    # Feature engineering
    training_data['age'] = 2024 - training_data['year_built']
    training_data['price_per_sqft'] = training_data['price'] / training_data['sqft']
    
    # Scale features
    scaler = StandardScaler()
    numeric_cols = ['sqft', 'bedrooms', 'age']
    training_data[numeric_cols] = scaler.fit_transform(training_data[numeric_cols])
    
    return training_data

# steps/train.py
def estimator():
    """Define model to train"""
    from sklearn.ensemble import GradientBoostingRegressor
    
    return GradientBoostingRegressor(
        n_estimators=200,
        learning_rate=0.1,
        max_depth=5,
        random_state=42
    )

Running Pipeline

# Run entire pipeline
mlflow recipes run

# Run specific step
mlflow recipes run --step train

# Inspect results
mlflow recipes inspect

# View artifacts
mlflow recipes inspect --step train

Integration with MLflow Tracking

"""
MLflow Pipelines automatically log to MLflow Tracking
"""
import mlflow

# View logged runs
client = mlflow.tracking.MlflowClient()
experiment = client.get_experiment_by_name("my_pipeline")
runs = client.search_runs(experiment.experiment_id)

for run in runs:
    print(f"Run ID: {run.info.run_id}")
    print(f"Metrics: {run.data.metrics}")
    print(f"Params: {run.data.params}")
    print(f"Artifacts: {client.list_artifacts(run.info.run_id)}")
    print("---")

πŸ”„ CI/CD for ML Models

What is ML CI/CD?

Continuous Integration and Continuous Deployment for ML extends traditional software CI/CD with model training, validation, and deployment automation.

GitHub Actions ML Pipeline

# .github/workflows/ml-pipeline.yml
name: ML Training Pipeline

on:
  push:
    branches: [main]
  schedule:
    - cron: '0 2 * * *'  # Daily at 2 AM
  workflow_dispatch:  # Manual trigger

jobs:
  train:
    runs-on: ubuntu-latest
    
    steps:
      - uses: actions/checkout@v3
      
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'
      
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
      
      - name: Download training data
        run: |
          aws s3 cp s3://my-bucket/data.csv data/
        env:
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
      
      - name: Train model
        run: |
          python train.py --config config.yaml
      
      - name: Evaluate model
        id: evaluate
        run: |
          python evaluate.py --model model.pkl
          echo "accuracy=$(cat metrics.json | jq -r '.accuracy')" >> $GITHUB_OUTPUT
      
      - name: Quality gate
        run: |
          if (( $(echo "${{ steps.evaluate.outputs.accuracy }} < 0.85" | bc -l) )); then
            echo "Model accuracy below threshold"
            exit 1
          fi
      
      - name: Upload model artifact
        uses: actions/upload-artifact@v3
        with:
          name: trained-model
          path: model.pkl
      
      - name: Log to MLflow
        run: |
          python log_model.py
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
  
  deploy:
    needs: train
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    
    steps:
      - uses: actions/checkout@v3
      
      - name: Download model
        uses: actions/download-artifact@v3
        with:
          name: trained-model
      
      - name: Deploy to production
        run: |
          # Build Docker image
          docker build -t ml-model:${{ github.sha }} .
          
          # Push to registry
          docker push ml-model:${{ github.sha }}
          
          # Update K8s deployment
          kubectl set image deployment/ml-model \
            ml-model=ml-model:${{ github.sha }}
        env:
          KUBECONFIG: ${{ secrets.KUBECONFIG }}

Model Testing in CI/CD

"""
tests/test_model.py - Model quality tests
"""
import pytest
import joblib
import pandas as pd
from sklearn.metrics import accuracy_score

@pytest.fixture
def model():
    """Load trained model"""
    return joblib.load('model.pkl')

@pytest.fixture
def test_data():
    """Load test dataset"""
    return pd.read_csv('data/test.csv')

def test_model_accuracy(model, test_data):
    """Test model meets accuracy threshold"""
    X_test = test_data.drop('target', axis=1)
    y_test = test_data['target']
    
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    
    assert accuracy >= 0.85, f"Accuracy {accuracy:.4f} below threshold 0.85"

def test_prediction_latency(model, test_data):
    """Test prediction speed"""
    import time
    
    X_sample = test_data.drop('target', axis=1).iloc[0:1]
    
    start = time.time()
    _ = model.predict(X_sample)
    duration = time.time() - start
    
    assert duration < 0.1, f"Prediction took {duration:.4f}s, exceeds 100ms limit"

def test_no_null_predictions(model, test_data):
    """Test model doesn't produce null predictions"""
    X_test = test_data.drop('target', axis=1)
    predictions = model.predict(X_test)
    
    assert not pd.isnull(predictions).any(), "Model produced null predictions"

def test_prediction_distribution(model, test_data):
    """Test predictions follow expected distribution"""
    X_test = test_data.drop('target', axis=1)
    predictions = model.predict(X_test)
    
    # Check class balance (for classification)
    unique, counts = np.unique(predictions, return_counts=True)
    min_class_ratio = counts.min() / counts.sum()
    
    assert min_class_ratio >= 0.05, "Severe class imbalance in predictions"

βœ… ML CI/CD Best Practices:

  • Automate model training on code or data changes
  • Implement quality gates (accuracy thresholds)
  • Test model performance and latency
  • Version control data, code, and models
  • Use canary deployments for gradual rollout
  • Monitor model performance post-deployment

🎯 Summary

You've mastered ML orchestration and pipeline automation:

πŸŒͺ️

Apache Airflow

DAG-based orchestration for complex workflows mixing ML and non-ML tasks

☸️

Kubeflow Pipelines

Kubernetes-native ML pipelines with containerized components

πŸ“¦

MLflow Pipelines

Template-based pipelines with built-in best practices

πŸ”„

ML CI/CD

Automated training, testing, and deployment workflows

πŸ§ͺ

Model Testing

Quality gates and performance validation

πŸ“Š

Experiment Tracking

Version control for models, data, and metrics

Key Takeaways

  1. ML pipelines provide reproducibility, automation, and scalability
  2. Airflow excels at complex workflows with diverse tasks
  3. Kubeflow is ideal for Kubernetes-based ML infrastructure
  4. MLflow Pipelines offer quick setup with templates
  5. CI/CD for ML automates the entire model lifecycle
  6. Implement quality gates to prevent bad models reaching production
  7. Version everything: code, data, models, and configurations

πŸš€ Next Steps:

Your ML workflows are fully automated! Next, you'll learn Kubernetes for ML - deploying and scaling ML workloads on K8s with GPU scheduling, autoscaling, and production-grade infrastructure.

Test Your Knowledge

Q1: What is the main benefit of using an ML orchestration framework like Airflow?

It makes models more accurate
It reduces training time
It automates and makes ML workflows reproducible, with scheduling, monitoring, and error handling
It's required for deployment

Q2: What is a DAG in Apache Airflow?

A type of database
Directed Acyclic Graph - a workflow where tasks have dependencies but no cycles
A deployment strategy
A data format

Q3: When should you choose Kubeflow Pipelines over Apache Airflow?

When you're running on Kubernetes and need ML-specific features like experiment tracking and model versioning
Always, it's better
Never use Kubeflow
Only for simple workflows

Q4: What is a quality gate in ML CI/CD?

A security feature
A type of neural network layer
A deployment strategy
A checkpoint that validates model performance meets thresholds before deployment

Q5: What should you version control in an ML project?

Only the code
Only the trained models
Code, data, models, configurations, and pipeline definitions
Nothing, version control isn't needed