Project Overview
Build an advanced customer churn prediction system with automated feature engineering using Featuretools, time-series features, dimensionality reduction, and a production-ready deployment pipeline. This project combines everything you've learned in the Feature Engineering course.
šÆ Learning Objectives
- Use Featuretools for automated deep feature synthesis
- Create sophisticated time-based behavioral features
- Apply PCA for dimensionality reduction
- Build production-ready sklearn pipelines
- Deploy with model versioning and monitoring
- Handle real-time feature computation
Step 1: Generate Synthetic Telecom Data
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
np.random.seed(42)
# Generate customer data
n_customers = 5000
n_transactions_per_customer = np.random.randint(10, 100, n_customers)
customers = pd.DataFrame({
'customer_id': range(1, n_customers + 1),
'signup_date': pd.date_range('2022-01-01', periods=n_customers, freq='1H'),
'age': np.random.randint(18, 75, n_customers),
'monthly_subscription': np.random.choice([29.99, 49.99, 79.99], n_customers),
'contract_type': np.random.choice(['Monthly', '1-Year', '2-Year'], n_customers),
'payment_method': np.random.choice(['Credit Card', 'Bank Transfer', 'PayPal'], n_customers),
'region': np.random.choice(['North', 'South', 'East', 'West'], n_customers)
})
# Generate transaction/usage data
transactions = []
for cust_id in range(1, n_customers + 1):
n_trans = n_transactions_per_customer[cust_id - 1]
start_date = customers[customers['customer_id'] == cust_id]['signup_date'].values[0]
for i in range(n_trans):
trans_date = pd.Timestamp(start_date) + timedelta(days=np.random.randint(0, 365))
transactions.append({
'transaction_id': len(transactions) + 1,
'customer_id': cust_id,
'transaction_date': trans_date,
'data_usage_gb': np.random.exponential(5),
'call_minutes': np.random.exponential(100),
'sms_count': np.random.poisson(50),
'support_tickets': np.random.poisson(0.5)
})
transactions = pd.DataFrame(transactions)
# Generate churn labels (based on customer behavior)
customers['churned'] = 0
# Churn logic: customers with low usage, high support tickets, monthly contracts
for idx, row in customers.iterrows():
cust_trans = transactions[transactions['customer_id'] == row['customer_id']]
churn_prob = 0.05 # Base churn rate
if row['contract_type'] == 'Monthly': churn_prob += 0.15
if cust_trans['support_tickets'].mean() > 1: churn_prob += 0.20
if cust_trans['data_usage_gb'].mean() < 2: churn_prob += 0.15
customers.at[idx, 'churned'] = 1 if np.random.random() < churn_prob else 0
print(f"Customers: {len(customers)}")
print(f"Transactions: {len(transactions)}")
print(f"Churn rate: {customers['churned'].mean():.2%}")
print("\nCustomer data:")
print(customers.head())
print("\nTransaction data:")
print(transactions.head())
Step 2: Automated Feature Engineering with Featuretools
# Install: pip install featuretools
import featuretools as ft
# Create EntitySet
es = ft.EntitySet(id='customer_churn')
# Add customers dataframe
es = es.add_dataframe(
dataframe_name='customers',
dataframe=customers,
index='customer_id',
time_index='signup_date'
)
# Add transactions dataframe
es = es.add_dataframe(
dataframe_name='transactions',
dataframe=transactions,
index='transaction_id',
time_index='transaction_date'
)
# Define relationship
es = es.add_relationship('customers', 'customer_id',
'transactions', 'customer_id')
print("EntitySet created:")
print(es)
# Deep Feature Synthesis
feature_matrix, feature_defs = ft.dfs(
entityset=es,
target_dataframe_name='customers',
agg_primitives=['mean', 'sum', 'std', 'max', 'min', 'count', 'trend'],
trans_primitives=['month', 'weekday', 'day'],
max_depth=2,
verbose=True
)
print(f"\nGenerated {len(feature_defs)} features automatically!")
print(f"Feature matrix shape: {feature_matrix.shape}")
print("\nSample features:")
print(feature_matrix.columns.tolist()[:20])
Step 3: Manual Time-Based Feature Engineering
# Calculate time-based features manually for more control
customer_features = customers.copy()
for cust_id in customers['customer_id']:
cust_trans = transactions[transactions['customer_id'] == cust_id].copy()
if len(cust_trans) == 0:
continue
# Recency: days since last transaction
last_trans = cust_trans['transaction_date'].max()
customer_features.loc[customer_features['customer_id'] == cust_id, 'recency_days'] = \
(pd.Timestamp('2023-01-01') - last_trans).days
# Frequency: transactions per month
tenure_days = (last_trans - cust_trans['transaction_date'].min()).days + 1
trans_per_month = (len(cust_trans) / tenure_days) * 30
customer_features.loc[customer_features['customer_id'] == cust_id, 'frequency_monthly'] = \
trans_per_month
# Monetary: average spending
customer_features.loc[customer_features['customer_id'] == cust_id, 'avg_data_usage'] = \
cust_trans['data_usage_gb'].mean()
customer_features.loc[customer_features['customer_id'] == cust_id, 'avg_call_minutes'] = \
cust_trans['call_minutes'].mean()
# Trend: usage over time
if len(cust_trans) > 1:
cust_trans = cust_trans.sort_values('transaction_date')
recent_usage = cust_trans.tail(5)['data_usage_gb'].mean()
early_usage = cust_trans.head(5)['data_usage_gb'].mean()
usage_trend = (recent_usage - early_usage) / (early_usage + 1e-5)
customer_features.loc[customer_features['customer_id'] == cust_id, 'usage_trend'] = \
usage_trend
# Variability
customer_features.loc[customer_features['customer_id'] == cust_id, 'usage_std'] = \
cust_trans['data_usage_gb'].std()
# Fill NaN values
customer_features = customer_features.fillna(0)
print("Time-based features created:")
print(customer_features[['customer_id', 'recency_days', 'frequency_monthly',
'usage_trend', 'avg_data_usage']].head())
Step 4: Feature Selection & Dimensionality Reduction
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, f_classif
# Prepare features
feature_cols = ['age', 'monthly_subscription', 'recency_days', 'frequency_monthly',
'avg_data_usage', 'avg_call_minutes', 'usage_trend', 'usage_std']
# Encode categorical variables
le_contract = LabelEncoder()
le_payment = LabelEncoder()
le_region = LabelEncoder()
customer_features['contract_encoded'] = le_contract.fit_transform(customer_features['contract_type'])
customer_features['payment_encoded'] = le_payment.fit_transform(customer_features['payment_method'])
customer_features['region_encoded'] = le_region.fit_transform(customer_features['region'])
feature_cols.extend(['contract_encoded', 'payment_encoded', 'region_encoded'])
X = customer_features[feature_cols]
y = customer_features['churned']
# Train-test split
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# PCA for dimensionality reduction
pca = PCA(n_components=0.95) # Keep 95% variance
X_train_pca = pca.fit_transform(X_train_scaled)
X_test_pca = pca.transform(X_test_scaled)
print(f"Original features: {X_train_scaled.shape[1]}")
print(f"PCA components: {X_train_pca.shape[1]}")
print(f"Explained variance: {pca.explained_variance_ratio_.sum():.2%}")
Step 5: Build Production Pipeline
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
import joblib
# Define feature types
numeric_features = ['age', 'monthly_subscription', 'recency_days', 'frequency_monthly',
'avg_data_usage', 'avg_call_minutes', 'usage_trend', 'usage_std']
categorical_features = ['contract_type', 'payment_method', 'region']
# Preprocessing
numeric_transformer = Pipeline([
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('onehot', OneHotEncoder(drop='first', handle_unknown='ignore'))
])
preprocessor = ColumnTransformer([
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
# Complete pipeline with PCA
pipeline = Pipeline([
('preprocessor', preprocessor),
('pca', PCA(n_components=0.95)),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
# Prepare data with original categorical columns
X_original = customer_features[numeric_features + categorical_features]
y = customer_features['churned']
X_train, X_test, y_train, y_test = train_test_split(
X_original, y, test_size=0.2, random_state=42, stratify=y
)
# Train pipeline
pipeline.fit(X_train, y_train)
# Evaluate
from sklearn.metrics import classification_report, roc_auc_score, confusion_matrix
y_pred = pipeline.predict(X_test)
y_pred_proba = pipeline.predict_proba(X_test)[:, 1]
print("Model Performance:")
print(classification_report(y_test, y_pred))
print(f"\nROC-AUC: {roc_auc_score(y_test, y_pred_proba):.4f}")
# Save pipeline
joblib.dump(pipeline, 'churn_prediction_pipeline.pkl')
print("\nā
Pipeline saved to 'churn_prediction_pipeline.pkl'")
Step 6: Real-Time Feature Computation
class ChurnPredictor:
"""Production-ready churn predictor with feature computation"""
def __init__(self, pipeline_path='churn_prediction_pipeline.pkl'):
self.pipeline = joblib.load(pipeline_path)
self.feature_cols = numeric_features + categorical_features
def compute_features(self, customer_data, transaction_data):
"""Compute features from raw customer and transaction data"""
features = {}
# Basic customer features
features['age'] = customer_data['age']
features['monthly_subscription'] = customer_data['monthly_subscription']
features['contract_type'] = customer_data['contract_type']
features['payment_method'] = customer_data['payment_method']
features['region'] = customer_data['region']
# Compute time-based features from transactions
if len(transaction_data) > 0:
# Recency
last_trans = transaction_data['transaction_date'].max()
features['recency_days'] = (pd.Timestamp('now') - last_trans).days
# Frequency
first_trans = transaction_data['transaction_date'].min()
tenure_days = (last_trans - first_trans).days + 1
features['frequency_monthly'] = (len(transaction_data) / tenure_days) * 30
# Average usage
features['avg_data_usage'] = transaction_data['data_usage_gb'].mean()
features['avg_call_minutes'] = transaction_data['call_minutes'].mean()
# Trend
if len(transaction_data) > 1:
sorted_trans = transaction_data.sort_values('transaction_date')
recent = sorted_trans.tail(5)['data_usage_gb'].mean()
early = sorted_trans.head(5)['data_usage_gb'].mean()
features['usage_trend'] = (recent - early) / (early + 1e-5)
else:
features['usage_trend'] = 0
# Variability
features['usage_std'] = transaction_data['data_usage_gb'].std()
else:
# Default values for new customers
features.update({
'recency_days': 0,
'frequency_monthly': 0,
'avg_data_usage': 0,
'avg_call_minutes': 0,
'usage_trend': 0,
'usage_std': 0
})
return pd.DataFrame([features])
def predict(self, customer_data, transaction_data):
"""Predict churn probability"""
features = self.compute_features(customer_data, transaction_data)
churn_prob = self.pipeline.predict_proba(features)[0, 1]
return churn_prob
def predict_batch(self, customers_df, transactions_df):
"""Predict for multiple customers"""
predictions = []
for cust_id in customers_df['customer_id']:
cust_data = customers_df[customers_df['customer_id'] == cust_id].iloc[0]
cust_trans = transactions_df[transactions_df['customer_id'] == cust_id]
churn_prob = self.predict(cust_data, cust_trans)
predictions.append({
'customer_id': cust_id,
'churn_probability': churn_prob,
'risk_level': 'High' if churn_prob > 0.7 else ('Medium' if churn_prob > 0.4 else 'Low')
})
return pd.DataFrame(predictions)
# Example usage
predictor = ChurnPredictor('churn_prediction_pipeline.pkl')
# Test on sample customer
sample_customer = customers.iloc[0]
sample_transactions = transactions[transactions['customer_id'] == sample_customer['customer_id']]
churn_prob = predictor.predict(sample_customer, sample_transactions)
print(f"Customer {sample_customer['customer_id']} Churn Probability: {churn_prob:.2%}")
Step 7: Model Monitoring & Drift Detection
from scipy import stats
class ModelMonitor:
"""Monitor feature and prediction drift"""
def __init__(self, reference_data):
self.reference_stats = self._compute_stats(reference_data)
def _compute_stats(self, data):
stats_dict = {}
for col in data.select_dtypes(include=[np.number]).columns:
stats_dict[col] = {
'mean': data[col].mean(),
'std': data[col].std(),
'min': data[col].min(),
'max': data[col].max()
}
return stats_dict
def detect_drift(self, new_data, threshold=0.05):
"""Detect feature drift using KS test"""
drift_report = {}
for col in self.reference_stats.keys():
if col not in new_data.columns:
continue
# Kolmogorov-Smirnov test
ks_stat, p_value = stats.ks_2samp(
reference_data[col],
new_data[col]
)
drift_report[col] = {
'drift_detected': p_value < threshold,
'p_value': p_value,
'ks_statistic': ks_stat
}
return drift_report
def summary(self, new_data):
"""Generate monitoring summary"""
print("š Model Monitoring Summary")
print("=" * 50)
drift = self.detect_drift(new_data)
drifted_features = [k for k, v in drift.items() if v['drift_detected']]
if drifted_features:
print(f"ā ļø Drift detected in {len(drifted_features)} features:")
for feat in drifted_features:
print(f" - {feat}: p-value = {drift[feat]['p_value']:.4f}")
else:
print("ā
No significant drift detected")
return drift
# Initialize monitor with training data
reference_data = X_train_original # Use your training features
monitor = ModelMonitor(reference_data)
# Check drift on new data
drift_report = monitor.summary(X_test)
šÆ Challenge Exercises
Exercise 1: Advanced Featuretools
Create custom aggregation primitives for domain-specific features (e.g., "AvgWeekendUsage", "PeakHourCallRatio").
Exercise 2: Real-Time API
Build a Flask/FastAPI endpoint that accepts customer data and returns churn prediction in real-time.
Exercise 3: A/B Testing Framework
Implement feature versioning to compare model performance with different feature sets.
Exercise 4: Explainability Dashboard
Create SHAP visualizations showing which features drive churn predictions for individual customers.
š Project Summary
What You've Built
- ā Automated feature engineering with Featuretools
- ā Sophisticated time-based behavioral features
- ā PCA dimensionality reduction integrated in pipeline
- ā Production-ready sklearn pipeline with versioning
- ā Real-time feature computation class
- ā Model monitoring and drift detection system
- ā Batch prediction capabilities
š” Key Takeaways
- Automated feature engineering accelerates development
- Time-based features are crucial for churn prediction
- Always build pipelines to ensure reproducibility
- Monitor feature distributions for drift in production
- Real-time feature computation requires careful design
š Deployment Checklist
š Advanced Project Complete!
You've mastered production-level feature engineering!