title: "Страховки для агентов: Лимиты итераций, Отслеживание затрат, Паттерны таймаутов" slug: agent-failsafe-iteration-cost-timeout-2026-ru date: 2026-02-24 lang: ru
Страховки для агентов: Лимиты итераций, Отслеживание затрат, Паттерны таймаутов
Ключевые факты
- LangGraph recursion_limit по умолчанию: 25 итераций (задаётся в словаре
RunnableConfig, передаваемом в.invoke()или.ainvoke()) - Месторасположение LangGraph recursion_limit: Параметр конфига
{"recursion_limit": N}— не свойство графа, а конфиг вызова - Механика LangGraph interrupt(): Вызовите
interrupt(value)из узла, чтобы паузировать выполнение и вернуться к вызывающей стороне для human-in-the-loop - tiktoken текущая версия: 0.12.0 (по данным PyPI, март 2026)
- langchain текущая версия: 1.2.10 (стабильный API для callbacks и сообщений)
- langgraph текущая версия: 1.0.10 (стабильный API графа с breaking changes от 0.x)
- Подсчёт токенов на сообщение: Каждое сообщение требует 4 служебных токена + токены контента + 2 токена для подготовки ответа (формат OpenAI chat)
- Пример затрат разошедшегося агента: GPT-4o по $2.50/1M входящих токенов + $10.00/1M выходящих токенов — 20 итераций × 8K токенов/итерация = 160K токенов ≈ $0.40–$1.60 в зависимости от ratio input/output
- Паттерн таймаута для async-кода:
asyncio.wait_for(coro, timeout=seconds)генерируетasyncio.TimeoutErrorпри истечении - Паттерн таймаута для sync-кода:
concurrent.futures.ThreadPoolExecutorсfuture.result(timeout=seconds)или signal.alarm (только Unix, не thread-safe) - Ограничение signal.alarm: Работает только в main thread на Unix-системах — ненадёжно для production multi-threaded агентов
- Threshold consecutive failure в circuit breaker: Типично 5 последовательных сбоев перед открытием цепи
- Circuit breaker half-open state: После истечения timeout circuit переходит в HALF_OPEN, позволяя test requests (типично требует 2 последовательных успехов для закрытия)
- Circuit breaker timeout window: 60 секунд — обычное значение перед попыткой retry из OPEN state
- Формула подсчёта токенов для сообщений:
sum(4 + tokens(msg.content) for msg in messages) + 2 - LangChain callback integration point:
BaseCallbackHandler.on_llm_end(response: LLMResult)получает dict token_usage из ответа LLM - Budget enforcement timing: Pre-call checks предотвращают запуск вызова, который превысит бюджет; post-call accounting проверяет фактические затраты
- Tool timeout baseline: 10 секунд — разумное значение по умолчанию; настройте per tool (web_search: 8s, code_execution: 30s, database_query: 10s)
- ThreadPoolExecutor для sync tools:
executor.submit(func, *args).result(timeout=seconds)сfuture.cancel()при таймауте - Graceful degradation requirement: Всегда возвращайте partial results с объяснением вместо голых исключений, когда срабатывают страховки
Почему агенты разбегаются
AI-агенты отказывают иначе, чем стандартные приложения. Они зацикливаются — вызывают инструменты, рассуждают о результатах, вызывают больше инструментов. Этот feedback loop создаёт режимы отказа, которых не существует в single-shot LLM вызовах.
Коренные причины
Reward hacking. Агент обнаруживает паттерн взаимодействия с инструментом, который, по его сигналу reward, выглядит как прогресс, но на самом деле не приближает к цели. Пример: агент с задачей "исследовать цены конкурентов" повторно вызывает tool web search с небольшими вариациями, каждый раз получая большие HTML-ответы, которые интерпретирует как прогресс.
Tool result loops. Инструмент возвращает двусмысленный output. Агент вызывает другой инструмент для уточнения. Тот инструмент тоже возвращает двусмысленный output. Цикл продолжается. Без вмешательства агент бесконечно потребляет бюджет.
Плохие условия завершения. Prompting агента не включает чёткие критерии успеха. Он продолжает "уточнять" результаты даже когда задача завершена. Частое в open-ended задачах типа "улучшить этот код" или "тщательно исследовать эту тему".
Context window bloat. Каждая итерация добавляется в историю разговора. Результаты инструментов с verbose output (50KB API responses, дампы БД, полные веб-страницы) вызывают экспоненциальный рост контекста. 5-итерационный loop может раздуться с 2K токенов до 80K токенов.
Tool call timeouts. Внешние сервисы зависают. Database queries бегут бесконечно под нагрузкой. HTTP requests ждут вечность на неправильно настроенных серверах. Без таймаутов потоки агента блокируются навсегда и thread pools истощаются.
Decision Framework
| Режим отказа | Первичная страховка | Вторичная страховка | Fallback стратегия |
|---|---|---|---|
| Бесконечный loop reasoning | recursion_limit в config |
Кастомный iteration counter в state | Emergency stop node возвращающий partial results |
| Token cost explosion | Budget middleware (pre-call checks) | Cost tracking callback (post-call) | Switch на дешёвый model для summary |
| Hanging tool call | asyncio.wait_for / ThreadPoolExecutor timeout |
Circuit breaker после N timeouts | Return cached/fallback data |
| Repeated tool failures | Circuit breaker (open после 5 failures) | Tool-level retry с exponential backoff | Structured error для reasoning агента |
| Context window overflow | Token counting + max_tokens_per_call limit | Summarization old messages | Truncate oldest tool results |
| Budget exhausted mid-run | Budget check перед каждым LLM call | Graceful degradation handler | Use gpt-4o-mini для synthesis partial answer |
Parameters Reference Table
| Параметр | Рекомендуемое значение | Примечания |
|---|---|---|
recursion_limit |
15–25 для production tasks | LangGraph default 25; задайте явно в config |
| Soft iteration limit | 80% от recursion_limit (e.g. 20 если recursion_limit=25) | Trigger warning logs и graceful stop перед hard limit |
| Per-run budget | $0.10–$0.50 для GPT-4o tasks | Adjust в зависимости от complexity task и acceptable cost |
| Budget warning threshold | 80% от total budget | Log warnings когда approaching limit |
max_tokens_per_call |
6000–8000 tokens | Prevents single call от consuming entire context window |
| Tool timeout (default) | 10 seconds | Override per tool based на expected latency |
| Tool timeout (database) | 10 seconds с statement_timeout=9s | Database-side timeout должен быть немного shorter |
| Tool timeout (web requests) | 8 seconds | Account для network latency и slow servers |
| Tool timeout (code execution) | 30 seconds | Longer для compute-intensive operations |
| Circuit breaker failure threshold | 5 consecutive failures | Open circuit чтобы prevent cascade failures |
| Circuit breaker success threshold | 2 consecutive successes в HALF_OPEN | Close circuit після service recovery |
| Circuit breaker timeout | 60 seconds | Wait перед testing если service recovered |
| Thread pool size для sync tools | 4–8 workers | Limit concurrent blocking operations |
| Early stopping confidence threshold | 0.90–0.95 | Stop когда agent signals high confidence |
| Token usage monitoring interval | Every LLM call via callback | Real-time cost tracking |
Частые ошибки
Ошибка 1: Нет recursion_limit
Влияние: Агент бежит до API timeout (часто 600 секунд) или exhaustion бюджета, сжигая $5–$50 затрат.
❌ Неправильно:
result = agent.invoke({"messages": [...]}) # Uses default 25, but implicit
✅ Правильно:
result = agent.invoke(
{"messages": [...]},
config={"recursion_limit": 20} # Explicit hard limit
)
Ошибка 2: Budget Tracking без Enforcement
Влияние: Вы логируете затраты но не стопите выполнение. Агент завершается но превышает бюджет на 300–500%.
❌ Неправильно:
class CostTracker:
def check_budget(self):
if self.usage.total_cost_usd >= self.budget_usd:
logger.warning("Budget exceeded!") # Only logs, doesn't stop
✅ Правильно:
class CostTracker:
def check_budget(self):
if self.usage.total_cost_usd >= self.budget_usd:
raise BudgetExceededError(
f"${self.usage.total_cost_usd:.4f} spent, "
f"budget was ${self.budget_usd:.4f}"
)
Ошибка 3: Tool Timeouts без Fallback Values
Влияние: Timeout exception propagates к агенту, crashes выполнение вместо graceful degradation.
❌ Неправильно:
@with_timeout(seconds=5.0, raise_on_timeout=True)
def query_database(sql: str) -> dict:
return execute_query(sql) # Raises ToolTimeoutError on timeout
✅ Правильно:
@with_timeout(seconds=5.0, fallback={"error": "query_timeout", "rows": []})
def query_database(sql: str) -> dict:
return execute_query(sql) # Returns structured fallback on timeout
Ошибка 4: Ignoring Cached Token Pricing
Влияние: Cost estimates на 50–100% выше. Repeated context использует cached tokens по скидке 50%.
❌ Неправильно:
cost = (prompt_tokens / 1_000_000) * pricing["input"]
# Ignores that some prompt tokens are cached
✅ Правильно:
regular_input = prompt_tokens - cached_tokens
cost = (
(regular_input / 1_000_000) * pricing["input"]
+ (cached_tokens / 1_000_000) * pricing["cached_input"]
)
Ошибка 5: Circuit Breaker без HALF_OPEN State
Влияние: Service восстанавливается но circuit остаётся permanently open. Требуется manual intervention.
❌ Неправильно:
if self.state == CircuitState.OPEN:
raise CircuitOpenError("Service unavailable")
# Never tests if service recovered
✅ Правильно:
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.timeout_seconds:
self.state = CircuitState.HALF_OPEN # Test recovery
else:
raise CircuitOpenError(f"Retry in {self._seconds_until_retry()}s")
Iteration Limit Patterns
Pattern 1: Hard Limit via recursion_limit
LangGraph's встроенный механизм. Задайте в invocation config, генерирует GraphRecursionError при превышении.
from langgraph.graph import StateGraph, END
from langgraph.errors import GraphRecursionError
agent = build_agent_graph()
try:
result = agent.invoke(
{"messages": [{"role": "user", "content": "analyze dataset"}]},
config={"recursion_limit": 25}
)
except GraphRecursionError as e:
logger.error(f"Agent exceeded recursion limit: {e}")
result = {"error": "max_iterations", "partial": None}
Когда использовать: Всегда задавайте как ultimate safety net. Даже если вы реализуете soft limits, hard limit предотвращает runaway execution.
Pattern 2: Soft Limit с State Tracking
Отслеживайте iteration count в state и trigger graceful degradation перед hitting hard limit.
from typing import TypedDict, Annotated
import operator
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
iteration_count: int
should_stop: bool
stop_reason: str | None
MAX_ITERATIONS = 20
WARN_AT = 15
def increment_iteration(state: AgentState) -> dict:
count = state["iteration_count"] + 1
if count >= MAX_ITERATIONS:
return {
"iteration_count": count,
"should_stop": True,
"stop_reason": "soft_max_iterations"
}
if count >= WARN_AT:
logger.warning(f"Iteration {count}/{MAX_ITERATIONS} — approaching limit")
return {"iteration_count": count}
def routing_node(state: AgentState) -> str:
if state.get("should_stop"):
return "graceful_stop"
if has_final_answer(state):
return "end"
return "continue_reasoning"
def graceful_stop_node(state: AgentState) -> dict:
summary = summarize_progress(state["messages"])
return {
"messages": [{
"role": "assistant",
"content": f"Reached iteration limit. Progress so far: {summary}"
}]
}
Когда использовать: Production системы где вы хотите контроль над stopping behavior. Позволяет кастомную логику для partial results.
Pattern 3: Per-Tool Call Budget
Лимитируйте how many times каждый инструмент может быть вызван per run. Prevents tool-specific loops.
from collections import defaultdict
class ToolCallBudget:
def __init__(self, max_calls_per_tool: int = 5):
self.max_calls_per_tool = max_calls_per_tool
self.call_counts: dict[str, int] = defaultdict(int)
def can_call(self, tool_name: str) -> bool:
return self.call_counts[tool_name] < self.max_calls_per_tool
def record_call(self, tool_name: str) -> None:
self.call_counts[tool_name] += 1
def check_and_record(self, tool_name: str) -> None:
if not self.can_call(tool_name):
raise ToolBudgetExceededError(
f"Tool '{tool_name}' called {self.call_counts[tool_name]} times "
f"(limit: {self.max_calls_per_tool})"
)
self.record_call(tool_name)
# Integrate into tool execution node
def tool_execution_node(state: AgentState, budget: ToolCallBudget) -> dict:
tool_call = state["messages"][-1]["tool_calls"][0]
tool_name = tool_call["name"]
try:
budget.check_and_record(tool_name)
result = execute_tool(tool_name, tool_call["args"])
return {"messages": [{"role": "tool", "content": result}]}
except ToolBudgetExceededError as e:
return {
"messages": [{
"role": "tool",
"content": f"Tool budget exceeded: {e}. Try a different approach."
}]
}
Когда использовать: Когда агенты застревают в single-tool loops (e.g. вызывают web_search 20 раз с minor variations).
Cost Circuit Breaker
Real-time token counting и budget enforcement предотвращают runaway costs.
Token Counting с tiktoken
import tiktoken
from dataclasses import dataclass, field
import time
@dataclass
class TokenUsage:
prompt_tokens: int = 0
completion_tokens: int = 0
cached_tokens: int = 0
total_cost_usd: float = 0.0
call_count: int = 0
start_time: float = field(default_factory=time.time)
@property
def total_tokens(self) -> int:
return self.prompt_tokens + self.completion_tokens
@property
def elapsed_seconds(self) -> float:
return time.time() - self.start_time
class CostTracker:
MODEL_PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00, "cached_input": 1.25},
"gpt-4o-mini": {"input": 0.15, "output": 0.60, "cached_input": 0.075},
"o3-mini": {"input": 1.10, "output": 4.40, "cached_input": 0.55},
}
def __init__(self, model: str, budget_usd: float | None = None):
self.model = model
self.budget_usd = budget_usd
self.usage = TokenUsage()
try:
self._encoder = tiktoken.encoding_for_model(model)
except KeyError:
self._encoder = tiktoken.get_encoding("cl100k_base")
def count_tokens(self, text: str) -> int:
return len(self._encoder.encode(text))
def count_messages_tokens(self, messages: list[dict]) -> int:
"""OpenAI chat format: 4 tokens overhead per message + 2 for reply."""
total = 0
for msg in messages:
total += 4 # Message overhead
for key, value in msg.items():
if isinstance(value, str):
total += self.count_tokens(value)
total += 2 # Reply priming
return total
def record_call(
self,
prompt_tokens: int,
completion_tokens: int,
cached_tokens: int = 0
) -> None:
pricing = self.MODEL_PRICING.get(
self.model,
{"input": 5.0, "output": 15.0, "cached_input": 2.5}
)
regular_input = prompt_tokens - cached_tokens
cost = (
(regular_input / 1_000_000) * pricing["input"]
+ (cached_tokens / 1_000_000) * pricing["cached_input"]
+ (completion_tokens / 1_000_000) * pricing["output"]
)
self.usage.prompt_tokens += prompt_tokens
self.usage.completion_tokens += completion_tokens
self.usage.cached_tokens += cached_tokens
self.usage.total_cost_usd += cost
self.usage.call_count += 1
def check_budget(self) -> None:
if self.budget_usd and self.usage.total_cost_usd >= self.budget_usd:
raise BudgetExceededError(
f"Budget ${self.budget_usd:.4f} exceeded: "
f"${self.usage.total_cost_usd:.4f} spent"
)
class BudgetExceededError(Exception):
pass
Budget Enforcement Middleware
Pre-call checks предотвращают exceeding budget перед incurred cost.
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import BaseMessage
from langchain_core.outputs import ChatResult
from typing import Any
import logging
logger = logging.getLogger(__name__)
class BudgetEnforcingLLM(BaseChatModel):
inner_llm: BaseChatModel
tracker: CostTracker
max_tokens_per_call: int = 8000
warn_at_budget_fraction: float = 0.8
class Config:
arbitrary_types_allowed = True
def _generate(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
**kwargs: Any
) -> ChatResult:
self._pre_call_checks(messages)
result = self.inner_llm._generate(messages, stop=stop, **kwargs)
self._post_call_accounting(result)
return result
def _pre_call_checks(self, messages: list[BaseMessage]) -> None:
# Token limit check
estimated_tokens = self.tracker.count_messages_tokens(
[{"role": m.type, "content": str(m.content)} for m in messages]
)
if estimated_tokens > self.max_tokens_per_call:
raise TokenLimitError(
f"Input {estimated_tokens} tokens exceeds limit "
f"{self.max_tokens_per_call}"
)
# Budget check
if self.tracker.budget_usd:
remaining = self.tracker.budget_usd - self.tracker.usage.total_cost_usd
if remaining <= 0:
raise BudgetExceededError("Budget exhausted")
fraction_used = (
self.tracker.usage.total_cost_usd / self.tracker.budget_usd
)
if fraction_used >= self.warn_at_budget_fraction:
logger.warning(
f"Budget {fraction_used:.0%} consumed "
f"(${self.tracker.usage.total_cost_usd:.4f}/"
f"${self.tracker.budget_usd:.4f})"
)
def _post_call_accounting(self, result: ChatResult) -> None:
if result.llm_output:
usage = result.llm_output.get("token_usage", {})
if usage:
self.tracker.record_call(
prompt_tokens=usage.get("prompt_tokens", 0),
completion_tokens=usage.get("completion_tokens", 0),
cached_tokens=usage.get("prompt_tokens_details", {})
.get("cached_tokens", 0)
)
self.tracker.check_budget()
@property
def _llm_type(self) -> str:
return f"budget_enforcing_{self.inner_llm._llm_type}"
class TokenLimitError(Exception):
pass
LangChain Callback Integration
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from typing import Any
class CostTrackingCallback(BaseCallbackHandler):
def __init__(self, tracker: CostTracker):
self.tracker = tracker
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
if not response.llm_output:
return
usage = response.llm_output.get("token_usage") or \
response.llm_output.get("usage", {})
if usage:
self.tracker.record_call(
prompt_tokens=usage.get("prompt_tokens", 0),
completion_tokens=usage.get("completion_tokens", 0),
cached_tokens=usage.get("prompt_tokens_details", {})
.get("cached_tokens", 0)
)
try:
self.tracker.check_budget()
except BudgetExceededError:
raise
# Usage
tracker = CostTracker(model="gpt-4o", budget_usd=0.50)
callback = CostTrackingCallback(tracker)
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o", callbacks=[callback])
result = agent.invoke(
{"messages": [...]},
config={"callbacks": [callback], "recursion_limit": 25}
)
print(f"Cost: ${tracker.usage.total_cost_usd:.4f}")
Timeout Implementation
Pattern 1: Async Tool Timeout с asyncio.wait_for
import asyncio
import functools
from typing import Callable, Any, TypeVar
T = TypeVar("T")
def with_async_timeout(
seconds: float,
fallback: Any = None,
raise_on_timeout: bool = False
):
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs) -> Any:
try:
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=seconds
)
except asyncio.TimeoutError:
if raise_on_timeout:
raise ToolTimeoutError(
f"{func.__name__} timed out after {seconds}s"
)
return fallback
return wrapper
return decorator
class ToolTimeoutError(Exception):
pass
# Usage
@with_async_timeout(seconds=8.0, fallback={"error": "search_timeout"})
async def web_search(query: str) -> dict:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://api.search.com?q={query}",
timeout=aiohttp.ClientTimeout(total=7.5)
) as resp:
return await resp.json()
Pattern 2: Sync Tool Timeout с ThreadPoolExecutor
Важно: Не используйте signal.alarm в production. Работает только в main thread на Unix-системах и interferes с asyncio event loops.
import functools
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
from typing import Callable, Any
def with_sync_timeout(
seconds: float,
fallback: Any = None,
raise_on_timeout: bool = False
):
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, *args, **kwargs)
try:
return future.result(timeout=seconds)
except FuturesTimeout:
future.cancel()
if raise_on_timeout:
raise ToolTimeoutError(
f"{func.__name__} timed out after {seconds}s"
)
return fallback
return wrapper
return decorator
@with_sync_timeout(seconds=10.0, fallback={"error": "db_timeout", "rows": []})
def query_database(sql: str, params: dict) -> dict:
import psycopg2
conn = psycopg2.connect(
dsn="postgresql://...",
connect_timeout=5,
options="-c statement_timeout=9000" # 9s server-side timeout
)
cursor = conn.cursor()
cursor.execute(sql, params)
return {"rows": cursor.fetchall()}
Pattern 3: Aggregate Timeout для Tool Batch
Когда агент вызывает multiple tools в parallel, enforce total deadline.
import asyncio
from typing import Callable
async def execute_tools_with_deadline(
tool_calls: list[dict],
tools_registry: dict[str, Callable],
total_timeout: float = 30.0,
per_tool_timeout: float = 10.0
) -> list[dict]:
async def execute_single(call: dict) -> dict:
tool_name = call["name"]
tool_args = call["args"]
tool_id = call["id"]
tool_func = tools_registry.get(tool_name)
if not tool_func:
return {"tool_call_id": tool_id, "error": f"Unknown tool: {tool_name}"}
try:
if asyncio.iscoroutinefunction(tool_func):
result = await asyncio.wait_for(
tool_func(**tool_args),
timeout=per_tool_timeout
)
else:
result = await asyncio.wait_for(
asyncio.get_event_loop().run_in_executor(
None, lambda: tool_func(**tool_args)
),
timeout=per_tool_timeout
)
return {"tool_call_id": tool_id, "result": result}
except asyncio.TimeoutError:
return {
"tool_call_id": tool_id,
"error": f"Tool {tool_name} timeout after {per_tool_timeout}s"
}
except Exception as e:
return {
"tool_call_id": tool_id,
"error": f"{type(e).__name__}: {str(e)}"
}
try:
results = await asyncio.wait_for(
asyncio.gather(*[execute_single(call) for call in tool_calls]),
timeout=total_timeout
)
return list(results)
except asyncio.TimeoutError:
return [{
"error": f"Tool batch exceeded {total_timeout}s total deadline"
}]
Per-Tool Timeout Configuration
TOOL_TIMEOUTS = {
"web_search": 8.0,
"web_scrape": 12.0,
"code_execution": 30.0,
"database_query": 10.0,
"file_read": 5.0,
"api_call": 15.0,
"llm_summarize": 20.0,
}
def get_tool_timeout(tool_name: str, default: float = 10.0) -> float:
return TOOL_TIMEOUTS.get(tool_name, default)
# Apply when decorating tools
@with_async_timeout(seconds=get_tool_timeout("web_search"))
async def web_search(query: str) -> dict:
...
Graceful Degradation
Когда страховки срабатывают, возвращайте useful partial results вместо bare errors.
Budget Exceeded Handler
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
class GracefulDegradationHandler:
def __init__(self, cheap_model: str = "gpt-4o-mini"):
self.cheap_llm = ChatOpenAI(model=cheap_model, max_tokens=500)
async def handle_budget_exceeded(
self,
state: dict,
budget_info: dict
) -> str:
completed_work = self._extract_tool_results(state["messages"])
prompt = f"""
Task: {state.get('original_task', 'Unknown task')}
Budget exhausted: ${budget_info['used']:.4f} of ${budget_info['total']:.4f}
Completed work:
{completed_work}
Provide:
1. Summary of what was accomplished
2. What still needs to be done
3. Most important findings
Be concise (budget-constrained summary).
"""
response = await self.cheap_llm.ainvoke([HumanMessage(content=prompt)])
return response.content
def _extract_tool_results(self, messages: list) -> str:
findings = []
for msg in messages:
if isinstance(msg, dict) and msg.get("role") == "tool":
content = msg.get("content", "")
if len(content) > 50:
findings.append(content[:300])
return "\n".join(findings[-10:]) if findings else "No results yet"
Iteration Limit Handler
async def handle_iteration_limit(
self,
state: dict,
iteration_count: int
) -> str:
messages = state.get("messages", [])
tool_results = [
m.get("content", "")[:200]
for m in messages
if isinstance(m, dict) and m.get("role") == "tool"
]
if not tool_results:
return (
f"Could not complete task in {iteration_count} iterations. "
"Try breaking into smaller sub-tasks."
)
gathered = "\n".join(tool_results[-5:])
return (
f"Reached iteration limit ({iteration_count}). "
f"Information gathered:\n\n{gathered}\n\n"
"Based on this, partial answer: [agent synthesizes here]"
)
Timeout Handler
async def handle_timeout(
self,
state: dict,
elapsed_seconds: float,
timeout_seconds: float
) -> str:
last_action = self._get_last_action(state["messages"])
return (
f"Task exceeded {timeout_seconds}s timeout (ran for {elapsed_seconds:.1f}s). "
f"Last action: {last_action}. "
"This task may require optimization or should be split into smaller steps."
)
def _get_last_action(self, messages: list) -> str:
for msg in reversed(messages):
if isinstance(msg, dict):
if msg.get("role") == "assistant" and msg.get("tool_calls"):
return f"calling {msg['tool_calls'][0]['name']}"
elif msg.get("role") == "tool":
return "waiting for tool result"
return "unknown"
Performance & Benchmarks
Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.
Token Counting Overhead
- tiktoken encoding time: Примерно 50–200 микросекунд на сообщение (1,000 токенов)
- Impact on latency: Negligible (< 1% от total LLM call latency)
- Memory footprint: ~10 MB для tokenizer model загруженной один раз per process
Cost Savings из Safeguards
Типичный runaway agent без safeguards:
- Бежит на full API timeout (600 секунд)
- Делает 40–80 LLM calls с escalating context
- Total cost: $3–$15 для GPT-4o
С safeguards (recursion_limit=20, budget=$0.50):
- Stops в iteration 20 или budget exhaustion
- Total cost capped на $0.50
- Cost reduction: 85–95% на runaway scenarios
Circuit Breaker Impact
Когда tool fails repeatedly (e.g. database connection lost):
Без circuit breaker:
- Agent attempts 20 tool calls, каждый timing out после 10s
- Total wasted time: 200 секунд
- No useful work done
С circuit breaker (threshold=5, timeout=60s):
- First 5 calls fail (50 секунд wasted)
- Circuit opens, subsequent calls fail immediately
- Agent receives structured error и tries alternative approach
- Total wasted time: 50 секунд
- Time saved: 75%
Timeout Configuration Impact
| Tool Type | Default Timeout | 95th Percentile Latency | Timeout Setting | False Positive Rate |
|---|---|---|---|---|
| Database query | 10s | 2–4s | 10s | Very low |
| Web search API | 8s | 1–3s | 8s | Low |
| Web scraping | 12s | 4–8s | 12s | Low-medium |
| Code execution | 30s | 5–15s | 30s | Medium (depends on task) |
| File I/O | 5s | 0.1–1s | 5s | Very low |
False positives occur когда legitimate operations exceed timeout. Tune based на P95 latency + safety margin.
Memory Usage по Component
Estimated per-agent memory consumption:
- Base LangGraph state: 5–20 KB (depends на state schema)
- Message history (50 messages): 100–500 KB (depends на tool results)
- tiktoken encoder: 10 MB (shared across agents в process)
- CostTracker per run: < 1 KB
- CircuitBreaker registry (20 tools): < 5 KB
- Total per agent: 10–15 MB baseline + message history
Для 100 concurrent agents: 1–2 GB memory footprint (dominated by message history).
Callback Overhead
LangChain callback handlers добавляют minimal overhead:
- on_llm_end callback execution: 100–500 микросекунд per call
- Token counting в callback: 50–200 микросекунд (если re-counting messages)
- Total overhead: < 1ms per LLM call (< 0.1% от typical LLM latency)
Callbacks синхронны и блокируют briefly, но impact negligible в сравнении с network I/O для LLM API calls.