📜 Free Certificate Upon Completion - Earn a verified certificate when you complete all 7 modules in the AI Agents course.

Agent Tools & Actions

📚 Tutorial 3 🟡 Intermediate

Equip agents with capabilities: build tools, integrations, and actions to interact with the world

🎓 Complete all tutorials to earn your Free AI Agents Certificate
Shareable on LinkedIn • Verified by AITutorials.site • No signup fee

What Makes Agents Powerful?

An agent that can only think is limited. A truly powerful agent can do things. It can:

  • Query databases and retrieve information
  • Call APIs and integrate with external services
  • Execute code and run calculations
  • Read and write files
  • Send emails, messages, and notifications
  • Control systems and devices

These capabilities are implemented through tools and actions. This module teaches you how to give agents superpowers.

Key Insight: The set of tools available to an agent directly determines what it can accomplish. A well-designed tool set is as important as a good reasoning strategy.

Types of Agent Tools

Different tools enable different capabilities:

🌐 API Tools

Call external APIs: weather services, stock data, translation APIs, search engines. Integrates with the broader internet.

💾 Database Tools

Query and update databases. Retrieve customer records, product info, inventory data. Execute SQL or NoSQL queries.

🔍 Search Tools

Web search, semantic search, vector database search. Find relevant information from large knowledge bases.

📊 Computation Tools

Execute code, run calculations, analyze data. Python interpreter, mathematical computations, data transformations.

📧 Communication Tools

Send emails, messages, notifications. Notify humans, trigger workflows, escalate issues.

⚙️ System Tools

Control systems and workflows. Run commands, trigger jobs, manage resources, orchestrate processes.

Building Custom Tools

Most agent applications require custom tools specific to your domain. LangChain makes this straightforward:

Simple Tool Example

from langchain.tools import Tool
from datetime import datetime

# Define a simple tool function
def get_current_time(format: str = "%Y-%m-%d %H:%M:%S") -> str:
    """Get current date and time"""
    return datetime.now().strftime(format)

# Wrap it as a Tool
time_tool = Tool(
    name="GetCurrentTime",
    func=get_current_time,
    description="Returns current date and time. Useful for scheduling and time-based queries.",
    args_schema={
        "type": "object",
        "properties": {
            "format": {
                "type": "string",
                "description": "Time format string (Python strftime format)"
            }
        }
    }
)

# Agent can now use this tool
agent.tools.append(time_tool)

Database Query Tool

from langchain.tools import Tool
import sqlite3

def query_customer_database(customer_id: int) -> str:
    """Look up customer information"""
    conn = sqlite3.connect("customers.db")
    cursor = conn.cursor()
    cursor.execute(
        "SELECT id, name, email, status FROM customers WHERE id = ?",
        (customer_id,)
    )
    result = cursor.fetchone()
    conn.close()
    
    if result:
        return f"Customer {result[1]}: {result[2]} (Status: {result[3]})"
    return "Customer not found"

db_tool = Tool(
    name="QueryCustomerDB",
    func=query_customer_database,
    description="Look up customer information by ID",
    args_schema={
        "type": "object",
        "properties": {
            "customer_id": {
                "type": "integer",
                "description": "The customer ID to look up"
            }
        },
        "required": ["customer_id"]
    }
)

API Integration Tool

from langchain.tools import Tool
import requests

def search_web(query: str, max_results: int = 5) -> str:
    """Search the web for information"""
    # Using a free search API (e.g., DuckDuckGo)
    params = {
        "q": query,
        "format": "json",
        "max_results": max_results
    }
    try:
        response = requests.get(
            "https://api.duckduckgo.com/",
            params=params,
            timeout=10
        )
        results = response.json()
        
        # Format results
        formatted = "Search Results:\n"
        for i, result in enumerate(results.get("Results", [])[:max_results]):
            formatted += f"{i+1}. {result.get('Title')}\n"
            formatted += f"   {result.get('Abstract')}\n"
        return formatted
    except Exception as e:
        return f"Search failed: {str(e)}"

search_tool = Tool(
    name="WebSearch",
    func=search_web,
    description="Search the web for current information",
    args_schema={
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "What to search for"
            },
            "max_results": {
                "type": "integer",
                "description": "Maximum number of results (default 5)"
            }
        },
        "required": ["query"]
    }
)
Tool Best Practices:
  • Write clear descriptions agents can understand
  • Define schemas so agents know what parameters to pass
  • Handle errors gracefully and return meaningful messages
  • Keep tools focused (do one thing well)

Composing Tool Chains

Often, complex actions require chaining tools together:

Example: Complete Customer Support Workflow

from langchain.tools import Tool
from langchain.agents import initialize_agent

# Tool 1: Find customer
lookup_customer = Tool(name="LookupCustomer", func=lookup_fn, ...)

# Tool 2: Check order history
check_orders = Tool(name="CheckOrders", func=check_fn, ...)

# Tool 3: Process refund
process_refund = Tool(name="ProcessRefund", func=refund_fn, ...)

# Tool 4: Send confirmation email
send_email = Tool(name="SendEmail", func=email_fn, ...)

# Agent can orchestrate these
tools = [lookup_customer, check_orders, process_refund, send_email]
agent = initialize_agent(tools, llm, agent_type="react")

# Now agent can handle complex requests:
# "A customer wants a refund. Look them up, check their orders,
#  process the refund, and send confirmation."
# Agent automatically chains the tools in the right order!

The agent decides which tools to use and in what order to accomplish the goal. This is the power of agentic workflows.

Tool Safety & Constraints

As agents become more powerful, safety becomes critical. Tools should have safeguards:

Rate Limiting

from functools import wraps
from datetime import datetime, timedelta

def rate_limit(max_calls: int, time_window: int):
    """Limit tool calls to max_calls per time_window seconds"""
    def decorator(func):
        calls = []
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            now = datetime.now()
            # Remove old calls outside time window
            calls[:] = [t for t in calls if now - t < timedelta(seconds=time_window)]
            
            if len(calls) >= max_calls:
                raise Exception(f"Rate limit exceeded: {max_calls} calls per {time_window}s")
            
            calls.append(now)
            return func(*args, **kwargs)
        
        return wrapper
    return decorator

@rate_limit(max_calls=10, time_window=60)  # Max 10 calls per minute
def query_expensive_api(query: str) -> str:
    # This tool can only be called 10 times per minute
    pass

Input Validation

def transfer_funds(from_account: str, to_account: str, amount: float) -> str:
    """Transfer funds between accounts"""
    
    # Validate inputs
    if not isinstance(amount, (int, float)):
        return "Error: Amount must be numeric"
    
    if amount <= 0:
        return "Error: Amount must be positive"
    
    if amount > 10000:
        return "Error: Cannot transfer more than $10,000 without approval"
    
    if len(from_account) != 10 or len(to_account) != 10:
        return "Error: Invalid account format"
    
    # Proceed safely
    return do_transfer(from_account, to_account, amount)

Permission Checks

from enum import Enum

class Permission(Enum):
    READONLY = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"

def delete_database_record(table: str, record_id: int, user: str) -> str:
    """Delete a record (requires admin permission)"""
    
    # Check if user has permission
    user_permissions = get_user_permissions(user)
    if Permission.ADMIN not in user_permissions:
        return "Error: Insufficient permissions to delete records"
    
    # Log the action
    audit_log.write(f"User {user} deleted {table}.{record_id}")
    
    # Execute deletion
    return do_delete(table, record_id)
⚠️ Tool Safety Principle: Every tool should validate inputs, check permissions, implement rate limits, log actions, and have clear error messages. Never let an agent have tools without safety constraints.

Advanced Tool Implementations

Production tools need robustness. Here's how to build enterprise-grade tools:

Tool with Retry Logic

Resilient API Tool
from typing import Optional
import time
import requests
from functools import wraps

def with_retry(max_retries=3, backoff_factor=2):
    """Decorator to add retry logic to tool functions"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_retries - 1:
                        wait_time = backoff_factor ** attempt
                        print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
                        time.sleep(wait_time)
            
            raise last_exception
        return wrapper
    return decorator

@with_retry(max_retries=3)
def get_stock_price(symbol: str) -> str:
    """Get current stock price with automatic retries"""
    response = requests.get(
        f"https://api.example.com/stocks/{symbol}",
        timeout=10
    )
    response.raise_for_status()
    data = response.json()
    return f"{symbol}: ${data['price']:.2f}"

# Create the tool
from langchain.tools import Tool

stock_tool = Tool(
    name="GetStockPrice",
    func=get_stock_price,
    description="Get current stock price for a symbol. Automatically retries on failure."
)

Caching Tool for Expensive Operations

Tool with Result Caching
from functools import lru_cache
import hashlib
import json
from datetime import datetime, timedelta
from typing import Dict, Any, Optional

class CachedTool:
    """Base class for tools with caching"""
    
    def __init__(self, cache_ttl_seconds: int = 300):
        self.cache: Dict[str, Dict[str, Any]] = {}
        self.cache_ttl = cache_ttl_seconds
    
    def _cache_key(self, *args, **kwargs) -> str:
        """Generate cache key from arguments"""
        key_data = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True)
        return hashlib.md5(key_data.encode()).hexdigest()
    
    def _get_cached(self, key: str) -> Optional[Any]:
        """Get cached result if still valid"""
        if key in self.cache:
            entry = self.cache[key]
            age = (datetime.now() - entry['timestamp']).total_seconds()
            if age < self.cache_ttl:
                print(f"Cache hit! (age: {age:.1f}s)")
                return entry['result']
            else:
                # Expired
                del self.cache[key]
        return None
    
    def _set_cached(self, key: str, result: Any):
        """Store result in cache"""
        self.cache[key] = {
            'result': result,
            'timestamp': datetime.now()
        }
    
    def __call__(self, *args, **kwargs):
        key = self._cache_key(*args, **kwargs)
        
        # Check cache
        cached = self._get_cached(key)
        if cached is not None:
            return cached
        
        # Execute and cache
        result = self.execute(*args, **kwargs)
        self._set_cached(key, result)
        return result
    
    def execute(self, *args, **kwargs):
        """Override this with actual tool logic"""
        raise NotImplementedError

# Example: Weather tool with 5-minute cache
class WeatherTool(CachedTool):
    def execute(self, city: str) -> str:
        """Get weather for city (expensive API call)"""
        print(f"Calling weather API for {city}...")
        # Simulate expensive API call
        time.sleep(1)
        return f"{city}: 72°F, Sunny"

weather_tool = WeatherTool(cache_ttl_seconds=300)

# First call: hits API
print(weather_tool(city="Boston"))  # "Calling weather API..."

# Second call within 5 minutes: uses cache
print(weather_tool(city="Boston"))  # "Cache hit!"

Tool with Comprehensive Logging

Production-Ready Tool with Logging
import logging
from datetime import datetime
from typing import Any, Callable
import json

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

class LoggedTool:
    """Tool wrapper that logs all calls"""
    
    def __init__(self, name: str, func: Callable, description: str):
        self.name = name
        self.func = func
        self.description = description
        self.logger = logging.getLogger(f"Tool.{name}")
        self.call_count = 0
        self.total_duration = 0
    
    def __call__(self, *args, **kwargs) -> Any:
        self.call_count += 1
        start_time = datetime.now()
        
        # Log call
        self.logger.info(f"Tool called: {self.name}")
        self.logger.debug(f"Arguments: args={args}, kwargs={kwargs}")
        
        try:
            # Execute
            result = self.func(*args, **kwargs)
            
            # Log success
            duration = (datetime.now() - start_time).total_seconds()
            self.total_duration += duration
            self.logger.info(f"Tool succeeded: {self.name} (duration: {duration:.2f}s)")
            self.logger.debug(f"Result: {str(result)[:200]}")
            
            return result
            
        except Exception as e:
            # Log failure
            duration = (datetime.now() - start_time).total_seconds()
            self.logger.error(f"Tool failed: {self.name} (duration: {duration:.2f}s)")
            self.logger.error(f"Error: {str(e)}", exc_info=True)
            raise
    
    def get_stats(self) -> Dict[str, Any]:
        """Get tool usage statistics"""
        avg_duration = self.total_duration / self.call_count if self.call_count > 0 else 0
        return {
            "name": self.name,
            "call_count": self.call_count,
            "total_duration": self.total_duration,
            "avg_duration": avg_duration
        }

# Example usage
def send_email_impl(to: str, subject: str, body: str) -> str:
    """Send email (mock)"""
    print(f"Sending email to {to}...")
    return f"Email sent to {to}"

email_tool = LoggedTool(
    name="SendEmail",
    func=send_email_impl,
    description="Send email to user"
)

# Now all calls are logged
email_tool(to="user@example.com", subject="Hello", body="Test")

# Check stats
print(json.dumps(email_tool.get_stats(), indent=2))

Tool Composition Patterns

Complex actions often require multiple tools working together:

Sequential Tool Chain

Multi-Step Workflow
from typing import List, Dict, Any
from dataclasses import dataclass

@dataclass
class ToolResult:
    """Result from a tool execution"""
    success: bool
    data: Any
    error: Optional[str] = None

class ToolChain:
    """Execute tools sequentially, passing results forward"""
    
    def __init__(self, tools: List[Tool]):
        self.tools = tools
        self.results = []
    
    def execute(self, initial_input: Dict[str, Any]) -> ToolResult:
        """Execute tool chain"""
        current_data = initial_input
        
        for i, tool in enumerate(self.tools):
            try:
                # Execute tool with current data
                result = tool(**current_data)
                
                # Store result
                self.results.append({
                    'tool': tool.name,
                    'input': current_data,
                    'output': result,
                    'success': True
                })
                
                # Update data for next tool
                current_data = self._extract_data(result)
                
            except Exception as e:
                self.results.append({
                    'tool': tool.name,
                    'input': current_data,
                    'error': str(e),
                    'success': False
                })
                return ToolResult(success=False, data=None, error=str(e))
        
        return ToolResult(success=True, data=current_data)
    
    def _extract_data(self, result: Any) -> Dict[str, Any]:
        """Extract structured data from tool result"""
        if isinstance(result, dict):
            return result
        elif isinstance(result, str):
            return {"result": result}
        return {"data": result}

# Example: Customer onboarding workflow
lookup_customer = LoggedTool("LookupCustomer", lookup_fn, "Find customer")
create_account = LoggedTool("CreateAccount", create_fn, "Create account")
send_welcome = LoggedTool("SendWelcome", email_fn, "Send welcome email")
setup_billing = LoggedTool("SetupBilling", billing_fn, "Setup billing")

chain = ToolChain([lookup_customer, create_account, send_welcome, setup_billing])

result = chain.execute({"email": "newuser@example.com"})
if result.success:
    print("Onboarding complete!")
else:
    print(f"Failed at: {result.error}")

Conditional Tool Execution

Conditional Workflow
from typing import Callable, Any
from enum import Enum

class Condition(Enum):
    """Condition operators"""
    EQUALS = "=="
    NOT_EQUALS = "!="
    GREATER_THAN = ">"
    LESS_THAN = "<"
    CONTAINS = "contains"

class ConditionalTool:
    """Tool that executes based on conditions"""
    
    def __init__(self, name: str):
        self.name = name
        self.conditions = []
        self.tools = {}
        self.default_tool = None
    
    def when(self, field: str, condition: Condition, value: Any, tool: Tool):
        """Register conditional tool"""
        self.conditions.append({
            'field': field,
            'condition': condition,
            'value': value,
            'tool': tool
        })
        return self
    
    def otherwise(self, tool: Tool):
        """Set default tool"""
        self.default_tool = tool
        return self
    
    def execute(self, **kwargs) -> Any:
        """Execute based on conditions"""
        
        # Check conditions
        for cond in self.conditions:
            field_value = kwargs.get(cond['field'])
            
            if self._check_condition(field_value, cond['condition'], cond['value']):
                print(f"Condition matched: {cond['field']} {cond['condition'].value} {cond['value']}")
                return cond['tool'](**kwargs)
        
        # No condition matched, use default
        if self.default_tool:
            print("No conditions matched, using default")
            return self.default_tool(**kwargs)
        
        raise ValueError("No matching condition and no default tool")
    
    def _check_condition(self, field_value: Any, condition: Condition, target: Any) -> bool:
        """Check if condition is met"""
        if condition == Condition.EQUALS:
            return field_value == target
        elif condition == Condition.NOT_EQUALS:
            return field_value != target
        elif condition == Condition.GREATER_THAN:
            return field_value > target
        elif condition == Condition.LESS_THAN:
            return field_value < target
        elif condition == Condition.CONTAINS:
            return target in str(field_value)
        return False

# Example: Customer support routing
def handle_urgent(issue: str, priority: str) -> str:
    return f"URGENT: Escalating {issue} to supervisor"

def handle_billing(issue: str, priority: str) -> str:
    return f"Routing {issue} to billing department"

def handle_general(issue: str, priority: str) -> str:
    return f"Creating ticket for {issue}"

routing_tool = ConditionalTool("RouteIssue")
routing_tool \
    .when("priority", Condition.EQUALS, "urgent", handle_urgent) \
    .when("issue", Condition.CONTAINS, "billing", handle_billing) \
    .otherwise(handle_general)

# Usage
print(routing_tool.execute(issue="Payment failed", priority="urgent"))
# Output: "URGENT: Escalating Payment failed to supervisor"

print(routing_tool.execute(issue="Can't login", priority="normal"))
# Output: "Creating ticket for Can't login"

Parallel Tool Execution

Parallel Tool Execution
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any

class ParallelToolExecutor:
    """Execute multiple tools in parallel"""
    
    def __init__(self, max_workers: int = 5):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
    
    def execute_parallel(self, tools: List[Tool], inputs: List[Dict[str, Any]]) -> List[Any]:
        """Execute tools in parallel and return all results"""
        if len(tools) != len(inputs):
            raise ValueError("Number of tools must match number of inputs")
        
        futures = []
        for tool, input_data in zip(tools, inputs):
            future = self.executor.submit(tool, **input_data)
            futures.append((tool.name, future))
        
        # Collect results
        results = {}
        for tool_name, future in futures:
            try:
                result = future.result(timeout=30)
                results[tool_name] = {"success": True, "data": result}
            except Exception as e:
                results[tool_name] = {"success": False, "error": str(e)}
        
        return results
    
    async def execute_parallel_async(self, tools: List[Tool], inputs: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Execute tools in parallel using asyncio"""
        
        async def run_tool(tool: Tool, input_data: Dict[str, Any]):
            loop = asyncio.get_event_loop()
            try:
                result = await loop.run_in_executor(None, lambda: tool(**input_data))
                return {"success": True, "data": result}
            except Exception as e:
                return {"success": False, "error": str(e)}
        
        tasks = [run_tool(tool, input_data) for tool, input_data in zip(tools, inputs)]
        results = await asyncio.gather(*tasks)
        
        return {tool.name: result for tool, result in zip(tools, results)}

# Example: Gather information from multiple sources
parallel_executor = ParallelToolExecutor(max_workers=3)

# Run multiple searches simultaneously
results = parallel_executor.execute_parallel(
    tools=[web_search_tool, database_tool, api_tool],
    inputs=[
        {"query": "Python tutorials"},
        {"table": "users", "id": 123},
        {"endpoint": "/products/latest"}
    ]
)

print(results)
# All tools ran in parallel, saving time!

Tool Security and Sandboxing

Production tools need security layers to prevent misuse:

Sandboxed Code Execution

Safe Python Code Execution
import ast
import sys
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr
import signal
from typing import Dict, Any

class TimeoutException(Exception):
    pass

def timeout_handler(signum, frame):
    raise TimeoutException("Code execution timed out")

class SafeCodeExecutor:
    """Execute Python code in a restricted environment"""
    
    def __init__(self, timeout_seconds: int = 5):
        self.timeout = timeout_seconds
        self.allowed_imports = {
            'math', 'random', 'datetime', 'json', 
            'collections', 'itertools', 're'
        }
    
    def execute(self, code: str) -> Dict[str, Any]:
        """Execute code safely with restrictions"""
        
        # Validate code
        validation_error = self._validate_code(code)
        if validation_error:
            return {"success": False, "error": validation_error}
        
        # Prepare restricted environment
        safe_globals = {
            '__builtins__': {
                'print': print,
                'len': len,
                'range': range,
                'str': str,
                'int': int,
                'float': float,
                'list': list,
                'dict': dict,
                'sum': sum,
                'min': min,
                'max': max,
            }
        }
        
        # Capture output
        stdout_capture = StringIO()
        stderr_capture = StringIO()
        
        try:
            # Set timeout
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(self.timeout)
            
            # Execute with output capture
            with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture):
                exec(code, safe_globals)
            
            # Cancel timeout
            signal.alarm(0)
            
            return {
                "success": True,
                "stdout": stdout_capture.getvalue(),
                "stderr": stderr_capture.getvalue(),
                "globals": {k: v for k, v in safe_globals.items() if not k.startswith('__')}
            }
            
        except TimeoutException:
            signal.alarm(0)
            return {"success": False, "error": "Execution timed out"}
        except Exception as e:
            signal.alarm(0)
            return {"success": False, "error": str(e)}
    
    def _validate_code(self, code: str) -> Optional[str]:
        """Validate code before execution"""
        
        # Check for dangerous imports
        try:
            tree = ast.parse(code)
        except SyntaxError as e:
            return f"Syntax error: {str(e)}"
        
        for node in ast.walk(tree):
            # Block dangerous imports
            if isinstance(node, ast.Import):
                for alias in node.names:
                    if alias.name not in self.allowed_imports:
                        return f"Import not allowed: {alias.name}"
            
            # Block file operations
            if isinstance(node, ast.Call):
                if isinstance(node.func, ast.Name):
                    if node.func.id in ['open', 'eval', 'exec', '__import__']:
                        return f"Function not allowed: {node.func.id}"
        
        return None

# Usage
executor = SafeCodeExecutor(timeout_seconds=5)

# Safe code
result = executor.execute("""
import math
result = math.sqrt(144)
print(f"Square root of 144 is {result}")
""")
print(result)  # Success!

# Dangerous code
result = executor.execute("""
import os
os.system('rm -rf /')
""")
print(result)  # Error: Import not allowed: os

Permission System for Tools

Role-Based Access Control
from enum import Enum
from typing import Set, Dict, Any
from dataclasses import dataclass

class Role(Enum):
    """User roles"""
    VIEWER = "viewer"
    EDITOR = "editor"
    ADMIN = "admin"
    SUPER_ADMIN = "super_admin"

class Permission(Enum):
    """Permission types"""
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    EXECUTE = "execute"
    ADMIN = "admin"

# Role permissions mapping
ROLE_PERMISSIONS = {
    Role.VIEWER: {Permission.READ},
    Role.EDITOR: {Permission.READ, Permission.WRITE},
    Role.ADMIN: {Permission.READ, Permission.WRITE, Permission.DELETE, Permission.EXECUTE},
    Role.SUPER_ADMIN: {Permission.READ, Permission.WRITE, Permission.DELETE, Permission.EXECUTE, Permission.ADMIN}
}

@dataclass
class User:
    """User with role"""
    username: str
    role: Role
    
    def has_permission(self, permission: Permission) -> bool:
        return permission in ROLE_PERMISSIONS[self.role]

class ProtectedTool:
    """Tool with permission requirements"""
    
    def __init__(self, name: str, func: Callable, required_permission: Permission):
        self.name = name
        self.func = func
        self.required_permission = required_permission
        self.audit_log = []
    
    def execute(self, user: User, *args, **kwargs) -> Any:
        """Execute tool if user has permission"""
        
        # Check permission
        if not user.has_permission(self.required_permission):
            error = f"Permission denied: {user.username} lacks {self.required_permission.value}"
            self._audit(user, "DENIED", error)
            raise PermissionError(error)
        
        # Execute
        try:
            result = self.func(*args, **kwargs)
            self._audit(user, "SUCCESS", f"Args: {args}, Kwargs: {kwargs}")
            return result
        except Exception as e:
            self._audit(user, "FAILED", str(e))
            raise
    
    def _audit(self, user: User, status: str, details: str):
        """Log tool usage"""
        self.audit_log.append({
            'timestamp': datetime.now(),
            'user': user.username,
            'role': user.role.value,
            'tool': self.name,
            'status': status,
            'details': details
        })

# Example: Database delete tool (requires ADMIN)
def delete_user_impl(user_id: int) -> str:
    return f"Deleted user {user_id}"

delete_user_tool = ProtectedTool(
    name="DeleteUser",
    func=delete_user_impl,
    required_permission=Permission.DELETE
)

# Create users
viewer = User("alice", Role.VIEWER)
admin = User("bob", Role.ADMIN)

# Try to delete
try:
    delete_user_tool.execute(viewer, user_id=123)
except PermissionError as e:
    print(f"Alice denied: {e}")

delete_user_tool.execute(admin, user_id=123)
print("Bob succeeded!")

# Check audit log
for entry in delete_user_tool.audit_log:
    print(f"{entry['timestamp']}: {entry['user']} - {entry['status']}")

Common Tool Integration Patterns

Pattern Use Case Example
Sequential Tools used in order Lookup → Check → Approve → Execute
Conditional Tools used based on conditions If urgent → escalate, else → handle
Parallel Multiple tools simultaneously Search DB + call API + get cache
Feedback Loop Tools that refine results Search → evaluate → refine search → try again

Tool Monitoring and Observability

In production, you need visibility into how tools are being used:

Tool Usage Analytics

Tool Analytics System
from collections import defaultdict, Counter
from datetime import datetime, timedelta
from typing import Dict, List, Any
import statistics

class ToolAnalytics:
    """Track and analyze tool usage"""
    
    def __init__(self):
        self.calls: List[Dict[str, Any]] = []
        self.errors: List[Dict[str, Any]] = []
    
    def log_call(self, tool_name: str, duration: float, success: bool, 
                 user: str = None, error: str = None):
        """Log a tool call"""
        entry = {
            'tool': tool_name,
            'timestamp': datetime.now(),
            'duration': duration,
            'success': success,
            'user': user,
            'error': error
        }
        
        self.calls.append(entry)
        if not success:
            self.errors.append(entry)
    
    def get_summary(self, time_window_hours: int = 24) -> Dict[str, Any]:
        """Get analytics summary"""
        cutoff = datetime.now() - timedelta(hours=time_window_hours)
        recent_calls = [c for c in self.calls if c['timestamp'] > cutoff]
        
        if not recent_calls:
            return {"message": "No recent calls"}
        
        # Tool usage counts
        tool_counts = Counter([c['tool'] for c in recent_calls])
        
        # Success rates
        success_rates = {}
        for tool in tool_counts:
            tool_calls = [c for c in recent_calls if c['tool'] == tool]
            successes = sum(1 for c in tool_calls if c['success'])
            success_rates[tool] = (successes / len(tool_calls)) * 100
        
        # Performance metrics
        durations = defaultdict(list)
        for call in recent_calls:
            if call['success']:
                durations[call['tool']].append(call['duration'])
        
        performance = {}
        for tool, times in durations.items():
            performance[tool] = {
                'avg_duration': statistics.mean(times),
                'min_duration': min(times),
                'max_duration': max(times),
                'p95_duration': statistics.quantiles(times, n=20)[18] if len(times) > 20 else max(times)
            }
        
        # Most common errors
        error_counts = Counter([e['error'] for e in self.errors if e['timestamp'] > cutoff])
        
        return {
            'time_window_hours': time_window_hours,
            'total_calls': len(recent_calls),
            'tool_usage': dict(tool_counts),
            'success_rates': success_rates,
            'performance': performance,
            'top_errors': dict(error_counts.most_common(5))
        }
    
    def get_slow_tools(self, threshold_seconds: float = 1.0) -> List[str]:
        """Identify slow tools"""
        durations = defaultdict(list)
        for call in self.calls:
            if call['success']:
                durations[call['tool']].append(call['duration'])
        
        slow_tools = []
        for tool, times in durations.items():
            avg = statistics.mean(times)
            if avg > threshold_seconds:
                slow_tools.append((tool, avg))
        
        return sorted(slow_tools, key=lambda x: x[1], reverse=True)
    
    def get_unreliable_tools(self, min_failure_rate: float = 0.1) -> List[str]:
        """Identify unreliable tools"""
        tool_stats = defaultdict(lambda: {'total': 0, 'failures': 0})
        
        for call in self.calls:
            tool_stats[call['tool']]['total'] += 1
            if not call['success']:
                tool_stats[call['tool']]['failures'] += 1
        
        unreliable = []
        for tool, stats in tool_stats.items():
            failure_rate = stats['failures'] / stats['total']
            if failure_rate >= min_failure_rate:
                unreliable.append((tool, failure_rate))
        
        return sorted(unreliable, key=lambda x: x[1], reverse=True)

# Usage
analytics = ToolAnalytics()

# Tools log their calls
analytics.log_call("WebSearch", duration=0.5, success=True, user="agent-1")
analytics.log_call("DatabaseQuery", duration=2.3, success=True, user="agent-1")
analytics.log_call("SendEmail", duration=0.8, success=False, error="SMTP timeout", user="agent-2")

# Get insights
summary = analytics.get_summary(time_window_hours=24)
print(json.dumps(summary, indent=2, default=str))

slow = analytics.get_slow_tools(threshold_seconds=2.0)
print(f"\nSlow tools: {slow}")

unreliable = analytics.get_unreliable_tools(min_failure_rate=0.2)
print(f"Unreliable tools: {unreliable}")

Real-time Tool Monitoring Dashboard

Tool Health Monitor
from enum import Enum
from dataclasses import dataclass
from typing import Optional

class HealthStatus(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

@dataclass
class ToolHealth:
    """Health status of a tool"""
    tool_name: str
    status: HealthStatus
    success_rate: float
    avg_duration: float
    recent_errors: int
    last_success: Optional[datetime]
    message: str

class ToolHealthMonitor:
    """Monitor tool health in real-time"""
    
    def __init__(self, analytics: ToolAnalytics):
        self.analytics = analytics
    
    def check_tool_health(self, tool_name: str, lookback_minutes: int = 15) -> ToolHealth:
        """Check health of a specific tool"""
        cutoff = datetime.now() - timedelta(minutes=lookback_minutes)
        
        # Get recent calls for this tool
        recent_calls = [
            c for c in self.analytics.calls 
            if c['tool'] == tool_name and c['timestamp'] > cutoff
        ]
        
        if not recent_calls:
            return ToolHealth(
                tool_name=tool_name,
                status=HealthStatus.DEGRADED,
                success_rate=0,
                avg_duration=0,
                recent_errors=0,
                last_success=None,
                message="No recent activity"
            )
        
        # Calculate metrics
        successes = [c for c in recent_calls if c['success']]
        failures = [c for c in recent_calls if not c['success']]
        
        success_rate = len(successes) / len(recent_calls)
        avg_duration = statistics.mean([c['duration'] for c in successes]) if successes else 0
        recent_errors = len(failures)
        last_success = max([c['timestamp'] for c in successes]) if successes else None
        
        # Determine status
        status = self._determine_status(success_rate, avg_duration, recent_errors)
        message = self._generate_message(status, success_rate, avg_duration, recent_errors)
        
        return ToolHealth(
            tool_name=tool_name,
            status=status,
            success_rate=success_rate,
            avg_duration=avg_duration,
            recent_errors=recent_errors,
            last_success=last_success,
            message=message
        )
    
    def _determine_status(self, success_rate: float, avg_duration: float, errors: int) -> HealthStatus:
        """Determine health status from metrics"""
        # Unhealthy: low success rate or too many errors
        if success_rate < 0.7 or errors > 10:
            return HealthStatus.UNHEALTHY
        
        # Degraded: moderate issues
        if success_rate < 0.9 or avg_duration > 3.0 or errors > 5:
            return HealthStatus.DEGRADED
        
        # Healthy
        return HealthStatus.HEALTHY
    
    def _generate_message(self, status: HealthStatus, success_rate: float, 
                          avg_duration: float, errors: int) -> str:
        """Generate human-readable message"""
        if status == HealthStatus.HEALTHY:
            return f"Operating normally ({success_rate:.1%} success, {avg_duration:.2f}s avg)"
        elif status == HealthStatus.DEGRADED:
            issues = []
            if success_rate < 0.9:
                issues.append(f"success rate {success_rate:.1%}")
            if avg_duration > 3.0:
                issues.append(f"slow ({avg_duration:.2f}s)")
            if errors > 5:
                issues.append(f"{errors} recent errors")
            return f"Degraded: {', '.join(issues)}"
        else:
            return f"Unhealthy: {success_rate:.1%} success, {errors} errors"
    
    def get_dashboard(self) -> Dict[str, ToolHealth]:
        """Get health status for all tools"""
        # Get unique tools
        tools = set([c['tool'] for c in self.analytics.calls])
        
        dashboard = {}
        for tool in tools:
            dashboard[tool] = self.check_tool_health(tool)
        
        return dashboard
    
    def print_dashboard(self):
        """Print formatted dashboard"""
        dashboard = self.get_dashboard()
        
        print("\n" + "="*70)
        print("TOOL HEALTH DASHBOARD")
        print("="*70 + "\n")
        
        # Group by status
        for status in HealthStatus:
            tools = [t for t, h in dashboard.items() if h.status == status]
            if tools:
                print(f"{status.value.upper()}: {len(tools)} tools")
                for tool in tools:
                    health = dashboard[tool]
                    print(f"  • {tool}: {health.message}")
                print()

# Usage
monitor = ToolHealthMonitor(analytics)
health = monitor.check_tool_health("WebSearch")
print(f"{health.tool_name}: {health.status.value} - {health.message}")

# Full dashboard
monitor.print_dashboard()

Tool Development Best Practices

Essential practices for production tool development:

1. Single Responsibility

Each tool should do one thing well. Avoid "Swiss Army knife" tools that try to do everything.

2. Clear Descriptions

Write tool descriptions from the agent's perspective. The LLM must understand when to use the tool.

3. Validate Everything

Validate all inputs before execution. Return clear error messages, not exceptions.

4. Fail Gracefully

Handle errors gracefully. Return informative error messages agents can act on.

5. Implement Timeouts

Never let tools run forever. Set reasonable timeouts for all operations.

6. Log Comprehensively

Log every tool call with inputs, outputs, duration, and errors. Essential for debugging.

7. Use Type Hints

Type hints help agents understand expected inputs and improve reliability.

8. Version Your Tools

When changing tool behavior, version them (v1, v2). Old agents can continue using old versions.

9. Test Edge Cases

Test tools with invalid inputs, network failures, timeouts, and edge cases.

Practical Exercise: Build a Complete Tool Suite

Let's build a production-ready tool suite for a customer support agent:

The Challenge

Create a complete tool suite with:
  • Customer lookup tool (with caching)
  • Order history tool (with retry logic)
  • Refund processing tool (with permissions and audit log)
  • Email notification tool (with rate limiting)
  • Analytics to track tool usage
  • Health monitoring for all tools

Complete Implementation

customer_support_tools.py
"""
Complete customer support tool suite
"""
from typing import Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)

# Tool infrastructure (from previous sections)
# ... (CachedTool, LoggedTool, ProtectedTool, ToolAnalytics, etc.)

# ============== Customer Support Tools ==============

class CustomerLookupTool(CachedTool, LoggedTool):
    """Look up customer info with caching and logging"""
    
    def execute(self, email: str) -> Dict[str, Any]:
        """Find customer by email"""
        # Simulate database lookup
        customers_db = {
            "alice@example.com": {
                "id": 1,
                "name": "Alice Johnson",
                "status": "premium",
                "joined": "2023-01-15"
            },
            "bob@example.com": {
                "id": 2,
                "name": "Bob Smith",
                "status": "standard",
                "joined": "2023-06-20"
            }
        }
        
        customer = customers_db.get(email)
        if not customer:
            return {"error": f"Customer not found: {email}"}
        
        return {"success": True, "customer": customer}

@with_retry(max_retries=3)
def get_order_history(customer_id: int, limit: int = 10) -> Dict[str, Any]:
    """Get customer order history with retries"""
    # Simulate API call that might fail
    orders = [
        {"id": 101, "date": "2024-01-15", "total": 89.99, "status": "delivered"},
        {"id": 102, "date": "2024-02-01", "total": 149.50, "status": "delivered"},
    ]
    return {"success": True, "orders": orders[:limit]}

class RefundTool(ProtectedTool):
    """Process refunds (requires admin permission)"""
    
    def execute(self, order_id: int, amount: float, reason: str) -> str:
        # Validate
        if amount <= 0:
            return "Error: Refund amount must be positive"
        if amount > 10000:
            return "Error: Refunds over $10,000 require manual approval"
        
        # Process refund (simulate)
        self._audit("Refund", f"Order {order_id}: ${amount} - {reason}")
        return f"Refund of ${amount:.2f} processed for order {order_id}"

class EmailTool:
    """Send emails with rate limiting"""
    
    def __init__(self):
        self.rate_limiter = RateLimiter(max_calls=10, window_seconds=60)
    
    @rate_limit(max_calls=10, time_window=60)
    def send(self, to: str, subject: str, body: str) -> str:
        """Send email (max 10/minute)"""
        # Simulate email send
        print(f"Email sent to {to}: {subject}")
        return f"Email sent to {to}"

# ============== Unified Support Agent System ==============

class CustomerSupportAgent:
    """Complete customer support agent with all tools"""
    
    def __init__(self):
        # Initialize tools
        self.lookup = CustomerLookupTool(cache_ttl_seconds=300)
        self.orders = LoggedTool("GetOrders", get_order_history, "Get order history")
        self.refund = RefundTool("ProcessRefund", self._process_refund, Permission.ADMIN)
        self.email = EmailTool()
        
        # Initialize analytics and monitoring
        self.analytics = ToolAnalytics()
        self.monitor = ToolHealthMonitor(self.analytics)
        
        self.logger = logging.getLogger("CustomerSupportAgent")
    
    def _process_refund(self, order_id: int, amount: float, reason: str) -> str:
        """Refund implementation"""
        return f"Refund ${amount} for order {order_id}: {reason}"
    
    def handle_request(self, request: str, user: User) -> str:
        """Handle customer support request"""
        self.logger.info(f"Handling request: {request}")
        
        # Simple rule-based routing (in production, use LLM)
        if "refund" in request.lower():
            return self._handle_refund(request, user)
        elif "order" in request.lower():
            return self._handle_order_inquiry(request, user)
        else:
            return "I can help with orders and refunds. Please specify your request."
    
    def _handle_refund(self, request: str, user: User) -> str:
        """Handle refund request"""
        # Extract details (simplified)
        order_id = 101  # Would parse from request
        amount = 89.99
        
        start = datetime.now()
        try:
            result = self.refund.execute(user, order_id, amount, "Customer request")
            duration = (datetime.now() - start).total_seconds()
            self.analytics.log_call("ProcessRefund", duration, True, user.username)
            return result
        except Exception as e:
            duration = (datetime.now() - start).total_seconds()
            self.analytics.log_call("ProcessRefund", duration, False, user.username, str(e))
            return f"Refund failed: {str(e)}"
    
    def get_system_health(self) -> Dict[str, Any]:
        """Get complete system health report"""
        return {
            "analytics": self.analytics.get_summary(time_window_hours=24),
            "tool_health": {
                tool: health.__dict__ 
                for tool, health in self.monitor.get_dashboard().items()
            },
            "slow_tools": self.analytics.get_slow_tools(),
            "unreliable_tools": self.analytics.get_unreliable_tools()
        }

# ============== Usage ==============

# Create agent
agent = CustomerSupportAgent()

# Create users
admin_user = User("admin", Role.ADMIN)
viewer_user = User("viewer", Role.VIEWER)

# Handle requests
print(agent.handle_request("I need a refund for order 101", admin_user))
print(agent.handle_request("I need a refund for order 101", viewer_user))

# Get health report
import json
health = agent.get_system_health()
print(json.dumps(health, indent=2, default=str))
Challenge Extensions:
  • Add a sentiment analysis tool to detect angry customers
  • Implement automatic escalation for urgent issues
  • Add a knowledge base search tool for FAQ answers
  • Create a tool to check inventory before refunds
  • Build a workflow engine to chain tools automatically

Key Takeaways

  • Tools define capabilities: Agent power comes from well-designed tools
  • Production tools need: Retries, caching, logging, validation, permissions
  • Multiple tool types: APIs, databases, search, computation, communication, system tools
  • Build custom tools: Use LangChain to wrap any function as a tool
  • Compose tools: Chain tools sequentially, conditionally, or in parallel
  • Security is critical: Validate inputs, check permissions, sandbox execution
  • Monitor everything: Track usage, performance, errors, and health
  • Tool design matters: Clear descriptions and schemas help agents use tools effectively
  • Fail gracefully: Return informative errors, never crash
  • Test thoroughly: Edge cases, timeouts, failures, security breaches

Test Your Knowledge

Q1: What is the primary purpose of agent tools?

To store agent training data
To display user interfaces
To reduce computational costs
To give agents the ability to interact with external systems and perform actions

Q2: Which of the following is a best practice when designing agent tools?

Make tools do as many things as possible in one function
Hide error messages from the agent
Provide clear descriptions and parameter schemas so agents know how to use them
Never implement timeouts or rate limits

Q3: What is function calling in the context of AI agents?

A way to train neural networks
An LLM capability to output structured requests to invoke specific tools with parameters
A method for compiling Python code
A debugging technique

Q4: Why is error handling critical in agent tools?

Tools may fail due to network issues, invalid inputs, or API limits; agents need informative errors to recover gracefully
Error handling is not important for production agents
Errors should cause the agent to restart completely
Errors only matter for debugging during development

Q5: What is the advantage of using tool schemas (like JSON Schema)?

They make tools run faster
They reduce the cost of API calls
They clearly define expected parameters and types, helping agents call tools correctly
They eliminate the need for testing