Страховки для агентов: Лимиты итераций, Отслеживание затрат, Паттерны таймаутов


title: "Страховки для агентов: Лимиты итераций, Отслеживание затрат, Паттерны таймаутов" slug: agent-failsafe-iteration-cost-timeout-2026-ru date: 2026-02-24 lang: ru

Страховки для агентов: Лимиты итераций, Отслеживание затрат, Паттерны таймаутов

Ключевые факты

  • LangGraph recursion_limit по умолчанию: 25 итераций (задаётся в словаре RunnableConfig, передаваемом в .invoke() или .ainvoke())
  • Месторасположение LangGraph recursion_limit: Параметр конфига {"recursion_limit": N} — не свойство графа, а конфиг вызова
  • Механика LangGraph interrupt(): Вызовите interrupt(value) из узла, чтобы паузировать выполнение и вернуться к вызывающей стороне для human-in-the-loop
  • tiktoken текущая версия: 0.12.0 (по данным PyPI, март 2026)
  • langchain текущая версия: 1.2.10 (стабильный API для callbacks и сообщений)
  • langgraph текущая версия: 1.0.10 (стабильный API графа с breaking changes от 0.x)
  • Подсчёт токенов на сообщение: Каждое сообщение требует 4 служебных токена + токены контента + 2 токена для подготовки ответа (формат OpenAI chat)
  • Пример затрат разошедшегося агента: GPT-4o по $2.50/1M входящих токенов + $10.00/1M выходящих токенов — 20 итераций × 8K токенов/итерация = 160K токенов ≈ $0.40–$1.60 в зависимости от ratio input/output
  • Паттерн таймаута для async-кода: asyncio.wait_for(coro, timeout=seconds) генерирует asyncio.TimeoutError при истечении
  • Паттерн таймаута для sync-кода: concurrent.futures.ThreadPoolExecutor с future.result(timeout=seconds) или signal.alarm (только Unix, не thread-safe)
  • Ограничение signal.alarm: Работает только в main thread на Unix-системах — ненадёжно для production multi-threaded агентов
  • Threshold consecutive failure в circuit breaker: Типично 5 последовательных сбоев перед открытием цепи
  • Circuit breaker half-open state: После истечения timeout circuit переходит в HALF_OPEN, позволяя test requests (типично требует 2 последовательных успехов для закрытия)
  • Circuit breaker timeout window: 60 секунд — обычное значение перед попыткой retry из OPEN state
  • Формула подсчёта токенов для сообщений: sum(4 + tokens(msg.content) for msg in messages) + 2
  • LangChain callback integration point: BaseCallbackHandler.on_llm_end(response: LLMResult) получает dict token_usage из ответа LLM
  • Budget enforcement timing: Pre-call checks предотвращают запуск вызова, который превысит бюджет; post-call accounting проверяет фактические затраты
  • Tool timeout baseline: 10 секунд — разумное значение по умолчанию; настройте per tool (web_search: 8s, code_execution: 30s, database_query: 10s)
  • ThreadPoolExecutor для sync tools: executor.submit(func, *args).result(timeout=seconds) с future.cancel() при таймауте
  • Graceful degradation requirement: Всегда возвращайте partial results с объяснением вместо голых исключений, когда срабатывают страховки

Почему агенты разбегаются

AI-агенты отказывают иначе, чем стандартные приложения. Они зацикливаются — вызывают инструменты, рассуждают о результатах, вызывают больше инструментов. Этот feedback loop создаёт режимы отказа, которых не существует в single-shot LLM вызовах.

Коренные причины

Reward hacking. Агент обнаруживает паттерн взаимодействия с инструментом, который, по его сигналу reward, выглядит как прогресс, но на самом деле не приближает к цели. Пример: агент с задачей "исследовать цены конкурентов" повторно вызывает tool web search с небольшими вариациями, каждый раз получая большие HTML-ответы, которые интерпретирует как прогресс.

Tool result loops. Инструмент возвращает двусмысленный output. Агент вызывает другой инструмент для уточнения. Тот инструмент тоже возвращает двусмысленный output. Цикл продолжается. Без вмешательства агент бесконечно потребляет бюджет.

Плохие условия завершения. Prompting агента не включает чёткие критерии успеха. Он продолжает "уточнять" результаты даже когда задача завершена. Частое в open-ended задачах типа "улучшить этот код" или "тщательно исследовать эту тему".

Context window bloat. Каждая итерация добавляется в историю разговора. Результаты инструментов с verbose output (50KB API responses, дампы БД, полные веб-страницы) вызывают экспоненциальный рост контекста. 5-итерационный loop может раздуться с 2K токенов до 80K токенов.

Tool call timeouts. Внешние сервисы зависают. Database queries бегут бесконечно под нагрузкой. HTTP requests ждут вечность на неправильно настроенных серверах. Без таймаутов потоки агента блокируются навсегда и thread pools истощаются.

Decision Framework

Режим отказа Первичная страховка Вторичная страховка Fallback стратегия
Бесконечный loop reasoning recursion_limit в config Кастомный iteration counter в state Emergency stop node возвращающий partial results
Token cost explosion Budget middleware (pre-call checks) Cost tracking callback (post-call) Switch на дешёвый model для summary
Hanging tool call asyncio.wait_for / ThreadPoolExecutor timeout Circuit breaker после N timeouts Return cached/fallback data
Repeated tool failures Circuit breaker (open после 5 failures) Tool-level retry с exponential backoff Structured error для reasoning агента
Context window overflow Token counting + max_tokens_per_call limit Summarization old messages Truncate oldest tool results
Budget exhausted mid-run Budget check перед каждым LLM call Graceful degradation handler Use gpt-4o-mini для synthesis partial answer

Parameters Reference Table

Параметр Рекомендуемое значение Примечания
recursion_limit 15–25 для production tasks LangGraph default 25; задайте явно в config
Soft iteration limit 80% от recursion_limit (e.g. 20 если recursion_limit=25) Trigger warning logs и graceful stop перед hard limit
Per-run budget $0.10–$0.50 для GPT-4o tasks Adjust в зависимости от complexity task и acceptable cost
Budget warning threshold 80% от total budget Log warnings когда approaching limit
max_tokens_per_call 6000–8000 tokens Prevents single call от consuming entire context window
Tool timeout (default) 10 seconds Override per tool based на expected latency
Tool timeout (database) 10 seconds с statement_timeout=9s Database-side timeout должен быть немного shorter
Tool timeout (web requests) 8 seconds Account для network latency и slow servers
Tool timeout (code execution) 30 seconds Longer для compute-intensive operations
Circuit breaker failure threshold 5 consecutive failures Open circuit чтобы prevent cascade failures
Circuit breaker success threshold 2 consecutive successes в HALF_OPEN Close circuit після service recovery
Circuit breaker timeout 60 seconds Wait перед testing если service recovered
Thread pool size для sync tools 4–8 workers Limit concurrent blocking operations
Early stopping confidence threshold 0.90–0.95 Stop когда agent signals high confidence
Token usage monitoring interval Every LLM call via callback Real-time cost tracking

Частые ошибки

Ошибка 1: Нет recursion_limit

Влияние: Агент бежит до API timeout (часто 600 секунд) или exhaustion бюджета, сжигая $5–$50 затрат.

Неправильно:

result = agent.invoke({"messages": [...]})  # Uses default 25, but implicit

Правильно:

result = agent.invoke(
    {"messages": [...]},
    config={"recursion_limit": 20}  # Explicit hard limit
)

Ошибка 2: Budget Tracking без Enforcement

Влияние: Вы логируете затраты но не стопите выполнение. Агент завершается но превышает бюджет на 300–500%.

Неправильно:

class CostTracker:
    def check_budget(self):
        if self.usage.total_cost_usd >= self.budget_usd:
            logger.warning("Budget exceeded!")  # Only logs, doesn't stop

Правильно:

class CostTracker:
    def check_budget(self):
        if self.usage.total_cost_usd >= self.budget_usd:
            raise BudgetExceededError(
                f"${self.usage.total_cost_usd:.4f} spent, "
                f"budget was ${self.budget_usd:.4f}"
            )

Ошибка 3: Tool Timeouts без Fallback Values

Влияние: Timeout exception propagates к агенту, crashes выполнение вместо graceful degradation.

Неправильно:

@with_timeout(seconds=5.0, raise_on_timeout=True)
def query_database(sql: str) -> dict:
    return execute_query(sql)  # Raises ToolTimeoutError on timeout

Правильно:

@with_timeout(seconds=5.0, fallback={"error": "query_timeout", "rows": []})
def query_database(sql: str) -> dict:
    return execute_query(sql)  # Returns structured fallback on timeout

Ошибка 4: Ignoring Cached Token Pricing

Влияние: Cost estimates на 50–100% выше. Repeated context использует cached tokens по скидке 50%.

Неправильно:

cost = (prompt_tokens / 1_000_000) * pricing["input"]
# Ignores that some prompt tokens are cached

Правильно:

regular_input = prompt_tokens - cached_tokens
cost = (
    (regular_input / 1_000_000) * pricing["input"]
    + (cached_tokens / 1_000_000) * pricing["cached_input"]
)

Ошибка 5: Circuit Breaker без HALF_OPEN State

Влияние: Service восстанавливается но circuit остаётся permanently open. Требуется manual intervention.

Неправильно:

if self.state == CircuitState.OPEN:
    raise CircuitOpenError("Service unavailable")
    # Never tests if service recovered

Правильно:

if self.state == CircuitState.OPEN:
    if time.time() - self.last_failure_time >= self.timeout_seconds:
        self.state = CircuitState.HALF_OPEN  # Test recovery
    else:
        raise CircuitOpenError(f"Retry in {self._seconds_until_retry()}s")

Iteration Limit Patterns

Pattern 1: Hard Limit via recursion_limit

LangGraph's встроенный механизм. Задайте в invocation config, генерирует GraphRecursionError при превышении.

from langgraph.graph import StateGraph, END
from langgraph.errors import GraphRecursionError

agent = build_agent_graph()

try:
    result = agent.invoke(
        {"messages": [{"role": "user", "content": "analyze dataset"}]},
        config={"recursion_limit": 25}
    )
except GraphRecursionError as e:
    logger.error(f"Agent exceeded recursion limit: {e}")
    result = {"error": "max_iterations", "partial": None}

Когда использовать: Всегда задавайте как ultimate safety net. Даже если вы реализуете soft limits, hard limit предотвращает runaway execution.

Pattern 2: Soft Limit с State Tracking

Отслеживайте iteration count в state и trigger graceful degradation перед hitting hard limit.

from typing import TypedDict, Annotated
import operator

class AgentState(TypedDict):
    messages: Annotated[list, operator.add]
    iteration_count: int
    should_stop: bool
    stop_reason: str | None

MAX_ITERATIONS = 20
WARN_AT = 15

def increment_iteration(state: AgentState) -> dict:
    count = state["iteration_count"] + 1

    if count >= MAX_ITERATIONS:
        return {
            "iteration_count": count,
            "should_stop": True,
            "stop_reason": "soft_max_iterations"
        }

    if count >= WARN_AT:
        logger.warning(f"Iteration {count}/{MAX_ITERATIONS} — approaching limit")

    return {"iteration_count": count}

def routing_node(state: AgentState) -> str:
    if state.get("should_stop"):
        return "graceful_stop"

    if has_final_answer(state):
        return "end"

    return "continue_reasoning"

def graceful_stop_node(state: AgentState) -> dict:
    summary = summarize_progress(state["messages"])
    return {
        "messages": [{
            "role": "assistant",
            "content": f"Reached iteration limit. Progress so far: {summary}"
        }]
    }

Когда использовать: Production системы где вы хотите контроль над stopping behavior. Позволяет кастомную логику для partial results.

Pattern 3: Per-Tool Call Budget

Лимитируйте how many times каждый инструмент может быть вызван per run. Prevents tool-specific loops.

from collections import defaultdict

class ToolCallBudget:
    def __init__(self, max_calls_per_tool: int = 5):
        self.max_calls_per_tool = max_calls_per_tool
        self.call_counts: dict[str, int] = defaultdict(int)

    def can_call(self, tool_name: str) -> bool:
        return self.call_counts[tool_name] < self.max_calls_per_tool

    def record_call(self, tool_name: str) -> None:
        self.call_counts[tool_name] += 1

    def check_and_record(self, tool_name: str) -> None:
        if not self.can_call(tool_name):
            raise ToolBudgetExceededError(
                f"Tool '{tool_name}' called {self.call_counts[tool_name]} times "
                f"(limit: {self.max_calls_per_tool})"
            )
        self.record_call(tool_name)

# Integrate into tool execution node
def tool_execution_node(state: AgentState, budget: ToolCallBudget) -> dict:
    tool_call = state["messages"][-1]["tool_calls"][0]
    tool_name = tool_call["name"]

    try:
        budget.check_and_record(tool_name)
        result = execute_tool(tool_name, tool_call["args"])
        return {"messages": [{"role": "tool", "content": result}]}
    except ToolBudgetExceededError as e:
        return {
            "messages": [{
                "role": "tool",
                "content": f"Tool budget exceeded: {e}. Try a different approach."
            }]
        }

Когда использовать: Когда агенты застревают в single-tool loops (e.g. вызывают web_search 20 раз с minor variations).

Cost Circuit Breaker

Real-time token counting и budget enforcement предотвращают runaway costs.

Token Counting с tiktoken

import tiktoken
from dataclasses import dataclass, field
import time

@dataclass
class TokenUsage:
    prompt_tokens: int = 0
    completion_tokens: int = 0
    cached_tokens: int = 0
    total_cost_usd: float = 0.0
    call_count: int = 0
    start_time: float = field(default_factory=time.time)

    @property
    def total_tokens(self) -> int:
        return self.prompt_tokens + self.completion_tokens

    @property
    def elapsed_seconds(self) -> float:
        return time.time() - self.start_time

class CostTracker:
    MODEL_PRICING = {
        "gpt-4o": {"input": 2.50, "output": 10.00, "cached_input": 1.25},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60, "cached_input": 0.075},
        "o3-mini": {"input": 1.10, "output": 4.40, "cached_input": 0.55},
    }

    def __init__(self, model: str, budget_usd: float | None = None):
        self.model = model
        self.budget_usd = budget_usd
        self.usage = TokenUsage()

        try:
            self._encoder = tiktoken.encoding_for_model(model)
        except KeyError:
            self._encoder = tiktoken.get_encoding("cl100k_base")

    def count_tokens(self, text: str) -> int:
        return len(self._encoder.encode(text))

    def count_messages_tokens(self, messages: list[dict]) -> int:
        """OpenAI chat format: 4 tokens overhead per message + 2 for reply."""
        total = 0
        for msg in messages:
            total += 4  # Message overhead
            for key, value in msg.items():
                if isinstance(value, str):
                    total += self.count_tokens(value)
        total += 2  # Reply priming
        return total

    def record_call(
        self,
        prompt_tokens: int,
        completion_tokens: int,
        cached_tokens: int = 0
    ) -> None:
        pricing = self.MODEL_PRICING.get(
            self.model,
            {"input": 5.0, "output": 15.0, "cached_input": 2.5}
        )

        regular_input = prompt_tokens - cached_tokens
        cost = (
            (regular_input / 1_000_000) * pricing["input"]
            + (cached_tokens / 1_000_000) * pricing["cached_input"]
            + (completion_tokens / 1_000_000) * pricing["output"]
        )

        self.usage.prompt_tokens += prompt_tokens
        self.usage.completion_tokens += completion_tokens
        self.usage.cached_tokens += cached_tokens
        self.usage.total_cost_usd += cost
        self.usage.call_count += 1

    def check_budget(self) -> None:
        if self.budget_usd and self.usage.total_cost_usd >= self.budget_usd:
            raise BudgetExceededError(
                f"Budget ${self.budget_usd:.4f} exceeded: "
                f"${self.usage.total_cost_usd:.4f} spent"
            )

class BudgetExceededError(Exception):
    pass

Budget Enforcement Middleware

Pre-call checks предотвращают exceeding budget перед incurred cost.

from langchain_core.language_models import BaseChatModel
from langchain_core.messages import BaseMessage
from langchain_core.outputs import ChatResult
from typing import Any
import logging

logger = logging.getLogger(__name__)

class BudgetEnforcingLLM(BaseChatModel):
    inner_llm: BaseChatModel
    tracker: CostTracker
    max_tokens_per_call: int = 8000
    warn_at_budget_fraction: float = 0.8

    class Config:
        arbitrary_types_allowed = True

    def _generate(
        self,
        messages: list[BaseMessage],
        stop: list[str] | None = None,
        **kwargs: Any
    ) -> ChatResult:
        self._pre_call_checks(messages)
        result = self.inner_llm._generate(messages, stop=stop, **kwargs)
        self._post_call_accounting(result)
        return result

    def _pre_call_checks(self, messages: list[BaseMessage]) -> None:
        # Token limit check
        estimated_tokens = self.tracker.count_messages_tokens(
            [{"role": m.type, "content": str(m.content)} for m in messages]
        )

        if estimated_tokens > self.max_tokens_per_call:
            raise TokenLimitError(
                f"Input {estimated_tokens} tokens exceeds limit "
                f"{self.max_tokens_per_call}"
            )

        # Budget check
        if self.tracker.budget_usd:
            remaining = self.tracker.budget_usd - self.tracker.usage.total_cost_usd
            if remaining <= 0:
                raise BudgetExceededError("Budget exhausted")

            fraction_used = (
                self.tracker.usage.total_cost_usd / self.tracker.budget_usd
            )
            if fraction_used >= self.warn_at_budget_fraction:
                logger.warning(
                    f"Budget {fraction_used:.0%} consumed "
                    f"(${self.tracker.usage.total_cost_usd:.4f}/"
                    f"${self.tracker.budget_usd:.4f})"
                )

    def _post_call_accounting(self, result: ChatResult) -> None:
        if result.llm_output:
            usage = result.llm_output.get("token_usage", {})
            if usage:
                self.tracker.record_call(
                    prompt_tokens=usage.get("prompt_tokens", 0),
                    completion_tokens=usage.get("completion_tokens", 0),
                    cached_tokens=usage.get("prompt_tokens_details", {})
                                  .get("cached_tokens", 0)
                )
                self.tracker.check_budget()

    @property
    def _llm_type(self) -> str:
        return f"budget_enforcing_{self.inner_llm._llm_type}"

class TokenLimitError(Exception):
    pass

LangChain Callback Integration

from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from typing import Any

class CostTrackingCallback(BaseCallbackHandler):
    def __init__(self, tracker: CostTracker):
        self.tracker = tracker

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        if not response.llm_output:
            return

        usage = response.llm_output.get("token_usage") or \
                response.llm_output.get("usage", {})

        if usage:
            self.tracker.record_call(
                prompt_tokens=usage.get("prompt_tokens", 0),
                completion_tokens=usage.get("completion_tokens", 0),
                cached_tokens=usage.get("prompt_tokens_details", {})
                              .get("cached_tokens", 0)
            )

            try:
                self.tracker.check_budget()
            except BudgetExceededError:
                raise

# Usage
tracker = CostTracker(model="gpt-4o", budget_usd=0.50)
callback = CostTrackingCallback(tracker)

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o", callbacks=[callback])

result = agent.invoke(
    {"messages": [...]},
    config={"callbacks": [callback], "recursion_limit": 25}
)

print(f"Cost: ${tracker.usage.total_cost_usd:.4f}")

Timeout Implementation

Pattern 1: Async Tool Timeout с asyncio.wait_for

import asyncio
import functools
from typing import Callable, Any, TypeVar

T = TypeVar("T")

def with_async_timeout(
    seconds: float,
    fallback: Any = None,
    raise_on_timeout: bool = False
):
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs) -> Any:
            try:
                return await asyncio.wait_for(
                    func(*args, **kwargs),
                    timeout=seconds
                )
            except asyncio.TimeoutError:
                if raise_on_timeout:
                    raise ToolTimeoutError(
                        f"{func.__name__} timed out after {seconds}s"
                    )
                return fallback
        return wrapper
    return decorator

class ToolTimeoutError(Exception):
    pass

# Usage
@with_async_timeout(seconds=8.0, fallback={"error": "search_timeout"})
async def web_search(query: str) -> dict:
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.get(
            f"https://api.search.com?q={query}",
            timeout=aiohttp.ClientTimeout(total=7.5)
        ) as resp:
            return await resp.json()

Pattern 2: Sync Tool Timeout с ThreadPoolExecutor

Важно: Не используйте signal.alarm в production. Работает только в main thread на Unix-системах и interferes с asyncio event loops.

import functools
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
from typing import Callable, Any

def with_sync_timeout(
    seconds: float,
    fallback: Any = None,
    raise_on_timeout: bool = False
):
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            with ThreadPoolExecutor(max_workers=1) as executor:
                future = executor.submit(func, *args, **kwargs)
                try:
                    return future.result(timeout=seconds)
                except FuturesTimeout:
                    future.cancel()
                    if raise_on_timeout:
                        raise ToolTimeoutError(
                            f"{func.__name__} timed out after {seconds}s"
                        )
                    return fallback
        return wrapper
    return decorator

@with_sync_timeout(seconds=10.0, fallback={"error": "db_timeout", "rows": []})
def query_database(sql: str, params: dict) -> dict:
    import psycopg2
    conn = psycopg2.connect(
        dsn="postgresql://...",
        connect_timeout=5,
        options="-c statement_timeout=9000"  # 9s server-side timeout
    )
    cursor = conn.cursor()
    cursor.execute(sql, params)
    return {"rows": cursor.fetchall()}

Pattern 3: Aggregate Timeout для Tool Batch

Когда агент вызывает multiple tools в parallel, enforce total deadline.

import asyncio
from typing import Callable

async def execute_tools_with_deadline(
    tool_calls: list[dict],
    tools_registry: dict[str, Callable],
    total_timeout: float = 30.0,
    per_tool_timeout: float = 10.0
) -> list[dict]:

    async def execute_single(call: dict) -> dict:
        tool_name = call["name"]
        tool_args = call["args"]
        tool_id = call["id"]

        tool_func = tools_registry.get(tool_name)
        if not tool_func:
            return {"tool_call_id": tool_id, "error": f"Unknown tool: {tool_name}"}

        try:
            if asyncio.iscoroutinefunction(tool_func):
                result = await asyncio.wait_for(
                    tool_func(**tool_args),
                    timeout=per_tool_timeout
                )
            else:
                result = await asyncio.wait_for(
                    asyncio.get_event_loop().run_in_executor(
                        None, lambda: tool_func(**tool_args)
                    ),
                    timeout=per_tool_timeout
                )
            return {"tool_call_id": tool_id, "result": result}

        except asyncio.TimeoutError:
            return {
                "tool_call_id": tool_id,
                "error": f"Tool {tool_name} timeout after {per_tool_timeout}s"
            }
        except Exception as e:
            return {
                "tool_call_id": tool_id,
                "error": f"{type(e).__name__}: {str(e)}"
            }

    try:
        results = await asyncio.wait_for(
            asyncio.gather(*[execute_single(call) for call in tool_calls]),
            timeout=total_timeout
        )
        return list(results)

    except asyncio.TimeoutError:
        return [{
            "error": f"Tool batch exceeded {total_timeout}s total deadline"
        }]

Per-Tool Timeout Configuration

TOOL_TIMEOUTS = {
    "web_search": 8.0,
    "web_scrape": 12.0,
    "code_execution": 30.0,
    "database_query": 10.0,
    "file_read": 5.0,
    "api_call": 15.0,
    "llm_summarize": 20.0,
}

def get_tool_timeout(tool_name: str, default: float = 10.0) -> float:
    return TOOL_TIMEOUTS.get(tool_name, default)

# Apply when decorating tools
@with_async_timeout(seconds=get_tool_timeout("web_search"))
async def web_search(query: str) -> dict:
    ...

Graceful Degradation

Когда страховки срабатывают, возвращайте useful partial results вместо bare errors.

Budget Exceeded Handler

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

class GracefulDegradationHandler:
    def __init__(self, cheap_model: str = "gpt-4o-mini"):
        self.cheap_llm = ChatOpenAI(model=cheap_model, max_tokens=500)

    async def handle_budget_exceeded(
        self,
        state: dict,
        budget_info: dict
    ) -> str:
        completed_work = self._extract_tool_results(state["messages"])

        prompt = f"""
Task: {state.get('original_task', 'Unknown task')}

Budget exhausted: ${budget_info['used']:.4f} of ${budget_info['total']:.4f}

Completed work:
{completed_work}

Provide:
1. Summary of what was accomplished
2. What still needs to be done
3. Most important findings

Be concise (budget-constrained summary).
"""

        response = await self.cheap_llm.ainvoke([HumanMessage(content=prompt)])
        return response.content

    def _extract_tool_results(self, messages: list) -> str:
        findings = []
        for msg in messages:
            if isinstance(msg, dict) and msg.get("role") == "tool":
                content = msg.get("content", "")
                if len(content) > 50:
                    findings.append(content[:300])

        return "\n".join(findings[-10:]) if findings else "No results yet"

Iteration Limit Handler

async def handle_iteration_limit(
    self,
    state: dict,
    iteration_count: int
) -> str:
    messages = state.get("messages", [])

    tool_results = [
        m.get("content", "")[:200]
        for m in messages
        if isinstance(m, dict) and m.get("role") == "tool"
    ]

    if not tool_results:
        return (
            f"Could not complete task in {iteration_count} iterations. "
            "Try breaking into smaller sub-tasks."
        )

    gathered = "\n".join(tool_results[-5:])

    return (
        f"Reached iteration limit ({iteration_count}). "
        f"Information gathered:\n\n{gathered}\n\n"
        "Based on this, partial answer: [agent synthesizes here]"
    )

Timeout Handler

async def handle_timeout(
    self,
    state: dict,
    elapsed_seconds: float,
    timeout_seconds: float
) -> str:
    last_action = self._get_last_action(state["messages"])

    return (
        f"Task exceeded {timeout_seconds}s timeout (ran for {elapsed_seconds:.1f}s). "
        f"Last action: {last_action}. "
        "This task may require optimization or should be split into smaller steps."
    )

def _get_last_action(self, messages: list) -> str:
    for msg in reversed(messages):
        if isinstance(msg, dict):
            if msg.get("role") == "assistant" and msg.get("tool_calls"):
                return f"calling {msg['tool_calls'][0]['name']}"
            elif msg.get("role") == "tool":
                return "waiting for tool result"
    return "unknown"

Performance & Benchmarks

Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.

Token Counting Overhead

  • tiktoken encoding time: Примерно 50–200 микросекунд на сообщение (1,000 токенов)
  • Impact on latency: Negligible (< 1% от total LLM call latency)
  • Memory footprint: ~10 MB для tokenizer model загруженной один раз per process

Cost Savings из Safeguards

Типичный runaway agent без safeguards:

  • Бежит на full API timeout (600 секунд)
  • Делает 40–80 LLM calls с escalating context
  • Total cost: $3–$15 для GPT-4o

С safeguards (recursion_limit=20, budget=$0.50):

  • Stops в iteration 20 или budget exhaustion
  • Total cost capped на $0.50
  • Cost reduction: 85–95% на runaway scenarios

Circuit Breaker Impact

Когда tool fails repeatedly (e.g. database connection lost):

Без circuit breaker:

  • Agent attempts 20 tool calls, каждый timing out после 10s
  • Total wasted time: 200 секунд
  • No useful work done

С circuit breaker (threshold=5, timeout=60s):

  • First 5 calls fail (50 секунд wasted)
  • Circuit opens, subsequent calls fail immediately
  • Agent receives structured error и tries alternative approach
  • Total wasted time: 50 секунд
  • Time saved: 75%

Timeout Configuration Impact

Tool Type Default Timeout 95th Percentile Latency Timeout Setting False Positive Rate
Database query 10s 2–4s 10s Very low
Web search API 8s 1–3s 8s Low
Web scraping 12s 4–8s 12s Low-medium
Code execution 30s 5–15s 30s Medium (depends on task)
File I/O 5s 0.1–1s 5s Very low

False positives occur когда legitimate operations exceed timeout. Tune based на P95 latency + safety margin.

Memory Usage по Component

Estimated per-agent memory consumption:

  • Base LangGraph state: 5–20 KB (depends на state schema)
  • Message history (50 messages): 100–500 KB (depends на tool results)
  • tiktoken encoder: 10 MB (shared across agents в process)
  • CostTracker per run: < 1 KB
  • CircuitBreaker registry (20 tools): < 5 KB
  • Total per agent: 10–15 MB baseline + message history

Для 100 concurrent agents: 1–2 GB memory footprint (dominated by message history).

Callback Overhead

LangChain callback handlers добавляют minimal overhead:

  • on_llm_end callback execution: 100–500 микросекунд per call
  • Token counting в callback: 50–200 микросекунд (если re-counting messages)
  • Total overhead: < 1ms per LLM call (< 0.1% от typical LLM latency)

Callbacks синхронны и блокируют briefly, но impact negligible в сравнении с network I/O для LLM API calls.