πŸš€ HANDS-ON PROJECT
Home β†’ AI Agents β†’ Business Process Automation Project

πŸ€– Build a Business Process Automation Agent

Create an intelligent agent that automates email triage, calendar management, CRM updates, and task orchestration

🎯 Advanced ⏱️ 2-3 hours πŸ’» Python + LangChain + APIs πŸ”§ Production-Ready Project

🎯 Project Overview

Business professionals spend 28% of their workweek on email, 23% on searching for information, and countless hours on repetitive tasks. What if an AI agent could handle this automatically? In this project, you'll build an enterprise-grade automation agent that saves 10-15 hours per week!

Real-World Impact

πŸ“Š Time Savings: 10-15 hours/week per employee
πŸ’° ROI: $50,000-$100,000 annually for a 10-person team
⚑ Response Time: 90% faster email triage and responses
πŸ“ˆ Accuracy: 95%+ correct classification and routing

What You'll Automate

πŸ“§ Email Management

  • Classify incoming emails by urgency and category
  • Draft responses to common inquiries
  • Escalate urgent issues to humans
  • Archive, label, and organize automatically

πŸ“… Calendar Intelligence

  • Schedule meetings by analyzing email requests
  • Find optimal meeting times across participants
  • Send calendar invites and confirmations
  • Handle rescheduling and cancellations

πŸ’Ό CRM Automation

  • Update contact records from email interactions
  • Log calls, meetings, and conversations
  • Create follow-up tasks automatically
  • Generate activity reports

βœ… Task Orchestration

  • Create tasks from emails and Slack messages
  • Assign priorities based on context
  • Send reminders for deadlines
  • Update task status across platforms

πŸš€ Enterprise Value: This is exactly what companies like Salesforce Einstein, Microsoft Copilot, and startups like Adept are building. You'll create a production-ready version!

System Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚            Input Sources (Triggers)                 β”‚
β”‚  πŸ“§ Email (Gmail/Outlook) | πŸ’¬ Slack | πŸ“ž Calendar  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                   β”‚
                   β–Ό
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        β”‚  Business Agent      β”‚
        β”‚  (LangChain + GPT-4) β”‚
        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                   β”‚
          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”
          β”‚        β”‚        β”‚
          β–Ό        β–Ό        β–Ό
      β”Œβ”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”
      β”‚Emailβ”‚  β”‚Cal  β”‚  β”‚CRM  β”‚  ← Action Tools
      β”‚Tool β”‚  β”‚Tool β”‚  β”‚Tool β”‚
      β””β”€β”€β”¬β”€β”€β”˜  β””β”€β”€β”¬β”€β”€β”˜  β””β”€β”€β”¬β”€β”€β”˜
          β”‚        β”‚        β”‚
          β””β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”˜
                   β”‚
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        β”‚  External Services   β”‚
        β”‚  Gmail | GCal | Hub  β”‚
        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                   β”‚
                   β–Ό
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        β”‚  Execution Log       β”‚
        β”‚  (MongoDB/PostgreSQL)β”‚
        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    

πŸ› οΈ Setup & Dependencies

1 Install Dependencies

# Core AI framework
pip install langchain langchain-openai langchain-community
pip install openai python-dotenv

# Email integration
pip install google-auth google-auth-oauthlib google-auth-httplib2
pip install google-api-python-client

# Calendar integration
pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib

# CRM (HubSpot example)
pip install hubspot-api-client

# Database
pip install pymongo  # or psycopg2 for PostgreSQL

# Task management
pip install todoist-api-python  # or asana, trello, etc.

# Utilities
pip install pydantic python-dateutil pytz

2 Configure API Keys

Create .env file:

# AI Model
OPENAI_API_KEY=your_openai_key_here

# Google Workspace (Gmail + Calendar)
GOOGLE_CLIENT_ID=your_google_client_id
GOOGLE_CLIENT_SECRET=your_google_client_secret
GOOGLE_REFRESH_TOKEN=your_refresh_token

# CRM (HubSpot example)
HUBSPOT_API_KEY=your_hubspot_key

# Task Management (Todoist example)
TODOIST_API_TOKEN=your_todoist_token

# Database
MONGODB_URI=mongodb://localhost:27017/business_agent

πŸ’‘ Getting API Credentials:

  • Google: Google Cloud Console β†’ Enable Gmail & Calendar APIs
  • HubSpot: Settings β†’ Integrations β†’ API Key
  • Todoist: Settings β†’ Integrations β†’ API token

3 Project Structure

business-automation-agent/
β”œβ”€β”€ .env                        # API keys
β”œβ”€β”€ agent.py                    # Main agent
β”œβ”€β”€ tools/
β”‚   β”œβ”€β”€ email_tools.py          # Gmail integration
β”‚   β”œβ”€β”€ calendar_tools.py       # Google Calendar
β”‚   β”œβ”€β”€ crm_tools.py            # HubSpot/Salesforce
β”‚   └── task_tools.py           # Todoist/Asana
β”œβ”€β”€ workflows/
β”‚   β”œβ”€β”€ email_workflow.py       # Email automation
β”‚   β”œβ”€β”€ calendar_workflow.py    # Meeting scheduling
β”‚   └── crm_workflow.py         # CRM updates
β”œβ”€β”€ database/
β”‚   └── models.py               # Data models
β”œβ”€β”€ logs/                       # Execution logs
└── requirements.txt

πŸ’» Building the Automation Agent

Step 1: Email Tools

tools/email_tools.py
"""
Gmail integration tools
"""
import os
import base64
from email.mime.text import MIMEText
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from langchain.tools import Tool
from typing import List, Dict

class EmailTools:
    """Tools for Gmail automation"""
    
    def __init__(self):
        # Initialize Gmail API
        creds = Credentials(
            token=None,
            refresh_token=os.getenv("GOOGLE_REFRESH_TOKEN"),
            token_uri="https://oauth2.googleapis.com/token",
            client_id=os.getenv("GOOGLE_CLIENT_ID"),
            client_secret=os.getenv("GOOGLE_CLIENT_SECRET")
        )
        
        self.service = build('gmail', 'v1', credentials=creds)
    
    def read_unread_emails(self, max_results: int = 10) -> str:
        """Read unread emails from inbox"""
        try:
            # Get unread emails
            results = self.service.users().messages().list(
                userId='me',
                q='is:unread',
                maxResults=max_results
            ).execute()
            
            messages = results.get('messages', [])
            
            if not messages:
                return "No unread emails found."
            
            emails_summary = []
            for msg in messages:
                # Get full message
                message = self.service.users().messages().get(
                    userId='me',
                    id=msg['id'],
                    format='full'
                ).execute()
                
                # Extract details
                headers = message['payload']['headers']
                subject = next((h['value'] for h in headers if h['name'] == 'Subject'), 'No subject')
                from_email = next((h['value'] for h in headers if h['name'] == 'From'), 'Unknown')
                date = next((h['value'] for h in headers if h['name'] == 'Date'), 'Unknown')
                
                # Get body
                body = self._get_email_body(message)
                
                emails_summary.append({
                    'id': msg['id'],
                    'subject': subject,
                    'from': from_email,
                    'date': date,
                    'snippet': body[:200]  # First 200 chars
                })
            
            # Format for agent
            result = f"Found {len(emails_summary)} unread emails:\n\n"
            for i, email in enumerate(emails_summary, 1):
                result += f"{i}. From: {email['from']}\n"
                result += f"   Subject: {email['subject']}\n"
                result += f"   Date: {email['date']}\n"
                result += f"   Preview: {email['snippet']}...\n"
                result += f"   ID: {email['id']}\n\n"
            
            return result
            
        except Exception as e:
            return f"Error reading emails: {str(e)}"
    
    def _get_email_body(self, message: dict) -> str:
        """Extract email body from message"""
        try:
            if 'parts' in message['payload']:
                # Multipart email
                for part in message['payload']['parts']:
                    if part['mimeType'] == 'text/plain':
                        data = part['body'].get('data', '')
                        if data:
                            return base64.urlsafe_b64decode(data).decode('utf-8')
            else:
                # Simple email
                data = message['payload']['body'].get('data', '')
                if data:
                    return base64.urlsafe_b64decode(data).decode('utf-8')
            
            return message.get('snippet', '')
        except:
            return message.get('snippet', '')
    
    def send_email(self, to: str, subject: str, body: str) -> str:
        """Send an email"""
        try:
            message = MIMEText(body)
            message['to'] = to
            message['subject'] = subject
            
            raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
            
            self.service.users().messages().send(
                userId='me',
                body={'raw': raw}
            ).execute()
            
            return f"Email sent successfully to {to}"
            
        except Exception as e:
            return f"Error sending email: {str(e)}"
    
    def label_email(self, email_id: str, label: str) -> str:
        """Add label to email"""
        try:
            # Get or create label
            labels = self.service.users().labels().list(userId='me').execute()
            label_id = None
            
            for lbl in labels.get('labels', []):
                if lbl['name'].lower() == label.lower():
                    label_id = lbl['id']
                    break
            
            if not label_id:
                # Create label
                label_obj = self.service.users().labels().create(
                    userId='me',
                    body={'name': label}
                ).execute()
                label_id = label_obj['id']
            
            # Add label to email
            self.service.users().messages().modify(
                userId='me',
                id=email_id,
                body={'addLabelIds': [label_id]}
            ).execute()
            
            return f"Added label '{label}' to email {email_id}"
            
        except Exception as e:
            return f"Error labeling email: {str(e)}"
    
    def mark_as_read(self, email_id: str) -> str:
        """Mark email as read"""
        try:
            self.service.users().messages().modify(
                userId='me',
                id=email_id,
                body={'removeLabelIds': ['UNREAD']}
            ).execute()
            
            return f"Marked email {email_id} as read"
            
        except Exception as e:
            return f"Error marking email as read: {str(e)}"
    
    def get_tools(self) -> List[Tool]:
        """Get email tools for agent"""
        return [
            Tool(
                name="Read Unread Emails",
                func=self.read_unread_emails,
                description="Read unread emails from inbox. Returns list of emails with subject, sender, and preview."
            ),
            Tool(
                name="Send Email",
                func=lambda x: self.send_email(
                    to=x.split('|')[0],
                    subject=x.split('|')[1],
                    body=x.split('|')[2]
                ),
                description="Send an email. Input format: 'to|subject|body'"
            ),
            Tool(
                name="Label Email",
                func=lambda x: self.label_email(
                    email_id=x.split('|')[0],
                    label=x.split('|')[1]
                ),
                description="Add label to email. Input format: 'email_id|label_name'"
            ),
            Tool(
                name="Mark Email Read",
                func=self.mark_as_read,
                description="Mark an email as read. Input: email_id"
            )
        ]

Step 2: Calendar Tools

tools/calendar_tools.py
"""
Google Calendar integration tools
"""
import os
from datetime import datetime, timedelta
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from langchain.tools import Tool
from typing import List
import pytz

class CalendarTools:
    """Tools for Google Calendar automation"""
    
    def __init__(self):
        # Initialize Calendar API
        creds = Credentials(
            token=None,
            refresh_token=os.getenv("GOOGLE_REFRESH_TOKEN"),
            token_uri="https://oauth2.googleapis.com/token",
            client_id=os.getenv("GOOGLE_CLIENT_ID"),
            client_secret=os.getenv("GOOGLE_CLIENT_SECRET")
        )
        
        self.service = build('calendar', 'v3', credentials=creds)
        self.timezone = pytz.timezone('America/New_York')  # Adjust as needed
    
    def check_availability(self, date_str: str, start_time: str, duration_minutes: int = 60) -> str:
        """Check if a time slot is available"""
        try:
            # Parse date and time
            date_obj = datetime.strptime(date_str, '%Y-%m-%d')
            time_obj = datetime.strptime(start_time, '%H:%M').time()
            
            start_datetime = datetime.combine(date_obj, time_obj)
            start_datetime = self.timezone.localize(start_datetime)
            
            end_datetime = start_datetime + timedelta(minutes=duration_minutes)
            
            # Check for conflicts
            events_result = self.service.events().list(
                calendarId='primary',
                timeMin=start_datetime.isoformat(),
                timeMax=end_datetime.isoformat(),
                singleEvents=True,
                orderBy='startTime'
            ).execute()
            
            events = events_result.get('items', [])
            
            if events:
                conflicts = []
                for event in events:
                    conflicts.append(f"- {event.get('summary', 'Busy')} at {event['start'].get('dateTime', '')}")
                
                return f"Time slot NOT available. Conflicts:\n" + "\n".join(conflicts)
            else:
                return f"Time slot AVAILABLE: {date_str} at {start_time} for {duration_minutes} minutes"
            
        except Exception as e:
            return f"Error checking availability: {str(e)}"
    
    def create_meeting(self, summary: str, date_str: str, start_time: str, 
                      duration_minutes: int, attendees: str) -> str:
        """Create a calendar meeting"""
        try:
            # Parse datetime
            date_obj = datetime.strptime(date_str, '%Y-%m-%d')
            time_obj = datetime.strptime(start_time, '%H:%M').time()
            
            start_datetime = datetime.combine(date_obj, time_obj)
            start_datetime = self.timezone.localize(start_datetime)
            
            end_datetime = start_datetime + timedelta(minutes=duration_minutes)
            
            # Parse attendees
            attendee_list = [{'email': email.strip()} for email in attendees.split(',')]
            
            # Create event
            event = {
                'summary': summary,
                'start': {
                    'dateTime': start_datetime.isoformat(),
                    'timeZone': str(self.timezone)
                },
                'end': {
                    'dateTime': end_datetime.isoformat(),
                    'timeZone': str(self.timezone)
                },
                'attendees': attendee_list,
                'reminders': {
                    'useDefault': False,
                    'overrides': [
                        {'method': 'email', 'minutes': 24 * 60},
                        {'method': 'popup', 'minutes': 30}
                    ]
                }
            }
            
            created_event = self.service.events().insert(
                calendarId='primary',
                body=event,
                sendUpdates='all'
            ).execute()
            
            return f"Meeting created: {summary} on {date_str} at {start_time}. Link: {created_event.get('htmlLink')}"
            
        except Exception as e:
            return f"Error creating meeting: {str(e)}"
    
    def find_free_slots(self, date_str: str, num_slots: int = 3) -> str:
        """Find available time slots on a given day"""
        try:
            date_obj = datetime.strptime(date_str, '%Y-%m-%d')
            
            # Define business hours (9 AM - 5 PM)
            start_of_day = datetime.combine(date_obj, datetime.min.time().replace(hour=9))
            start_of_day = self.timezone.localize(start_of_day)
            
            end_of_day = datetime.combine(date_obj, datetime.min.time().replace(hour=17))
            end_of_day = self.timezone.localize(end_of_day)
            
            # Get existing events
            events_result = self.service.events().list(
                calendarId='primary',
                timeMin=start_of_day.isoformat(),
                timeMax=end_of_day.isoformat(),
                singleEvents=True,
                orderBy='startTime'
            ).execute()
            
            events = events_result.get('items', [])
            
            # Find gaps
            free_slots = []
            current_time = start_of_day
            
            for event in events:
                event_start = datetime.fromisoformat(event['start'].get('dateTime'))
                
                # If there's a gap before this event
                if (event_start - current_time).total_seconds() >= 3600:  # At least 1 hour
                    free_slots.append({
                        'start': current_time.strftime('%H:%M'),
                        'end': event_start.strftime('%H:%M')
                    })
                
                event_end = datetime.fromisoformat(event['end'].get('dateTime'))
                current_time = max(current_time, event_end)
            
            # Check if there's time after last event
            if (end_of_day - current_time).total_seconds() >= 3600:
                free_slots.append({
                    'start': current_time.strftime('%H:%M'),
                    'end': end_of_day.strftime('%H:%M')
                })
            
            if not free_slots:
                return f"No free slots available on {date_str}"
            
            # Return top N slots
            result = f"Available time slots on {date_str}:\n"
            for i, slot in enumerate(free_slots[:num_slots], 1):
                result += f"{i}. {slot['start']} - {slot['end']}\n"
            
            return result
            
        except Exception as e:
            return f"Error finding free slots: {str(e)}"
    
    def get_tools(self) -> List[Tool]:
        """Get calendar tools for agent"""
        return [
            Tool(
                name="Check Availability",
                func=lambda x: self.check_availability(
                    date_str=x.split('|')[0],
                    start_time=x.split('|')[1],
                    duration_minutes=int(x.split('|')[2]) if len(x.split('|')) > 2 else 60
                ),
                description="Check if a time slot is available. Input: 'YYYY-MM-DD|HH:MM|duration_minutes'"
            ),
            Tool(
                name="Create Meeting",
                func=lambda x: self.create_meeting(
                    summary=x.split('|')[0],
                    date_str=x.split('|')[1],
                    start_time=x.split('|')[2],
                    duration_minutes=int(x.split('|')[3]),
                    attendees=x.split('|')[4]
                ),
                description="Create a calendar meeting. Input: 'summary|YYYY-MM-DD|HH:MM|duration|email1,email2'"
            ),
            Tool(
                name="Find Free Slots",
                func=self.find_free_slots,
                description="Find available time slots on a date. Input: 'YYYY-MM-DD'"
            )
        ]

Step 3: CRM Tools

tools/crm_tools.py
"""
CRM integration tools (HubSpot example)
"""
import os
from hubspot import HubSpot
from langchain.tools import Tool
from typing import List
from datetime import datetime

class CRMTools:
    """Tools for CRM automation"""
    
    def __init__(self):
        # Initialize HubSpot client
        self.client = HubSpot(access_token=os.getenv("HUBSPOT_API_KEY"))
    
    def search_contact(self, email: str) -> str:
        """Search for a contact by email"""
        try:
            # Search contacts
            response = self.client.crm.contacts.search_api.do_search(
                public_object_search_request={
                    "filterGroups": [{
                        "filters": [{
                            "propertyName": "email",
                            "operator": "EQ",
                            "value": email
                        }]
                    }]
                }
            )
            
            if response.results:
                contact = response.results[0]
                props = contact.properties
                
                return f"""Contact found:
- Name: {props.get('firstname', '')} {props.get('lastname', '')}
- Email: {props.get('email', '')}
- Company: {props.get('company', 'N/A')}
- Phone: {props.get('phone', 'N/A')}
- Last Contact: {props.get('lastmodifieddate', 'N/A')}
- Contact ID: {contact.id}"""
            else:
                return f"No contact found with email: {email}"
                
        except Exception as e:
            return f"Error searching contact: {str(e)}"
    
    def create_contact(self, email: str, firstname: str, lastname: str, company: str = "") -> str:
        """Create a new contact"""
        try:
            contact_data = {
                "properties": {
                    "email": email,
                    "firstname": firstname,
                    "lastname": lastname,
                    "company": company
                }
            }
            
            contact = self.client.crm.contacts.basic_api.create(
                simple_public_object_input_for_create=contact_data
            )
            
            return f"Contact created: {firstname} {lastname} ({email}). ID: {contact.id}"
            
        except Exception as e:
            return f"Error creating contact: {str(e)}"
    
    def log_activity(self, contact_id: str, activity_type: str, notes: str) -> str:
        """Log an activity/interaction"""
        try:
            # Create note/engagement
            engagement_data = {
                "properties": {
                    "hs_timestamp": datetime.now().isoformat(),
                    "hs_note_body": f"[{activity_type}] {notes}"
                }
            }
            
            # For simplicity, creating as a note
            # In production, use appropriate engagement type
            note = self.client.crm.objects.notes.basic_api.create(
                simple_public_object_input_for_create=engagement_data
            )
            
            # Associate with contact
            self.client.crm.objects.notes.associations_api.create(
                note_id=note.id,
                to_object_type="contacts",
                to_object_id=contact_id,
                association_type="note_to_contact"
            )
            
            return f"Activity logged for contact {contact_id}: {activity_type}"
            
        except Exception as e:
            return f"Error logging activity: {str(e)}"
    
    def create_task(self, subject: str, due_date: str, contact_id: str = None) -> str:
        """Create a follow-up task"""
        try:
            task_data = {
                "properties": {
                    "hs_task_subject": subject,
                    "hs_task_status": "NOT_STARTED",
                    "hs_timestamp": datetime.now().isoformat(),
                    "hs_task_priority": "MEDIUM"
                }
            }
            
            if due_date:
                task_data["properties"]["hs_task_due_date"] = due_date
            
            task = self.client.crm.objects.tasks.basic_api.create(
                simple_public_object_input_for_create=task_data
            )
            
            # Associate with contact if provided
            if contact_id:
                self.client.crm.objects.tasks.associations_api.create(
                    task_id=task.id,
                    to_object_type="contacts",
                    to_object_id=contact_id,
                    association_type="task_to_contact"
                )
            
            return f"Task created: {subject}. Due: {due_date}. Task ID: {task.id}"
            
        except Exception as e:
            return f"Error creating task: {str(e)}"
    
    def get_tools(self) -> List[Tool]:
        """Get CRM tools for agent"""
        return [
            Tool(
                name="Search Contact",
                func=self.search_contact,
                description="Search for a contact by email address. Returns contact details."
            ),
            Tool(
                name="Create Contact",
                func=lambda x: self.create_contact(
                    email=x.split('|')[0],
                    firstname=x.split('|')[1],
                    lastname=x.split('|')[2],
                    company=x.split('|')[3] if len(x.split('|')) > 3 else ""
                ),
                description="Create new contact. Input: 'email|firstname|lastname|company'"
            ),
            Tool(
                name="Log Activity",
                func=lambda x: self.log_activity(
                    contact_id=x.split('|')[0],
                    activity_type=x.split('|')[1],
                    notes=x.split('|')[2]
                ),
                description="Log activity for contact. Input: 'contact_id|type|notes'"
            ),
            Tool(
                name="Create Task",
                func=lambda x: self.create_task(
                    subject=x.split('|')[0],
                    due_date=x.split('|')[1],
                    contact_id=x.split('|')[2] if len(x.split('|')) > 2 else None
                ),
                description="Create follow-up task. Input: 'subject|YYYY-MM-DD|contact_id'"
            )
        ]

Step 4: Build the Main Agent

agent.py
"""
Business Process Automation Agent
"""
import os
from dotenv import load_dotenv
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.memory import ConversationBufferMemory

from tools.email_tools import EmailTools
from tools.calendar_tools import CalendarTools
from tools.crm_tools import CRMTools

load_dotenv()

class BusinessAutomationAgent:
    """AI agent for business process automation"""
    
    def __init__(self, model: str = "gpt-4-turbo-preview"):
        self.model = model
        
        # Initialize LLM
        self.llm = ChatOpenAI(
            model=model,
            temperature=0.3,
            openai_api_key=os.getenv("OPENAI_API_KEY")
        )
        
        # Initialize tools
        self.email_tools = EmailTools()
        self.calendar_tools = CalendarTools()
        self.crm_tools = CRMTools()
        
        # Combine all tools
        self.tools = (
            self.email_tools.get_tools() +
            self.calendar_tools.get_tools() +
            self.crm_tools.get_tools()
        )
        
        # Create memory
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )
        
        # Create agent
        self.agent_executor = self._create_agent()
    
    def _create_agent(self) -> AgentExecutor:
        """Create the automation agent"""
        
        system_prompt = """You are an expert business automation assistant. Your role is to help professionals automate their workflows including:

**Email Management:**
- Read and classify emails by urgency and category
- Draft responses to common inquiries
- Label and organize emails
- Escalate urgent matters

**Calendar Management:**
- Check availability and find free time slots
- Schedule meetings based on email requests
- Send meeting invites
- Handle rescheduling

**CRM Updates:**
- Search and update contact information
- Log activities and interactions
- Create follow-up tasks
- Maintain accurate records

**Decision Making:**
- Always check for conflicts before scheduling
- Prioritize urgent emails (keywords: urgent, asap, important, deadline)
- Confirm actions before executing (when appropriate)
- Provide clear summaries of actions taken

**Communication Style:**
- Be professional and concise
- Explain your reasoning
- Ask for clarification when needed
- Confirm successful completion

When processing requests:
1. Understand the user's intent
2. Gather necessary information using available tools
3. Execute the appropriate actions
4. Confirm completion with summary
"""
        
        prompt = ChatPromptTemplate.from_messages([
            ("system", system_prompt),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad")
        ])
        
        agent = create_openai_tools_agent(
            llm=self.llm,
            tools=self.tools,
            prompt=prompt
        )
        
        return AgentExecutor(
            agent=agent,
            tools=self.tools,
            memory=self.memory,
            verbose=True,
            max_iterations=15,
            handle_parsing_errors=True
        )
    
    def process(self, request: str) -> dict:
        """Process a business automation request"""
        
        print(f"\nπŸ€– Processing: {request}\n")
        print("=" * 60)
        
        try:
            result = self.agent_executor.invoke({"input": request})
            
            print("\n" + "=" * 60)
            print("βœ… Request completed!")
            
            return {
                "success": True,
                "request": request,
                "output": result["output"]
            }
            
        except Exception as e:
            print(f"\n❌ Error: {str(e)}")
            return {
                "success": False,
                "request": request,
                "error": str(e)
            }

# Example usage
if __name__ == "__main__":
    agent = BusinessAutomationAgent()
    
    # Example automation requests
    examples = [
        "Check my unread emails and label any urgent ones as 'Urgent'",
        
        "I received an email from john@example.com asking to schedule a meeting next Tuesday. Find available slots and create a 1-hour meeting at the earliest available time.",
        
        "Search for the contact with email sarah@company.com in the CRM and log that I had a call with them today discussing the Q1 contract.",
        
        "Create a follow-up task to send proposal to the contact I just logged the call with, due in 3 days."
    ]
    
    # Process first example
    result = agent.process(examples[0])
    
    if result["success"]:
        print("\n" + "=" * 60)
        print("RESULT")
        print("=" * 60)
        print(result["output"])

βœ… Checkpoint: Test the Agent

Test with a simple workflow:

python agent.py

The agent should:

  • Connect to your Gmail, Calendar, and CRM
  • Read unread emails
  • Classify and label them
  • Schedule meetings when requested
  • Update CRM records

πŸš€ Advanced Features

1. Automated Email Response

Auto-reply to common inquiries
def auto_respond_to_common_emails(agent):
    """Automatically respond to common email types"""
    
    # Define response templates
    templates = {
        "meeting_request": "Thank you for reaching out. I've checked my calendar and found some available slots. Would any of these work for you?\n\n{slots}\n\nPlease let me know your preference.",
        
        "support_request": "Thank you for contacting support. I've logged your request (Ticket #{ticket_id}) and our team will respond within 24 hours.\n\nIn the meantime, you might find these resources helpful:\n{resources}",
        
        "pricing_inquiry": "Thank you for your interest! I've sent you detailed pricing information to this email. If you have specific questions, feel free to reply or schedule a call: {calendar_link}"
    }
    
    # Process unread emails
    emails = agent.email_tools.read_unread_emails()
    
    # Classify and respond
    classification_prompt = f"""
Classify these emails by type:
{emails}

For each email, determine if it's:
- meeting_request: Someone asking to schedule a meeting
- support_request: Customer support inquiry
- pricing_inquiry: Asking about pricing
- other: Doesn't fit categories

Return as JSON: {{"email_id": "type"}}
"""
    
    # Get classifications from LLM
    # Then auto-respond based on template

2. Smart Meeting Scheduler

Find optimal meeting times
def find_optimal_meeting_time(agent, participants: list, duration: int, preferred_days: list):
    """Find the best meeting time across multiple participants"""
    
    # Check each participant's availability
    availability = {}
    
    for participant in participants:
        # Get their free/busy info
        slots = agent.calendar_tools.find_free_slots(preferred_days[0])
        availability[participant] = slots
    
    # Find common free slots
    common_slots = find_intersection(availability)
    
    # Rank by preference (e.g., avoid early morning/late afternoon)
    ranked_slots = rank_time_preferences(common_slots)
    
    return ranked_slots[0]  # Best option

3. CRM Intelligence

Smart CRM updates from email context
def intelligent_crm_update(agent, email_content: str, sender_email: str):
    """Extract insights from email and update CRM"""
    
    # Search for contact
    contact = agent.crm_tools.search_contact(sender_email)
    
    # Use LLM to extract key information
    extraction_prompt = f"""
Analyze this email and extract:
1. Mentioned products/services
2. Stage of sales process (interest, evaluation, negotiation, etc.)
3. Next steps discussed
4. Any concerns or objections
5. Sentiment (positive, neutral, negative)

Email:
{email_content}

Return as structured JSON.
"""
    
    # Update CRM with insights
    # Create follow-up tasks
    # Set reminders

4. Workflow Automation

Chain multiple automations
class WorkflowAutomation:
    """Chain multiple automation steps"""
    
    def __init__(self, agent):
        self.agent = agent
    
    def new_customer_onboarding(self, customer_email: str, customer_name: str):
        """Automate new customer onboarding"""
        
        steps = [
            # 1. Create CRM contact
            f"Create contact: {customer_email}|{customer_name.split()[0]}|{customer_name.split()[1]}|",
            
            # 2. Send welcome email
            f"Send welcome email with onboarding guide",
            
            # 3. Schedule kickoff call
            f"Find free slots next week and create 30-min kickoff meeting",
            
            # 4. Create onboarding tasks
            f"Create tasks: Send product guide, Setup account, Training session",
            
            # 5. Log in CRM
            f"Log activity: Customer onboarding started"
        ]
        
        results = []
        for step in steps:
            result = self.agent.process(step)
            results.append(result)
            
            if not result["success"]:
                print(f"Step failed: {step}")
                break
        
        return results
    
    def deal_won_workflow(self, customer_email: str):
        """Automate post-sale workflow"""
        steps = [
            "Send congratulations email",
            "Schedule implementation kickoff",
            "Create implementation tasks",
            "Notify account management team",
            "Update CRM deal stage to 'Won'"
        ]
        # Execute workflow...
    
    def support_ticket_workflow(self, ticket_email: str):
        """Automate support ticket handling"""
        steps = [
            "Classify ticket urgency and category",
            "Search knowledge base for solutions",
            "Draft response or escalate if needed",
            "Log in support system",
            "Set follow-up reminder"
        ]
        # Execute workflow...

πŸš€ Production Deployment

1. Run as Background Service

# scheduler.py
from apscheduler.schedulers.background import BackgroundScheduler
from agent import BusinessAutomationAgent

def process_emails():
    """Run email processing every 15 minutes"""
    agent = BusinessAutomationAgent()
    agent.process("Check and process my unread emails")

def daily_summary():
    """Send daily summary"""
    agent = BusinessAutomationAgent()
    agent.process("Summarize today's activities and send me a report")

# Create scheduler
scheduler = BackgroundScheduler()
scheduler.add_job(process_emails, 'interval', minutes=15)
scheduler.add_job(daily_summary, 'cron', hour=17, minute=0)  # 5 PM daily

scheduler.start()

# Keep running
try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()

2. Add Monitoring & Logging

import logging
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'logs/agent_{datetime.now().strftime("%Y%m%d")}.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger("BusinessAgent")

# Log all actions
def log_action(action_type: str, details: dict):
    logger.info(f"Action: {action_type} | Details: {details}")

# Track metrics
metrics = {
    "emails_processed": 0,
    "meetings_scheduled": 0,
    "crm_updates": 0,
    "errors": 0
}

3. Error Handling & Retries

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
def robust_agent_process(agent, request):
    """Process with automatic retries"""
    try:
        result = agent.process(request)
        if not result["success"]:
            raise Exception(result.get("error"))
        return result
    except Exception as e:
        logger.error(f"Agent process failed: {e}")
        raise

⚠️ Security Considerations:

  • OAuth Security: Store refresh tokens securely (environment variables, secret managers)
  • API Rate Limits: Respect Gmail (250 emails/day), Calendar, and CRM limits
  • Data Privacy: Don't log sensitive email content or customer data
  • Access Control: Limit agent permissions to minimum necessary
  • Audit Trail: Log all actions for compliance

πŸ’° Calculate Your ROI

"""
Calculate time and cost savings
"""
class ROICalculator:
    def __init__(self, 
                 hourly_rate: float = 50,
                 employees: int = 10):
        self.hourly_rate = hourly_rate
        self.employees = employees
    
    def calculate_savings(self):
        # Time saved per week per employee
        time_saved = {
            "email_processing": 5,      # hours
            "calendar_management": 2,
            "crm_updates": 3,
            "follow_ups": 2
        }
        
        total_hours_saved = sum(time_saved.values())
        
        # Annual calculations
        weeks_per_year = 50  # Accounting for vacation
        annual_hours_saved = total_hours_saved * weeks_per_year * self.employees
        annual_cost_savings = annual_hours_saved * self.hourly_rate
        
        return {
            "hours_saved_per_week_per_employee": total_hours_saved,
            "annual_hours_saved_total": annual_hours_saved,
            "annual_cost_savings": f"${annual_cost_savings:,.2f}",
            "roi_multiple": annual_cost_savings / 5000  # Assuming $5k setup cost
        }

# Example
calc = ROICalculator(hourly_rate=75, employees=10)
print(calc.calculate_savings())

# Output:
# {
#   "hours_saved_per_week_per_employee": 12,
#   "annual_hours_saved_total": 6000,
#   "annual_cost_savings": "$450,000.00",
#   "roi_multiple": 90  # 90x ROI!
# }

🎯 Key Takeaways

  • Integration is Key: Connect to real business systems (Gmail, Calendar, CRM)
  • Context Matters: Use memory to maintain conversation context
  • Error Handling: Production systems need robust error handling and retries
  • Incremental Automation: Start with simple tasks, expand gradually
  • Human Oversight: Critical decisions should have human confirmation
  • Massive ROI: 10-15 hours saved per week = $50k-$450k annually

πŸ“š Next Steps

πŸ’‘ Real-World Applications:

  • Sales Teams: Automate lead qualification, follow-ups, CRM updates
  • Customer Success: Handle onboarding, check-ins, renewal reminders
  • Recruiting: Screen resumes, schedule interviews, send updates
  • Operations: Process invoices, expense reports, approvals

πŸŽ‰ Congratulations!

You've built an enterprise-grade business automation agent that can save 10-15 hours per week and generate massive ROI!

← Back to AI Agents Course