_CORE
AI & Agentic Systems Core Information Systems Cloud & Platform Engineering Data Platform & Integration Security & Compliance QA, Testing & Observability IoT, Automation & Robotics Mobile & Digital Banking & Finance Insurance Public Administration Defense & Security Healthcare Energy & Utilities Telco & Media Manufacturing Logistics & E-commerce Retail & Loyalty
References Technologies Blog Know-how Tools
About Collaboration Careers
CS EN
Let's talk

Multi-Agent orchestrace

10. 11. 2024 4 min read intermediate

Multi-agent orchestration represents an advanced method of coordinating several AI agents based on large language models (LLM). This technology enables creating complex systems where various specialized agents collaborate on solving complex tasks.

What is Multi-Agent Orchestration

Multi-Agent orchestration represents coordination of multiple AI agents that collaborate on solving complex tasks. Each agent has specialized capabilities and together they form an intelligent system capable of processing tasks that a single agent couldn’t handle efficiently.

Key advantages of this approach include modularity, scalability, and parallel processing capability. Instead of one monolithic agent, we have an ecosystem of specialized components that can be independently developed and optimized.

Orchestration Architectures

Centralized Orchestration

In a centralized model, there exists a main orchestrator that coordinates the work of all agents. This approach is simpler to implement and provides better control over the entire workflow.

class CentralOrchestrator:
    def __init__(self):
        self.agents = {
            'analyzer': DataAnalyzerAgent(),
            'processor': DataProcessorAgent(),
            'formatter': OutputFormatterAgent()
        }

    async def execute_workflow(self, task):
        # 1. Data analysis
        analysis = await self.agents['analyzer'].analyze(task.data)

        # 2. Processing based on analysis
        processed = await self.agents['processor'].process(
            task.data, analysis.parameters
        )

        # 3. Output formatting
        return await self.agents['formatter'].format(processed)

Decentralized Orchestration

The decentralized model allows agents to communicate directly with each other using message passing or event-driven architecture. Each agent has its own decision logic about when and with whom to communicate.

class DecentralizedAgent:
    def __init__(self, agent_id, message_bus):
        self.id = agent_id
        self.message_bus = message_bus
        self.capabilities = set()

    async def handle_message(self, message):
        if self.can_handle(message.task_type):
            result = await self.process(message.payload)

            # Find next suitable agent
            next_agent = self.find_next_agent(message.workflow)
            if next_agent:
                await self.message_bus.send(next_agent, result)
        else:
            # Forward to another agent
            suitable_agent = self.find_suitable_agent(message.task_type)
            await self.message_bus.forward(suitable_agent, message)

Implementation Using LangChain

LangChain provides a robust framework for creating multi-agent systems. Here’s a practical example of coordinating agents for document analysis and processing:

from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import Tool
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI

class DocumentAnalysisOrchestrator:
    def __init__(self):
        self.llm = ChatOpenAI(temperature=0)
        self.setup_agents()

    def setup_agents(self):
        # Specialized document reader agent
        self.reader_agent = self.create_reader_agent()

        # Content analyzer agent  
        self.analyzer_agent = self.create_analyzer_agent()

        # Report generator agent
        self.generator_agent = self.create_generator_agent()

    def create_reader_agent(self):
        def read_document(file_path: str) -> str:
            """Extract text content from document"""
            # Implementation for various file types
            return extracted_text

        reader_tool = Tool(
            name="document_reader",
            description="Reads and extracts text from documents",
            func=read_document
        )

        return create_openai_functions_agent(
            self.llm, [reader_tool], 
            "You are specialized in reading and extracting document content."
        )

    async def orchestrate_analysis(self, document_path: str):
        # Step 1: Document reading
        content = await self.reader_agent.ainvoke({
            "input": f"Read document: {document_path}"
        })

        # Step 2: Content analysis
        analysis = await self.analyzer_agent.ainvoke({
            "input": f"Analyze this content: {content['output']}"
        })

        # Step 3: Report generation
        report = await self.generator_agent.ainvoke({
            "input": f"Generate report from: {analysis['output']}"
        })

        return report['output']

Communication Patterns Between Agents

Request-Response Pattern

The simplest pattern where one agent sends a request and waits for a response. Suitable for synchronous operations with clearly defined input and output.

class RequestResponseCommunication:
    async def request(self, target_agent, request_data, timeout=30):
        message = {
            'id': self.generate_message_id(),
            'sender': self.agent_id,
            'target': target_agent,
            'payload': request_data,
            'timestamp': time.time()
        }

        response = await asyncio.wait_for(
            self.send_and_wait(message), 
            timeout=timeout
        )

        return response.payload

Event-driven Communication

Agents publish events and others subscribe to relevant events. Enables asynchronous and loosely-coupled communication.

class EventDrivenAgent:
    def __init__(self, event_bus):
        self.event_bus = event_bus
        self.subscriptions = {}

    def subscribe(self, event_type, handler):
        if event_type not in self.subscriptions:
            self.subscriptions[event_type] = []
        self.subscriptions[event_type].append(handler)

    async def publish_event(self, event_type, data):
        event = {
            'type': event_type,
            'data': data,
            'timestamp': time.time(),
            'sender': self.agent_id
        }

        await self.event_bus.publish(event)

    async def handle_event(self, event):
        if event.type in self.subscriptions:
            for handler in self.subscriptions[event.type]:
                await handler(event.data)

State Management and Coordination

In multi-agent systems, it’s critical to maintain consistent state and coordinate access to shared resources. Redis or similar in-memory databases are often used for sharing state between agents.

import redis.asyncio as redis
import json

class SharedState:
    def __init__(self, redis_url="redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)

    async def set_workflow_state(self, workflow_id, state):
        await self.redis.hset(
            f"workflow:{workflow_id}", 
            "state", 
            json.dumps(state)
        )

    async def get_workflow_state(self, workflow_id):
        state_data = await self.redis.hget(
            f"workflow:{workflow_id}", 
            "state"
        )
        return json.loads(state_data) if state_data else None

    async def acquire_lock(self, resource_id, timeout=10):
        lock_key = f"lock:{resource_id}"
        acquired = await self.redis.set(
            lock_key, 
            self.agent_id, 
            nx=True, 
            ex=timeout
        )
        return acquired

Monitoring and Debugging

Multi-agent systems can be complex to debug. It’s important to implement comprehensive logging and metrics collection:

import structlog
from opentelemetry import trace

class AgentMonitoring:
    def __init__(self, agent_id):
        self.logger = structlog.get_logger()
        self.tracer = trace.get_tracer(__name__)
        self.agent_id = agent_id

    async def log_agent_interaction(self, interaction_type, target_agent, payload):
        with self.tracer.start_as_current_span("agent_interaction") as span:
            span.set_attribute("agent.source", self.agent_id)
            span.set_attribute("agent.target", target_agent)
            span.set_attribute("interaction.type", interaction_type)

            self.logger.info(
                "Agent interaction",
                source_agent=self.agent_id,
                target_agent=target_agent,
                interaction_type=interaction_type,
                payload_size=len(str(payload))
            )

Practical Use Cases

Multi-agent orchestration finds application in many areas:

  • Document processing pipelines - one agent extracts text, another analyzes sentiment, third generates summary
  • Data ETL workflows - specialized agents for extraction, transformation and loading
  • Customer service automation - routing agent determines category, specialized agent solves specific problem
  • Code review systems - different agents check style, security, performance and logic

Summary

Multi-Agent orchestration represents a powerful tool for creating scalable AI systems. The key to success is proper design of communication patterns, efficient state management, and robust monitoring. When implementing, it’s important to consider trade-offs between centralized and decentralized architecture based on specific project requirements. With growing complexity of AI applications, multi-agent approaches become necessary for maintaining modularity and possibility of independent development of individual components.

multi-agentorchestraceai agenti
Share:

CORE SYSTEMS tým

Stavíme core systémy a AI agenty, které drží provoz. 15 let zkušeností s enterprise IT.