Саморегулирующиеся агенты: восстановление после ошибок, стратегии повторов, адаптивное перепланирование


title: "Саморегулирующиеся агенты: восстановление после ошибок, стратегии повторов, адаптивное перепланирование" slug: self-healing-agents-error-recovery-2026-ru date: 2026-02-24 lang: ru

Саморегулирующиеся агенты: восстановление после ошибок, стратегии повторов, адаптивное перепланирование

Ключевые факты

  • tenacity 9.1.4 (последняя версия): декларативная библиотека повторов с конфигурацией на основе декораторов
  • backoff 2.2.1 (последняя версия): минималистичный retry с экспоненциальными/постоянными стратегиями и jitter
  • langgraph 1.0.10 (последняя версия): предоставляет NodeInterrupt для остановки выполнения и вмешательства человека, checkpointing для восстановления состояния
  • langchain 1.2.10 (последняя версия): фреймворк агентов с интеграциями LLM и tool calling
  • tenacity retry декоратор принимает: stop (макс попытки/время), wait (стратегия задержки), retry (условие ошибки), before_sleep (callback логирования)
  • Формула экспоненциального backoff: delay = base_delay * (exponential_base ** attempt), обычно base_delay=1.0, exponential_base=2.0
  • Паттерн полного jitter: delay = random.uniform(0, calculated_delay) предотвращает thundering herd на общих ресурсах
  • Таксономия ошибок: три категории — временные (сетевой timeout, rate limit, service unavailable), семантические (неверный query, пустой результат, несоответствие схемы), фатальные (permission denied, resource not found, quota exceeded)
  • Временные ошибки (HTTP 429, 503, timeout): безопасны для повтора с backoff; соблюдайте Retry-After заголовок
  • Семантические ошибки: требуют корректировки параметров или альтернативного подхода; LLM должен рассуждать о сбое
  • Фатальные ошибки (HTTP 403, 404, auth failures): никогда не повторяйте; немедленная эскалация или перепланирование
  • Circuit breaker состояния: closed (нормальная операция) → open (fast-fail после порога) → half-open (тестирование восстановления)
  • Параметры порога circuit breaker: failure_threshold (последовательные сбои для открытия), recovery_timeout (секунды до half-open), success_threshold (успехи для закрытия)
  • LangGraph NodeInterrupt: вызывает исключение в середине графика для паузы выполнения; позволяет человеческой проверке и модификации состояния перед возобновлением
  • LangGraph checkpointer: сохраняет состояние графика на каждом узле; включает возобновление после краха или ручного вмешательства
  • Retry с tenacity: @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=60), retry=retry_if_exception_type(RateLimitError))
  • Порог перепланирования агента: обычно 3 попытки перепланирования перед эскалацией человеку
  • Порядок решения восстановления: классифицировать ошибку → проверить повторяемость → исчерпать повторы → проверить инструмент fallback → запустить перепланирование → эскалация человеку
  • Цепочки инструментов fallback: основной инструмент → fallback1 (на rate limit/unavailable) → fallback2 (на любую ошибку) → cached/degraded response
  • Паттерн downgrade модели: GPT-4o ($5/1M tokens) → GPT-4o-mini ($0.15/1M tokens) при rate limit или оптимизации стоимости
  • Распространённая ошибка retry: повтор фатальных ошибок (403, 404) тратит время и может запустить rate limiting или IP ban

Что такое саморегулирующийся агент

Саморегулирующийся агент — это автономная система, которая обнаруживает, классифицирует и восстанавливает ошибки без вмешательства человека. Процесс следует трём стадиям:

1. Обнаружение ошибки: захватить исключения из вызовов инструментов, ответов API или проверки состояния графика. Ошибки могут быть исключениями Python (TimeoutError, HTTPError) или семантическими сбоями (пустой результат, несоответствие схемы).

2. Классификация: отобразить сырые ошибки на структурированные категории (временные, семантические, фатальные) с метаданными восстановления. Классификация определяет, повторять ли, перепланировать, использовать fallback или эскалировать.

3. Выбор стратегии восстановления: на основе категории ошибки выбрать путь восстановления:

  • Временные ошибки → повтор с экспоненциальным backoff
  • Семантические ошибки → перепланирование с корректировкой параметров или альтернативным инструментом
  • Фатальные ошибки → пропустить шаг и перепланировать его, или эскалировать человеку

Агент сохраняет контекст выполнения через попытки восстановления, позволяя ему избежать повторных сбоев и учиться на неудачных подходах.

Фреймворк решения

Выбор стратегии восстановления следует дереву решений на основе типа ошибки и контекста:

Error Occurs
│
├─ Classify Error
│  │
│  ├─ Transient (429, 503, timeout)
│  │  ├─ Retry count < max_attempts? → Retry with backoff
│  │  └─ Else → Try fallback tool or replan
│  │
│  ├─ Semantic (invalid params, empty result)
│  │  ├─ Can adjust parameters? → LLM generates new params, retry
│  │  └─ Else → Replan with alternative approach
│  │
│  └─ Fatal (403, 404, auth error)
│     ├─ Fallback tool available? → Switch to fallback
│     └─ Else → Replan without this step or escalate to human
│
└─ Recovery Attempt
   ├─ Success → Continue execution
   └─ Failure
      ├─ Replanning count < 3? → Trigger replanning node
      └─ Else → Escalate to human (NodeInterrupt)

Retry vs Fallback: используйте retry для одного инструмента с одинаковыми параметрами (временные ошибки). Используйте fallback, когда инструмент постоянно падает или ошибка указывает на деградацию сервиса.

Fallback vs Human Escalation: используйте fallback, когда альтернативный инструмент обеспечивает приемлемое качество результата. Эскалируйте, когда ни один инструмент не может выполнить требование или критическая бизнес-логика падает.

Порог перепланирования: после 3 попыток перепланирования агент, вероятно, исчерпал разумные альтернативные подходы. Дальнейшие попытки рискуют циклическим рассуждением или бесконечными циклами.

Таблица справки параметров

Параметр Значение Примечания
max_retry_attempts 3 Стандарт для временных ошибок; увеличьте до 5 для критических операций
base_delay 1.0s Начальная ожидание перед первым повтором
max_delay 60s Шапка для предотвращения чрезмерного времени ожидания
exponential_base 2.0 Удвоение задержки: 1s, 2s, 4s, 8s, 16s, 32s, 60s (capped)
jitter random.uniform(0, delay) Полный jitter; предотвращает thundering herd
rate_limit_retry_after Значение Header или 60s Соблюдайте API Retry-After заголовок, если присутствует
circuit_breaker_threshold 5 Последовательные сбои для открытия circuit
circuit_breaker_timeout 60s Ожидание перед тестированием восстановления (half-open)
max_replanning_attempts 3 Попытки перепланирования перед эскалацией человеку
tool_timeout 30s Timeout выполнения для каждого инструмента
graph_recursion_limit 50 LangGraph максимальное число выполнений узла
fallback_model gpt-4o-mini Экономичная альтернатива gpt-4o
cache_ttl 300s Time-to-live для кэшированных fallback ответов

Распространённые ошибки

Ошибка 1: Повтор фатальных ошибок

Воздействие: тратит время, может запустить rate limiting или блокировки безопасности.

Плохо:

@retry(stop=stop_after_attempt(5))
async def fetch_resource(resource_id: str):
    response = await httpx.get(f"/api/resource/{resource_id}")
    response.raise_for_status()  # Raises on 403, 404
    return response.json()

# Will retry 5 times on 404 Not Found (pointless)

Хорошо:

@retry(
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((TimeoutError, httpx.TimeoutException)),
    reraise=True
)
async def fetch_resource(resource_id: str):
    try:
        response = await httpx.get(f"/api/resource/{resource_id}")
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        if e.response.status_code in {403, 404, 401}:
            raise  # Don't retry fatal errors
        elif e.response.status_code in {429, 503}:
            raise  # Let @retry handle transient errors
        raise

Ошибка 2: Отсутствие Jitter в Backoff

Воздействие: thundering herd проблема, когда несколько агентов повторяют одновременно.

Плохо:

@retry(wait=wait_exponential(multiplier=1, min=2, max=60))
async def call_api():
    # All agents retry at exactly 2s, 4s, 8s, 16s...
    pass

Хорошо:

@retry(wait=wait_exponential(multiplier=1, min=2, max=60) + wait_random(0, 2))
async def call_api():
    # Adds 0-2s random jitter to each wait
    pass

# Or use full jitter pattern
import random
delay = min(base_delay * (2 ** attempt), max_delay)
jittered_delay = random.uniform(0, delay)

Ошибка 3: Игнорирование Retry-After заголовка

Воздействие: преждевременные повторы запускают дальнейший rate limiting; задерживают восстановление.

Плохо:

@retry(wait=wait_fixed(5))
async def api_call():
    # Retries every 5s, even if Retry-After says 120s
    pass

Хорошо:

async def api_call_with_backoff():
    try:
        return await api_call()
    except RateLimitError as e:
        retry_after = e.response.headers.get("Retry-After", 60)
        await asyncio.sleep(float(retry_after))
        return await api_call()

Ошибка 4: Перепланирование без истории

Воздействие: агент повторяет неудачные подходы; циклическое перепланирование.

Плохо:

def planner_node(state):
    llm = ChatOpenAI(model="gpt-4o")
    plan = llm.invoke([HumanMessage(content=state["task"])])
    # Ignores state["failed_steps"], may regenerate same plan
    return {"current_plan": parse_plan(plan)}

Хорошо:

def planner_node(state):
    llm = ChatOpenAI(model="gpt-4o")

    failed_context = "\n".join([
        f"❌ {step['name']}: {step['error']} (tried {step['attempts']} times)"
        for step in state.get("failed_steps", [])
    ])

    prompt = f"""Task: {state['task']}

Failed approaches:
{failed_context}

Generate alternative plan avoiding failed steps."""

    plan = llm.invoke([HumanMessage(content=prompt)])
    return {"current_plan": parse_plan(plan)}

Ошибка 5: Отсутствие Circuit Breaker для деградированных сервисов

Воздействие: агент продолжает вызывать падающий сервис; тратит время и ресурсы.

Плохо:

async def call_unreliable_service():
    # Retries indefinitely even if service is down for 10 minutes
    return await retry_with_backoff(external_api_call)

Хорошо:

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.state = "closed"  # closed, open, half_open
        self.opened_at = None

    async def call(self, func):
        if self.state == "open":
            if time.time() - self.opened_at > self.timeout:
                self.state = "half_open"
            else:
                raise CircuitOpenError("Service degraded, fast-failing")

        try:
            result = await func()
            if self.state == "half_open":
                self.state = "closed"
                self.failures = 0
            return result
        except Exception as e:
            self.failures += 1
            if self.failures >= self.failure_threshold:
                self.state = "open"
                self.opened_at = time.time()
            raise

Паттерны повтора

Паттерн 1: Tenacity декоратор с логикой, специфичной для ошибок

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=60),
    retry=retry_if_exception_type((TimeoutError, httpx.TimeoutException)),
    before_sleep=lambda retry_state: logger.warning(
        f"Retry {retry_state.attempt_number}/3 after {retry_state.outcome.exception()}"
    )
)
async def fetch_with_retry(url: str):
    async with httpx.AsyncClient() as client:
        response = await client.get(url, timeout=10.0)
        response.raise_for_status()
        return response.json()

Паттерн 2: Конфигурация повтора для каждого инструмента

from dataclasses import dataclass
from typing import Optional

@dataclass
class ToolRetryConfig:
    max_attempts: int
    base_delay: float
    max_delay: float
    retryable_errors: set[type]

TOOL_RETRY_CONFIGS = {
    "database_query": ToolRetryConfig(
        max_attempts=5,  # High reliability requirement
        base_delay=0.5,
        max_delay=10.0,
        retryable_errors={ConnectionError, TimeoutError}
    ),
    "web_search": ToolRetryConfig(
        max_attempts=3,
        base_delay=2.0,
        max_delay=60.0,
        retryable_errors={RateLimitError, ServiceUnavailableError}
    ),
    "llm_call": ToolRetryConfig(
        max_attempts=2,  # Expensive, retry sparingly
        base_delay=5.0,
        max_delay=30.0,
        retryable_errors={RateLimitError}
    ),
}

async def execute_tool_with_config(tool_name: str, **kwargs):
    config = TOOL_RETRY_CONFIGS.get(tool_name)
    if not config:
        return await execute_tool(tool_name, **kwargs)

    for attempt in range(config.max_attempts):
        try:
            return await execute_tool(tool_name, **kwargs)
        except Exception as e:
            if type(e) not in config.retryable_errors:
                raise
            if attempt == config.max_attempts - 1:
                raise

            delay = min(
                config.base_delay * (2 ** attempt),
                config.max_delay
            )
            await asyncio.sleep(delay)

Паттерн 3: Экспоненциальный backoff с jitter

import random
import asyncio
from typing import TypeVar, Callable, Awaitable

T = TypeVar("T")

async def retry_with_jitter(
    func: Callable[..., Awaitable[T]],
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: bool = True,
) -> T:
    last_exception = None

    for attempt in range(max_attempts):
        try:
            return await func()
        except Exception as e:
            last_exception = e

            if attempt == max_attempts - 1:
                raise

            # Exponential backoff
            delay = min(base_delay * (2 ** attempt), max_delay)

            # Full jitter: random between 0 and delay
            if jitter:
                delay = random.uniform(0, delay)

            await asyncio.sleep(delay)

    raise last_exception

Стратегии fallback

Стратегия 1: Downgrade модели (GPT-4o → GPT-4o-mini)

from langchain_openai import ChatOpenAI

async def llm_call_with_fallback(prompt: str, max_tokens: int = 500):
    models = [
        ("gpt-4o", 0.7),           # Primary: high capability
        ("gpt-4o-mini", 0.7),      # Fallback: cost-effective
        ("gpt-3.5-turbo", 0.5),    # Last resort: fastest/cheapest
    ]

    for model_name, temperature in models:
        try:
            llm = ChatOpenAI(model=model_name, temperature=temperature, max_tokens=max_tokens)
            response = await llm.ainvoke([HumanMessage(content=prompt)])
            return response.content
        except RateLimitError:
            logger.warning(f"{model_name} rate limited, trying next model")
            continue
        except Exception as e:
            logger.error(f"{model_name} failed: {e}")
            continue

    raise Exception("All LLM models exhausted")

Стратегия 2: Подстановка инструмента

TOOL_FALLBACK_CHAINS = {
    "web_search": ["google_search", "bing_search", "duckduckgo_search", "cached_search"],
    "database_read": ["primary_db", "read_replica", "cache"],
    "translation": ["openai_translate", "google_translate", "local_model"],
}

async def execute_with_fallback(primary_tool: str, **kwargs):
    chain = TOOL_FALLBACK_CHAINS.get(primary_tool, [primary_tool])

    for tool_name in chain:
        try:
            result = await execute_tool(tool_name, **kwargs)

            if tool_name != chain[0]:
                logger.info(f"Used fallback {tool_name} for {primary_tool}")

            return result
        except Exception as e:
            logger.warning(f"{tool_name} failed: {e}, trying next in chain")
            continue

    raise Exception(f"All fallbacks exhausted for {primary_tool}")

Стратегия 3: Кэшированный ответ как последняя линия защиты

from functools import lru_cache
import hashlib
import json

class CacheFallback:
    def __init__(self, ttl: int = 300):
        self.cache = {}
        self.ttl = ttl

    def cache_key(self, tool_name: str, kwargs: dict) -> str:
        key_str = f"{tool_name}:{json.dumps(kwargs, sort_keys=True)}"
        return hashlib.md5(key_str.encode()).hexdigest()

    async def execute(self, tool_name: str, **kwargs):
        cache_key = self.cache_key(tool_name, kwargs)

        try:
            result = await execute_tool(tool_name, **kwargs)
            # Cache successful result
            self.cache[cache_key] = {
                "result": result,
                "timestamp": time.time()
            }
            return result
        except Exception as e:
            # Try cache as fallback
            cached = self.cache.get(cache_key)
            if cached and (time.time() - cached["timestamp"] < self.ttl):
                logger.warning(f"{tool_name} failed, using cached result (age: {time.time() - cached['timestamp']:.0f}s)")
                return cached["result"]
            raise

Паттерны восстановления LangGraph

Паттерн 1: NodeInterrupt для человеческого вмешательства

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt

def critical_decision_node(state):
    """Node that requires human approval for high-risk actions."""
    action = state["proposed_action"]

    if action["risk_level"] == "high":
        # Pause execution and request human review
        raise NodeInterrupt(
            f"Human approval required for: {action['description']}\n"
            f"Risk: {action['risk_details']}\n"
            f"Approve by resuming with state['approved'] = True"
        )

    return {"execution_approved": True}

# Usage
checkpointer = MemorySaver()
graph = build_graph_with_critical_node()
compiled = graph.compile(checkpointer=checkpointer)

# Initial execution (will pause at NodeInterrupt)
thread_id = "user-123-task-456"
try:
    result = compiled.invoke(
        {"task": "delete production database"},
        config={"configurable": {"thread_id": thread_id}}
    )
except NodeInterrupt as e:
    print(f"Execution paused: {e}")
    # Notify human, wait for approval

# Resume after human approval
state = checkpointer.get(thread_id)
state["approved"] = True
checkpointer.put(thread_id, state)

result = compiled.invoke(
    None,  # Resume from checkpoint
    config={"configurable": {"thread_id": thread_id}}
)

Паттерн 2: Checkpointer возобновление после исправления ошибки

from langgraph.checkpoint.sqlite import SqliteSaver

def tool_execution_node(state):
    """Executes tool; errors are saved to state for recovery."""
    tool_call = state["pending_tool_call"]

    try:
        result = execute_tool(tool_call["name"], tool_call["args"])
        return {
            "tool_results": state.get("tool_results", []) + [result],
            "pending_tool_call": None,
            "last_error": None
        }
    except Exception as e:
        return {
            "last_error": {
                "tool": tool_call["name"],
                "error": str(e),
                "args": tool_call["args"],
                "timestamp": time.time()
            }
        }

def recovery_router(state):
    """Routes to recovery or continues execution."""
    if state.get("last_error"):
        return "recovery"
    return "continue"

# Build graph with recovery path
graph = StateGraph(AgentState)
graph.add_node("execute_tool", tool_execution_node)
graph.add_node("recovery", recovery_node)
graph.add_conditional_edges("execute_tool", recovery_router, {
    "recovery": "recovery",
    "continue": "next_step"
})

# Persistent checkpointer survives crashes
checkpointer = SqliteSaver.from_conn_string("agent_state.db")
app = graph.compile(checkpointer=checkpointer)

# Execute with auto-recovery
thread_id = "session-789"
result = app.invoke(
    {"task": "process financial data"},
    config={"configurable": {"thread_id": thread_id}}
)

# If process crashes, resume from last checkpoint
state = checkpointer.get(thread_id)
if state.get("last_error"):
    print(f"Resuming after error: {state['last_error']}")
    result = app.invoke(None, config={"configurable": {"thread_id": thread_id}})

Паттерн 3: Условное восстановление с контекстом ошибки

from typing import TypedDict, Annotated, Optional
import operator

class RecoveryState(TypedDict):
    messages: Annotated[list, operator.add]
    current_step: str
    failed_steps: list[dict]
    recovery_attempts: int
    execution_mode: str  # "normal", "recovery", "escalated"

def should_attempt_recovery(state: RecoveryState) -> str:
    """Decision node: recover, replan, or escalate."""
    MAX_RECOVERY_ATTEMPTS = 3

    if not state.get("failed_steps"):
        return "continue"

    last_failure = state["failed_steps"][-1]
    error_category = last_failure.get("category")

    # Fatal errors → replan immediately
    if error_category in {"permission_denied", "not_found", "quota_exceeded"}:
        if state["recovery_attempts"] < MAX_RECOVERY_ATTEMPTS:
            return "replan"
        return "escalate"

    # Transient errors → retry already handled at tool level
    # If we're here, retries failed → try fallback
    if error_category in {"rate_limit", "timeout", "unavailable"}:
        return "fallback"

    # Semantic errors → replan with adjusted approach
    return "replan"

graph = StateGraph(RecoveryState)
graph.add_node("execute", execute_node)
graph.add_node("fallback", fallback_node)
graph.add_node("replan", replan_node)
graph.add_node("escalate", human_escalation_node)

graph.add_conditional_edges("execute", should_attempt_recovery, {
    "continue": "execute",
    "fallback": "fallback",
    "replan": "replan",
    "escalate": "escalate"
})

graph.add_edge("fallback", "execute")
graph.add_edge("replan", "execute")
graph.add_edge("escalate", END)

Производительность и бенчмарки

Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.

Overhead повтора: экспоненциальный backoff с 3 попытками обычно добавляет задержку в диапазоне нескольких секунд до минуты для временных ошибок, в зависимости от конфигурации base delay и backoff multiplier. Полный jitter снижает пиковую нагрузку на восстановленные сервисы по сравнению с синхронизированными повторами.

Стоимость перепланирования: перепланирование на основе LLM (GPT-4o) включает дополнительные вызовы inference, добавляя задержку и затраты tokens. Каждый цикл перепланирования может потребить сотни до тысяч tokens в зависимости от размера контекста. Ограничение попыток перепланирования до 3 предотвращает неконтролируемые затраты.

Экономия fallback модели: fallback от GPT-4o к GPT-4o-mini может существенно снизить затраты на token, сохраняя приемлемое качество вывода для многих задач. Разница в стоимости между моделями может быть на порядки больше, делая это эффективной стратегией контроля затрат при rate limits.

Circuit breaker fast-fail: когда сервис находится в состоянии open circuit, запросы падают немедленно, а не ожидают timeout, снижая среднее время ответа во время сбоев с десятков секунд до миллисекунд.

Коэффициент cache hit: кэшированные fallback ответы могут предоставить немедленные результаты при падении основных инструментов. Эффективность зависит от паттернов повторения workload и конфигурации cache TTL. Больший TTL улучшает hit rate, но рискует устаревшими данными.

Частота эскалации человеку: хорошо настроенные саморегулирующиеся системы обычно эскалируют небольшую часть задач на проверку человеком. Более высокие частоты эскалации могут указывать на недостаточное покрытие fallback или чрезмерно консервативную классификацию ошибок. Более низкие частоты предполагают эффективное автономное восстановление, но требуют мониторинга на молчаливые сбои.

Хранилище checkpointer: постоянное checkpointing (SQLite, Postgres) добавляет overhead хранилища, пропорциональный размеру состояния и частоте checkpoint. In-memory checkpointer устраняет overhead сохранения, но теряет возможность восстановления через перезапуск процесса.

Коэффициент успеха восстановления: доля ошибок, успешно восстановленных автономно (через retry, fallback или перепланирование) в сравнении с требующими вмешательства человека. Системы с комплексными цепочками fallback и классификацией ошибок обычно достигают более высоких коэффициентов автономного восстановления.