HomeMLOps EngineerProduction Best Practices

Production ML Best Practices

Feature stores with Feast and Tecton, model governance, compliance, cost optimization, security, and documentation for enterprise ML

📅 Tutorial 13 🎓 Advanced

🎓 Complete all tutorials to earn your Free MLOps Engineer Certificate
Shareable on LinkedIn • Verified by AITutorials.site • No signup fee

🏢 Enterprise ML Challenges

Your ML system is deployed and working. But as you scale:

  • Features computed differently in training vs serving (training-serving skew)
  • Multiple teams building duplicate feature pipelines
  • No visibility into which models use which data
  • Compliance requirements: explainability, fairness, data privacy
  • Cloud costs spiraling out of control
  • Security vulnerabilities in model APIs
  • No documentation - only original developer understands the system

Production best practices solve these enterprise challenges with feature stores, governance frameworks, cost optimization, and systematic documentation.

💡 Production ML Pillars:

  • Feature Stores: Centralized feature management
  • Model Governance: Lineage, versioning, approvals
  • Compliance: Explainability, fairness, privacy
  • Cost Optimization: Resource management
  • Security: Authentication, authorization, encryption
  • Documentation: Model cards, data sheets, runbooks

🏪 Feature Stores

What is a Feature Store?

A feature store is a centralized repository for ML features that:

  • Stores feature definitions and transformations
  • Serves features consistently for training and inference
  • Prevents training-serving skew
  • Enables feature reuse across teams
  • Provides point-in-time correct features for training
  • Tracks feature lineage and usage

Feast (Open Source Feature Store)

Installation

pip install feast

Define Feature Repository

"""
Feature definitions with Feast
"""
# feature_repo/features.py
from datetime import timedelta
from feast import Entity, Feature, FeatureView, ValueType
from feast.data_source import FileSource

# Define entity (unique identifier)
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="User ID"
)

# Data source
user_features_source = FileSource(
    path="data/user_features.parquet",
    event_timestamp_column="event_timestamp",
)

# Feature view
user_features = FeatureView(
    name="user_features",
    entities=["user_id"],
    ttl=timedelta(days=1),
    features=[
        Feature(name="age", dtype=ValueType.INT64),
        Feature(name="total_purchases", dtype=ValueType.INT64),
        Feature(name="avg_purchase_amount", dtype=ValueType.FLOAT),
        Feature(name="days_since_last_purchase", dtype=ValueType.INT64),
    ],
    online=True,
    source=user_features_source,
    tags={"team": "ml", "project": "churn_prediction"},
)

Initialize Feature Store

# feature_repo/feature_store.yaml
project: ml_project
registry: data/registry.db
provider: local
online_store:
  type: sqlite
  path: data/online_store.db

# Apply feature definitions
cd feature_repo
feast apply

Materialize Features (Training)

"""
Get historical features for training
"""
from feast import FeatureStore
from datetime import datetime
import pandas as pd

store = FeatureStore(repo_path="feature_repo")

# Entity dataframe (users and timestamps)
entity_df = pd.DataFrame({
    "user_id": [1001, 1002, 1003],
    "event_timestamp": [
        datetime(2024, 1, 1),
        datetime(2024, 1, 2),
        datetime(2024, 1, 3),
    ]
})

# Get historical features (point-in-time correct)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_features:age",
        "user_features:total_purchases",
        "user_features:avg_purchase_amount",
        "user_features:days_since_last_purchase",
    ],
).to_df()

print(training_df)

Serve Features (Inference)

"""
Get online features for real-time prediction
"""
# Materialize features to online store
from datetime import datetime, timedelta

store.materialize(
    start_date=datetime.now() - timedelta(days=7),
    end_date=datetime.now()
)

# Get online features (low latency)
@app.post("/predict")
async def predict(user_id: int):
    """Prediction endpoint with Feast features"""
    
    # Fetch features from online store
    features = store.get_online_features(
        features=[
            "user_features:age",
            "user_features:total_purchases",
            "user_features:avg_purchase_amount",
            "user_features:days_since_last_purchase",
        ],
        entity_rows=[{"user_id": user_id}],
    ).to_dict()
    
    # Convert to model input
    feature_vector = [
        features['age'][0],
        features['total_purchases'][0],
        features['avg_purchase_amount'][0],
        features['days_since_last_purchase'][0],
    ]
    
    # Predict
    prediction = model.predict([feature_vector])[0]
    
    return {"user_id": user_id, "churn_probability": float(prediction)}

Tecton (Enterprise Feature Store)

"""
Tecton feature definitions with transformations
"""
from tecton import Entity, Feature, FeatureView, transformation
from tecton.types import Field, String, Int64, Float64
from datetime import timedelta

# Define entity
user = Entity(
    name="user",
    join_keys=[Field("user_id", Int64)],
    description="User entity"
)

# Feature transformation
@transformation(mode="pandas")
def user_transaction_stats(transactions):
    """Compute user transaction statistics"""
    return transactions.groupby('user_id').agg({
        'amount': ['count', 'mean', 'sum'],
        'timestamp': 'max'
    }).reset_index()

# Feature view with transformation
user_transaction_features = FeatureView(
    name="user_transaction_features",
    entities=[user],
    features=[
        Feature("total_transactions", Int64),
        Feature("avg_transaction_amount", Float64),
        Feature("total_spend", Float64),
    ],
    transformation=user_transaction_stats,
    ttl=timedelta(days=30),
    online=True,
    offline=True,
    feature_start_time=datetime(2024, 1, 1),
)

# Real-time features
@transformation(mode="python", input=RequestDataSource())
def user_context_features(request):
    """Real-time request features"""
    return {
        'hour_of_day': request['timestamp'].hour,
        'day_of_week': request['timestamp'].weekday(),
        'device_type': request['device'],
    }

user_context = FeatureView(
    name="user_context",
    entities=[user],
    features=[
        Feature("hour_of_day", Int64),
        Feature("day_of_week", Int64),
        Feature("device_type", String),
    ],
    transformation=user_context_features,
    online=True,
)

Feature Store Benefits

🎯

Consistency

Same features for training and serving

♻️

Reusability

Share features across teams and models

⏱️

Point-in-Time

Correct historical features for training

Low Latency

Fast online feature serving

📋 Model Governance

What is Model Governance?

Model governance ensures models are developed responsibly, meet regulatory requirements, and maintain quality standards throughout their lifecycle.

Model Registry with Metadata

"""
Comprehensive model metadata tracking
"""
import mlflow
from mlflow.tracking import MlflowClient

def register_model_with_governance(model, metadata):
    """Register model with governance metadata"""
    
    with mlflow.start_run() as run:
        # Log model
        mlflow.sklearn.log_model(model, "model")
        
        # Log governance metadata
        mlflow.log_params({
            "model_owner": metadata["owner"],
            "model_purpose": metadata["purpose"],
            "training_data_version": metadata["data_version"],
            "compliance_status": metadata["compliance"],
        })
        
        # Log metrics
        mlflow.log_metrics({
            "accuracy": metadata["accuracy"],
            "fairness_score": metadata["fairness"],
            "explainability_score": metadata["explainability"],
        })
        
        # Log artifacts
        mlflow.log_dict(metadata["data_lineage"], "data_lineage.json")
        mlflow.log_dict(metadata["model_card"], "model_card.json")
        
        # Register
        model_uri = f"runs:/{run.info.run_id}/model"
        registered = mlflow.register_model(model_uri, metadata["model_name"])
        
        # Add governance tags
        client = MlflowClient()
        client.set_model_version_tag(
            name=metadata["model_name"],
            version=registered.version,
            key="approved_by",
            value=metadata["approver"]
        )
        client.set_model_version_tag(
            name=metadata["model_name"],
            version=registered.version,
            key="compliance_reviewed",
            value="true"
        )
        
        return registered

# Usage
metadata = {
    "model_name": "credit_risk_model",
    "owner": "data-science-team",
    "purpose": "Credit risk assessment",
    "data_version": "v2024.03",
    "compliance": "GDPR_compliant",
    "accuracy": 0.92,
    "fairness": 0.88,
    "explainability": 0.85,
    "approver": "john.doe@company.com",
    "data_lineage": {
        "sources": ["customer_db", "transaction_db"],
        "transformations": ["feature_engineering_v3"],
        "timestamp": "2024-03-15T10:00:00Z"
    },
    "model_card": {
        "intended_use": "Credit approval decisions",
        "limitations": "Not suitable for customers under 18",
        "ethical_considerations": "Bias mitigation applied"
    }
}

register_model_with_governance(model, metadata)

Approval Workflow

"""
Model approval workflow
"""
class ModelApprovalWorkflow:
    def __init__(self, model_name, version):
        self.model_name = model_name
        self.version = version
        self.client = MlflowClient()
    
    def request_approval(self, requester):
        """Request model approval"""
        
        # Transition to staging
        self.client.transition_model_version_stage(
            name=self.model_name,
            version=self.version,
            stage="Staging"
        )
        
        # Add approval request tag
        self.client.set_model_version_tag(
            name=self.model_name,
            version=self.version,
            key="approval_status",
            value="pending"
        )
        self.client.set_model_version_tag(
            name=self.model_name,
            version=self.version,
            key="requested_by",
            value=requester
        )
        
        # Notify approvers
        send_approval_request(self.model_name, self.version, requester)
    
    def approve(self, approver, comments=""):
        """Approve model for production"""
        
        # Validation checks
        checks = self.run_validation_checks()
        
        if not all(checks.values()):
            raise ValueError(f"Validation failed: {checks}")
        
        # Record approval
        self.client.set_model_version_tag(
            name=self.model_name,
            version=self.version,
            key="approval_status",
            value="approved"
        )
        self.client.set_model_version_tag(
            name=self.model_name,
            version=self.version,
            key="approved_by",
            value=approver
        )
        self.client.set_model_version_tag(
            name=self.model_name,
            version=self.version,
            key="approval_date",
            value=datetime.now().isoformat()
        )
        
        # Promote to production
        self.client.transition_model_version_stage(
            name=self.model_name,
            version=self.version,
            stage="Production"
        )
        
        print(f"✅ Model v{self.version} approved and promoted to production")
    
    def run_validation_checks(self):
        """Run governance validation checks"""
        return {
            "accuracy_threshold": self.check_accuracy(),
            "fairness_check": self.check_fairness(),
            "explainability": self.check_explainability(),
            "data_quality": self.check_data_quality(),
            "compliance": self.check_compliance(),
        }

# Usage
workflow = ModelApprovalWorkflow("credit_risk_model", version=5)
workflow.request_approval(requester="data-scientist@company.com")

# Later, after review
workflow.approve(approver="ml-lead@company.com")

⚖️ Compliance & Responsible AI

Model Explainability with SHAP

"""
Add explainability to models
"""
import shap
import matplotlib.pyplot as plt

def explain_prediction(model, X_test, instance_idx=0):
    """Generate SHAP explanations"""
    
    # Create explainer
    explainer = shap.TreeExplainer(model)
    
    # Calculate SHAP values
    shap_values = explainer.shap_values(X_test)
    
    # Explain single prediction
    shap.initjs()
    shap.force_plot(
        explainer.expected_value,
        shap_values[instance_idx],
        X_test.iloc[instance_idx],
        matplotlib=True
    )
    plt.savefig('explanation.png')
    
    # Feature importance
    shap.summary_plot(shap_values, X_test, show=False)
    plt.savefig('feature_importance.png')
    
    # Log to MLflow
    mlflow.log_artifact('explanation.png')
    mlflow.log_artifact('feature_importance.png')
    
    return shap_values

# Add to prediction endpoint
@app.post("/predict_with_explanation")
async def predict_explainable(request: PredictRequest):
    """Prediction with explanation"""
    
    features_df = pd.DataFrame([request.features])
    prediction = model.predict(features_df)[0]
    
    # Generate explanation
    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(features_df)[0]
    
    # Top contributing features
    feature_importance = dict(zip(
        features_df.columns,
        shap_values
    ))
    top_features = sorted(
        feature_importance.items(),
        key=lambda x: abs(x[1]),
        reverse=True
    )[:5]
    
    return {
        "prediction": float(prediction),
        "explanation": {
            "top_features": [
                {"feature": f, "contribution": float(c)}
                for f, c in top_features
            ]
        }
    }

Fairness Evaluation

"""
Evaluate model fairness across demographic groups
"""
from fairlearn.metrics import (
    demographic_parity_difference,
    equalized_odds_difference
)

def evaluate_fairness(model, X_test, y_test, sensitive_feature):
    """Assess model fairness"""
    
    predictions = model.predict(X_test)
    
    # Demographic parity
    dp_diff = demographic_parity_difference(
        y_true=y_test,
        y_pred=predictions,
        sensitive_features=sensitive_feature
    )
    
    # Equalized odds
    eo_diff = equalized_odds_difference(
        y_true=y_test,
        y_pred=predictions,
        sensitive_features=sensitive_feature
    )
    
    print(f"Demographic Parity Difference: {dp_diff:.4f}")
    print(f"Equalized Odds Difference: {eo_diff:.4f}")
    
    # Log fairness metrics
    mlflow.log_metrics({
        "demographic_parity_diff": dp_diff,
        "equalized_odds_diff": eo_diff,
    })
    
    # Fail if unfair
    if abs(dp_diff) > 0.1 or abs(eo_diff) > 0.1:
        raise ValueError("Model fails fairness criteria")
    
    return dp_diff, eo_diff

Data Privacy (Differential Privacy)

"""
Train model with differential privacy
"""
from diffprivlib.models import LogisticRegression

# Train with privacy guarantees
private_model = LogisticRegression(
    epsilon=1.0,  # Privacy budget
    data_norm=5.0
)

private_model.fit(X_train, y_train)

print(f"Model trained with ε={private_model.epsilon} differential privacy")

💰 Cost Optimization

Resource Right-Sizing

"""
Monitor and optimize resource usage
"""
import psutil
import time

class ResourceMonitor:
    def __init__(self):
        self.metrics = []
    
    def track_prediction(self, prediction_func):
        """Track resource usage during prediction"""
        
        def wrapper(*args, **kwargs):
            # Before
            cpu_before = psutil.cpu_percent()
            memory_before = psutil.Process().memory_info().rss / 1024**2
            start_time = time.time()
            
            # Execute
            result = prediction_func(*args, **kwargs)
            
            # After
            duration = time.time() - start_time
            cpu_after = psutil.cpu_percent()
            memory_after = psutil.Process().memory_info().rss / 1024**2
            
            self.metrics.append({
                'duration': duration,
                'cpu_usage': cpu_after - cpu_before,
                'memory_mb': memory_after - memory_before,
            })
            
            return result
        
        return wrapper
    
    def analyze(self):
        """Analyze resource usage and recommend sizing"""
        df = pd.DataFrame(self.metrics)
        
        avg_duration = df['duration'].mean()
        p95_duration = df['duration'].quantile(0.95)
        avg_memory = df['memory_mb'].mean()
        
        print(f"Avg latency: {avg_duration:.3f}s")
        print(f"P95 latency: {p95_duration:.3f}s")
        print(f"Avg memory: {avg_memory:.1f}MB")
        
        # Recommendations
        if avg_memory < 512:
            print("💡 Consider downsizing to 512MB instance")
        if p95_duration < 0.1:
            print("💡 Model is fast - consider batching requests")

monitor = ResourceMonitor()

@app.post("/predict")
@monitor.track_prediction
async def predict(request: PredictRequest):
    return model.predict(request.features)

Batch Prediction for Cost Efficiency

"""
Batch predictions to reduce costs
"""
# Instead of real-time predictions for all use cases
# Use scheduled batch predictions for non-critical paths

from airflow import DAG
from airflow.operators.python import PythonOperator

def batch_predict():
    """Run batch predictions on accumulated requests"""
    
    # Fetch pending requests
    requests = fetch_pending_requests()
    
    # Batch predict (much faster than individual)
    features = [r['features'] for r in requests]
    predictions = model.predict(features)
    
    # Store results
    for request, prediction in zip(requests, predictions):
        store_prediction(request['id'], prediction)
    
    print(f"Processed {len(requests)} predictions")

dag = DAG(
    'batch_predictions',
    schedule_interval='*/15 * * * *',  # Every 15 minutes
)

PythonOperator(
    task_id='batch_predict',
    python_callable=batch_predict,
    dag=dag
)

Spot Instances for Training

# Use spot instances for training (70% cost reduction)
# Kubernetes with spot instances
apiVersion: v1
kind: Pod
metadata:
  name: ml-training
spec:
  nodeSelector:
    node.kubernetes.io/instance-type: spot
  tolerations:
    - key: "spot"
      operator: "Equal"
      value: "true"
      effect: "NoSchedule"
  containers:
    - name: trainer
      image: ml-trainer:latest
      resources:
        requests:
          memory: "8Gi"
          cpu: "4"

📝 Documentation Best Practices

Model Cards

# model_card.yaml
model_details:
  name: "Customer Churn Prediction Model"
  version: "2.1.0"
  date: "2024-03-15"
  owner: "Data Science Team"
  contact: "ds-team@company.com"

intended_use:
  primary_use: "Identify customers at risk of churning"
  users: "Customer success teams, marketing"
  out_of_scope: "Not for individual customer targeting without human review"

training_data:
  sources:
    - "Customer database (2022-2024)"
    - "Transaction history"
    - "Support tickets"
  size: "1.2M customers"
  preprocessing: "Feature engineering pipeline v3"

metrics:
  accuracy: 0.89
  precision: 0.87
  recall: 0.85
  auc_roc: 0.93

ethical_considerations:
  fairness: "Tested across demographic groups - no significant bias detected"
  privacy: "PII removed, differential privacy applied"
  limitations: "Performance degrades for new product lines"

recommendations:
  monitoring: "Monitor weekly, retrain monthly"
  thresholds: "Confidence > 0.7 for automated actions"

Runbooks

# Model Deployment Runbook

## Pre-Deployment Checklist
- [ ] Model passes all validation tests
- [ ] Fairness metrics within acceptable range
- [ ] Performance validated on hold-out test set
- [ ] Model card completed and reviewed
- [ ] Approval obtained from ML lead

## Deployment Steps
1. **Register Model**
   ```bash
   mlflow models register-model runs:/abc123/model \
     --name customer_churn
   ```

2. **Deploy to Staging**
   ```bash
   kubectl apply -f k8s/staging-deployment.yaml
   ```

3. **Run Shadow Mode** (48 hours)
   - Monitor shadow vs production agreement
   - Check for performance degradation

4. **Promote to Production**
   ```bash
   kubectl apply -f k8s/production-deployment.yaml
   ```

## Monitoring
- Dashboard: http://grafana.company.com/ml-models
- Alerts: #ml-alerts Slack channel
- On-call: data-science-oncall@company.com

## Rollback Procedure
If accuracy drops below 85% or error rate exceeds 5%:
```bash
# Rollback to previous version
mlflow models transition-stage \
  --name customer_churn \
  --version 14 \
  --stage Production
```

## Common Issues
- **High latency**: Check feature store connection
- **Low accuracy**: Verify input data quality
- **Fairness violations**: Trigger retraining with balanced data

🎯 Summary

You've mastered production ML best practices:

🏪

Feature Stores

Centralized feature management with Feast and Tecton

📋

Governance

Model registry, lineage, and approval workflows

⚖️

Compliance

Explainability, fairness, and privacy guarantees

💰

Cost Optimization

Resource monitoring and efficient infrastructure

🔒

Security

Authentication, authorization, and data protection

📝

Documentation

Model cards, runbooks, and knowledge sharing

Key Takeaways

  1. Use feature stores to prevent training-serving skew and enable feature reuse
  2. Implement model governance with registries, approvals, and lineage tracking
  3. Ensure compliance with explainability, fairness, and privacy safeguards
  4. Optimize costs through right-sizing, batching, and spot instances
  5. Document models thoroughly with model cards and runbooks
  6. Monitor production systems continuously and maintain rollback capabilities
  7. Build responsible AI systems that are transparent, fair, and secure

🎉 Congratulations on Completing the Core Tutorials!

You've learned all the foundational MLOps concepts! Now put them into practice with hands-on projects where you'll build complete end-to-end ML systems integrating everything you've learned.

Test Your Knowledge

Q1: What problem do feature stores solve?

Slow training
Model deployment
Training-serving skew, feature consistency, and reusability across teams
Data storage

Q2: What is model governance?

Ensuring models meet quality, regulatory, and ethical standards with approvals and lineage tracking
Training models faster
Monitoring model accuracy
Deploying models

Q3: Why use SHAP for explainability?

To train models
To explain individual predictions by showing feature contributions
To deploy models
To store data

Q4: Cost optimization strategy for ML training?

Always use largest instances
Train continuously
Never monitor resources
Use spot instances for training, batch predictions, and right-size resources

Q5: What should a model card include?

Only accuracy metrics
Code implementation
Intended use, training data, metrics, ethical considerations, and limitations
Nothing, documentation isn't needed