๐Ÿ“œ Free Certificate Upon Completion - Earn a verified certificate when you complete all 7 modules in the AI Agents course.

Multi-Agent Systems

๐Ÿ“š Tutorial 5 ๐Ÿ”ด Advanced

Build teams of agents that communicate, collaborate, and coordinate to solve complex problems

๐ŸŽ“ Complete all tutorials to earn your Free AI Agents Certificate
Shareable on LinkedIn โ€ข Verified by AITutorials.site โ€ข No signup fee

Why Multi-Agent Systems?

One agent is powerful. A team of agents is exponentially more powerful. Multi-agent systems enable:

  • Specialization: Each agent focuses on their expertise
  • Parallelization: Multiple agents work simultaneously
  • Resilience: If one agent fails, others continue
  • Knowledge synthesis: Combining perspectives leads to better decisions
  • Scalability: Complex problems decomposed across agents
Key Insight: Multi-agent systems work best when agents have clear roles, can communicate effectively, and have shared goals or complementary objectives.

Multi-Agent Architectures

Different architectures serve different purposes:

1. Hierarchical Architecture

Coordinator Agent (Boss)

โ”œโ”€ Researcher Agent (Specialist)

โ”œโ”€ Analyst Agent (Specialist)

โ””โ”€ Writer Agent (Specialist)

Coordinator delegates tasks, aggregates results, makes final decisions. Used in CrewAI.

2. Peer-to-Peer Architecture

Agent A โ†” Agent B โ†” Agent C

Agents communicate directly with each other. Good for collaborative problem-solving, debate, negotiation.

3. Blackboard Architecture

Shared Workspace (Blackboard)

Agent1 โ†” Agent2 โ†” Agent3

Agents write/read from shared state. Useful for collaborative knowledge building and complex reasoning.

Agent Communication Patterns

Message Passing System

Complete Message Passing Implementation
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import queue
import threading

class MessagePriority(Enum):
    LOW = 1
    NORMAL = 2
    HIGH = 3
    URGENT = 4

@dataclass
class Message:
    """Message between agents"""
    from_agent: str
    to_agent: str
    content: Any
    priority: MessagePriority
    timestamp: datetime
    reply_to: Optional[str] = None
    requires_response: bool = False
    
class MultiAgentCommunicationSystem:
    """Advanced multi-agent message passing"""
    
    def __init__(self):
        self.agents = {}
        self.message_queues = {}  # Per-agent message queues
        self.message_history = []
        self.lock = threading.Lock()
    
    def register_agent(self, agent_id: str, agent):
        """Register agent in the system"""
        with self.lock:
            self.agents[agent_id] = agent
            self.message_queues[agent_id] = queue.PriorityQueue()
    
    def send_message(self, from_agent: str, to_agent: str, 
                    content: Any, priority: MessagePriority = MessagePriority.NORMAL,
                    requires_response: bool = False):
        """Send message to another agent"""
        if to_agent not in self.agents:
            raise ValueError(f"Agent {to_agent} not found")
        
        message = Message(
            from_agent=from_agent,
            to_agent=to_agent,
            content=content,
            priority=priority,
            timestamp=datetime.now(),
            requires_response=requires_response
        )
        
        # Add to recipient's queue (higher priority = lower number for queue)
        priority_value = 5 - priority.value
        self.message_queues[to_agent].put((priority_value, message))
        
        # Store in history
        self.message_history.append(message)
        
        return message
    
    def broadcast(self, from_agent: str, content: Any, 
                  priority: MessagePriority = MessagePriority.NORMAL):
        """Broadcast message to all agents"""
        for agent_id in self.agents:
            if agent_id != from_agent:
                self.send_message(from_agent, agent_id, content, priority)
    
    def get_message(self, agent_id: str, timeout: float = None) -> Optional[Message]:
        """Get next message for agent"""
        try:
            if timeout:
                priority, message = self.message_queues[agent_id].get(timeout=timeout)
            else:
                priority, message = self.message_queues[agent_id].get_nowait()
            return message
        except queue.Empty:
            return None
    
    def reply(self, original_message: Message, response: Any):
        """Reply to a message"""
        return self.send_message(
            from_agent=original_message.to_agent,
            to_agent=original_message.from_agent,
            content=response,
            priority=original_message.priority
        )
    
    def get_conversation(self, agent1: str, agent2: str) -> List[Message]:
        """Get conversation history between two agents"""
        return [
            msg for msg in self.message_history
            if (msg.from_agent == agent1 and msg.to_agent == agent2) or
               (msg.from_agent == agent2 and msg.to_agent == agent1)
        ]

# Usage Example
comm_system = MultiAgentCommunicationSystem()
comm_system.register_agent("researcher", researcher_agent)
comm_system.register_agent("writer", writer_agent)

# Send messages
comm_system.send_message(
    "researcher", 
    "writer",
    {"findings": "5 key papers on transformers", "confidence": 0.9},
    priority=MessagePriority.HIGH,
    requires_response=True
)

# Writer receives and processes
message = comm_system.get_message("writer")
if message:
    print(f"From {message.from_agent}: {message.content}")
    
    # Reply
    comm_system.reply(message, {"status": "received", "eta": "2 hours"})

Shared Workspace (Blackboard)

Production Blackboard System
from typing import Dict, List, Callable, Any
import threading
from datetime import datetime

class BlackboardEvent(Enum):
    CREATED = "created"
    UPDATED = "updated"
    DELETED = "deleted"

@dataclass
class BlackboardEntry:
    """Entry on blackboard"""
    topic: str
    data: Any
    author: str
    timestamp: datetime
    version: int = 1

class Blackboard:
    """Production-ready shared knowledge workspace"""
    
    def __init__(self):
        self.data: Dict[str, BlackboardEntry] = {}
        self.subscribers: Dict[str, List[Callable]] = {}
        self.history: List[tuple] = []  # (event, entry)
        self.lock = threading.RLock()
    
    def put(self, topic: str, data: Any, author: str):
        """Write to blackboard"""
        with self.lock:
            if topic in self.data:
                # Update existing
                old_entry = self.data[topic]
                entry = BlackboardEntry(
                    topic=topic,
                    data=data,
                    author=author,
                    timestamp=datetime.now(),
                    version=old_entry.version + 1
                )
                event = BlackboardEvent.UPDATED
            else:
                # Create new
                entry = BlackboardEntry(
                    topic=topic,
                    data=data,
                    author=author,
                    timestamp=datetime.now()
                )
                event = BlackboardEvent.CREATED
            
            self.data[topic] = entry
            self.history.append((event, entry))
            
            # Notify subscribers
            self._notify_subscribers(topic, event, entry)
    
    def get(self, topic: str) -> Optional[BlackboardEntry]:
        """Read from blackboard"""
        with self.lock:
            return self.data.get(topic)
    
    def get_all(self) -> Dict[str, BlackboardEntry]:
        """Get all entries"""
        with self.lock:
            return self.data.copy()
    
    def delete(self, topic: str, author: str):
        """Delete entry"""
        with self.lock:
            if topic in self.data:
                entry = self.data[topic]
                del self.data[topic]
                self.history.append((BlackboardEvent.DELETED, entry))
                self._notify_subscribers(topic, BlackboardEvent.DELETED, entry)
    
    def subscribe(self, topic: str, callback: Callable):
        """Subscribe to topic changes"""
        with self.lock:
            if topic not in self.subscribers:
                self.subscribers[topic] = []
            self.subscribers[topic].append(callback)
    
    def query(self, predicate: Callable[[BlackboardEntry], bool]) -> List[BlackboardEntry]:
        """Query entries matching predicate"""
        with self.lock:
            return [entry for entry in self.data.values() if predicate(entry)]
    
    def get_history(self, topic: str = None) -> List[tuple]:
        """Get history for topic or all"""
        with self.lock:
            if topic:
                return [(event, entry) for event, entry in self.history 
                       if entry.topic == topic]
            return self.history.copy()
    
    def _notify_subscribers(self, topic: str, event: BlackboardEvent, entry: BlackboardEntry):
        """Notify subscribers of changes"""
        for callback in self.subscribers.get(topic, []):
            try:
                callback(event, entry)
            except Exception as e:
                print(f"Subscriber notification failed: {e}")

# Usage Example
blackboard = Blackboard()

# Agent writes to blackboard
blackboard.put(
    "research_findings",
    {"papers": ["paper1", "paper2"], "key_concepts": ["attention", "transformers"]},
    author="researcher_agent"
)

# Agent subscribes to updates
def on_findings_update(event, entry):
    print(f"Findings {event.value}: {entry.data}")

blackboard.subscribe("research_findings", on_findings_update)

# Another agent reads
findings = blackboard.get("research_findings")
print(f"Found {len(findings.data['papers'])} papers")

# Query all entries by specific agent
agent_entries = blackboard.query(lambda e: e.author == "researcher_agent")
print(f"Researcher has {len(agent_entries)} entries")

Pub-Sub Communication

Publish-Subscribe System
from typing import Dict, Set, Callable
import threading

class PubSubSystem:
    """Publish-subscribe for agent communication"""
    
    def __init__(self):
        self.topics: Dict[str, Set[Callable]] = {}
        self.lock = threading.Lock()
    
    def subscribe(self, topic: str, callback: Callable):
        """Subscribe to topic"""
        with self.lock:
            if topic not in self.topics:
                self.topics[topic] = set()
            self.topics[topic].add(callback)
    
    def unsubscribe(self, topic: str, callback: Callable):
        """Unsubscribe from topic"""
        with self.lock:
            if topic in self.topics:
                self.topics[topic].discard(callback)
    
    def publish(self, topic: str, data: Any):
        """Publish to topic"""
        with self.lock:
            subscribers = self.topics.get(topic, set()).copy()
        
        # Notify outside lock to prevent deadlocks
        for callback in subscribers:
            try:
                callback(data)
            except Exception as e:
                print(f"Subscriber error: {e}")
    
    def get_topics(self) -> List[str]:
        """Get all topics"""
        with self.lock:
            return list(self.topics.keys())

# Usage
pubsub = PubSubSystem()

# Agents subscribe
def researcher_handler(data):
    print(f"Researcher received: {data}")

def writer_handler(data):
    print(f"Writer received: {data}")

pubsub.subscribe("research_updates", researcher_handler)
pubsub.subscribe("research_updates", writer_handler)

# Publish event
pubsub.publish("research_updates", {
    "event": "new_paper_found",
    "paper": "Attention Is All You Need"
})

Coordination Strategies

1. Hierarchical Coordination (Manager-Worker)

Complete Hierarchical System
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    ASSIGNED = "assigned"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Task:
    id: str
    description: str
    required_skills: List[str]
    status: TaskStatus
    assigned_to: Optional[str] = None
    result: Any = None

@dataclass
class AgentCapabilities:
    agent_id: str
    skills: List[str]
    current_load: int = 0
    max_load: int = 3

class HierarchicalCoordinator:
    """Manager agent that coordinates worker agents"""
    
    def __init__(self, llm):
        self.llm = llm
        self.workers: Dict[str, AgentCapabilities] = {}
        self.tasks: Dict[str, Task] = {}
        self.communication = MultiAgentCommunicationSystem()
    
    def register_worker(self, agent_id: str, skills: List[str], max_load: int = 3):
        """Register worker agent"""
        self.workers[agent_id] = AgentCapabilities(agent_id, skills, 0, max_load)
        self.communication.register_agent(agent_id, None)
    
    def submit_task(self, task: Task) -> str:
        """Submit task for execution"""
        self.tasks[task.id] = task
        
        # Find suitable worker
        worker = self._find_best_worker(task.required_skills)
        
        if worker:
            return self._assign_task(task, worker)
        else:
            task.status = TaskStatus.PENDING
            return f"Task {task.id} queued (no available workers)"
    
    def _find_best_worker(self, required_skills: List[str]) -> Optional[str]:
        """Find best worker for task"""
        candidates = []
        
        for agent_id, caps in self.workers.items():
            # Check if agent has required skills
            if all(skill in caps.skills for skill in required_skills):
                # Check if agent has capacity
                if caps.current_load < caps.max_load:
                    # Score by workload (prefer less loaded)
                    score = caps.max_load - caps.current_load
                    candidates.append((score, agent_id))
        
        if candidates:
            # Return least loaded agent
            candidates.sort(reverse=True)
            return candidates[0][1]
        
        return None
    
    def _assign_task(self, task: Task, worker_id: str) -> str:
        """Assign task to worker"""
        task.assigned_to = worker_id
        task.status = TaskStatus.ASSIGNED
        
        # Update worker load
        self.workers[worker_id].current_load += 1
        
        # Send task to worker
        self.communication.send_message(
            "coordinator",
            worker_id,
            {
                "task_id": task.id,
                "description": task.description,
                "action": "execute"
            },
            priority=MessagePriority.HIGH,
            requires_response=True
        )
        
        return f"Task {task.id} assigned to {worker_id}"
    
    def receive_result(self, worker_id: str, task_id: str, result: Any):
        """Receive result from worker"""
        task = self.tasks[task_id]
        task.result = result
        task.status = TaskStatus.COMPLETED
        
        # Update worker load
        self.workers[worker_id].current_load -= 1
        
        # Check if pending tasks can now be assigned
        self._process_pending_tasks()
    
    def _process_pending_tasks(self):
        """Assign pending tasks if workers available"""
        pending = [t for t in self.tasks.values() if t.status == TaskStatus.PENDING]
        
        for task in pending:
            worker = self._find_best_worker(task.required_skills)
            if worker:
                self._assign_task(task, worker)
    
    def get_status(self) -> Dict[str, Any]:
        """Get system status"""
        return {
            "workers": {
                agent_id: {
                    "skills": caps.skills,
                    "load": f"{caps.current_load}/{caps.max_load}"
                }
                for agent_id, caps in self.workers.items()
            },
            "tasks": {
                task_id: {
                    "status": task.status.value,
                    "assigned_to": task.assigned_to
                }
                for task_id, task in self.tasks.items()
            }
        }

# Usage
coordinator = HierarchicalCoordinator(llm=ChatOpenAI())

# Register workers
coordinator.register_worker("researcher", skills=["research", "analysis"], max_load=2)
coordinator.register_worker("writer", skills=["writing", "editing"], max_load=3)
coordinator.register_worker("coder", skills=["coding", "testing"], max_load=1)

# Submit tasks
task1 = Task("t1", "Research AI agents", ["research"], TaskStatus.PENDING)
task2 = Task("t2", "Write blog post", ["writing"], TaskStatus.PENDING)
task3 = Task("t3", "Code example", ["coding"], TaskStatus.PENDING)

print(coordinator.submit_task(task1))  # Assigned to researcher
print(coordinator.submit_task(task2))  # Assigned to writer
print(coordinator.submit_task(task3))  # Assigned to coder

# Check status
import json
print(json.dumps(coordinator.get_status(), indent=2))

2. Consensus and Voting Systems

Multi-Agent Voting
from collections import Counter
from typing import List, Any, Callable

class VotingSystem:
    """Consensus building through voting"""
    
    def __init__(self, agents: List[Any]):
        self.agents = agents
    
    def majority_vote(self, question: str) -> Dict[str, Any]:
        """Simple majority voting"""
        votes = []
        
        for agent in self.agents:
            vote = agent.decide(question)
            votes.append(vote)
        
        # Count votes
        vote_counts = Counter(votes)
        winner = vote_counts.most_common(1)[0]
        
        return {
            "decision": winner[0],
            "votes": dict(vote_counts),
            "confidence": winner[1] / len(votes)
        }
    
    def weighted_vote(self, question: str, weights: Dict[str, float]) -> Dict[str, Any]:
        """Weighted voting (e.g., by expertise)"""
        weighted_votes = {}
        
        for agent in self.agents:
            vote = agent.decide(question)
            weight = weights.get(agent.id, 1.0)
            
            if vote not in weighted_votes:
                weighted_votes[vote] = 0
            weighted_votes[vote] += weight
        
        winner = max(weighted_votes.items(), key=lambda x: x[1])
        
        return {
            "decision": winner[0],
            "weighted_votes": weighted_votes,
            "total_weight": winner[1]
        }
    
    def consensus_with_discussion(self, question: str, max_rounds: int = 3) -> Dict[str, Any]:
        """Iterative consensus building"""
        
        for round_num in range(max_rounds):
            # Each agent proposes
            proposals = []
            for agent in self.agents:
                proposal = agent.propose(question)
                proposals.append((agent.id, proposal))
            
            # Share proposals for discussion
            for agent in self.agents:
                agent.see_proposals(proposals)
            
            # Vote
            votes = [agent.vote(proposals) for agent in self.agents]
            vote_counts = Counter(votes)
            
            # Check for consensus (e.g., 80% agreement)
            if vote_counts.most_common(1)[0][1] / len(votes) >= 0.8:
                return {
                    "consensus": True,
                    "decision": vote_counts.most_common(1)[0][0],
                    "rounds": round_num + 1
                }
        
        # No consensus reached
        return {
            "consensus": False,
            "decision": vote_counts.most_common(1)[0][0],
            "rounds": max_rounds,
            "note": "Fallback to plurality"
        }

# Usage
voting = VotingSystem(agents=[agent1, agent2, agent3])

# Simple vote
result = voting.majority_vote("Should we use LangChain or custom framework?")
print(f"Decision: {result['decision']} (confidence: {result['confidence']:.0%})")

# Weighted vote (expert opinions count more)
weights = {"expert_agent": 2.0, "junior_agent": 0.5}
result = voting.weighted_vote("Technical architecture decision", weights)
print(f"Decision: {result['decision']}")

3. Negotiation Protocol

Agent Negotiation
class NegotiationProtocol:
    """Agents negotiate to reach agreements"""
    
    def __init__(self, agents: List[Any]):
        self.agents = agents
        self.negotiation_history = []
    
    def negotiate(self, issue: str, max_rounds: int = 5) -> Dict[str, Any]:
        """Multi-agent negotiation"""
        
        # Initial proposals
        proposals = {}
        for agent in self.agents:
            proposals[agent.id] = agent.initial_proposal(issue)
        
        for round_num in range(max_rounds):
            # Each agent responds to others' proposals
            new_proposals = {}
            
            for agent in self.agents:
                # Agent sees all proposals
                response = agent.respond_to_proposals(proposals, issue)
                new_proposals[agent.id] = response
                
                # Track history
                self.negotiation_history.append({
                    "round": round_num,
                    "agent": agent.id,
                    "proposal": response
                })
            
            # Check if proposals converged
            if self._check_convergence(new_proposals):
                return {
                    "success": True,
                    "agreement": self._extract_agreement(new_proposals),
                    "rounds": round_num + 1
                }
            
            proposals = new_proposals
        
        # No agreement
        return {
            "success": False,
            "final_proposals": proposals,
            "rounds": max_rounds
        }
    
    def _check_convergence(self, proposals: Dict[str, Any]) -> bool:
        """Check if proposals have converged"""
        # Simple check: all proposals similar
        values = list(proposals.values())
        if not values:
            return False
        
        # Compare all to first
        first = values[0]
        return all(self._similar(first, v) for v in values[1:])
    
    def _similar(self, prop1: Any, prop2: Any, threshold: float = 0.9) -> bool:
        """Check if two proposals are similar"""
        # Implementation depends on proposal format
        # Could compare numerical values, text similarity, etc.
        return True  # Simplified
    
    def _extract_agreement(self, proposals: Dict[str, Any]) -> Any:
        """Extract agreed-upon solution"""
        # Average, median, or consensus of converged proposals
        return list(proposals.values())[0]

# Usage
negotiation = NegotiationProtocol([buyer_agent, seller_agent])
result = negotiation.negotiate("price_for_service")

if result["success"]:
    print(f"Agreement reached: {result['agreement']}")
else:
    print("No agreement")

4. Auction-Based Task Allocation

Task Auction System
@dataclass
class Bid:
    agent_id: str
    price: float
    estimated_time: float
    quality_score: float

class TaskAuctionSystem:
    """Allocate tasks through bidding"""
    
    def __init__(self):
        self.agents = []
    
    def auction_task(self, task_description: str) -> Dict[str, Any]:
        """Auction task to agents"""
        
        # Request bids
        bids: List[Bid] = []
        for agent in self.agents:
            bid = agent.bid_on_task(task_description)
            if bid:
                bids.append(bid)
        
        if not bids:
            return {"success": False, "reason": "No bids received"}
        
        # Evaluate bids (multi-criteria)
        best_bid = self._select_best_bid(bids)
        
        # Assign to winner
        return {
            "success": True,
            "winner": best_bid.agent_id,
            "bid": {
                "price": best_bid.price,
                "time": best_bid.estimated_time,
                "quality": best_bid.quality_score
            },
            "total_bids": len(bids)
        }
    
    def _select_best_bid(self, bids: List[Bid]) -> Bid:
        """Select best bid using scoring function"""
        
        def score_bid(bid: Bid) -> float:
            # Multi-criteria: price (lower better), time (lower better), 
            # quality (higher better)
            price_score = 1.0 / (bid.price + 1)
            time_score = 1.0 / (bid.estimated_time + 1)
            quality_score = bid.quality_score
            
            # Weighted combination
            return 0.3 * price_score + 0.3 * time_score + 0.4 * quality_score
        
        return max(bids, key=score_bid)

# Usage
auction = TaskAuctionSystem()
result = auction.auction_task("Analyze customer sentiment in 10K reviews")
print(f"Winner: {result['winner']} at ${result['bid']['price']}")

Emergent Behaviors

Multi-agent systems can exhibit behaviors not programmed into individual agents. These emerge from interactions:

  • Collaboration: Agents cooperate toward shared goals
  • Competition: Agents compete, driving improvement
  • Specialization: Agents naturally develop expertise areas
  • Error correction: Agents catch and fix each other's mistakes
  • Knowledge synthesis: Combining diverse perspectives yields insights
Example: Debate systems where agents argue different positions often find better solutions than single agents. The disagreement drives more thorough analysis.

Real-World Multi-Agent Applications

๐Ÿ“ฐ News Analysis

Multiple agents analyze stories from different angles (bias, factuality, impact)

๐ŸŽฏ Project Management

Agents for planning, tracking, risk management, communication

๐Ÿ’ก Ideation

Agents propose ideas from different perspectives, refine together

๐Ÿงช Software Development

Agents for requirements, design, coding, testing, documentation

Complete Multi-Agent System: Research Team

Let's build a complete research team with multiple agents:

research_team_system.py
"""
Complete multi-agent research team system
"""
from typing import List, Dict, Any
from dataclasses import dataclass

@dataclass
class ResearchAgent:
    """Individual research agent"""
    id: str
    role: str
    skills: List[str]
    llm: Any
    
    def research(self, topic: str, depth: str = "basic") -> Dict[str, Any]:
        """Conduct research on topic"""
        prompt = f"""
Role: {self.role}
Task: Research {topic} at {depth} depth
Skills: {', '.join(self.skills)}

Provide structured findings.
"""
        result = self.llm.predict(prompt)
        return {
            "agent": self.id,
            "topic": topic,
            "findings": result,
            "confidence": 0.85
        }
    
    def critique(self, content: str) -> str:
        """Critique another agent's work"""
        prompt = f"""
Role: {self.role}
Task: Critically review this content:
{content}

Provide constructive feedback.
"""
        return self.llm.predict(prompt)

class ResearchTeamSystem:
    """Multi-agent research team"""
    
    def __init__(self):
        self.blackboard = Blackboard()
        self.communication = MultiAgentCommunicationSystem()
        self.agents: Dict[str, ResearchAgent] = {}
        self.coordinator = None
    
    def setup_team(self):
        """Initialize research team"""
        from langchain_openai import ChatOpenAI
        llm = ChatOpenAI(model="gpt-4")
        
        # Create specialized agents
        self.agents = {
            "researcher": ResearchAgent(
                "researcher",
                "Research Specialist",
                ["literature_review", "data_gathering", "fact_checking"],
                llm
            ),
            "analyst": ResearchAgent(
                "analyst",
                "Data Analyst",
                ["data_analysis", "statistics", "pattern_recognition"],
                llm
            ),
            "critic": ResearchAgent(
                "critic",
                "Critical Reviewer",
                ["critical_thinking", "peer_review", "quality_assurance"],
                llm
            ),
            "synthesizer": ResearchAgent(
                "synthesizer",
                "Knowledge Synthesizer",
                ["synthesis", "summarization", "integration"],
                llm
            )
        }
        
        # Register with communication system
        for agent_id in self.agents:
            self.communication.register_agent(agent_id, self.agents[agent_id])
    
    def conduct_research(self, topic: str) -> Dict[str, Any]:
        """Conduct collaborative research"""
        
        # Phase 1: Parallel research
        print(f"Phase 1: Researching {topic}...")
        research_results = []
        
        # Researcher gathers initial data
        findings = self.agents["researcher"].research(topic, depth="comprehensive")
        self.blackboard.put("research_findings", findings, author="researcher")
        research_results.append(findings)
        
        # Analyst analyzes patterns
        analysis = self.agents["analyst"].research(topic, depth="analytical")
        self.blackboard.put("analysis", analysis, author="analyst")
        research_results.append(analysis)
        
        # Phase 2: Critical review
        print("Phase 2: Critical review...")
        all_findings = self.blackboard.get_all()
        critiques = []
        
        for entry in all_findings.values():
            critique = self.agents["critic"].critique(str(entry.data))
            critiques.append(critique)
            
            # Send feedback to original author
            self.communication.send_message(
                "critic",
                entry.author,
                {"feedback": critique},
                priority=MessagePriority.NORMAL
            )
        
        self.blackboard.put("critiques", critiques, author="critic")
        
        # Phase 3: Synthesis
        print("Phase 3: Synthesizing findings...")
        synthesis_prompt = f"""
Synthesize these research findings:

Findings: {research_results}
Critiques: {critiques}

Create a comprehensive, well-structured research summary.
"""
        final_report = self.agents["synthesizer"].llm.predict(synthesis_prompt)
        self.blackboard.put("final_report", final_report, author="synthesizer")
        
        # Compile results
        return {
            "topic": topic,
            "research_count": len(research_results),
            "critiques_count": len(critiques),
            "final_report": final_report,
            "collaboration_metrics": self._get_metrics()
        }
    
    def _get_metrics(self) -> Dict[str, Any]:
        """Get collaboration metrics"""
        return {
            "messages_sent": len(self.communication.message_history),
            "blackboard_entries": len(self.blackboard.get_all()),
            "agents_participated": len(self.agents)
        }

# Usage
team = ResearchTeamSystem()
team.setup_team()

result = team.conduct_research("Impact of Large Language Models on Software Development")

print(f"\n{'='*70}")
print("RESEARCH REPORT")
print(f"{'='*70}\n")
print(result["final_report"])
print(f"\n{'='*70}")
print(f"Collaboration Stats:")
print(f"  - Agents: {result['collaboration_metrics']['agents_participated']}")
print(f"  - Messages: {result['collaboration_metrics']['messages_sent']}")
print(f"  - Research phases: {result['research_count']}")
print(f"{'='*70}")

Multi-Agent Best Practices

1. Clear Roles

Define distinct roles and responsibilities for each agent. Avoid overlap.

2. Structured Communication

Use formal protocols. Don't let agents "chat freely" - structure interactions.

3. Prevent Conflicts

Handle race conditions, deadlocks, and conflicting updates to shared state.

4. Monitor Interactions

Log all messages and state changes. Essential for debugging multi-agent issues.

5. Limit Team Size

3-5 agents is ideal. Too many creates chaos. Scale horizontally if needed.

6. Test Interactions

Test agent interactions, not just individual agents. Emergent behaviors appear here.

Practical Exercise: Build a Debate System

Challenge: Build a multi-agent debate system where:
  • Two agents argue opposite sides of an issue
  • A moderator agent manages the debate
  • A judge agent evaluates arguments
  • Audience agents ask questions
  • System determines winner based on argument quality

Starter Code

debate_system.py
class DebateSystem:
    """Multi-agent debate system"""
    
    def __init__(self, topic: str, position_a: str, position_b: str):
        self.topic = topic
        self.debaters = {
            "pro": DebaterAgent("pro", position_a),
            "con": DebaterAgent("con", position_b)
        }
        self.moderator = ModeratorAgent()
        self.judge = JudgeAgent()
        self.transcript = []
    
    def run_debate(self, rounds: int = 3) -> Dict[str, Any]:
        """Run structured debate"""
        
        # Opening statements
        for side, debater in self.debaters.items():
            statement = debater.opening_statement(self.topic)
            self.transcript.append({
                "round": 0,
                "speaker": side,
                "type": "opening",
                "content": statement
            })
        
        # Debate rounds
        for round_num in range(1, rounds + 1):
            # Moderator poses question
            question = self.moderator.pose_question(self.topic, round_num)
            
            # Each debater responds
            for side, debater in self.debaters.items():
                # Debater sees opponent's previous arguments
                opponent_args = [
                    t["content"] for t in self.transcript 
                    if t["speaker"] != side
                ]
                
                response = debater.respond(question, opponent_args)
                self.transcript.append({
                    "round": round_num,
                    "speaker": side,
                    "type": "argument",
                    "content": response
                })
        
        # Closing statements
        for side, debater in self.debaters.items():
            closing = debater.closing_statement(self.transcript)
            self.transcript.append({
                "round": rounds + 1,
                "speaker": side,
                "type": "closing",
                "content": closing
            })
        
        # Judge evaluates
        verdict = self.judge.evaluate(self.transcript)
        
        return {
            "topic": self.topic,
            "winner": verdict["winner"],
            "reasoning": verdict["reasoning"],
            "scores": verdict["scores"],
            "transcript": self.transcript
        }

# Usage
debate = DebateSystem(
    topic="Should AI development be regulated by governments?",
    position_a="Yes, strict regulation is necessary",
    position_b="No, innovation should remain free"
)

result = debate.run_debate(rounds=3)
print(f"Winner: {result['winner']}")
print(f"Reasoning: {result['reasoning']}")

Key Takeaways

  • Multi-agent > single agent: Teams solve more complex problems through specialization
  • Architecture matters: Hierarchical, peer-to-peer, or blackboard - choose based on problem
  • Communication is critical: Message passing, shared state, or pub-sub - all need structure
  • Coordination strategies: Delegation, voting, negotiation, auctions - match to use case
  • Emergent behaviors: Systems exhibit capabilities beyond individual agents
  • Use frameworks: CrewAI, LangGraph, or custom systems for multi-agent orchestration
  • Monitor everything: Log messages, state changes, and agent interactions
  • Test interactions: Multi-agent bugs appear in interactions, not individual agents
  • Keep teams small: 3-5 agents optimal. More agents = more complexity

Test Your Knowledge

Q1: What is the key characteristic of multi-agent systems?

They use multiple LLMs simultaneously
They are always faster than single agents
Multiple autonomous agents collaborate, coordinate, or compete to achieve goals
They require less computational resources

Q2: In which architecture does a leader agent assign tasks and coordinate worker agents?

Peer-to-peer
Hierarchical
Blackboard
Distributed

Q3: What is the blackboard architecture pattern in multi-agent systems?

Agents directly message each other
One agent controls all others
Agents work in isolation
Agents read from and write to a shared knowledge space

Q4: Which coordination strategy involves agents bidding on tasks?

Auction-based coordination
Delegation
Voting
Hierarchical assignment

Q5: What is an "emergent behavior" in multi-agent systems?

A bug that needs to be fixed
Behavior that was explicitly programmed
Complex system capabilities that arise from agent interactions beyond individual agent abilities
Random behavior that should be avoided