Equip agents with capabilities: build tools, integrations, and actions to interact with the world
🎓 Complete all tutorials to earn your Free AI Agents Certificate
Shareable on LinkedIn • Verified by AITutorials.site • No signup fee
An agent that can only think is limited. A truly powerful agent can do things. It can:
These capabilities are implemented through tools and actions. This module teaches you how to give agents superpowers.
Different tools enable different capabilities:
Call external APIs: weather services, stock data, translation APIs, search engines. Integrates with the broader internet.
Query and update databases. Retrieve customer records, product info, inventory data. Execute SQL or NoSQL queries.
Web search, semantic search, vector database search. Find relevant information from large knowledge bases.
Execute code, run calculations, analyze data. Python interpreter, mathematical computations, data transformations.
Send emails, messages, notifications. Notify humans, trigger workflows, escalate issues.
Control systems and workflows. Run commands, trigger jobs, manage resources, orchestrate processes.
Most agent applications require custom tools specific to your domain. LangChain makes this straightforward:
from langchain.tools import Tool
from datetime import datetime
# Define a simple tool function
def get_current_time(format: str = "%Y-%m-%d %H:%M:%S") -> str:
"""Get current date and time"""
return datetime.now().strftime(format)
# Wrap it as a Tool
time_tool = Tool(
name="GetCurrentTime",
func=get_current_time,
description="Returns current date and time. Useful for scheduling and time-based queries.",
args_schema={
"type": "object",
"properties": {
"format": {
"type": "string",
"description": "Time format string (Python strftime format)"
}
}
}
)
# Agent can now use this tool
agent.tools.append(time_tool)
from langchain.tools import Tool
import sqlite3
def query_customer_database(customer_id: int) -> str:
"""Look up customer information"""
conn = sqlite3.connect("customers.db")
cursor = conn.cursor()
cursor.execute(
"SELECT id, name, email, status FROM customers WHERE id = ?",
(customer_id,)
)
result = cursor.fetchone()
conn.close()
if result:
return f"Customer {result[1]}: {result[2]} (Status: {result[3]})"
return "Customer not found"
db_tool = Tool(
name="QueryCustomerDB",
func=query_customer_database,
description="Look up customer information by ID",
args_schema={
"type": "object",
"properties": {
"customer_id": {
"type": "integer",
"description": "The customer ID to look up"
}
},
"required": ["customer_id"]
}
)
from langchain.tools import Tool
import requests
def search_web(query: str, max_results: int = 5) -> str:
"""Search the web for information"""
# Using a free search API (e.g., DuckDuckGo)
params = {
"q": query,
"format": "json",
"max_results": max_results
}
try:
response = requests.get(
"https://api.duckduckgo.com/",
params=params,
timeout=10
)
results = response.json()
# Format results
formatted = "Search Results:\n"
for i, result in enumerate(results.get("Results", [])[:max_results]):
formatted += f"{i+1}. {result.get('Title')}\n"
formatted += f" {result.get('Abstract')}\n"
return formatted
except Exception as e:
return f"Search failed: {str(e)}"
search_tool = Tool(
name="WebSearch",
func=search_web,
description="Search the web for current information",
args_schema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "What to search for"
},
"max_results": {
"type": "integer",
"description": "Maximum number of results (default 5)"
}
},
"required": ["query"]
}
)
Often, complex actions require chaining tools together:
from langchain.tools import Tool
from langchain.agents import initialize_agent
# Tool 1: Find customer
lookup_customer = Tool(name="LookupCustomer", func=lookup_fn, ...)
# Tool 2: Check order history
check_orders = Tool(name="CheckOrders", func=check_fn, ...)
# Tool 3: Process refund
process_refund = Tool(name="ProcessRefund", func=refund_fn, ...)
# Tool 4: Send confirmation email
send_email = Tool(name="SendEmail", func=email_fn, ...)
# Agent can orchestrate these
tools = [lookup_customer, check_orders, process_refund, send_email]
agent = initialize_agent(tools, llm, agent_type="react")
# Now agent can handle complex requests:
# "A customer wants a refund. Look them up, check their orders,
# process the refund, and send confirmation."
# Agent automatically chains the tools in the right order!
The agent decides which tools to use and in what order to accomplish the goal. This is the power of agentic workflows.
As agents become more powerful, safety becomes critical. Tools should have safeguards:
from functools import wraps
from datetime import datetime, timedelta
def rate_limit(max_calls: int, time_window: int):
"""Limit tool calls to max_calls per time_window seconds"""
def decorator(func):
calls = []
@wraps(func)
def wrapper(*args, **kwargs):
now = datetime.now()
# Remove old calls outside time window
calls[:] = [t for t in calls if now - t < timedelta(seconds=time_window)]
if len(calls) >= max_calls:
raise Exception(f"Rate limit exceeded: {max_calls} calls per {time_window}s")
calls.append(now)
return func(*args, **kwargs)
return wrapper
return decorator
@rate_limit(max_calls=10, time_window=60) # Max 10 calls per minute
def query_expensive_api(query: str) -> str:
# This tool can only be called 10 times per minute
pass
def transfer_funds(from_account: str, to_account: str, amount: float) -> str:
"""Transfer funds between accounts"""
# Validate inputs
if not isinstance(amount, (int, float)):
return "Error: Amount must be numeric"
if amount <= 0:
return "Error: Amount must be positive"
if amount > 10000:
return "Error: Cannot transfer more than $10,000 without approval"
if len(from_account) != 10 or len(to_account) != 10:
return "Error: Invalid account format"
# Proceed safely
return do_transfer(from_account, to_account, amount)
from enum import Enum
class Permission(Enum):
READONLY = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
def delete_database_record(table: str, record_id: int, user: str) -> str:
"""Delete a record (requires admin permission)"""
# Check if user has permission
user_permissions = get_user_permissions(user)
if Permission.ADMIN not in user_permissions:
return "Error: Insufficient permissions to delete records"
# Log the action
audit_log.write(f"User {user} deleted {table}.{record_id}")
# Execute deletion
return do_delete(table, record_id)
Production tools need robustness. Here's how to build enterprise-grade tools:
from typing import Optional
import time
import requests
from functools import wraps
def with_retry(max_retries=3, backoff_factor=2):
"""Decorator to add retry logic to tool functions"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
wait_time = backoff_factor ** attempt
print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
raise last_exception
return wrapper
return decorator
@with_retry(max_retries=3)
def get_stock_price(symbol: str) -> str:
"""Get current stock price with automatic retries"""
response = requests.get(
f"https://api.example.com/stocks/{symbol}",
timeout=10
)
response.raise_for_status()
data = response.json()
return f"{symbol}: ${data['price']:.2f}"
# Create the tool
from langchain.tools import Tool
stock_tool = Tool(
name="GetStockPrice",
func=get_stock_price,
description="Get current stock price for a symbol. Automatically retries on failure."
)
from functools import lru_cache
import hashlib
import json
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
class CachedTool:
"""Base class for tools with caching"""
def __init__(self, cache_ttl_seconds: int = 300):
self.cache: Dict[str, Dict[str, Any]] = {}
self.cache_ttl = cache_ttl_seconds
def _cache_key(self, *args, **kwargs) -> str:
"""Generate cache key from arguments"""
key_data = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True)
return hashlib.md5(key_data.encode()).hexdigest()
def _get_cached(self, key: str) -> Optional[Any]:
"""Get cached result if still valid"""
if key in self.cache:
entry = self.cache[key]
age = (datetime.now() - entry['timestamp']).total_seconds()
if age < self.cache_ttl:
print(f"Cache hit! (age: {age:.1f}s)")
return entry['result']
else:
# Expired
del self.cache[key]
return None
def _set_cached(self, key: str, result: Any):
"""Store result in cache"""
self.cache[key] = {
'result': result,
'timestamp': datetime.now()
}
def __call__(self, *args, **kwargs):
key = self._cache_key(*args, **kwargs)
# Check cache
cached = self._get_cached(key)
if cached is not None:
return cached
# Execute and cache
result = self.execute(*args, **kwargs)
self._set_cached(key, result)
return result
def execute(self, *args, **kwargs):
"""Override this with actual tool logic"""
raise NotImplementedError
# Example: Weather tool with 5-minute cache
class WeatherTool(CachedTool):
def execute(self, city: str) -> str:
"""Get weather for city (expensive API call)"""
print(f"Calling weather API for {city}...")
# Simulate expensive API call
time.sleep(1)
return f"{city}: 72°F, Sunny"
weather_tool = WeatherTool(cache_ttl_seconds=300)
# First call: hits API
print(weather_tool(city="Boston")) # "Calling weather API..."
# Second call within 5 minutes: uses cache
print(weather_tool(city="Boston")) # "Cache hit!"
import logging
from datetime import datetime
from typing import Any, Callable
import json
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class LoggedTool:
"""Tool wrapper that logs all calls"""
def __init__(self, name: str, func: Callable, description: str):
self.name = name
self.func = func
self.description = description
self.logger = logging.getLogger(f"Tool.{name}")
self.call_count = 0
self.total_duration = 0
def __call__(self, *args, **kwargs) -> Any:
self.call_count += 1
start_time = datetime.now()
# Log call
self.logger.info(f"Tool called: {self.name}")
self.logger.debug(f"Arguments: args={args}, kwargs={kwargs}")
try:
# Execute
result = self.func(*args, **kwargs)
# Log success
duration = (datetime.now() - start_time).total_seconds()
self.total_duration += duration
self.logger.info(f"Tool succeeded: {self.name} (duration: {duration:.2f}s)")
self.logger.debug(f"Result: {str(result)[:200]}")
return result
except Exception as e:
# Log failure
duration = (datetime.now() - start_time).total_seconds()
self.logger.error(f"Tool failed: {self.name} (duration: {duration:.2f}s)")
self.logger.error(f"Error: {str(e)}", exc_info=True)
raise
def get_stats(self) -> Dict[str, Any]:
"""Get tool usage statistics"""
avg_duration = self.total_duration / self.call_count if self.call_count > 0 else 0
return {
"name": self.name,
"call_count": self.call_count,
"total_duration": self.total_duration,
"avg_duration": avg_duration
}
# Example usage
def send_email_impl(to: str, subject: str, body: str) -> str:
"""Send email (mock)"""
print(f"Sending email to {to}...")
return f"Email sent to {to}"
email_tool = LoggedTool(
name="SendEmail",
func=send_email_impl,
description="Send email to user"
)
# Now all calls are logged
email_tool(to="user@example.com", subject="Hello", body="Test")
# Check stats
print(json.dumps(email_tool.get_stats(), indent=2))
Complex actions often require multiple tools working together:
from typing import List, Dict, Any
from dataclasses import dataclass
@dataclass
class ToolResult:
"""Result from a tool execution"""
success: bool
data: Any
error: Optional[str] = None
class ToolChain:
"""Execute tools sequentially, passing results forward"""
def __init__(self, tools: List[Tool]):
self.tools = tools
self.results = []
def execute(self, initial_input: Dict[str, Any]) -> ToolResult:
"""Execute tool chain"""
current_data = initial_input
for i, tool in enumerate(self.tools):
try:
# Execute tool with current data
result = tool(**current_data)
# Store result
self.results.append({
'tool': tool.name,
'input': current_data,
'output': result,
'success': True
})
# Update data for next tool
current_data = self._extract_data(result)
except Exception as e:
self.results.append({
'tool': tool.name,
'input': current_data,
'error': str(e),
'success': False
})
return ToolResult(success=False, data=None, error=str(e))
return ToolResult(success=True, data=current_data)
def _extract_data(self, result: Any) -> Dict[str, Any]:
"""Extract structured data from tool result"""
if isinstance(result, dict):
return result
elif isinstance(result, str):
return {"result": result}
return {"data": result}
# Example: Customer onboarding workflow
lookup_customer = LoggedTool("LookupCustomer", lookup_fn, "Find customer")
create_account = LoggedTool("CreateAccount", create_fn, "Create account")
send_welcome = LoggedTool("SendWelcome", email_fn, "Send welcome email")
setup_billing = LoggedTool("SetupBilling", billing_fn, "Setup billing")
chain = ToolChain([lookup_customer, create_account, send_welcome, setup_billing])
result = chain.execute({"email": "newuser@example.com"})
if result.success:
print("Onboarding complete!")
else:
print(f"Failed at: {result.error}")
from typing import Callable, Any
from enum import Enum
class Condition(Enum):
"""Condition operators"""
EQUALS = "=="
NOT_EQUALS = "!="
GREATER_THAN = ">"
LESS_THAN = "<"
CONTAINS = "contains"
class ConditionalTool:
"""Tool that executes based on conditions"""
def __init__(self, name: str):
self.name = name
self.conditions = []
self.tools = {}
self.default_tool = None
def when(self, field: str, condition: Condition, value: Any, tool: Tool):
"""Register conditional tool"""
self.conditions.append({
'field': field,
'condition': condition,
'value': value,
'tool': tool
})
return self
def otherwise(self, tool: Tool):
"""Set default tool"""
self.default_tool = tool
return self
def execute(self, **kwargs) -> Any:
"""Execute based on conditions"""
# Check conditions
for cond in self.conditions:
field_value = kwargs.get(cond['field'])
if self._check_condition(field_value, cond['condition'], cond['value']):
print(f"Condition matched: {cond['field']} {cond['condition'].value} {cond['value']}")
return cond['tool'](**kwargs)
# No condition matched, use default
if self.default_tool:
print("No conditions matched, using default")
return self.default_tool(**kwargs)
raise ValueError("No matching condition and no default tool")
def _check_condition(self, field_value: Any, condition: Condition, target: Any) -> bool:
"""Check if condition is met"""
if condition == Condition.EQUALS:
return field_value == target
elif condition == Condition.NOT_EQUALS:
return field_value != target
elif condition == Condition.GREATER_THAN:
return field_value > target
elif condition == Condition.LESS_THAN:
return field_value < target
elif condition == Condition.CONTAINS:
return target in str(field_value)
return False
# Example: Customer support routing
def handle_urgent(issue: str, priority: str) -> str:
return f"URGENT: Escalating {issue} to supervisor"
def handle_billing(issue: str, priority: str) -> str:
return f"Routing {issue} to billing department"
def handle_general(issue: str, priority: str) -> str:
return f"Creating ticket for {issue}"
routing_tool = ConditionalTool("RouteIssue")
routing_tool \
.when("priority", Condition.EQUALS, "urgent", handle_urgent) \
.when("issue", Condition.CONTAINS, "billing", handle_billing) \
.otherwise(handle_general)
# Usage
print(routing_tool.execute(issue="Payment failed", priority="urgent"))
# Output: "URGENT: Escalating Payment failed to supervisor"
print(routing_tool.execute(issue="Can't login", priority="normal"))
# Output: "Creating ticket for Can't login"
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any
class ParallelToolExecutor:
"""Execute multiple tools in parallel"""
def __init__(self, max_workers: int = 5):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def execute_parallel(self, tools: List[Tool], inputs: List[Dict[str, Any]]) -> List[Any]:
"""Execute tools in parallel and return all results"""
if len(tools) != len(inputs):
raise ValueError("Number of tools must match number of inputs")
futures = []
for tool, input_data in zip(tools, inputs):
future = self.executor.submit(tool, **input_data)
futures.append((tool.name, future))
# Collect results
results = {}
for tool_name, future in futures:
try:
result = future.result(timeout=30)
results[tool_name] = {"success": True, "data": result}
except Exception as e:
results[tool_name] = {"success": False, "error": str(e)}
return results
async def execute_parallel_async(self, tools: List[Tool], inputs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Execute tools in parallel using asyncio"""
async def run_tool(tool: Tool, input_data: Dict[str, Any]):
loop = asyncio.get_event_loop()
try:
result = await loop.run_in_executor(None, lambda: tool(**input_data))
return {"success": True, "data": result}
except Exception as e:
return {"success": False, "error": str(e)}
tasks = [run_tool(tool, input_data) for tool, input_data in zip(tools, inputs)]
results = await asyncio.gather(*tasks)
return {tool.name: result for tool, result in zip(tools, results)}
# Example: Gather information from multiple sources
parallel_executor = ParallelToolExecutor(max_workers=3)
# Run multiple searches simultaneously
results = parallel_executor.execute_parallel(
tools=[web_search_tool, database_tool, api_tool],
inputs=[
{"query": "Python tutorials"},
{"table": "users", "id": 123},
{"endpoint": "/products/latest"}
]
)
print(results)
# All tools ran in parallel, saving time!
Production tools need security layers to prevent misuse:
import ast
import sys
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr
import signal
from typing import Dict, Any
class TimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutException("Code execution timed out")
class SafeCodeExecutor:
"""Execute Python code in a restricted environment"""
def __init__(self, timeout_seconds: int = 5):
self.timeout = timeout_seconds
self.allowed_imports = {
'math', 'random', 'datetime', 'json',
'collections', 'itertools', 're'
}
def execute(self, code: str) -> Dict[str, Any]:
"""Execute code safely with restrictions"""
# Validate code
validation_error = self._validate_code(code)
if validation_error:
return {"success": False, "error": validation_error}
# Prepare restricted environment
safe_globals = {
'__builtins__': {
'print': print,
'len': len,
'range': range,
'str': str,
'int': int,
'float': float,
'list': list,
'dict': dict,
'sum': sum,
'min': min,
'max': max,
}
}
# Capture output
stdout_capture = StringIO()
stderr_capture = StringIO()
try:
# Set timeout
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(self.timeout)
# Execute with output capture
with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture):
exec(code, safe_globals)
# Cancel timeout
signal.alarm(0)
return {
"success": True,
"stdout": stdout_capture.getvalue(),
"stderr": stderr_capture.getvalue(),
"globals": {k: v for k, v in safe_globals.items() if not k.startswith('__')}
}
except TimeoutException:
signal.alarm(0)
return {"success": False, "error": "Execution timed out"}
except Exception as e:
signal.alarm(0)
return {"success": False, "error": str(e)}
def _validate_code(self, code: str) -> Optional[str]:
"""Validate code before execution"""
# Check for dangerous imports
try:
tree = ast.parse(code)
except SyntaxError as e:
return f"Syntax error: {str(e)}"
for node in ast.walk(tree):
# Block dangerous imports
if isinstance(node, ast.Import):
for alias in node.names:
if alias.name not in self.allowed_imports:
return f"Import not allowed: {alias.name}"
# Block file operations
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
if node.func.id in ['open', 'eval', 'exec', '__import__']:
return f"Function not allowed: {node.func.id}"
return None
# Usage
executor = SafeCodeExecutor(timeout_seconds=5)
# Safe code
result = executor.execute("""
import math
result = math.sqrt(144)
print(f"Square root of 144 is {result}")
""")
print(result) # Success!
# Dangerous code
result = executor.execute("""
import os
os.system('rm -rf /')
""")
print(result) # Error: Import not allowed: os
from enum import Enum
from typing import Set, Dict, Any
from dataclasses import dataclass
class Role(Enum):
"""User roles"""
VIEWER = "viewer"
EDITOR = "editor"
ADMIN = "admin"
SUPER_ADMIN = "super_admin"
class Permission(Enum):
"""Permission types"""
READ = "read"
WRITE = "write"
DELETE = "delete"
EXECUTE = "execute"
ADMIN = "admin"
# Role permissions mapping
ROLE_PERMISSIONS = {
Role.VIEWER: {Permission.READ},
Role.EDITOR: {Permission.READ, Permission.WRITE},
Role.ADMIN: {Permission.READ, Permission.WRITE, Permission.DELETE, Permission.EXECUTE},
Role.SUPER_ADMIN: {Permission.READ, Permission.WRITE, Permission.DELETE, Permission.EXECUTE, Permission.ADMIN}
}
@dataclass
class User:
"""User with role"""
username: str
role: Role
def has_permission(self, permission: Permission) -> bool:
return permission in ROLE_PERMISSIONS[self.role]
class ProtectedTool:
"""Tool with permission requirements"""
def __init__(self, name: str, func: Callable, required_permission: Permission):
self.name = name
self.func = func
self.required_permission = required_permission
self.audit_log = []
def execute(self, user: User, *args, **kwargs) -> Any:
"""Execute tool if user has permission"""
# Check permission
if not user.has_permission(self.required_permission):
error = f"Permission denied: {user.username} lacks {self.required_permission.value}"
self._audit(user, "DENIED", error)
raise PermissionError(error)
# Execute
try:
result = self.func(*args, **kwargs)
self._audit(user, "SUCCESS", f"Args: {args}, Kwargs: {kwargs}")
return result
except Exception as e:
self._audit(user, "FAILED", str(e))
raise
def _audit(self, user: User, status: str, details: str):
"""Log tool usage"""
self.audit_log.append({
'timestamp': datetime.now(),
'user': user.username,
'role': user.role.value,
'tool': self.name,
'status': status,
'details': details
})
# Example: Database delete tool (requires ADMIN)
def delete_user_impl(user_id: int) -> str:
return f"Deleted user {user_id}"
delete_user_tool = ProtectedTool(
name="DeleteUser",
func=delete_user_impl,
required_permission=Permission.DELETE
)
# Create users
viewer = User("alice", Role.VIEWER)
admin = User("bob", Role.ADMIN)
# Try to delete
try:
delete_user_tool.execute(viewer, user_id=123)
except PermissionError as e:
print(f"Alice denied: {e}")
delete_user_tool.execute(admin, user_id=123)
print("Bob succeeded!")
# Check audit log
for entry in delete_user_tool.audit_log:
print(f"{entry['timestamp']}: {entry['user']} - {entry['status']}")
| Pattern | Use Case | Example |
|---|---|---|
| Sequential | Tools used in order | Lookup → Check → Approve → Execute |
| Conditional | Tools used based on conditions | If urgent → escalate, else → handle |
| Parallel | Multiple tools simultaneously | Search DB + call API + get cache |
| Feedback Loop | Tools that refine results | Search → evaluate → refine search → try again |
In production, you need visibility into how tools are being used:
from collections import defaultdict, Counter
from datetime import datetime, timedelta
from typing import Dict, List, Any
import statistics
class ToolAnalytics:
"""Track and analyze tool usage"""
def __init__(self):
self.calls: List[Dict[str, Any]] = []
self.errors: List[Dict[str, Any]] = []
def log_call(self, tool_name: str, duration: float, success: bool,
user: str = None, error: str = None):
"""Log a tool call"""
entry = {
'tool': tool_name,
'timestamp': datetime.now(),
'duration': duration,
'success': success,
'user': user,
'error': error
}
self.calls.append(entry)
if not success:
self.errors.append(entry)
def get_summary(self, time_window_hours: int = 24) -> Dict[str, Any]:
"""Get analytics summary"""
cutoff = datetime.now() - timedelta(hours=time_window_hours)
recent_calls = [c for c in self.calls if c['timestamp'] > cutoff]
if not recent_calls:
return {"message": "No recent calls"}
# Tool usage counts
tool_counts = Counter([c['tool'] for c in recent_calls])
# Success rates
success_rates = {}
for tool in tool_counts:
tool_calls = [c for c in recent_calls if c['tool'] == tool]
successes = sum(1 for c in tool_calls if c['success'])
success_rates[tool] = (successes / len(tool_calls)) * 100
# Performance metrics
durations = defaultdict(list)
for call in recent_calls:
if call['success']:
durations[call['tool']].append(call['duration'])
performance = {}
for tool, times in durations.items():
performance[tool] = {
'avg_duration': statistics.mean(times),
'min_duration': min(times),
'max_duration': max(times),
'p95_duration': statistics.quantiles(times, n=20)[18] if len(times) > 20 else max(times)
}
# Most common errors
error_counts = Counter([e['error'] for e in self.errors if e['timestamp'] > cutoff])
return {
'time_window_hours': time_window_hours,
'total_calls': len(recent_calls),
'tool_usage': dict(tool_counts),
'success_rates': success_rates,
'performance': performance,
'top_errors': dict(error_counts.most_common(5))
}
def get_slow_tools(self, threshold_seconds: float = 1.0) -> List[str]:
"""Identify slow tools"""
durations = defaultdict(list)
for call in self.calls:
if call['success']:
durations[call['tool']].append(call['duration'])
slow_tools = []
for tool, times in durations.items():
avg = statistics.mean(times)
if avg > threshold_seconds:
slow_tools.append((tool, avg))
return sorted(slow_tools, key=lambda x: x[1], reverse=True)
def get_unreliable_tools(self, min_failure_rate: float = 0.1) -> List[str]:
"""Identify unreliable tools"""
tool_stats = defaultdict(lambda: {'total': 0, 'failures': 0})
for call in self.calls:
tool_stats[call['tool']]['total'] += 1
if not call['success']:
tool_stats[call['tool']]['failures'] += 1
unreliable = []
for tool, stats in tool_stats.items():
failure_rate = stats['failures'] / stats['total']
if failure_rate >= min_failure_rate:
unreliable.append((tool, failure_rate))
return sorted(unreliable, key=lambda x: x[1], reverse=True)
# Usage
analytics = ToolAnalytics()
# Tools log their calls
analytics.log_call("WebSearch", duration=0.5, success=True, user="agent-1")
analytics.log_call("DatabaseQuery", duration=2.3, success=True, user="agent-1")
analytics.log_call("SendEmail", duration=0.8, success=False, error="SMTP timeout", user="agent-2")
# Get insights
summary = analytics.get_summary(time_window_hours=24)
print(json.dumps(summary, indent=2, default=str))
slow = analytics.get_slow_tools(threshold_seconds=2.0)
print(f"\nSlow tools: {slow}")
unreliable = analytics.get_unreliable_tools(min_failure_rate=0.2)
print(f"Unreliable tools: {unreliable}")
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class ToolHealth:
"""Health status of a tool"""
tool_name: str
status: HealthStatus
success_rate: float
avg_duration: float
recent_errors: int
last_success: Optional[datetime]
message: str
class ToolHealthMonitor:
"""Monitor tool health in real-time"""
def __init__(self, analytics: ToolAnalytics):
self.analytics = analytics
def check_tool_health(self, tool_name: str, lookback_minutes: int = 15) -> ToolHealth:
"""Check health of a specific tool"""
cutoff = datetime.now() - timedelta(minutes=lookback_minutes)
# Get recent calls for this tool
recent_calls = [
c for c in self.analytics.calls
if c['tool'] == tool_name and c['timestamp'] > cutoff
]
if not recent_calls:
return ToolHealth(
tool_name=tool_name,
status=HealthStatus.DEGRADED,
success_rate=0,
avg_duration=0,
recent_errors=0,
last_success=None,
message="No recent activity"
)
# Calculate metrics
successes = [c for c in recent_calls if c['success']]
failures = [c for c in recent_calls if not c['success']]
success_rate = len(successes) / len(recent_calls)
avg_duration = statistics.mean([c['duration'] for c in successes]) if successes else 0
recent_errors = len(failures)
last_success = max([c['timestamp'] for c in successes]) if successes else None
# Determine status
status = self._determine_status(success_rate, avg_duration, recent_errors)
message = self._generate_message(status, success_rate, avg_duration, recent_errors)
return ToolHealth(
tool_name=tool_name,
status=status,
success_rate=success_rate,
avg_duration=avg_duration,
recent_errors=recent_errors,
last_success=last_success,
message=message
)
def _determine_status(self, success_rate: float, avg_duration: float, errors: int) -> HealthStatus:
"""Determine health status from metrics"""
# Unhealthy: low success rate or too many errors
if success_rate < 0.7 or errors > 10:
return HealthStatus.UNHEALTHY
# Degraded: moderate issues
if success_rate < 0.9 or avg_duration > 3.0 or errors > 5:
return HealthStatus.DEGRADED
# Healthy
return HealthStatus.HEALTHY
def _generate_message(self, status: HealthStatus, success_rate: float,
avg_duration: float, errors: int) -> str:
"""Generate human-readable message"""
if status == HealthStatus.HEALTHY:
return f"Operating normally ({success_rate:.1%} success, {avg_duration:.2f}s avg)"
elif status == HealthStatus.DEGRADED:
issues = []
if success_rate < 0.9:
issues.append(f"success rate {success_rate:.1%}")
if avg_duration > 3.0:
issues.append(f"slow ({avg_duration:.2f}s)")
if errors > 5:
issues.append(f"{errors} recent errors")
return f"Degraded: {', '.join(issues)}"
else:
return f"Unhealthy: {success_rate:.1%} success, {errors} errors"
def get_dashboard(self) -> Dict[str, ToolHealth]:
"""Get health status for all tools"""
# Get unique tools
tools = set([c['tool'] for c in self.analytics.calls])
dashboard = {}
for tool in tools:
dashboard[tool] = self.check_tool_health(tool)
return dashboard
def print_dashboard(self):
"""Print formatted dashboard"""
dashboard = self.get_dashboard()
print("\n" + "="*70)
print("TOOL HEALTH DASHBOARD")
print("="*70 + "\n")
# Group by status
for status in HealthStatus:
tools = [t for t, h in dashboard.items() if h.status == status]
if tools:
print(f"{status.value.upper()}: {len(tools)} tools")
for tool in tools:
health = dashboard[tool]
print(f" • {tool}: {health.message}")
print()
# Usage
monitor = ToolHealthMonitor(analytics)
health = monitor.check_tool_health("WebSearch")
print(f"{health.tool_name}: {health.status.value} - {health.message}")
# Full dashboard
monitor.print_dashboard()
Essential practices for production tool development:
Each tool should do one thing well. Avoid "Swiss Army knife" tools that try to do everything.
Write tool descriptions from the agent's perspective. The LLM must understand when to use the tool.
Validate all inputs before execution. Return clear error messages, not exceptions.
Handle errors gracefully. Return informative error messages agents can act on.
Never let tools run forever. Set reasonable timeouts for all operations.
Log every tool call with inputs, outputs, duration, and errors. Essential for debugging.
Type hints help agents understand expected inputs and improve reliability.
When changing tool behavior, version them (v1, v2). Old agents can continue using old versions.
Test tools with invalid inputs, network failures, timeouts, and edge cases.
Let's build a production-ready tool suite for a customer support agent:
"""
Complete customer support tool suite
"""
from typing import Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
# Tool infrastructure (from previous sections)
# ... (CachedTool, LoggedTool, ProtectedTool, ToolAnalytics, etc.)
# ============== Customer Support Tools ==============
class CustomerLookupTool(CachedTool, LoggedTool):
"""Look up customer info with caching and logging"""
def execute(self, email: str) -> Dict[str, Any]:
"""Find customer by email"""
# Simulate database lookup
customers_db = {
"alice@example.com": {
"id": 1,
"name": "Alice Johnson",
"status": "premium",
"joined": "2023-01-15"
},
"bob@example.com": {
"id": 2,
"name": "Bob Smith",
"status": "standard",
"joined": "2023-06-20"
}
}
customer = customers_db.get(email)
if not customer:
return {"error": f"Customer not found: {email}"}
return {"success": True, "customer": customer}
@with_retry(max_retries=3)
def get_order_history(customer_id: int, limit: int = 10) -> Dict[str, Any]:
"""Get customer order history with retries"""
# Simulate API call that might fail
orders = [
{"id": 101, "date": "2024-01-15", "total": 89.99, "status": "delivered"},
{"id": 102, "date": "2024-02-01", "total": 149.50, "status": "delivered"},
]
return {"success": True, "orders": orders[:limit]}
class RefundTool(ProtectedTool):
"""Process refunds (requires admin permission)"""
def execute(self, order_id: int, amount: float, reason: str) -> str:
# Validate
if amount <= 0:
return "Error: Refund amount must be positive"
if amount > 10000:
return "Error: Refunds over $10,000 require manual approval"
# Process refund (simulate)
self._audit("Refund", f"Order {order_id}: ${amount} - {reason}")
return f"Refund of ${amount:.2f} processed for order {order_id}"
class EmailTool:
"""Send emails with rate limiting"""
def __init__(self):
self.rate_limiter = RateLimiter(max_calls=10, window_seconds=60)
@rate_limit(max_calls=10, time_window=60)
def send(self, to: str, subject: str, body: str) -> str:
"""Send email (max 10/minute)"""
# Simulate email send
print(f"Email sent to {to}: {subject}")
return f"Email sent to {to}"
# ============== Unified Support Agent System ==============
class CustomerSupportAgent:
"""Complete customer support agent with all tools"""
def __init__(self):
# Initialize tools
self.lookup = CustomerLookupTool(cache_ttl_seconds=300)
self.orders = LoggedTool("GetOrders", get_order_history, "Get order history")
self.refund = RefundTool("ProcessRefund", self._process_refund, Permission.ADMIN)
self.email = EmailTool()
# Initialize analytics and monitoring
self.analytics = ToolAnalytics()
self.monitor = ToolHealthMonitor(self.analytics)
self.logger = logging.getLogger("CustomerSupportAgent")
def _process_refund(self, order_id: int, amount: float, reason: str) -> str:
"""Refund implementation"""
return f"Refund ${amount} for order {order_id}: {reason}"
def handle_request(self, request: str, user: User) -> str:
"""Handle customer support request"""
self.logger.info(f"Handling request: {request}")
# Simple rule-based routing (in production, use LLM)
if "refund" in request.lower():
return self._handle_refund(request, user)
elif "order" in request.lower():
return self._handle_order_inquiry(request, user)
else:
return "I can help with orders and refunds. Please specify your request."
def _handle_refund(self, request: str, user: User) -> str:
"""Handle refund request"""
# Extract details (simplified)
order_id = 101 # Would parse from request
amount = 89.99
start = datetime.now()
try:
result = self.refund.execute(user, order_id, amount, "Customer request")
duration = (datetime.now() - start).total_seconds()
self.analytics.log_call("ProcessRefund", duration, True, user.username)
return result
except Exception as e:
duration = (datetime.now() - start).total_seconds()
self.analytics.log_call("ProcessRefund", duration, False, user.username, str(e))
return f"Refund failed: {str(e)}"
def get_system_health(self) -> Dict[str, Any]:
"""Get complete system health report"""
return {
"analytics": self.analytics.get_summary(time_window_hours=24),
"tool_health": {
tool: health.__dict__
for tool, health in self.monitor.get_dashboard().items()
},
"slow_tools": self.analytics.get_slow_tools(),
"unreliable_tools": self.analytics.get_unreliable_tools()
}
# ============== Usage ==============
# Create agent
agent = CustomerSupportAgent()
# Create users
admin_user = User("admin", Role.ADMIN)
viewer_user = User("viewer", Role.VIEWER)
# Handle requests
print(agent.handle_request("I need a refund for order 101", admin_user))
print(agent.handle_request("I need a refund for order 101", viewer_user))
# Get health report
import json
health = agent.get_system_health()
print(json.dumps(health, indent=2, default=str))
Q1: What is the primary purpose of agent tools?
Q2: Which of the following is a best practice when designing agent tools?
Q3: What is function calling in the context of AI agents?
Q4: Why is error handling critical in agent tools?
Q5: What is the advantage of using tool schemas (like JSON Schema)?