Multi-Agent orchestrace
Multi-agent orchestrace představuje pokročilou metodu koordinace několika AI agentů založených na velkých jazykových modelech (LLM). Tato technologie umožňuje vytvářet komplexní systémy, kde různí specializovaní agenti spolupracují na řešení složitých úloh.
Co je Multi-Agent orchestrace
Multi-Agent orchestrace představuje koordinaci několika AI agentů, kteří spolupracují na řešení komplexních úloh. Každý agent má specializované schopnosti a společně tvoří inteligentní systém schopný zpracovat úkoly, které by jednotlivý agent nezvládl efektivně.
Klíčové výhody tohoto přístupu zahrnují modularitu, škálovatelnost a možnost paralelního zpracování. Místo jednoho monolitického agenta máme ekosystém specializovaných komponent, které mohou být nezávisle vyvíjeny a optimalizovány.
Architektury orchestrace
Centralizovaná orchestrace
V centralizovaném modelu existuje hlavní orchestrátor, který koordinuje práci všech agentů. Tento přístup je jednodušší na implementaci a poskytuje lepší kontrolu nad celým workflow.
class CentralOrchestrator:
def __init__(self):
self.agents = {
'analyzer': DataAnalyzerAgent(),
'processor': DataProcessorAgent(),
'formatter': OutputFormatterAgent()
}
async def execute_workflow(self, task):
# 1. Analýza dat
analysis = await self.agents['analyzer'].analyze(task.data)
# 2. Zpracování na základě analýzy
processed = await self.agents['processor'].process(
task.data, analysis.parameters
)
# 3. Formátování výstupu
return await self.agents['formatter'].format(processed)
Decentralizovaná orchestrace
Decentralizovaný model umožňuje agentům komunikovat přímo mezi sebou pomocí message passing nebo event-driven architektury. Každý agent má vlastní rozhodovací logiku o tom, kdy a s kým komunikovat.
class DecentralizedAgent:
def __init__(self, agent_id, message_bus):
self.id = agent_id
self.message_bus = message_bus
self.capabilities = set()
async def handle_message(self, message):
if self.can_handle(message.task_type):
result = await self.process(message.payload)
# Najdi dalšího vhodného agenta
next_agent = self.find_next_agent(message.workflow)
if next_agent:
await self.message_bus.send(next_agent, result)
else:
# Přeposlat jinému agentovi
suitable_agent = self.find_suitable_agent(message.task_type)
await self.message_bus.forward(suitable_agent, message)
Implementace s využitím LangChain
LangChain poskytuje robustní framework pro tvorbu multi-agent systémů. Zde je praktický příklad koordinace agentů pro analýzu a zpracování dokumentů:
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import Tool
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
class DocumentAnalysisOrchestrator:
def __init__(self):
self.llm = ChatOpenAI(temperature=0)
self.setup_agents()
def setup_agents(self):
# Specialized document reader agent
self.reader_agent = self.create_reader_agent()
# Content analyzer agent
self.analyzer_agent = self.create_analyzer_agent()
# Report generator agent
self.generator_agent = self.create_generator_agent()
def create_reader_agent(self):
def read_document(file_path: str) -> str:
"""Extract text content from document"""
# Implementation for various file types
return extracted_text
reader_tool = Tool(
name="document_reader",
description="Reads and extracts text from documents",
func=read_document
)
return create_openai_functions_agent(
self.llm, [reader_tool],
"You are specialized in reading and extracting document content."
)
async def orchestrate_analysis(self, document_path: str):
# Step 1: Document reading
content = await self.reader_agent.ainvoke({
"input": f"Read document: {document_path}"
})
# Step 2: Content analysis
analysis = await self.analyzer_agent.ainvoke({
"input": f"Analyze this content: {content['output']}"
})
# Step 3: Report generation
report = await self.generator_agent.ainvoke({
"input": f"Generate report from: {analysis['output']}"
})
return report['output']
Komunikační vzory mezi agenty
Request-Response pattern
Nejjednodušší vzor, kde jeden agent vyšle požadavek a čeká na odpověď. Vhodný pro synchronní operace s jasně definovaným vstupem a výstupem.
class RequestResponseCommunication:
async def request(self, target_agent, request_data, timeout=30):
message = {
'id': self.generate_message_id(),
'sender': self.agent_id,
'target': target_agent,
'payload': request_data,
'timestamp': time.time()
}
response = await asyncio.wait_for(
self.send_and_wait(message),
timeout=timeout
)
return response.payload
Event-driven communication
Agenti publikují události a ostatní se přihlašují k odběru relevantních událostí. Umožňuje asynchronní a loosely-coupled komunikaci.
class EventDrivenAgent:
def __init__(self, event_bus):
self.event_bus = event_bus
self.subscriptions = {}
def subscribe(self, event_type, handler):
if event_type not in self.subscriptions:
self.subscriptions[event_type] = []
self.subscriptions[event_type].append(handler)
async def publish_event(self, event_type, data):
event = {
'type': event_type,
'data': data,
'timestamp': time.time(),
'sender': self.agent_id
}
await self.event_bus.publish(event)
async def handle_event(self, event):
if event.type in self.subscriptions:
for handler in self.subscriptions[event.type]:
await handler(event.data)
Správa stavu a koordinace
V multi-agent systémech je kritické udržovat konzistentní stav a koordinovat přístup ke sdíleným zdrojům. Redis nebo podobné in-memory databáze jsou často využívány pro sdílení stavu mezi agenty.
import redis.asyncio as redis
import json
class SharedState:
def __init__(self, redis_url="redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
async def set_workflow_state(self, workflow_id, state):
await self.redis.hset(
f"workflow:{workflow_id}",
"state",
json.dumps(state)
)
async def get_workflow_state(self, workflow_id):
state_data = await self.redis.hget(
f"workflow:{workflow_id}",
"state"
)
return json.loads(state_data) if state_data else None
async def acquire_lock(self, resource_id, timeout=10):
lock_key = f"lock:{resource_id}"
acquired = await self.redis.set(
lock_key,
self.agent_id,
nx=True,
ex=timeout
)
return acquired
Monitoring a debugování
Multi-agent systémy mohou být složité na debugging. Je důležité implementovat comprehensive logging a metrics collection:
import structlog
from opentelemetry import trace
class AgentMonitoring:
def __init__(self, agent_id):
self.logger = structlog.get_logger()
self.tracer = trace.get_tracer(__name__)
self.agent_id = agent_id
async def log_agent_interaction(self, interaction_type, target_agent, payload):
with self.tracer.start_as_current_span("agent_interaction") as span:
span.set_attribute("agent.source", self.agent_id)
span.set_attribute("agent.target", target_agent)
span.set_attribute("interaction.type", interaction_type)
self.logger.info(
"Agent interaction",
source_agent=self.agent_id,
target_agent=target_agent,
interaction_type=interaction_type,
payload_size=len(str(payload))
)
Praktické případy použití
Multi-agent orchestrace najde uplatnění v mnoha oblastech:
- Document processing pipelines - jeden agent extrahuje text, druhý analyzuje sentiment, třetí generuje summary
- Data ETL workflows - specializované agenty pro extraction, transformation a loading
- Customer service automation - routing agent určí kategorii, specialized agent řeší konkrétní problém
- Code review systems - různí agenti kontrolují style, security, performance a logic
Shrnutí
Multi-Agent orchestrace představuje mocný nástroj pro tvorbu škálovatelných AI systémů. Klíčem k úspěchu je správný návrh komunikačních vzorů, efektivní správa stavu a robustní monitoring. Při implementaci je důležité zvážit trade-offy mezi centralizovanou a decentralizovanou architekturou podle konkrétních požadavků projektu. S rostoucí složitostí AI aplikací se multi-agent přístupy stávají nezbytností pro udržení modularity a možnosti nezávislého vývoje jednotlivých komponent.