title: "Анти-паттерны мульти-агентных систем: каскадные отказы, проблемы координации и безопасные архитектуры (2026)" slug: multi-agent-anti-patterns-cascades-2026-ru date: 2026-02-28 lang: ru
Анти-паттерны мульти-агентных систем: каскадные отказы, проблемы координации и безопасные архитектуры (2026)
Ключевые факты
-
Версии пакетов (актуальны на март 2026):
langchain==1.2.10(стабильный слой оркестрации)langgraph==1.0.10(stateful workflows для мульти-агентных систем)anthropic==0.84.0(SDK для Claude API)openai==2.24.0(SDK для GPT API)jsonschema==4.21.1(валидация состояния)httpx==0.27.0(асинхронный HTTP с поддержкой timeout)
-
Определение каскадного отказа: Когда отказ одного агента распространяется на зависимые нижестоящие агенты, которые обрабатывают повреждённый вывод как валидный ввод, создавая цепь отказов, в которой первопричина скрывается каждым последующим уровнем ошибки.
-
Реализация timeout для подграфов LangGraph: Используйте
asyncio.timeout(seconds)внутри функций узлов; LangGraph не имеет встроенного per-node timeout — должен быть реализован на уровне узла. Глобальный timeout черезawait graph.ainvoke(state, config={"timeout": 90.0})deprecated в пользуasync with asyncio.timeout(90.0)обёртки вокруг вызова invoke. -
Таксономия анти-паттернов (пять основных режимов отказа):
- Гиперорхестрация: Использование 10+ агентов для задач решаемых 1-2 агентами с надлежащим prompting
- Циклы агентов: Циклическая делегирование где Агент A → Агент B → Агент A образует неограниченную рекурсию
- Эскалация доверия: Нижестоящие агенты слепо доверяют выводам вышестоящих без валидации
- Утечка контекста: Агенты совместно используют mutable состояние, приводящее к race condition в параллельном исполнении
- Каскад стоимости: Экспоненциальный рост token/стоимости из nested agent вызовов (цепь глубины 5 = 3^5 = 243× базовая стоимость если каждый агент разветвляется 3-way)
-
Порог circuit breaker: Индустриальный стандарт — 3 последовательных отказа перед открытием цепи; время восстановления типично 30-60 секунд перед переходом в half-open состояние для retry.
-
Корневая причина anti-pattern общего состояния: Python async с
awaitсоздаёт windows context-switch где Агент A читаетstate["counter"], Агент B инкрементирует его, затем Агент A пишет stale значение — классический lost-update race condition несмотря на GIL защищающий memory corruption. -
Максимальная рекомендуемая глубина цепи агентов: Держите последовательные цепи агентов под 5 hops; глубже, coordination overhead превышает пользу, отладка становится экспоненциально сложнее, и latency плохо компилируется (5 агентов × 3 сек каждый = минимум 15 сек).
-
LangGraph recursion_limit по умолчанию: 25 итераций (configurable через
graph.compile(recursion_limit=N)); production системы должны устанавливать явный лимит на основе ожидаемой максимальной глубины × узлы на итерацию. -
Стоимость валидационного overhead: Валидация схемы через
jsonschema.validate()добавляет ~5-15ms per agent transition; negligible по сравнению с LLM call latency (500-3000ms) но значимо в high-throughput pipelines. -
Паттерн rate limiting для tool calls: Используйте
asyncio.Semaphore(N)для cap concurrent вызовов, token bucket algorithm для per-minute rate, и absolute ceiling для per-run total; типичные production значения:max_concurrent=5,max_per_minute=30,max_total_per_run=100. -
Human-in-the-loop interrupt механизм: LangGraph
interrupt()функция pauses граф execution, возвращает control caller с текущим состоянием; требуетMemorySavercheckpointer; resume черезgraph.ainvoke(Command(resume=user_decision), config=thread_config). -
Требование сохранения контекста: Оригинальные user inputs должны быть сохранены в immutable TypedDict fields (никогда перезаписываемые); нижестоящие агенты читают оригиналы напрямую rather than полагаться на upstream summaries для предотвращения потери информации.
-
Модель безопасности доступа к tools: Реализуйте per-agent allowlists где research агенты получают read-only tools (
["web_search", "database_read"]), write агенты получают write tools (["database_write"]), admin агенты получают destructive tools (["delete_record"]) — никогда не давайте все tools всем агентам. -
Transitions circuit breaker состояния:
CLOSED(нормально) →OPEN(блокирующее все запросы после threshold) →HALF_OPEN(разрешающее single test request после timeout) →CLOSED(на успех) или обратно вOPEN(на отказ). -
Эвристика решения количества агентов: Один агент для <2 минут задач в одной доменах; 2-4 агента для параллельного I/O или domain crossing; 5-10 агентов только для multi-day autonomous задач требующих checkpointing и изоляции.
-
State reducer паттерн для параллельной безопасности: LangGraph
Annotated[list[dict], operator.add]обеспечивает concurrent writes в одинаковый state key merged через reducer function instead of last-write-wins; критично для fan-out архитектур используяSend(). -
Timeout иерархия в production: HTTP call timeout (5-10s) < Agent node timeout (15-60s) < Global pipeline timeout (60-120s); каждый уровень должен превышать сумму своих children плюс buffer.
-
Формула cost explosion: В цепи где каждый агент вызывает N LLM операций, цепь глубины D стоит O(N^D) — e.g., 3 вызова per agent × 5 глубина = 243 total LLM calls; предотвращайте через depth guards и aggregation узлы.
-
Containment prompt injection: Tool allowlists предотвращают compromised агентов от escalating привилегий; даже если research агент prompt-injected для попытки
database_write, tool не будет в его registry.
Что такое анти-паттерны мульти-агентных систем
Анти-паттерны мульти-агентных систем — это режимы отказа уникальные для систем где несколько LLM агентов координируют для решения задач. В отличие от single-agent систем где отказы изолированы и traceable, мульти-агентные системы выставляют emergent bugs:
Почему они уникально сложны для отладки:
-
Амбигуозность атрибуции отказа: Когда Агент D возвращает garbage, была ли ошибка в D's логике, C's output который D потребил, B's state mutation, или A's initial классификации? Традиционные stack traces показывают D failed но скрывают root cause.
-
Недетерминированное воспроизведение: LLM вызов Агента B возвращает слегка отличающийся output на retry, маскируя ошибку которая срабатывает только когда B включает specific wording который confuses Агента C.
-
Cascade amplification: Minor data quality issue в Агенте A (e.g., возвращающий empty list instead of null) становится TypeError в B, hallucination в C, и confidently неправильный финальный output из D — каждый слой obscures происхождение.
-
Temporal coupling bugs: Агент A завершается успешно, но Агент C times out 30 секунд спустя ожидая Агента B, который заблокирован на rate limit который срабатывает только under production load — невозможно воспроизвести в unit tests.
-
State space explosion: С N агентами каждый делающий binary решения, отладка требует exploring 2^N execution paths; production логи показывают один path но не почему тот path был выбран над другими.
Традиционная отладка предполагает linear causality (функция A вызывает B вызывает C) и synchronous execution. Мульти-агентные системы нарушают оба допущения: causality течёт через shared state и LLM context, execution concurrent и asynchronous, и "работает по дизайну" на component уровне производит system-level отказ.
Decision Framework
Когда мульти-агентная архитектура оправдана:
| Сценарий | Количество агентов | Обоснование |
|---|---|---|
| 3+ параллельных независимых подзадач (поиск в несколько источников, обработка нескольких документов) | 2-5 (supervisor + workers) | Wall-clock time savings от true параллелизма оправдывает coordination overhead |
| Разделение expertise по доменам (legal анализ + financial расчёт + code generation) | 2-4 специалистов | Разные system prompts и tool sets per домен; single агент смешивал бы concerns |
| Multi-day autonomous задача с человеческими checkpoints | 5-10 + supervisor | Long-running требует state persistence, failure изоляции, и intermediate approval gates |
| Real-time streaming с processing stages | 3-6 pipeline агентов | Каждый stage независимо scalable; output stage N feeds stage N+1 асинхронно |
Когда single агент лучше (признаки over-engineering):
| Анти-паттерн | Симптом | Исправление |
|---|---|---|
| Агент делает single LLM вызов затем передаёт | Агент B просто переформатирует Агента A's output | Merge в single агент с structured output schema |
| Sequential цепь без branching | A→B→C без conditional логики | Single агент с chain-of-thought prompting |
| "Orchestrator" агент который просто маршрутизирует | Supervisor только решает какого specialist вызвать | Используйте conditional edges в graph, не LLM routing |
| Агенты делящие 90% tools и context | Research и Analysis агенты имеют identical tool доступ | Merge агентов, используйте single LLM вызов с multi-step инструкциями |
| Больше агентов чем LLM вызовов | 7 агентов но только 4 actual LLM invocations | Каждый агент должен обосновать собственный LLM вызов |
Decision tree:
Is task параллелизируемо в 3+ independent подзадачи?
├─ YES → Используйте fan-out архитектуру (supervisor + N workers, N ≤ 5)
└─ NO → Does task пересекает domain boundaries (legal + financial)?
├─ YES → Используйте 2-3 specialist агента с clear handoffs
└─ NO → Is task expected взять >30 минут?
├─ YES → Используйте checkpointed мульти-агент с human approval gates
└─ NO → Используйте single агент с chain-of-thought или structured output
Parameters Reference Table
| Параметр | Рекомендуемое значение | Контекст | Примечания |
|---|---|---|---|
recursion_limit |
10-15 | graph.compile(recursion_limit=N) |
По умолчанию 25 слишком высоко для production; устанавливайте на основе макс ожидаемая глубина × 2 |
circuit_breaker_threshold |
3 | Последовательные отказы перед opening | Индустриальный стандарт; ниже (2) для critical paths, выше (5) для experimental |
circuit_breaker_recovery_timeout |
30.0 секунд | Время в OPEN состоянии перед HALF_OPEN | Balance между fast recovery и избегании retry storms |
per_agent_timeout |
30-60 секунд | asyncio.timeout(N) per узел |
LLM вызовы ~3-10s; allow 3-5× buffer для retries и queue time |
global_pipeline_timeout |
90-120 секунд | Wraps весь graph.ainvoke() |
Сумма всех agent timeouts + 20% buffer |
max_concurrent_tools |
5 | asyncio.Semaphore(N) |
Match API rate limits; OpenAI Tier 2 ~50 req/min → 5 concurrent safe |
max_tools_per_minute |
20-30 | Token bucket rate limit | Stay under API tier limit с buffer |
max_tools_per_run |
50-100 | Absolute ceiling per pipeline run | Предотвращайте runaway cost; типичная задача использует 10-20 |
max_agent_chain_depth |
5 | Depth counter в state | Qualitative гайдлайн; более глубокие цепи экспоненциально сложнее для отладки |
max_parallel_fanout |
5 | Cap на len(Send()) list |
Limit concurrent LLM вызовы; больше не улучшает quality |
confidence_threshold |
0.75 | Classifier output confidence | Ниже этого, treat как uncertain; используйте "other" intent fallback |
schema_validation |
Required на всех agent outputs | jsonschema.validate() |
10ms overhead negligible vs. предотвращение cascade отказов |
state_reducer |
Все list/counter поля | Annotated[T, reducer_fn] |
Mandatory для любого состояния modified параллельными агентами |
human_approval_risk_levels |
["high", "critical"] |
interrupt() trigger condition |
Low/medium auto-approve; high+ требуют human |
error_log_structured_fields |
agent_name, input_hash, timestamp, error_type |
JSON logging | Enable tracing отказов обратно через agent цепь |
Общие ошибки
Ошибка 1: Каскадный отказ — Отсутствие upstream валидации
Влияние: Агент A возвращает empty результаты с status: "ok", Агент B делит на count, выбрасывает ZeroDivisionError, Агент C treats exception traceback как валидные данные и hallucinates, Агент D одобряет hallucination — pipeline выходит "успешно" с garbage output.
❌ Неправильный подход:
async def analyzer_agent(state):
search_result = state["search_result"] # Blind trust
count = search_result["count"] # Kaboom если "count" отсутствует или zero
analysis = {"quality": "high" if count > 1 else "low"}
return {"analysis": analysis}
✅ Правильный подход:
from jsonschema import validate, ValidationError
SEARCH_RESULT_SCHEMA = {
"type": "object",
"required": ["status", "results", "count"],
"properties": {
"status": {"type": "string", "enum": ["ok", "error"]},
"count": {"type": "integer", "minimum": 0},
}
}
async def analyzer_agent(state):
search_result = state.get("search_result")
# Explicit validation перед use
try:
validate(instance=search_result, schema=SEARCH_RESULT_SCHEMA)
except ValidationError as e:
return {
"error": f"Upstream validation failed: {e.message}",
"aborted": True
}
if search_result["status"] == "error":
return {"error": "Upstream reported error", "aborted": True}
count = search_result["count"]
if count == 0:
return {"analysis": {"quality": "no_data"}, "warning": "Zero results"}
analysis = {"quality": "high" if count > 1 else "low"}
return {"analysis": analysis}
Ошибка 2: Бесконечная делегирование — Цикл агентов
Влияние: Supervisor агент делегирует задачу Research агенту, Research определяет что нужна анализ и делегирует обратно Supervisor, Supervisor re-анализирует и делегирует Research снова — цикл потребляет 25× recursion_limit перед crash, сжигая через quota.
❌ Неправильный подход:
async def supervisor(state):
task = state["task"]
# Decision: делегировать к research
return Command(goto="research_agent")
async def research_agent(state):
# Decision: мне нужна больше анализ, отправить обратно к supervisor
return Command(goto="supervisor") # Infinite loop
graph.add_edge("supervisor", "research_agent")
graph.add_edge("research_agent", "supervisor") # Цикл без exit
✅ Правильный подход:
MAX_DEPTH = 5
MAX_VISITS_PER_NODE = 2
class State(TypedDict):
task: str
depth: int
visited_nodes: Annotated[list[str], operator.add]
aborted: bool
async def supervisor(state):
if state["depth"] >= MAX_DEPTH:
return {"aborted": True, "result": "Max depth exceeded"}
visits = state["visited_nodes"].count("supervisor")
if visits >= MAX_VISITS_PER_NODE:
return {"aborted": True, "result": "Cycle detected: supervisor"}
return {
"depth": state["depth"] + 1,
"visited_nodes": ["supervisor"],
}
graph.compile(recursion_limit=10) # Safety net
Ошибка 3: Утечка контекста — Агенты делящие mutable состояние
Влияние: Пять агентов работают параллельно записывая в state["results"], Агент 2 читает while Агент 3 пишет, видит partial данные, перезаписывает Агента 4's contribution — финальный output silently отсутствует результаты от multiple параллельных workers.
❌ Неправильный подход:
class SharedState(TypedDict):
results: dict # DANGER: mutable dict общий по parallel агентам
async def worker_agent(state):
# Race condition: read-modify-write not atomic
state["results"][worker_id] = my_output
return state
✅ Правильный подход:
def merge_results(a: list, b: list) -> list:
return a + b
class IsolatedState(TypedDict):
results: Annotated[list[dict], merge_results] # Reducer гарантирует safe merge
async def worker_agent(worker_input: dict) -> dict:
result = {"worker_id": worker_input["worker_id"], "output": "..."}
# Возвращайте new list element; LangGraph reducer безопасно append
return {"results": [result]}
Ошибка 4: Каскад стоимости — Неограниченные nested вызовы
Влияние: Research агент получает broad query, spawns 50 parallel sub-agents, каждый делает 3 LLM вызова, total стоимость = 50 × 3 × $0.01 = $1.50 per query — 30× ожидаемая стоимость, hitting rate limits и quota exhaustion.
❌ Неправильный подход:
async def research_agent(state):
topics = extract_topics(state["query"]) # Returns 50 topics
# Unbounded fanout
results = await asyncio.gather(*[
sub_research(topic) for topic in topics # 50 concurrent LLM вызовов
])
✅ Правильный подход:
MAX_PARALLEL = 5
tool_limiter = asyncio.Semaphore(5)
async def research_agent(state):
topics = extract_topics(state["query"])
topics = topics[:MAX_PARALLEL] # Hard cap
async def limited_research(topic):
async with tool_limiter:
return await sub_research(topic)
results = await asyncio.gather(
*[limited_research(t) for t in topics],
return_exceptions=True # Partial success
)
Ошибка 5: Эскалация доверия — Low-confidence предположение распространяемо
Влияние: Classifier агент детектирует intent как "billing" с 0.42 confidence (ниже 0.75 threshold), пишет в state anyway, все downstream агенты фильтруют действия через "billing" lens, пользователь с product вопросом попадает в billing support.
❌ Неправильный подход:
async def classifier_agent(state):
raw_output = llm_classify(state["query"])
# Не confidence check — плохая классификация распространяется
return {"intent": raw_output["intent"]}
async def router_agent(state):
intent = state["intent"] # Blindly доверяет upstream
if intent == "billing":
return Command(goto="billing_agent")
✅ Правильный подход:
CONFIDENCE_THRESHOLD = 0.75
async def classifier_agent(state):
raw_output = llm_classify(state["query"])
if raw_output["confidence"] < CONFIDENCE_THRESHOLD:
# Low confidence: mark как uncertain, preserve original
return {
"intent": "other", # Safe fallback
"intent_confidence": raw_output["confidence"],
"original_query": state["query"], # Never lose original
}
return {"intent": raw_output["intent"]}
async def router_agent(state):
if state["intent"] == "other" or state.get("intent_confidence", 1.0) < CONFIDENCE_THRESHOLD:
# Fall back к broad поиску используя original query
query = state["original_query"]
else:
query = f"{state['original_query']} (intent: {state['intent']})"
Паттерны каскадного отказа
Каскадные отказы происходят когда ошибка Агента A invisible для Агента B, который обрабатывает A's повреждённый output как валидный ввод, производя собственную ошибку что Агент C затем обрабатывает. Каждый слой obscures root cause.
Типичная cascade последовательность:
Агент A (search) → Returns {"status": "ok", "results": [], "count": 0}
(Should have returned {"status": "error"} но didn't)
Агент B (analyzer) → Reads count=0, делит на zero → ZeroDivisionError
Exception traceback written в state["analysis"]
Агент C (writer) → Receives traceback string как "analysis"
LLM treats error message как data → hallucinates content
Агент D (reviewer) → Receives hallucinated content
LLM видит well-formatted text → approves
Pipeline → Exits с status "success", логи show no errors
User получает confidently неправильный answer
Root cause: Нет валидационных barriers между агентами.
Isolation Strategy: Circuit Breakers + Validation Gates
Circuit breaker паттерн предотвращает retry storms открывая circuit после N отказов, блокируя subsequent запросы до recovery timeout.
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Blocking all requests
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class CircuitBreaker:
name: str
failure_threshold: int = 3
recovery_timeout: float = 30.0
_failure_count: int = field(default=0, init=False)
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_last_failure_time: datetime | None = field(default=None, init=False)
def can_proceed(self) -> bool:
if self._state == CircuitState.CLOSED:
return True
if self._state == CircuitState.OPEN:
if (self._last_failure_time and
datetime.now() - self._last_failure_time > timedelta(seconds=self.recovery_timeout)):
self._state = CircuitState.HALF_OPEN
return True
return False
return True # HALF_OPEN: allow test request
def record_success(self):
self._failure_count = 0
self._state = CircuitState.CLOSED
def record_failure(self):
self._failure_count += 1
self._last_failure_time = datetime.now()
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN
Validation gate паттерн проверяет schema перед accepting upstream output:
from jsonschema import validate, ValidationError
def validate_agent_output(output: dict, schema: dict, agent_name: str) -> tuple[bool, str]:
if output is None:
return False, f"{agent_name} returned None"
try:
validate(instance=output, schema=schema)
return True, ""
except ValidationError as e:
return False, f"{agent_name} schema validation failed: {e.message}"
Bulkhead Pattern: Изолировать failure домены
Bulkhead (из ship дизайна) изолирует отказы для предотвращения sinking всей системы. В мульти-агентных системах: отдельный high-risk агентов в isolated execution контекстах.
from typing import TypedDict
class BulkheadState(TypedDict):
# Core pipeline состояние — всегда валидно
user_query: str
final_answer: str | None
# Experimental agent outputs — isolated
experimental_enrichment: dict | None
experimental_error: str | None
# Production agent outputs — validated
search_results: list[dict]
analysis: dict
async def experimental_agent(state: BulkheadState) -> dict:
"""High-risk experimental агент — isolated bulkhead."""
try:
result = await risky_experimental_llm_call(state["user_query"])
return {"experimental_enrichment": result}
except Exception as e:
# Failure isolated — не abort main pipeline
return {"experimental_error": str(e)}
async def production_agent(state: BulkheadState) -> dict:
"""Core production агент — runs даже если experimental fails."""
search = await production_search(state["user_query"])
# Optional: используйте experimental enrichment если available
enrichment = state.get("experimental_enrichment")
if enrichment and state.get("experimental_error") is None:
search["enrichment"] = enrichment
return {"search_results": [search]}
def route_after_experimental(state: BulkheadState) -> str:
# Always продолжайте к production agent regardless of experimental outcome
return "production_agent"
# Graph setup
graph.add_node("experimental", experimental_agent)
graph.add_node("production", production_agent)
graph.add_edge("experimental", "production") # No conditional abort
Ключевой принцип: Critical path агенты не должны зависеть от experimental агентов' успеха.
Анти-паттерн гиперорхестрации
Определение: Использование multiple агентов с complex handoffs для задач solvable single well-prompted агентом.
Обычное проявление: Pipeline с 10+ агентами где 7 perform trivial трансформаций (extract JSON, reformat text, validate schema, log metrics) — задачи better handled structured output parsing и middleware.
Когда collapse в single агент
Collapse в single агент если:
- Sequential цепь без parallelism (A→B→C где каждый waits для previous)
- Агенты share >80% контекста и tools
- Average агент делает <5 секунд работы
- No conditional branching основанный на agent outputs
- Task заканчивается в <2 минут end-to-end
Держите мульти-агент если:
- True parallelism (fan-out к 3+ independent подзадачи)
- Domain expertise разделение (legal + medical + financial домены)
- Human approval gates в specific stages
- Long-running (>30 min) требующий checkpoints
Code Example: Over-Orchestration vs. Streamlined
❌ Over-orchestrated (7 агентов для простой задачи):
# Агент 1: Classify intent
async def classifier(state):
return {"intent": llm_call("classify: " + state["query"])}
# Агент 2: Rewrite query
async def rewriter(state):
return {"rewritten": llm_call("rewrite: " + state["query"])}
# Агент 3: Search
async def searcher(state):
return {"results": search(state["rewritten"])}
# Агент 4: Deduplicate results
async def deduplicator(state):
return {"unique_results": dedupe(state["results"])}
# Агент 5: Rank results
async def ranker(state):
return {"ranked": llm_call("rank: " + str(state["unique_results"]))}
# Агент 6: Summarize
async def summarizer(state):
return {"summary": llm_call("summarize: " + state["ranked"])}
# Агент 7: Format output
async def formatter(state):
return {"final": format_markdown(state["summary"])}
# 7 agent handoffs, 4 LLM вызовов, 15-20 секунд latency
✅ Streamlined (single агент с tools):
from langchain_core.tools import tool
from langchain_anthropic import ChatAnthropic
@tool
def search_tool(query: str) -> list[dict]:
"""Search knowledge base и return deduplicated, ranked results."""
results = search(query)
unique = dedupe(results)
return sorted(unique, key=lambda x: x["score"], reverse=True)
async def unified_agent(state):
"""Single агент с tools и structured output."""
llm = ChatAnthropic(model="claude-sonnet-4", temperature=0)
llm_with_tools = llm.bind_tools([search_tool])
prompt = f"""Answer this query: {state['query']}
Steps:
1. Use search_tool для find relevant информацию
2. Synthesize results в clear answer
3. Format как markdown
Return structured JSON с keys: answer, sources, confidence"""
response = await llm_with_tools.ainvoke(prompt)
# Single LLM вызов с tool use, 3-5 секунд latency
return {"final_answer": response.content}
Performance сравнение:
| Подход | Агенты | LLM вызовы | Latency | Complexity |
|---|---|---|---|---|
| Over-orchestrated | 7 | 4 | 15-20s | High (7 handoffs, 7 error paths) |
| Streamlined | 1 | 1 | 3-5s | Low (single call, tool abstraction) |
Context Bleed & Shared State
Context bleed: Агенты unintentionally делящие state modifications, приводящие к race conditions в parallel execution.
Root cause: Python async с await создаёт context switches. Агент A читает state["counter"]=5, yields на await llm_call(), Агент B инкрементирует counter к 6, Агент A resumes и writes stale значение 5 обратно — lost update.
Race Condition Example
class UnsafeState(TypedDict):
counter: int # Shared mutable integer
results: dict # Shared mutable dict
async def worker_a(state: UnsafeState):
current = state["counter"] # Read: 5
await asyncio.sleep(0.1) # Yield control
state["counter"] = current + 1 # Write: 6 (но B уже сделал это 6)
return state
async def worker_b(state: UnsafeState):
current = state["counter"] # Read: 5 (перед A пишет)
await asyncio.sleep(0.05) # Yield control (shorter чем A)
state["counter"] = current + 1 # Write: 6
return state
# Оба workers работают параллельно
# Expected: counter = 7 (5 + 1 + 1)
# Actual: counter = 6 (lost update)
Isolation Pattern: Reducer Functions
LangGraph Annotated[T, reducer_fn] обеспечивает concurrent updates safely merged:
from typing import Annotated
import operator
def merge_results(a: list, b: list) -> list:
"""Reducer: concatenate lists (order-independent)."""
return a + b
class SafeState(TypedDict):
# Reducer гарантирует параллельные writes merged, не overwritten
results: Annotated[list[dict], merge_results]
error_count: Annotated[int, operator.add] # Safe increment
max_score: Annotated[float, max] # Safe max tracking
async def worker_a(input: dict) -> dict:
result = {"worker": "a", "data": "..."}
# Return list с single element; reducer будет merge
return {
"results": [result],
"error_count": 0,
"max_score": 0.85,
}
async def worker_b(input: dict) -> dict:
result = {"worker": "b", "data": "..."}
return {
"results": [result],
"error_count": 1,
"max_score": 0.92,
}
# LangGraph вызовы: merge_results([result_a], [result_b]) → [result_a, result_b]
# operator.add(0, 1) → 1
# max(0.85, 0.92) → 0.92
Per-Worker Isolation: Send Pattern
Best practice: Give каждому worker isolated input, не shared state:
class WorkerInput(TypedDict):
task_chunk: str
worker_id: str
# No shared mutable state — read-only inputs только
async def dispatcher(state: dict) -> list[Send]:
chunks = state["task"].split(". ")
return [
Send("worker", WorkerInput(task_chunk=chunk, worker_id=f"w{i}"))
for i, chunk in enumerate(chunks)
]
async def worker(worker_input: WorkerInput) -> dict:
# Operates на isolated input, returns isolated output
result = process(worker_input["task_chunk"])
return {"results": [{"worker_id": worker_input["worker_id"], "output": result}]}
Ключевой принцип: Workers получают value inputs (строки, immutable types), возвращают new state chunks merged reducers — никогда не modify shared mutable объектов.
Предотвращение cost cascade
Cost cascade: Экспоненциальный cost growth из nested agent вызовов. Если каждый агент fans out к N sub-agents, цепь глубины D стоит O(N^D).
Example расчёт:
- Research агент spawns 3 sub-researchers
- Каждый sub-researcher spawns 3 analyzers
- Каждый analyzer делает 2 LLM вызовов
- Total: 3 × 3 × 2 = 18 LLM вызовов (vs. expected 1-2)
На глубине 5 с fanout 3: 3^5 = 243 LLM вызовов.
Budget Propagation Pattern
Concept: Allocate fixed token/cost budget на pipeline start, subdivide среди agents, enforce hard лимиты.
from typing import TypedDict
class BudgetState(TypedDict):
query: str
remaining_budget_tokens: int
total_tokens_used: int
budget_exceeded: bool
result: str | None
TOTAL_BUDGET = 100_000 # tokens
AGENT_BUDGETS = {
"research": 40_000,
"analysis": 30_000,
"synthesis": 25_000,
"review": 5_000,
}
async def research_agent(state: BudgetState) -> dict:
agent_budget = AGENT_BUDGETS["research"]
if state["remaining_budget_tokens"] < agent_budget:
return {
"budget_exceeded": True,
"result": "Research skipped: insufficient budget"
}
# Make LLM вызов с max_tokens limit
response = await llm.ainvoke(
state["query"],
max_tokens=min(agent_budget, state["remaining_budget_tokens"])
)
tokens_used = response.usage_metadata["total_tokens"]
return {
"remaining_budget_tokens": state["remaining_budget_tokens"] - tokens_used,
"total_tokens_used": state["total_tokens_used"] + tokens_used,
"research_output": response.content,
}
def route_after_research(state: BudgetState) -> str:
if state.get("budget_exceeded"):
return "abort"
if state["remaining_budget_tokens"] < AGENT_BUDGETS["analysis"]:
return "synthesis" # Skip analysis, go straight к synthesis
return "analysis"
Depth Guard Pattern
Prevent неограниченную рекурсию отслеживая call глубину:
MAX_DEPTH = 5
class DepthState(TypedDict):
depth: int
max_depth_exceeded: bool
async def recursive_agent(state: DepthState) -> dict:
if state["depth"] >= MAX_DEPTH:
return {
"max_depth_exceeded": True,
"result": f"Aborted: max depth {MAX_DEPTH} reached"
}
# Decide если recursion нужна
if needs_more_research(state):
return {
"depth": state["depth"] + 1, # Increment перед recursing
}
else:
return {"result": "Research complete"}
graph.compile(recursion_limit=MAX_DEPTH * 2) # LangGraph safety net
Aggregation Node Pattern
Reduce fanout cost aggregating параллельные результаты перед next LLM вызовом:
async def fanout_node(state) -> list[Send]:
topics = state["topics"][:5] # Cap на 5
return [Send("worker", {"topic": t}) for t in topics]
async def worker(input: dict) -> dict:
# Каждый worker: 1 LLM вызов
result = await llm.ainvoke(f"Research: {input['topic']}")
return {"worker_results": [result.content]}
async def aggregator(state) -> dict:
"""Aggregate N worker результаты в single LLM вызов."""
all_results = state["worker_results"] # List из N результаты
# Single LLM вызов к synthesize N результаты
synthesis_prompt = f"Synthesize these {len(all_results)} research findings:\n\n"
synthesis_prompt += "\n\n".join(all_results)
final = await llm.ainvoke(synthesis_prompt)
# Total cost: N worker вызовы + 1 aggregator вызов (не N × M)
return {"final_answer": final.content}
Cost сравнение:
| Паттерн | Fanout | Вызовы per worker | Aggregator | Total LLM вызовы |
|---|---|---|---|---|
| No aggregation | 5 | 3 | — | 15 |
| With aggregation | 5 | 1 | 1 | 6 |
Performance & Benchmarks
Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.
Latency по количеству агентов (sequential цепь, no parallelism):
| Агенты | Avg latency per агент | Total latency | Overhead |
|---|---|---|---|
| 1 | 3s | 3s | — |
| 3 | 3s | 11s | +2s coordination |
| 5 | 3s | 19s | +4s coordination |
| 10 | 3s | 40s | +10s coordination |
Coordination overhead: ~0.5-1s per агент handoff (state сериализация, graph traversal, logging).
Cost по глубине (каждый агент fans out к 3 sub-agents):
| Глубина | Agents invoked | LLM вызовы (2 per агент) | Cost при $0.01/call |
|---|---|---|---|
| 1 | 1 | 2 | $0.02 |
| 2 | 1 + 3 = 4 | 8 | $0.08 |
| 3 | 1 + 3 + 9 = 13 | 26 | $0.26 |
| 4 | 1 + 3 + 9 + 27 = 40 | 80 | $0.80 |
| 5 | 1 + 3 + 9 + 27 + 81 = 121 | 242 | $2.42 |
Throughput impact валидации:
| Валидация | Overhead per агент | Impact на 5-agent цепь |
|---|---|---|
| None | 0ms | 0ms (baseline) |
| JSON schema (jsonschema) | 8-12ms | 40-60ms |
| LLM-based валидация | 500-1500ms | 2500-7500ms |
Рекомендация: Используйте schema валидацию (negligible overhead vs. LLM call latency).
Circuit breaker эффективность (симулированный 30% upstream отказ rate):
| Паттерн | Failed запросы | Wasted LLM вызовы | Cost savings |
|---|---|---|---|
| No circuit breaker | 30% | 30% proceed к next агенту | 0% |
| Circuit breaker (threshold=3) | 30% | 3-6% proceed перед circuit opens | ~80% |
Parallel fanout масштабирование (bounded vs unbounded):
| Workers | Unbounded latency | Bounded (semaphore=5) latency | API rate limit нарушения |
|---|---|---|---|
| 5 | 3s | 3s | 0% |
| 10 | 3s | 6s (2 batches) | 0% |
| 20 | 3s | 12s (4 batches) | 0% |
| 50 (unbounded) | 3s | — | 85% (rate limit hit) |