title: "Саморегулирующиеся агенты: восстановление после ошибок, стратегии повторов, адаптивное перепланирование" slug: self-healing-agents-error-recovery-2026-ru date: 2026-02-24 lang: ru
Саморегулирующиеся агенты: восстановление после ошибок, стратегии повторов, адаптивное перепланирование
Ключевые факты
- tenacity 9.1.4 (последняя версия): декларативная библиотека повторов с конфигурацией на основе декораторов
- backoff 2.2.1 (последняя версия): минималистичный retry с экспоненциальными/постоянными стратегиями и jitter
- langgraph 1.0.10 (последняя версия): предоставляет
NodeInterruptдля остановки выполнения и вмешательства человека, checkpointing для восстановления состояния - langchain 1.2.10 (последняя версия): фреймворк агентов с интеграциями LLM и tool calling
- tenacity retry декоратор принимает:
stop(макс попытки/время),wait(стратегия задержки),retry(условие ошибки),before_sleep(callback логирования) - Формула экспоненциального backoff:
delay = base_delay * (exponential_base ** attempt), обычноbase_delay=1.0,exponential_base=2.0 - Паттерн полного jitter:
delay = random.uniform(0, calculated_delay)предотвращает thundering herd на общих ресурсах - Таксономия ошибок: три категории — временные (сетевой timeout, rate limit, service unavailable), семантические (неверный query, пустой результат, несоответствие схемы), фатальные (permission denied, resource not found, quota exceeded)
- Временные ошибки (HTTP 429, 503, timeout): безопасны для повтора с backoff; соблюдайте
Retry-Afterзаголовок - Семантические ошибки: требуют корректировки параметров или альтернативного подхода; LLM должен рассуждать о сбое
- Фатальные ошибки (HTTP 403, 404, auth failures): никогда не повторяйте; немедленная эскалация или перепланирование
- Circuit breaker состояния: closed (нормальная операция) → open (fast-fail после порога) → half-open (тестирование восстановления)
- Параметры порога circuit breaker:
failure_threshold(последовательные сбои для открытия),recovery_timeout(секунды до half-open),success_threshold(успехи для закрытия) - LangGraph NodeInterrupt: вызывает исключение в середине графика для паузы выполнения; позволяет человеческой проверке и модификации состояния перед возобновлением
- LangGraph checkpointer: сохраняет состояние графика на каждом узле; включает возобновление после краха или ручного вмешательства
- Retry с tenacity:
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=60), retry=retry_if_exception_type(RateLimitError)) - Порог перепланирования агента: обычно 3 попытки перепланирования перед эскалацией человеку
- Порядок решения восстановления: классифицировать ошибку → проверить повторяемость → исчерпать повторы → проверить инструмент fallback → запустить перепланирование → эскалация человеку
- Цепочки инструментов fallback: основной инструмент → fallback1 (на rate limit/unavailable) → fallback2 (на любую ошибку) → cached/degraded response
- Паттерн downgrade модели: GPT-4o ($5/1M tokens) → GPT-4o-mini ($0.15/1M tokens) при rate limit или оптимизации стоимости
- Распространённая ошибка retry: повтор фатальных ошибок (403, 404) тратит время и может запустить rate limiting или IP ban
Что такое саморегулирующийся агент
Саморегулирующийся агент — это автономная система, которая обнаруживает, классифицирует и восстанавливает ошибки без вмешательства человека. Процесс следует трём стадиям:
1. Обнаружение ошибки: захватить исключения из вызовов инструментов, ответов API или проверки состояния графика. Ошибки могут быть исключениями Python (TimeoutError, HTTPError) или семантическими сбоями (пустой результат, несоответствие схемы).
2. Классификация: отобразить сырые ошибки на структурированные категории (временные, семантические, фатальные) с метаданными восстановления. Классификация определяет, повторять ли, перепланировать, использовать fallback или эскалировать.
3. Выбор стратегии восстановления: на основе категории ошибки выбрать путь восстановления:
- Временные ошибки → повтор с экспоненциальным backoff
- Семантические ошибки → перепланирование с корректировкой параметров или альтернативным инструментом
- Фатальные ошибки → пропустить шаг и перепланировать его, или эскалировать человеку
Агент сохраняет контекст выполнения через попытки восстановления, позволяя ему избежать повторных сбоев и учиться на неудачных подходах.
Фреймворк решения
Выбор стратегии восстановления следует дереву решений на основе типа ошибки и контекста:
Error Occurs
│
├─ Classify Error
│ │
│ ├─ Transient (429, 503, timeout)
│ │ ├─ Retry count < max_attempts? → Retry with backoff
│ │ └─ Else → Try fallback tool or replan
│ │
│ ├─ Semantic (invalid params, empty result)
│ │ ├─ Can adjust parameters? → LLM generates new params, retry
│ │ └─ Else → Replan with alternative approach
│ │
│ └─ Fatal (403, 404, auth error)
│ ├─ Fallback tool available? → Switch to fallback
│ └─ Else → Replan without this step or escalate to human
│
└─ Recovery Attempt
├─ Success → Continue execution
└─ Failure
├─ Replanning count < 3? → Trigger replanning node
└─ Else → Escalate to human (NodeInterrupt)
Retry vs Fallback: используйте retry для одного инструмента с одинаковыми параметрами (временные ошибки). Используйте fallback, когда инструмент постоянно падает или ошибка указывает на деградацию сервиса.
Fallback vs Human Escalation: используйте fallback, когда альтернативный инструмент обеспечивает приемлемое качество результата. Эскалируйте, когда ни один инструмент не может выполнить требование или критическая бизнес-логика падает.
Порог перепланирования: после 3 попыток перепланирования агент, вероятно, исчерпал разумные альтернативные подходы. Дальнейшие попытки рискуют циклическим рассуждением или бесконечными циклами.
Таблица справки параметров
| Параметр | Значение | Примечания |
|---|---|---|
max_retry_attempts |
3 | Стандарт для временных ошибок; увеличьте до 5 для критических операций |
base_delay |
1.0s | Начальная ожидание перед первым повтором |
max_delay |
60s | Шапка для предотвращения чрезмерного времени ожидания |
exponential_base |
2.0 | Удвоение задержки: 1s, 2s, 4s, 8s, 16s, 32s, 60s (capped) |
jitter |
random.uniform(0, delay) |
Полный jitter; предотвращает thundering herd |
rate_limit_retry_after |
Значение Header или 60s | Соблюдайте API Retry-After заголовок, если присутствует |
circuit_breaker_threshold |
5 | Последовательные сбои для открытия circuit |
circuit_breaker_timeout |
60s | Ожидание перед тестированием восстановления (half-open) |
max_replanning_attempts |
3 | Попытки перепланирования перед эскалацией человеку |
tool_timeout |
30s | Timeout выполнения для каждого инструмента |
graph_recursion_limit |
50 | LangGraph максимальное число выполнений узла |
fallback_model |
gpt-4o-mini | Экономичная альтернатива gpt-4o |
cache_ttl |
300s | Time-to-live для кэшированных fallback ответов |
Распространённые ошибки
Ошибка 1: Повтор фатальных ошибок
Воздействие: тратит время, может запустить rate limiting или блокировки безопасности.
❌ Плохо:
@retry(stop=stop_after_attempt(5))
async def fetch_resource(resource_id: str):
response = await httpx.get(f"/api/resource/{resource_id}")
response.raise_for_status() # Raises on 403, 404
return response.json()
# Will retry 5 times on 404 Not Found (pointless)
✅ Хорошо:
@retry(
stop=stop_after_attempt(3),
retry=retry_if_exception_type((TimeoutError, httpx.TimeoutException)),
reraise=True
)
async def fetch_resource(resource_id: str):
try:
response = await httpx.get(f"/api/resource/{resource_id}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code in {403, 404, 401}:
raise # Don't retry fatal errors
elif e.response.status_code in {429, 503}:
raise # Let @retry handle transient errors
raise
Ошибка 2: Отсутствие Jitter в Backoff
Воздействие: thundering herd проблема, когда несколько агентов повторяют одновременно.
❌ Плохо:
@retry(wait=wait_exponential(multiplier=1, min=2, max=60))
async def call_api():
# All agents retry at exactly 2s, 4s, 8s, 16s...
pass
✅ Хорошо:
@retry(wait=wait_exponential(multiplier=1, min=2, max=60) + wait_random(0, 2))
async def call_api():
# Adds 0-2s random jitter to each wait
pass
# Or use full jitter pattern
import random
delay = min(base_delay * (2 ** attempt), max_delay)
jittered_delay = random.uniform(0, delay)
Ошибка 3: Игнорирование Retry-After заголовка
Воздействие: преждевременные повторы запускают дальнейший rate limiting; задерживают восстановление.
❌ Плохо:
@retry(wait=wait_fixed(5))
async def api_call():
# Retries every 5s, even if Retry-After says 120s
pass
✅ Хорошо:
async def api_call_with_backoff():
try:
return await api_call()
except RateLimitError as e:
retry_after = e.response.headers.get("Retry-After", 60)
await asyncio.sleep(float(retry_after))
return await api_call()
Ошибка 4: Перепланирование без истории
Воздействие: агент повторяет неудачные подходы; циклическое перепланирование.
❌ Плохо:
def planner_node(state):
llm = ChatOpenAI(model="gpt-4o")
plan = llm.invoke([HumanMessage(content=state["task"])])
# Ignores state["failed_steps"], may regenerate same plan
return {"current_plan": parse_plan(plan)}
✅ Хорошо:
def planner_node(state):
llm = ChatOpenAI(model="gpt-4o")
failed_context = "\n".join([
f"❌ {step['name']}: {step['error']} (tried {step['attempts']} times)"
for step in state.get("failed_steps", [])
])
prompt = f"""Task: {state['task']}
Failed approaches:
{failed_context}
Generate alternative plan avoiding failed steps."""
plan = llm.invoke([HumanMessage(content=prompt)])
return {"current_plan": parse_plan(plan)}
Ошибка 5: Отсутствие Circuit Breaker для деградированных сервисов
Воздействие: агент продолжает вызывать падающий сервис; тратит время и ресурсы.
❌ Плохо:
async def call_unreliable_service():
# Retries indefinitely even if service is down for 10 minutes
return await retry_with_backoff(external_api_call)
✅ Хорошо:
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.state = "closed" # closed, open, half_open
self.opened_at = None
async def call(self, func):
if self.state == "open":
if time.time() - self.opened_at > self.timeout:
self.state = "half_open"
else:
raise CircuitOpenError("Service degraded, fast-failing")
try:
result = await func()
if self.state == "half_open":
self.state = "closed"
self.failures = 0
return result
except Exception as e:
self.failures += 1
if self.failures >= self.failure_threshold:
self.state = "open"
self.opened_at = time.time()
raise
Паттерны повтора
Паттерн 1: Tenacity декоратор с логикой, специфичной для ошибок
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=60),
retry=retry_if_exception_type((TimeoutError, httpx.TimeoutException)),
before_sleep=lambda retry_state: logger.warning(
f"Retry {retry_state.attempt_number}/3 after {retry_state.outcome.exception()}"
)
)
async def fetch_with_retry(url: str):
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
return response.json()
Паттерн 2: Конфигурация повтора для каждого инструмента
from dataclasses import dataclass
from typing import Optional
@dataclass
class ToolRetryConfig:
max_attempts: int
base_delay: float
max_delay: float
retryable_errors: set[type]
TOOL_RETRY_CONFIGS = {
"database_query": ToolRetryConfig(
max_attempts=5, # High reliability requirement
base_delay=0.5,
max_delay=10.0,
retryable_errors={ConnectionError, TimeoutError}
),
"web_search": ToolRetryConfig(
max_attempts=3,
base_delay=2.0,
max_delay=60.0,
retryable_errors={RateLimitError, ServiceUnavailableError}
),
"llm_call": ToolRetryConfig(
max_attempts=2, # Expensive, retry sparingly
base_delay=5.0,
max_delay=30.0,
retryable_errors={RateLimitError}
),
}
async def execute_tool_with_config(tool_name: str, **kwargs):
config = TOOL_RETRY_CONFIGS.get(tool_name)
if not config:
return await execute_tool(tool_name, **kwargs)
for attempt in range(config.max_attempts):
try:
return await execute_tool(tool_name, **kwargs)
except Exception as e:
if type(e) not in config.retryable_errors:
raise
if attempt == config.max_attempts - 1:
raise
delay = min(
config.base_delay * (2 ** attempt),
config.max_delay
)
await asyncio.sleep(delay)
Паттерн 3: Экспоненциальный backoff с jitter
import random
import asyncio
from typing import TypeVar, Callable, Awaitable
T = TypeVar("T")
async def retry_with_jitter(
func: Callable[..., Awaitable[T]],
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True,
) -> T:
last_exception = None
for attempt in range(max_attempts):
try:
return await func()
except Exception as e:
last_exception = e
if attempt == max_attempts - 1:
raise
# Exponential backoff
delay = min(base_delay * (2 ** attempt), max_delay)
# Full jitter: random between 0 and delay
if jitter:
delay = random.uniform(0, delay)
await asyncio.sleep(delay)
raise last_exception
Стратегии fallback
Стратегия 1: Downgrade модели (GPT-4o → GPT-4o-mini)
from langchain_openai import ChatOpenAI
async def llm_call_with_fallback(prompt: str, max_tokens: int = 500):
models = [
("gpt-4o", 0.7), # Primary: high capability
("gpt-4o-mini", 0.7), # Fallback: cost-effective
("gpt-3.5-turbo", 0.5), # Last resort: fastest/cheapest
]
for model_name, temperature in models:
try:
llm = ChatOpenAI(model=model_name, temperature=temperature, max_tokens=max_tokens)
response = await llm.ainvoke([HumanMessage(content=prompt)])
return response.content
except RateLimitError:
logger.warning(f"{model_name} rate limited, trying next model")
continue
except Exception as e:
logger.error(f"{model_name} failed: {e}")
continue
raise Exception("All LLM models exhausted")
Стратегия 2: Подстановка инструмента
TOOL_FALLBACK_CHAINS = {
"web_search": ["google_search", "bing_search", "duckduckgo_search", "cached_search"],
"database_read": ["primary_db", "read_replica", "cache"],
"translation": ["openai_translate", "google_translate", "local_model"],
}
async def execute_with_fallback(primary_tool: str, **kwargs):
chain = TOOL_FALLBACK_CHAINS.get(primary_tool, [primary_tool])
for tool_name in chain:
try:
result = await execute_tool(tool_name, **kwargs)
if tool_name != chain[0]:
logger.info(f"Used fallback {tool_name} for {primary_tool}")
return result
except Exception as e:
logger.warning(f"{tool_name} failed: {e}, trying next in chain")
continue
raise Exception(f"All fallbacks exhausted for {primary_tool}")
Стратегия 3: Кэшированный ответ как последняя линия защиты
from functools import lru_cache
import hashlib
import json
class CacheFallback:
def __init__(self, ttl: int = 300):
self.cache = {}
self.ttl = ttl
def cache_key(self, tool_name: str, kwargs: dict) -> str:
key_str = f"{tool_name}:{json.dumps(kwargs, sort_keys=True)}"
return hashlib.md5(key_str.encode()).hexdigest()
async def execute(self, tool_name: str, **kwargs):
cache_key = self.cache_key(tool_name, kwargs)
try:
result = await execute_tool(tool_name, **kwargs)
# Cache successful result
self.cache[cache_key] = {
"result": result,
"timestamp": time.time()
}
return result
except Exception as e:
# Try cache as fallback
cached = self.cache.get(cache_key)
if cached and (time.time() - cached["timestamp"] < self.ttl):
logger.warning(f"{tool_name} failed, using cached result (age: {time.time() - cached['timestamp']:.0f}s)")
return cached["result"]
raise
Паттерны восстановления LangGraph
Паттерн 1: NodeInterrupt для человеческого вмешательства
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt
def critical_decision_node(state):
"""Node that requires human approval for high-risk actions."""
action = state["proposed_action"]
if action["risk_level"] == "high":
# Pause execution and request human review
raise NodeInterrupt(
f"Human approval required for: {action['description']}\n"
f"Risk: {action['risk_details']}\n"
f"Approve by resuming with state['approved'] = True"
)
return {"execution_approved": True}
# Usage
checkpointer = MemorySaver()
graph = build_graph_with_critical_node()
compiled = graph.compile(checkpointer=checkpointer)
# Initial execution (will pause at NodeInterrupt)
thread_id = "user-123-task-456"
try:
result = compiled.invoke(
{"task": "delete production database"},
config={"configurable": {"thread_id": thread_id}}
)
except NodeInterrupt as e:
print(f"Execution paused: {e}")
# Notify human, wait for approval
# Resume after human approval
state = checkpointer.get(thread_id)
state["approved"] = True
checkpointer.put(thread_id, state)
result = compiled.invoke(
None, # Resume from checkpoint
config={"configurable": {"thread_id": thread_id}}
)
Паттерн 2: Checkpointer возобновление после исправления ошибки
from langgraph.checkpoint.sqlite import SqliteSaver
def tool_execution_node(state):
"""Executes tool; errors are saved to state for recovery."""
tool_call = state["pending_tool_call"]
try:
result = execute_tool(tool_call["name"], tool_call["args"])
return {
"tool_results": state.get("tool_results", []) + [result],
"pending_tool_call": None,
"last_error": None
}
except Exception as e:
return {
"last_error": {
"tool": tool_call["name"],
"error": str(e),
"args": tool_call["args"],
"timestamp": time.time()
}
}
def recovery_router(state):
"""Routes to recovery or continues execution."""
if state.get("last_error"):
return "recovery"
return "continue"
# Build graph with recovery path
graph = StateGraph(AgentState)
graph.add_node("execute_tool", tool_execution_node)
graph.add_node("recovery", recovery_node)
graph.add_conditional_edges("execute_tool", recovery_router, {
"recovery": "recovery",
"continue": "next_step"
})
# Persistent checkpointer survives crashes
checkpointer = SqliteSaver.from_conn_string("agent_state.db")
app = graph.compile(checkpointer=checkpointer)
# Execute with auto-recovery
thread_id = "session-789"
result = app.invoke(
{"task": "process financial data"},
config={"configurable": {"thread_id": thread_id}}
)
# If process crashes, resume from last checkpoint
state = checkpointer.get(thread_id)
if state.get("last_error"):
print(f"Resuming after error: {state['last_error']}")
result = app.invoke(None, config={"configurable": {"thread_id": thread_id}})
Паттерн 3: Условное восстановление с контекстом ошибки
from typing import TypedDict, Annotated, Optional
import operator
class RecoveryState(TypedDict):
messages: Annotated[list, operator.add]
current_step: str
failed_steps: list[dict]
recovery_attempts: int
execution_mode: str # "normal", "recovery", "escalated"
def should_attempt_recovery(state: RecoveryState) -> str:
"""Decision node: recover, replan, or escalate."""
MAX_RECOVERY_ATTEMPTS = 3
if not state.get("failed_steps"):
return "continue"
last_failure = state["failed_steps"][-1]
error_category = last_failure.get("category")
# Fatal errors → replan immediately
if error_category in {"permission_denied", "not_found", "quota_exceeded"}:
if state["recovery_attempts"] < MAX_RECOVERY_ATTEMPTS:
return "replan"
return "escalate"
# Transient errors → retry already handled at tool level
# If we're here, retries failed → try fallback
if error_category in {"rate_limit", "timeout", "unavailable"}:
return "fallback"
# Semantic errors → replan with adjusted approach
return "replan"
graph = StateGraph(RecoveryState)
graph.add_node("execute", execute_node)
graph.add_node("fallback", fallback_node)
graph.add_node("replan", replan_node)
graph.add_node("escalate", human_escalation_node)
graph.add_conditional_edges("execute", should_attempt_recovery, {
"continue": "execute",
"fallback": "fallback",
"replan": "replan",
"escalate": "escalate"
})
graph.add_edge("fallback", "execute")
graph.add_edge("replan", "execute")
graph.add_edge("escalate", END)
Производительность и бенчмарки
Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.
Overhead повтора: экспоненциальный backoff с 3 попытками обычно добавляет задержку в диапазоне нескольких секунд до минуты для временных ошибок, в зависимости от конфигурации base delay и backoff multiplier. Полный jitter снижает пиковую нагрузку на восстановленные сервисы по сравнению с синхронизированными повторами.
Стоимость перепланирования: перепланирование на основе LLM (GPT-4o) включает дополнительные вызовы inference, добавляя задержку и затраты tokens. Каждый цикл перепланирования может потребить сотни до тысяч tokens в зависимости от размера контекста. Ограничение попыток перепланирования до 3 предотвращает неконтролируемые затраты.
Экономия fallback модели: fallback от GPT-4o к GPT-4o-mini может существенно снизить затраты на token, сохраняя приемлемое качество вывода для многих задач. Разница в стоимости между моделями может быть на порядки больше, делая это эффективной стратегией контроля затрат при rate limits.
Circuit breaker fast-fail: когда сервис находится в состоянии open circuit, запросы падают немедленно, а не ожидают timeout, снижая среднее время ответа во время сбоев с десятков секунд до миллисекунд.
Коэффициент cache hit: кэшированные fallback ответы могут предоставить немедленные результаты при падении основных инструментов. Эффективность зависит от паттернов повторения workload и конфигурации cache TTL. Больший TTL улучшает hit rate, но рискует устаревшими данными.
Частота эскалации человеку: хорошо настроенные саморегулирующиеся системы обычно эскалируют небольшую часть задач на проверку человеком. Более высокие частоты эскалации могут указывать на недостаточное покрытие fallback или чрезмерно консервативную классификацию ошибок. Более низкие частоты предполагают эффективное автономное восстановление, но требуют мониторинга на молчаливые сбои.
Хранилище checkpointer: постоянное checkpointing (SQLite, Postgres) добавляет overhead хранилища, пропорциональный размеру состояния и частоте checkpoint. In-memory checkpointer устраняет overhead сохранения, но теряет возможность восстановления через перезапуск процесса.
Коэффициент успеха восстановления: доля ошибок, успешно восстановленных автономно (через retry, fallback или перепланирование) в сравнении с требующими вмешательства человека. Системы с комплексными цепочками fallback и классификацией ошибок обычно достигают более высоких коэффициентов автономного восстановления.