Анти-паттерны мульти-агентных систем: каскадные отказы, проблемы координации и безопасные архитектуры (2026)


title: "Анти-паттерны мульти-агентных систем: каскадные отказы, проблемы координации и безопасные архитектуры (2026)" slug: multi-agent-anti-patterns-cascades-2026-ru date: 2026-02-28 lang: ru

Анти-паттерны мульти-агентных систем: каскадные отказы, проблемы координации и безопасные архитектуры (2026)

Ключевые факты

  • Версии пакетов (актуальны на март 2026):

    • langchain==1.2.10 (стабильный слой оркестрации)
    • langgraph==1.0.10 (stateful workflows для мульти-агентных систем)
    • anthropic==0.84.0 (SDK для Claude API)
    • openai==2.24.0 (SDK для GPT API)
    • jsonschema==4.21.1 (валидация состояния)
    • httpx==0.27.0 (асинхронный HTTP с поддержкой timeout)
  • Определение каскадного отказа: Когда отказ одного агента распространяется на зависимые нижестоящие агенты, которые обрабатывают повреждённый вывод как валидный ввод, создавая цепь отказов, в которой первопричина скрывается каждым последующим уровнем ошибки.

  • Реализация timeout для подграфов LangGraph: Используйте asyncio.timeout(seconds) внутри функций узлов; LangGraph не имеет встроенного per-node timeout — должен быть реализован на уровне узла. Глобальный timeout через await graph.ainvoke(state, config={"timeout": 90.0}) deprecated в пользу async with asyncio.timeout(90.0) обёртки вокруг вызова invoke.

  • Таксономия анти-паттернов (пять основных режимов отказа):

    • Гиперорхестрация: Использование 10+ агентов для задач решаемых 1-2 агентами с надлежащим prompting
    • Циклы агентов: Циклическая делегирование где Агент A → Агент B → Агент A образует неограниченную рекурсию
    • Эскалация доверия: Нижестоящие агенты слепо доверяют выводам вышестоящих без валидации
    • Утечка контекста: Агенты совместно используют mutable состояние, приводящее к race condition в параллельном исполнении
    • Каскад стоимости: Экспоненциальный рост token/стоимости из nested agent вызовов (цепь глубины 5 = 3^5 = 243× базовая стоимость если каждый агент разветвляется 3-way)
  • Порог circuit breaker: Индустриальный стандарт — 3 последовательных отказа перед открытием цепи; время восстановления типично 30-60 секунд перед переходом в half-open состояние для retry.

  • Корневая причина anti-pattern общего состояния: Python async с await создаёт windows context-switch где Агент A читает state["counter"], Агент B инкрементирует его, затем Агент A пишет stale значение — классический lost-update race condition несмотря на GIL защищающий memory corruption.

  • Максимальная рекомендуемая глубина цепи агентов: Держите последовательные цепи агентов под 5 hops; глубже, coordination overhead превышает пользу, отладка становится экспоненциально сложнее, и latency плохо компилируется (5 агентов × 3 сек каждый = минимум 15 сек).

  • LangGraph recursion_limit по умолчанию: 25 итераций (configurable через graph.compile(recursion_limit=N)); production системы должны устанавливать явный лимит на основе ожидаемой максимальной глубины × узлы на итерацию.

  • Стоимость валидационного overhead: Валидация схемы через jsonschema.validate() добавляет ~5-15ms per agent transition; negligible по сравнению с LLM call latency (500-3000ms) но значимо в high-throughput pipelines.

  • Паттерн rate limiting для tool calls: Используйте asyncio.Semaphore(N) для cap concurrent вызовов, token bucket algorithm для per-minute rate, и absolute ceiling для per-run total; типичные production значения: max_concurrent=5, max_per_minute=30, max_total_per_run=100.

  • Human-in-the-loop interrupt механизм: LangGraph interrupt() функция pauses граф execution, возвращает control caller с текущим состоянием; требует MemorySaver checkpointer; resume через graph.ainvoke(Command(resume=user_decision), config=thread_config).

  • Требование сохранения контекста: Оригинальные user inputs должны быть сохранены в immutable TypedDict fields (никогда перезаписываемые); нижестоящие агенты читают оригиналы напрямую rather than полагаться на upstream summaries для предотвращения потери информации.

  • Модель безопасности доступа к tools: Реализуйте per-agent allowlists где research агенты получают read-only tools (["web_search", "database_read"]), write агенты получают write tools (["database_write"]), admin агенты получают destructive tools (["delete_record"]) — никогда не давайте все tools всем агентам.

  • Transitions circuit breaker состояния: CLOSED (нормально) → OPEN (блокирующее все запросы после threshold) → HALF_OPEN (разрешающее single test request после timeout) → CLOSED (на успех) или обратно в OPEN (на отказ).

  • Эвристика решения количества агентов: Один агент для <2 минут задач в одной доменах; 2-4 агента для параллельного I/O или domain crossing; 5-10 агентов только для multi-day autonomous задач требующих checkpointing и изоляции.

  • State reducer паттерн для параллельной безопасности: LangGraph Annotated[list[dict], operator.add] обеспечивает concurrent writes в одинаковый state key merged через reducer function instead of last-write-wins; критично для fan-out архитектур используя Send().

  • Timeout иерархия в production: HTTP call timeout (5-10s) < Agent node timeout (15-60s) < Global pipeline timeout (60-120s); каждый уровень должен превышать сумму своих children плюс buffer.

  • Формула cost explosion: В цепи где каждый агент вызывает N LLM операций, цепь глубины D стоит O(N^D) — e.g., 3 вызова per agent × 5 глубина = 243 total LLM calls; предотвращайте через depth guards и aggregation узлы.

  • Containment prompt injection: Tool allowlists предотвращают compromised агентов от escalating привилегий; даже если research агент prompt-injected для попытки database_write, tool не будет в его registry.

Что такое анти-паттерны мульти-агентных систем

Анти-паттерны мульти-агентных систем — это режимы отказа уникальные для систем где несколько LLM агентов координируют для решения задач. В отличие от single-agent систем где отказы изолированы и traceable, мульти-агентные системы выставляют emergent bugs:

Почему они уникально сложны для отладки:

  • Амбигуозность атрибуции отказа: Когда Агент D возвращает garbage, была ли ошибка в D's логике, C's output который D потребил, B's state mutation, или A's initial классификации? Традиционные stack traces показывают D failed но скрывают root cause.

  • Недетерминированное воспроизведение: LLM вызов Агента B возвращает слегка отличающийся output на retry, маскируя ошибку которая срабатывает только когда B включает specific wording который confuses Агента C.

  • Cascade amplification: Minor data quality issue в Агенте A (e.g., возвращающий empty list instead of null) становится TypeError в B, hallucination в C, и confidently неправильный финальный output из D — каждый слой obscures происхождение.

  • Temporal coupling bugs: Агент A завершается успешно, но Агент C times out 30 секунд спустя ожидая Агента B, который заблокирован на rate limit который срабатывает только under production load — невозможно воспроизвести в unit tests.

  • State space explosion: С N агентами каждый делающий binary решения, отладка требует exploring 2^N execution paths; production логи показывают один path но не почему тот path был выбран над другими.

Традиционная отладка предполагает linear causality (функция A вызывает B вызывает C) и synchronous execution. Мульти-агентные системы нарушают оба допущения: causality течёт через shared state и LLM context, execution concurrent и asynchronous, и "работает по дизайну" на component уровне производит system-level отказ.

Decision Framework

Когда мульти-агентная архитектура оправдана:

Сценарий Количество агентов Обоснование
3+ параллельных независимых подзадач (поиск в несколько источников, обработка нескольких документов) 2-5 (supervisor + workers) Wall-clock time savings от true параллелизма оправдывает coordination overhead
Разделение expertise по доменам (legal анализ + financial расчёт + code generation) 2-4 специалистов Разные system prompts и tool sets per домен; single агент смешивал бы concerns
Multi-day autonomous задача с человеческими checkpoints 5-10 + supervisor Long-running требует state persistence, failure изоляции, и intermediate approval gates
Real-time streaming с processing stages 3-6 pipeline агентов Каждый stage независимо scalable; output stage N feeds stage N+1 асинхронно

Когда single агент лучше (признаки over-engineering):

Анти-паттерн Симптом Исправление
Агент делает single LLM вызов затем передаёт Агент B просто переформатирует Агента A's output Merge в single агент с structured output schema
Sequential цепь без branching A→B→C без conditional логики Single агент с chain-of-thought prompting
"Orchestrator" агент который просто маршрутизирует Supervisor только решает какого specialist вызвать Используйте conditional edges в graph, не LLM routing
Агенты делящие 90% tools и context Research и Analysis агенты имеют identical tool доступ Merge агентов, используйте single LLM вызов с multi-step инструкциями
Больше агентов чем LLM вызовов 7 агентов но только 4 actual LLM invocations Каждый агент должен обосновать собственный LLM вызов

Decision tree:

Is task параллелизируемо в 3+ independent подзадачи?
├─ YES → Используйте fan-out архитектуру (supervisor + N workers, N ≤ 5)
└─ NO → Does task пересекает domain boundaries (legal + financial)?
    ├─ YES → Используйте 2-3 specialist агента с clear handoffs
    └─ NO → Is task expected взять >30 минут?
        ├─ YES → Используйте checkpointed мульти-агент с human approval gates
        └─ NO → Используйте single агент с chain-of-thought или structured output

Parameters Reference Table

Параметр Рекомендуемое значение Контекст Примечания
recursion_limit 10-15 graph.compile(recursion_limit=N) По умолчанию 25 слишком высоко для production; устанавливайте на основе макс ожидаемая глубина × 2
circuit_breaker_threshold 3 Последовательные отказы перед opening Индустриальный стандарт; ниже (2) для critical paths, выше (5) для experimental
circuit_breaker_recovery_timeout 30.0 секунд Время в OPEN состоянии перед HALF_OPEN Balance между fast recovery и избегании retry storms
per_agent_timeout 30-60 секунд asyncio.timeout(N) per узел LLM вызовы ~3-10s; allow 3-5× buffer для retries и queue time
global_pipeline_timeout 90-120 секунд Wraps весь graph.ainvoke() Сумма всех agent timeouts + 20% buffer
max_concurrent_tools 5 asyncio.Semaphore(N) Match API rate limits; OpenAI Tier 2 ~50 req/min → 5 concurrent safe
max_tools_per_minute 20-30 Token bucket rate limit Stay under API tier limit с buffer
max_tools_per_run 50-100 Absolute ceiling per pipeline run Предотвращайте runaway cost; типичная задача использует 10-20
max_agent_chain_depth 5 Depth counter в state Qualitative гайдлайн; более глубокие цепи экспоненциально сложнее для отладки
max_parallel_fanout 5 Cap на len(Send()) list Limit concurrent LLM вызовы; больше не улучшает quality
confidence_threshold 0.75 Classifier output confidence Ниже этого, treat как uncertain; используйте "other" intent fallback
schema_validation Required на всех agent outputs jsonschema.validate() 10ms overhead negligible vs. предотвращение cascade отказов
state_reducer Все list/counter поля Annotated[T, reducer_fn] Mandatory для любого состояния modified параллельными агентами
human_approval_risk_levels ["high", "critical"] interrupt() trigger condition Low/medium auto-approve; high+ требуют human
error_log_structured_fields agent_name, input_hash, timestamp, error_type JSON logging Enable tracing отказов обратно через agent цепь

Общие ошибки

Ошибка 1: Каскадный отказ — Отсутствие upstream валидации

Влияние: Агент A возвращает empty результаты с status: "ok", Агент B делит на count, выбрасывает ZeroDivisionError, Агент C treats exception traceback как валидные данные и hallucinates, Агент D одобряет hallucination — pipeline выходит "успешно" с garbage output.

Неправильный подход:

async def analyzer_agent(state):
    search_result = state["search_result"]  # Blind trust
    count = search_result["count"]  # Kaboom если "count" отсутствует или zero
    analysis = {"quality": "high" if count > 1 else "low"}
    return {"analysis": analysis}

Правильный подход:

from jsonschema import validate, ValidationError

SEARCH_RESULT_SCHEMA = {
    "type": "object",
    "required": ["status", "results", "count"],
    "properties": {
        "status": {"type": "string", "enum": ["ok", "error"]},
        "count": {"type": "integer", "minimum": 0},
    }
}

async def analyzer_agent(state):
    search_result = state.get("search_result")

    # Explicit validation перед use
    try:
        validate(instance=search_result, schema=SEARCH_RESULT_SCHEMA)
    except ValidationError as e:
        return {
            "error": f"Upstream validation failed: {e.message}",
            "aborted": True
        }

    if search_result["status"] == "error":
        return {"error": "Upstream reported error", "aborted": True}

    count = search_result["count"]
    if count == 0:
        return {"analysis": {"quality": "no_data"}, "warning": "Zero results"}

    analysis = {"quality": "high" if count > 1 else "low"}
    return {"analysis": analysis}

Ошибка 2: Бесконечная делегирование — Цикл агентов

Влияние: Supervisor агент делегирует задачу Research агенту, Research определяет что нужна анализ и делегирует обратно Supervisor, Supervisor re-анализирует и делегирует Research снова — цикл потребляет 25× recursion_limit перед crash, сжигая через quota.

Неправильный подход:

async def supervisor(state):
    task = state["task"]
    # Decision: делегировать к research
    return Command(goto="research_agent")

async def research_agent(state):
    # Decision: мне нужна больше анализ, отправить обратно к supervisor
    return Command(goto="supervisor")  # Infinite loop

graph.add_edge("supervisor", "research_agent")
graph.add_edge("research_agent", "supervisor")  # Цикл без exit

Правильный подход:

MAX_DEPTH = 5
MAX_VISITS_PER_NODE = 2

class State(TypedDict):
    task: str
    depth: int
    visited_nodes: Annotated[list[str], operator.add]
    aborted: bool

async def supervisor(state):
    if state["depth"] >= MAX_DEPTH:
        return {"aborted": True, "result": "Max depth exceeded"}

    visits = state["visited_nodes"].count("supervisor")
    if visits >= MAX_VISITS_PER_NODE:
        return {"aborted": True, "result": "Cycle detected: supervisor"}

    return {
        "depth": state["depth"] + 1,
        "visited_nodes": ["supervisor"],
    }

graph.compile(recursion_limit=10)  # Safety net

Ошибка 3: Утечка контекста — Агенты делящие mutable состояние

Влияние: Пять агентов работают параллельно записывая в state["results"], Агент 2 читает while Агент 3 пишет, видит partial данные, перезаписывает Агента 4's contribution — финальный output silently отсутствует результаты от multiple параллельных workers.

Неправильный подход:

class SharedState(TypedDict):
    results: dict  # DANGER: mutable dict общий по parallel агентам

async def worker_agent(state):
    # Race condition: read-modify-write not atomic
    state["results"][worker_id] = my_output
    return state

Правильный подход:

def merge_results(a: list, b: list) -> list:
    return a + b

class IsolatedState(TypedDict):
    results: Annotated[list[dict], merge_results]  # Reducer гарантирует safe merge

async def worker_agent(worker_input: dict) -> dict:
    result = {"worker_id": worker_input["worker_id"], "output": "..."}
    # Возвращайте new list element; LangGraph reducer безопасно append
    return {"results": [result]}

Ошибка 4: Каскад стоимости — Неограниченные nested вызовы

Влияние: Research агент получает broad query, spawns 50 parallel sub-agents, каждый делает 3 LLM вызова, total стоимость = 50 × 3 × $0.01 = $1.50 per query — 30× ожидаемая стоимость, hitting rate limits и quota exhaustion.

Неправильный подход:

async def research_agent(state):
    topics = extract_topics(state["query"])  # Returns 50 topics
    # Unbounded fanout
    results = await asyncio.gather(*[
        sub_research(topic) for topic in topics  # 50 concurrent LLM вызовов
    ])

Правильный подход:

MAX_PARALLEL = 5
tool_limiter = asyncio.Semaphore(5)

async def research_agent(state):
    topics = extract_topics(state["query"])
    topics = topics[:MAX_PARALLEL]  # Hard cap

    async def limited_research(topic):
        async with tool_limiter:
            return await sub_research(topic)

    results = await asyncio.gather(
        *[limited_research(t) for t in topics],
        return_exceptions=True  # Partial success
    )

Ошибка 5: Эскалация доверия — Low-confidence предположение распространяемо

Влияние: Classifier агент детектирует intent как "billing" с 0.42 confidence (ниже 0.75 threshold), пишет в state anyway, все downstream агенты фильтруют действия через "billing" lens, пользователь с product вопросом попадает в billing support.

Неправильный подход:

async def classifier_agent(state):
    raw_output = llm_classify(state["query"])
    # Не confidence check — плохая классификация распространяется
    return {"intent": raw_output["intent"]}

async def router_agent(state):
    intent = state["intent"]  # Blindly доверяет upstream
    if intent == "billing":
        return Command(goto="billing_agent")

Правильный подход:

CONFIDENCE_THRESHOLD = 0.75

async def classifier_agent(state):
    raw_output = llm_classify(state["query"])

    if raw_output["confidence"] < CONFIDENCE_THRESHOLD:
        # Low confidence: mark как uncertain, preserve original
        return {
            "intent": "other",  # Safe fallback
            "intent_confidence": raw_output["confidence"],
            "original_query": state["query"],  # Never lose original
        }

    return {"intent": raw_output["intent"]}

async def router_agent(state):
    if state["intent"] == "other" or state.get("intent_confidence", 1.0) < CONFIDENCE_THRESHOLD:
        # Fall back к broad поиску используя original query
        query = state["original_query"]
    else:
        query = f"{state['original_query']} (intent: {state['intent']})"

Паттерны каскадного отказа

Каскадные отказы происходят когда ошибка Агента A invisible для Агента B, который обрабатывает A's повреждённый output как валидный ввод, производя собственную ошибку что Агент C затем обрабатывает. Каждый слой obscures root cause.

Типичная cascade последовательность:

Агент A (search)     → Returns {"status": "ok", "results": [], "count": 0}
                       (Should have returned {"status": "error"} но didn't)

Агент B (analyzer)   → Reads count=0, делит на zero → ZeroDivisionError
                       Exception traceback written в state["analysis"]

Агент C (writer)     → Receives traceback string как "analysis"
                       LLM treats error message как data → hallucinates content

Агент D (reviewer)   → Receives hallucinated content
                       LLM видит well-formatted text → approves

Pipeline             → Exits с status "success", логи show no errors
                       User получает confidently неправильный answer

Root cause: Нет валидационных barriers между агентами.

Isolation Strategy: Circuit Breakers + Validation Gates

Circuit breaker паттерн предотвращает retry storms открывая circuit после N отказов, блокируя subsequent запросы до recovery timeout.

from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"        # Normal operation
    OPEN = "open"            # Blocking all requests
    HALF_OPEN = "half_open"  # Testing recovery

@dataclass
class CircuitBreaker:
    name: str
    failure_threshold: int = 3
    recovery_timeout: float = 30.0

    _failure_count: int = field(default=0, init=False)
    _state: CircuitState = field(default=CircuitState.CLOSED, init=False)
    _last_failure_time: datetime | None = field(default=None, init=False)

    def can_proceed(self) -> bool:
        if self._state == CircuitState.CLOSED:
            return True
        if self._state == CircuitState.OPEN:
            if (self._last_failure_time and
                datetime.now() - self._last_failure_time > timedelta(seconds=self.recovery_timeout)):
                self._state = CircuitState.HALF_OPEN
                return True
            return False
        return True  # HALF_OPEN: allow test request

    def record_success(self):
        self._failure_count = 0
        self._state = CircuitState.CLOSED

    def record_failure(self):
        self._failure_count += 1
        self._last_failure_time = datetime.now()
        if self._failure_count >= self.failure_threshold:
            self._state = CircuitState.OPEN

Validation gate паттерн проверяет schema перед accepting upstream output:

from jsonschema import validate, ValidationError

def validate_agent_output(output: dict, schema: dict, agent_name: str) -> tuple[bool, str]:
    if output is None:
        return False, f"{agent_name} returned None"
    try:
        validate(instance=output, schema=schema)
        return True, ""
    except ValidationError as e:
        return False, f"{agent_name} schema validation failed: {e.message}"

Bulkhead Pattern: Изолировать failure домены

Bulkhead (из ship дизайна) изолирует отказы для предотвращения sinking всей системы. В мульти-агентных системах: отдельный high-risk агентов в isolated execution контекстах.

from typing import TypedDict

class BulkheadState(TypedDict):
    # Core pipeline состояние — всегда валидно
    user_query: str
    final_answer: str | None

    # Experimental agent outputs — isolated
    experimental_enrichment: dict | None
    experimental_error: str | None

    # Production agent outputs — validated
    search_results: list[dict]
    analysis: dict

async def experimental_agent(state: BulkheadState) -> dict:
    """High-risk experimental агент — isolated bulkhead."""
    try:
        result = await risky_experimental_llm_call(state["user_query"])
        return {"experimental_enrichment": result}
    except Exception as e:
        # Failure isolated — не abort main pipeline
        return {"experimental_error": str(e)}

async def production_agent(state: BulkheadState) -> dict:
    """Core production агент — runs даже если experimental fails."""
    search = await production_search(state["user_query"])

    # Optional: используйте experimental enrichment если available
    enrichment = state.get("experimental_enrichment")
    if enrichment and state.get("experimental_error") is None:
        search["enrichment"] = enrichment

    return {"search_results": [search]}

def route_after_experimental(state: BulkheadState) -> str:
    # Always продолжайте к production agent regardless of experimental outcome
    return "production_agent"

# Graph setup
graph.add_node("experimental", experimental_agent)
graph.add_node("production", production_agent)
graph.add_edge("experimental", "production")  # No conditional abort

Ключевой принцип: Critical path агенты не должны зависеть от experimental агентов' успеха.

Анти-паттерн гиперорхестрации

Определение: Использование multiple агентов с complex handoffs для задач solvable single well-prompted агентом.

Обычное проявление: Pipeline с 10+ агентами где 7 perform trivial трансформаций (extract JSON, reformat text, validate schema, log metrics) — задачи better handled structured output parsing и middleware.

Когда collapse в single агент

Collapse в single агент если:

  • Sequential цепь без parallelism (A→B→C где каждый waits для previous)
  • Агенты share >80% контекста и tools
  • Average агент делает <5 секунд работы
  • No conditional branching основанный на agent outputs
  • Task заканчивается в <2 минут end-to-end

Держите мульти-агент если:

  • True parallelism (fan-out к 3+ independent подзадачи)
  • Domain expertise разделение (legal + medical + financial домены)
  • Human approval gates в specific stages
  • Long-running (>30 min) требующий checkpoints

Code Example: Over-Orchestration vs. Streamlined

Over-orchestrated (7 агентов для простой задачи):

# Агент 1: Classify intent
async def classifier(state):
    return {"intent": llm_call("classify: " + state["query"])}

# Агент 2: Rewrite query
async def rewriter(state):
    return {"rewritten": llm_call("rewrite: " + state["query"])}

# Агент 3: Search
async def searcher(state):
    return {"results": search(state["rewritten"])}

# Агент 4: Deduplicate results
async def deduplicator(state):
    return {"unique_results": dedupe(state["results"])}

# Агент 5: Rank results
async def ranker(state):
    return {"ranked": llm_call("rank: " + str(state["unique_results"]))}

# Агент 6: Summarize
async def summarizer(state):
    return {"summary": llm_call("summarize: " + state["ranked"])}

# Агент 7: Format output
async def formatter(state):
    return {"final": format_markdown(state["summary"])}

# 7 agent handoffs, 4 LLM вызовов, 15-20 секунд latency

Streamlined (single агент с tools):

from langchain_core.tools import tool
from langchain_anthropic import ChatAnthropic

@tool
def search_tool(query: str) -> list[dict]:
    """Search knowledge base и return deduplicated, ranked results."""
    results = search(query)
    unique = dedupe(results)
    return sorted(unique, key=lambda x: x["score"], reverse=True)

async def unified_agent(state):
    """Single агент с tools и structured output."""
    llm = ChatAnthropic(model="claude-sonnet-4", temperature=0)
    llm_with_tools = llm.bind_tools([search_tool])

    prompt = f"""Answer this query: {state['query']}

Steps:
1. Use search_tool для find relevant информацию
2. Synthesize results в clear answer
3. Format как markdown

Return structured JSON с keys: answer, sources, confidence"""

    response = await llm_with_tools.ainvoke(prompt)

    # Single LLM вызов с tool use, 3-5 секунд latency
    return {"final_answer": response.content}

Performance сравнение:

Подход Агенты LLM вызовы Latency Complexity
Over-orchestrated 7 4 15-20s High (7 handoffs, 7 error paths)
Streamlined 1 1 3-5s Low (single call, tool abstraction)

Context Bleed & Shared State

Context bleed: Агенты unintentionally делящие state modifications, приводящие к race conditions в parallel execution.

Root cause: Python async с await создаёт context switches. Агент A читает state["counter"]=5, yields на await llm_call(), Агент B инкрементирует counter к 6, Агент A resumes и writes stale значение 5 обратно — lost update.

Race Condition Example

class UnsafeState(TypedDict):
    counter: int  # Shared mutable integer
    results: dict  # Shared mutable dict

async def worker_a(state: UnsafeState):
    current = state["counter"]  # Read: 5
    await asyncio.sleep(0.1)    # Yield control
    state["counter"] = current + 1  # Write: 6 (но B уже сделал это 6)
    return state

async def worker_b(state: UnsafeState):
    current = state["counter"]  # Read: 5 (перед A пишет)
    await asyncio.sleep(0.05)   # Yield control (shorter чем A)
    state["counter"] = current + 1  # Write: 6
    return state

# Оба workers работают параллельно
# Expected: counter = 7 (5 + 1 + 1)
# Actual: counter = 6 (lost update)

Isolation Pattern: Reducer Functions

LangGraph Annotated[T, reducer_fn] обеспечивает concurrent updates safely merged:

from typing import Annotated
import operator

def merge_results(a: list, b: list) -> list:
    """Reducer: concatenate lists (order-independent)."""
    return a + b

class SafeState(TypedDict):
    # Reducer гарантирует параллельные writes merged, не overwritten
    results: Annotated[list[dict], merge_results]
    error_count: Annotated[int, operator.add]  # Safe increment
    max_score: Annotated[float, max]  # Safe max tracking

async def worker_a(input: dict) -> dict:
    result = {"worker": "a", "data": "..."}
    # Return list с single element; reducer будет merge
    return {
        "results": [result],
        "error_count": 0,
        "max_score": 0.85,
    }

async def worker_b(input: dict) -> dict:
    result = {"worker": "b", "data": "..."}
    return {
        "results": [result],
        "error_count": 1,
        "max_score": 0.92,
    }

# LangGraph вызовы: merge_results([result_a], [result_b]) → [result_a, result_b]
#                  operator.add(0, 1) → 1
#                  max(0.85, 0.92) → 0.92

Per-Worker Isolation: Send Pattern

Best practice: Give каждому worker isolated input, не shared state:

class WorkerInput(TypedDict):
    task_chunk: str
    worker_id: str
    # No shared mutable state — read-only inputs только

async def dispatcher(state: dict) -> list[Send]:
    chunks = state["task"].split(". ")
    return [
        Send("worker", WorkerInput(task_chunk=chunk, worker_id=f"w{i}"))
        for i, chunk in enumerate(chunks)
    ]

async def worker(worker_input: WorkerInput) -> dict:
    # Operates на isolated input, returns isolated output
    result = process(worker_input["task_chunk"])
    return {"results": [{"worker_id": worker_input["worker_id"], "output": result}]}

Ключевой принцип: Workers получают value inputs (строки, immutable types), возвращают new state chunks merged reducers — никогда не modify shared mutable объектов.

Предотвращение cost cascade

Cost cascade: Экспоненциальный cost growth из nested agent вызовов. Если каждый агент fans out к N sub-agents, цепь глубины D стоит O(N^D).

Example расчёт:

  • Research агент spawns 3 sub-researchers
  • Каждый sub-researcher spawns 3 analyzers
  • Каждый analyzer делает 2 LLM вызовов
  • Total: 3 × 3 × 2 = 18 LLM вызовов (vs. expected 1-2)

На глубине 5 с fanout 3: 3^5 = 243 LLM вызовов.

Budget Propagation Pattern

Concept: Allocate fixed token/cost budget на pipeline start, subdivide среди agents, enforce hard лимиты.

from typing import TypedDict

class BudgetState(TypedDict):
    query: str
    remaining_budget_tokens: int
    total_tokens_used: int
    budget_exceeded: bool
    result: str | None

TOTAL_BUDGET = 100_000  # tokens
AGENT_BUDGETS = {
    "research": 40_000,
    "analysis": 30_000,
    "synthesis": 25_000,
    "review": 5_000,
}

async def research_agent(state: BudgetState) -> dict:
    agent_budget = AGENT_BUDGETS["research"]

    if state["remaining_budget_tokens"] < agent_budget:
        return {
            "budget_exceeded": True,
            "result": "Research skipped: insufficient budget"
        }

    # Make LLM вызов с max_tokens limit
    response = await llm.ainvoke(
        state["query"],
        max_tokens=min(agent_budget, state["remaining_budget_tokens"])
    )

    tokens_used = response.usage_metadata["total_tokens"]

    return {
        "remaining_budget_tokens": state["remaining_budget_tokens"] - tokens_used,
        "total_tokens_used": state["total_tokens_used"] + tokens_used,
        "research_output": response.content,
    }

def route_after_research(state: BudgetState) -> str:
    if state.get("budget_exceeded"):
        return "abort"
    if state["remaining_budget_tokens"] < AGENT_BUDGETS["analysis"]:
        return "synthesis"  # Skip analysis, go straight к synthesis
    return "analysis"

Depth Guard Pattern

Prevent неограниченную рекурсию отслеживая call глубину:

MAX_DEPTH = 5

class DepthState(TypedDict):
    depth: int
    max_depth_exceeded: bool

async def recursive_agent(state: DepthState) -> dict:
    if state["depth"] >= MAX_DEPTH:
        return {
            "max_depth_exceeded": True,
            "result": f"Aborted: max depth {MAX_DEPTH} reached"
        }

    # Decide если recursion нужна
    if needs_more_research(state):
        return {
            "depth": state["depth"] + 1,  # Increment перед recursing
        }
    else:
        return {"result": "Research complete"}

graph.compile(recursion_limit=MAX_DEPTH * 2)  # LangGraph safety net

Aggregation Node Pattern

Reduce fanout cost aggregating параллельные результаты перед next LLM вызовом:

async def fanout_node(state) -> list[Send]:
    topics = state["topics"][:5]  # Cap на 5
    return [Send("worker", {"topic": t}) for t in topics]

async def worker(input: dict) -> dict:
    # Каждый worker: 1 LLM вызов
    result = await llm.ainvoke(f"Research: {input['topic']}")
    return {"worker_results": [result.content]}

async def aggregator(state) -> dict:
    """Aggregate N worker результаты в single LLM вызов."""
    all_results = state["worker_results"]  # List из N результаты

    # Single LLM вызов к synthesize N результаты
    synthesis_prompt = f"Synthesize these {len(all_results)} research findings:\n\n"
    synthesis_prompt += "\n\n".join(all_results)

    final = await llm.ainvoke(synthesis_prompt)

    # Total cost: N worker вызовы + 1 aggregator вызов (не N × M)
    return {"final_answer": final.content}

Cost сравнение:

Паттерн Fanout Вызовы per worker Aggregator Total LLM вызовы
No aggregation 5 3 15
With aggregation 5 1 1 6

Performance & Benchmarks

Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.

Latency по количеству агентов (sequential цепь, no parallelism):

Агенты Avg latency per агент Total latency Overhead
1 3s 3s
3 3s 11s +2s coordination
5 3s 19s +4s coordination
10 3s 40s +10s coordination

Coordination overhead: ~0.5-1s per агент handoff (state сериализация, graph traversal, logging).

Cost по глубине (каждый агент fans out к 3 sub-agents):

Глубина Agents invoked LLM вызовы (2 per агент) Cost при $0.01/call
1 1 2 $0.02
2 1 + 3 = 4 8 $0.08
3 1 + 3 + 9 = 13 26 $0.26
4 1 + 3 + 9 + 27 = 40 80 $0.80
5 1 + 3 + 9 + 27 + 81 = 121 242 $2.42

Throughput impact валидации:

Валидация Overhead per агент Impact на 5-agent цепь
None 0ms 0ms (baseline)
JSON schema (jsonschema) 8-12ms 40-60ms
LLM-based валидация 500-1500ms 2500-7500ms

Рекомендация: Используйте schema валидацию (negligible overhead vs. LLM call latency).

Circuit breaker эффективность (симулированный 30% upstream отказ rate):

Паттерн Failed запросы Wasted LLM вызовы Cost savings
No circuit breaker 30% 30% proceed к next агенту 0%
Circuit breaker (threshold=3) 30% 3-6% proceed перед circuit opens ~80%

Parallel fanout масштабирование (bounded vs unbounded):

Workers Unbounded latency Bounded (semaphore=5) latency API rate limit нарушения
5 3s 3s 0%
10 3s 6s (2 batches) 0%
20 3s 12s (4 batches) 0%
50 (unbounded) 3s 85% (rate limit hit)