Финансовые AI-агенты: оценка рисков, проверка соответствия, генерация отчётов


title: "Финансовые AI-агенты: оценка рисков, проверка соответствия, генерация отчётов" slug: financial-agents-risk-compliance-2026-ru date: 2026-02-24 lang: ru

Финансовые AI-агенты: оценка рисков, проверка соответствия, генерация отчётов

Ключевые факты

  • Основные версии Python SDK: anthropic==0.84.0, openai==2.24.0, langchain==1.2.10, langgraph==1.0.10
  • API финансовых данных: Alpha Vantage (бесплатный тариф: 5 вызовов/мин, премиум: 75 вызовов/мин), yfinance==1.2.0 (Yahoo Finance), Polygon.io (REST + WebSocket), Bloomberg BLPAPI (требует лицензию Terminal), Refinitiv Eikon Data API
  • Регуляторные фреймворки — MiFID II: требует оценку пригодности для розничных клиентов при торговле сложными продуктами, документацию наилучшего исполнения (хранение 5 лет), отчётность по сделкам в NCA (T+1), раскрытие затрат при >1% от суммарных расходов
  • Регуляторные фреймворки — SOX: Section 302 (сертификация финансовых отчётов CEO/CFO), Section 404 (оценка системы внутреннего контроля), разделение обязанностей (один человек не может одновременно инициировать И утверждать транзакции)
  • Регуляторные фреймворки — Basel III: 10-дневный VaR при доверительном интервале 99% для требований к рыночному капиталу, стресс-тестирование при неблагоприятных сценариях, расчёт кредитного риска контрагента (CVA)
  • Требования к аудиторскому следу: логировать каждый вызов LLM API (модель, токены, временная метка), каждый вызов инструмента (имя, параметры, результат), каждую точку принятия решения (проверенное условие, выбранная ветка), каждую обработанную финансовую сумму
  • Хранение аудиторского следа: MiFID II требует 5 лет для записей о сделках, SOX требует бессрочное хранение для существенных позиций, использовать неизменяемые хранилища (S3 WORM, append-only базы данных)
  • Контрольные точки с участием человека — пороги транзакций: <$10K (автоматически), $10K–$100K (требует одобрение старшего аналитика), $100K–$1M (требует одобрение риск-менеджера), >$1M (требует одобрение C-level)
  • Контрольные точки с участием человека — пороги риска: VaR >5% от стоимости портфеля, убыток при стресс-тесте >20%, концентрация позиции >25% в одном активе, валовое плечо >5x
  • Расчёт риска — методы VaR: историческое моделирование (непараметрическое, использует реальное распределение доходностей), параметрический VaR (предполагает нормальное распределение, быстрее, но менее точен для тяжёлых хвостов), метод Монте-Карло (наиболее гибкий, вычислительно затратный)
  • Расчёт риска — ключевые метрики: VaR (Value at Risk — максимальный ожидаемый убыток при заданном уровне доверия), CVaR/ES (Conditional VaR/Expected Shortfall — средний убыток сверх порога VaR), стресс-тестирование (применение исторических кризисных сценариев), маржинальный VaR (вклад каждого актива в риск)
  • PII в финансовом контексте — идентификаторы: SSN (regex: \b\d{3}-\d{2}-\d{4}\b), номера банковских счетов (8–17 цифр), номера кредитных карт (16 цифр с возможными дефисами/пробелами), IBAN (2-буквенный код страны + контрольные цифры + номер счёта)
  • Требования к маскированию PII: никогда не отправлять сырые PII в LLM API, маскировать номера счетов до последних 4 цифр (••••••7890), полностью редактировать SSN ([REDACTED_SSN]), использовать токенизацию для обратимого маскирования при необходимости авторизованного доступа
  • Время проверки соответствия: проводить проверки ПЕРЕД выполнением любой сделки, ПОСЛЕ изменений в портфеле, по расписанию (ежедневно для лимитов риска, ежемесячно для обзоров пригодности), по запросу человека-ревьюера
  • Ограничения вызовов инструментов: реализовать максимальное число итераций (обычно 10–15) для предотвращения бесконечных циклов, таймаут на вызов инструмента (30s для запросов данных, 120s для сложных вычислений), паттерн circuit breaker при сбоях внешних API
  • Производственные соображения: кэшировать статические справочные данные в Redis (метаданные инструментов, регуляторные правила), предварительно вычислять метрики риска по расписанию для больших портфелей, использовать streaming для длительных отчётов, параллелизировать независимые вызовы инструментов
  • Пример стоимости (Claude Opus 4.6): ~7K токенов на отчёт о рисках портфеля при $15/MTok на вход + $75/MTok на выход ≈ $0.10 за отчёт, 100 отчётов/день ≈ $10/день операционных расходов
  • Восстановление после ошибок: повторять при временных сбоях (3 попытки с экспоненциальным backoff 2s→4s→8s), создавать контрольные точки для долгих агентов (сохранять состояние каждые 5 итераций), хранить историю диалога для отладки упавших запусков
  • Средства защиты: API-ключи в переменных окружения (никогда в коде), ротировать учётные данные Bloomberg/Reuters каждые 90 дней, IP-белый список для production API, rate limiting на пользователя/клиента (предотвращение злоупотреблений)
  • Выбор модели: Claude Opus 4.6 для сложного комплаенс-рассуждения, Claude Sonnet 4 для рутинных расчётов рисков (в 3 раза дешевле), GPT-4 Turbo для комментариев к рыночной ситуации в реальном времени, избегать старых моделей для финансового анализа

Что такое финансовый агент

Финансовый AI-агент — это автономная система, в которой большая языковая модель оркестрирует выполнение специализированных инструментов для решения многошаговых задач финансового анализа в рамках регуляторных ограничений. В отличие от простых чат-ботов, отвечающих на вопросы, финансовые агенты:

  1. Сохраняют персистентное состояние между несколькими API-вызовами, отслеживая контекст портфеля, промежуточные расчёты и историю принятых решений
  2. Выполняют многошаговые рабочие процессы: получение рыночных данных → расчёт риска → проверка соответствия → генерация отчёта
  3. Вызывают предметно-специфичные инструменты для количественных финансов (расчёт VaR, стресс-тестирование, оптимизация портфеля) и регуляторных проверок (пригодность по MiFID II, разделение обязанностей по SOX)
  4. Приостанавливаются для проверки человеком при определённых порогах риска перед выполнением высокорисковых действий
  5. Генерируют неизменяемые аудиторские следы, документирующие каждое решение, вызов инструмента и преобразование данных для регуляторной проверки

Типовые сценарии применения финансовых агентов:

  • Мониторинг риска портфеля: непрерывный расчёт VaR, отслеживание лимитов позиций, оповещения при превышении порогов
  • Проверка соответствия: валидация наилучшего исполнения по MiFID II, разделения обязанностей по SOX, требований к капиталу по Basel III до подачи заявки на сделку
  • Регуляторная отчётность: генерация отчётов по транзакциям MiFID II, оценок системы внутреннего контроля по SOX Section 404, отчётов о достаточности капитала по Basel III
  • Обнаружение мошенничества: мониторинг транзакций в реальном времени на аномалии (проверки частоты, сопоставление паттернов, ML-скоринг)
  • Инвестиционное исследование: агрегация аналитических отчётов, данных о прибыли, рыночных настроений; синтез в практические выводы с указанием источников

Финансовые агенты НЕ делают:

  • Не совершают сделки автономно без одобрения человека (регуляторный запрет в большинстве юрисдикций)
  • Не принимают дискреционных инвестиционных решений (требует зарегистрированного инвестиционного советника)
  • Не предоставляют персонализированных инвестиционных советов розничным клиентам без оценки пригодности
  • Не переопределяют жёстко заданные лимиты риска или регуляторные правила (агенты предлагают, люди одобряют исключения)

Фреймворк принятия решений

Финансовые агенты работают в трёх режимах управления в зависимости от уровня риска задачи и регуляторных ограничений:

1. Полностью автономный (без одобрения человека)

Когда применять: низкорисковые, повторяющиеся задачи с детерминированными результатами, не требующие регуляторного одобрения.

Примеры:

  • Получение рыночных данных из утверждённых источников
  • Вычисление стандартных метрик риска (VaR, волатильность) для существующих портфелей
  • Генерация отчётов только для чтения для внутреннего использования
  • Рутинные проверки соответствия, которые только фиксируют проблемы (не изменяют данные)

Реализация:

def run_autonomous_agent(task: str) -> dict:
    result = agent.run(task, max_iterations=10)
    audit_trail.record(event="autonomous_task", outcome=result)
    return result

Средства управления рисками: жёстко заданные лимиты итераций, доступ к базе данных только на чтение, отсутствие записей во внешние API, исчерпывающее логирование.

2. Человек в контуре (одобрение на ключевых контрольных точках)

Когда применять: задачи среднего-высокого риска, где агент готовит анализ, но человек проверяет и одобряет перед выполнением.

Примеры:

  • Подготовка торгового ордера >$10K (агент составляет черновик, человек проверяет соответствие/риск, человек подаёт заявку)
  • Рекомендации по ребалансировке портфеля (агент предлагает изменения аллокации, человек одобряет)
  • Финализация регуляторного отчёта (агент генерирует черновик, комплаенс-офицер проверяет, CCO подписывает)
  • Оценка пригодности клиента для сложных продуктов (агент выставляет оценку, советник проверяет, советник одобряет)

Реализация:

async def run_human_in_loop_agent(task: str, risk_level: str) -> dict:
    # Agent performs analysis
    analysis = agent.run(task, pause_on=["needs_approval"])

    # Create review request
    review = approval_system.create_review(
        data=analysis,
        risk_level=risk_level,
        timeout_hours=24
    )

    # Wait for human decision
    decision = await approval_system.wait_for_review(review.id)

    if decision.status == "approved":
        # Agent proceeds with approved action
        result = agent.continue_from_checkpoint(analysis.checkpoint)
        return result
    else:
        audit_trail.record(event="human_rejection", reason=decision.notes)
        return {"status": "rejected", "reason": decision.notes}

Средства управления рисками: разделение обязанностей (агент готовит ≠ человек одобряет), истечение таймаута (автоотклонение через 24ч), пути эскалации (высокий риск → старший ревьюер), аудиторский след решений об одобрении.

3. Правило-базированный (детерминированный, без дискреции LLM)

Когда применять: критически важные контроли, которые должны выполняться идентично каждый раз без какой-либо вариативности — как правило, для жёстких регуляторных ограничений.

Примеры:

  • Остановка торговли при превышении лимита позиции (если позиция > лимит → отклонить ордер, без рассуждений LLM)
  • Редактирование PII (если поле соответствует regex SSN → маскировать до [REDACTED], без суждения LLM)
  • Срабатывание circuit breaker (если VaR > 10% → остановить всю торговлю, уведомить CRO)
  • Проверки разделения обязанностей (если пользователь имеет роли инициатора + утверждающего → заблокировать транзакцию)

Реализация:

def rule_based_compliance_check(transaction: dict) -> dict:
    violations = []

    # Hard rule: no transaction >$1M without executive approval
    if transaction["amount"] > 1_000_000:
        if "executive_approver" not in transaction["approvals"]:
            violations.append("HARD_LIMIT: $1M+ requires executive approval")

    # Hard rule: VaR limit
    if transaction["portfolio_var"] > 0.10:  # 10% portfolio value
        violations.append("HARD_LIMIT: VaR exceeds 10% threshold")

    return {
        "compliant": len(violations) == 0,
        "violations": violations,
        "rules_checked": ["amount_limit", "var_limit"],
        "timestamp": datetime.utcnow().isoformat()
    }

Средства управления рисками: отсутствие LLM (нулевой риск галлюцинаций), логика правил под версионным контролем, автоматическое тестирование изменений правил, неизменяемый журнал оценок правил.

Матрица решений

Тип задачи Уровень риска Режим управления Роль человека Пример
Получение рыночных данных Низкий Автономный Нет (только мониторинг) Ежедневное обновление цен
Расчёт метрик риска Низкий–средний Автономный Просматривает оповещения Вычисление VaR портфеля
Торговый ордер >$10K Средний Человек в контуре Одобряет после проверки Ордер на покупку акций
Торговый ордер >$1M Высокий Человек в контуре Одобрение старшего/C-level Крупная институциональная сделка
Превышение лимита позиции Критический Правило-базированный Получает оповещение, расследует Автоостановка торговли
Пригодность по MiFID II (розница + сложный продукт) Высокий Человек в контуре Советник проверяет + одобряет Продажа структурированного продукта
Редактирование PII Критический Правило-базированный Нет (автоматически) Маскирование SSN перед логированием

Справочная таблица параметров

Параметр Значение Примечания
Модель claude-opus-4-6 Основная модель для сложного финансового рассуждения
Модель (оптимизированная по стоимости) claude-sonnet-4 Для рутинных расчётов (в 3 раза дешевле Opus)
Макс. токены 4096 Достаточно для большинства финансовых отчётов; увеличить до 8K для детального анализа
Макс. итерации 10 Предотвращение бесконечных циклов при выполнении агента
Температура 0 Детерминированный вывод для расчётов соответствия/риска
Температура (исследование) 0.3 Небольшое творчество для инвестиционных комментариев, всё же консервативно
Таймаут вызова инструмента 30000ms 30s для запросов рыночных данных
Таймаут вызова инструмента (расчёт риска) 120000ms 2 мин для сложных расчётов VaR/стресс-тестов
Таймаут одобрения человеком 86400s 24 часа; автоотклонение при отсутствии ответа
Хранение аудита indefinite SOX: существенные позиции; MiFID II: не менее 5 лет
Доверительный интервал VaR 0.95 95% для внутреннего управления рисками
Доверительный интервал VaR (Basel III) 0.99 99% для расчёта регуляторного капитала
Период хранения VaR 1 day Внутренняя отчётность по рискам
Период хранения VaR (Basel III) 10 days Требование к регуляторному капиталу
Порог убытка при стресс-тесте 0.20 Убыток 20% инициирует проверку человеком
Лимит концентрации позиции 0.25 Не более 25% портфеля в одном активе
Лимит валового плеча 5.0 Сумма длинных + коротких позиций / NAV
Сумма транзакции (автоодобрение) $10,000 Ниже порога: одобрение не требуется
Сумма транзакции (одобрение старшего) $100,000 $10K–$100K: требует старшего аналитика
Сумма транзакции (одобрение руководства) $1,000,000 Свыше $1M: требует C-level
Rate limit API (Alpha Vantage бесплатный) 5 calls/min Перейти на премиум для 75 вызовов/мин
Rate limit API (production) 100 calls/min Реализовать circuit breaker при 80% порога
Cache TTL (рыночные данные) 60s Котировки в реальном времени; увеличить до 300s для задержанных данных
Cache TTL (справочные данные) 86400s Метаданные инструментов, статические регуляторные правила

Типичные ошибки

Ошибка 1: Отправка PII в LLM API без редактирования

Последствия: нарушения GDPR (штрафы до 4% глобального оборота), регуляторные взыскания, потеря доверия клиентов, потенциальные обязательства по раскрытию утечки данных.

Неверно:

client_profile = db.query(
    "SELECT name, ssn, account_number, net_worth FROM clients WHERE id = ?",
    client_id
)
# DANGER: Sending raw SSN to LLM
analysis = anthropic_client.messages.create(
    model="claude-opus-4-6",
    messages=[{
        "role": "user",
        "content": f"Analyze suitability for client: {client_profile}"
    }]
)

Верно:

client_profile = db.query(
    "SELECT name, ssn, account_number, net_worth FROM clients WHERE id = ?",
    client_id
)

# Redact PII before LLM call
redactor = PIIRedactor()
safe_profile = redactor.redact_dict(client_profile)
# Result: {"name": "[REDACTED]", "ssn": "[REDACTED_SSN]",
#          "account_number": "••••••7890", "net_worth": 500000}

analysis = anthropic_client.messages.create(
    model="claude-opus-4-6",
    messages=[{
        "role": "user",
        "content": f"Analyze suitability for client profile: {safe_profile}"
    }]
)

# Log redacted version to audit trail
audit_trail.record(
    event="suitability_analysis",
    data=safe_profile,  # Never log original PII
    llm_response=analysis
)

Ошибка 2: Отсутствие разделения обязанностей в рабочем процессе одобрения

Последствия: нарушение SOX Section 404 (существенный недостаток внутреннего контроля), провал аудита, повышенный риск мошенничества, потенциальные меры принуждения со стороны SEC.

Неверно:

def process_payment(user_id: str, amount: float):
    # DANGER: Same user can initiate AND approve
    payment = create_payment(user_id, amount)
    approve_payment(payment.id, approved_by=user_id)  # Same user!
    execute_payment(payment.id)

Верно:

def process_payment(initiator_id: str, amount: float):
    # Create payment (initiator role)
    payment = create_payment(initiator_id, amount)
    audit_trail.record(event="payment_created", actor=initiator_id, amount=amount)

    # Route to approval queue
    if amount > 10_000:
        approval = approval_system.create_review(
            payment_id=payment.id,
            initiator=initiator_id,
            amount=amount,
            requires_role="senior_approver"
        )
        # Workflow pauses here; different user must approve
        return {"status": "pending_approval", "approval_id": approval.id}
    else:
        # Small amounts: dual control not required
        execute_payment(payment.id)
        return {"status": "executed"}

async def approve_payment(approval_id: str, approver_id: str):
    approval = db.get_approval(approval_id)

    # SOX control: initiator != approver
    if approval.initiator_id == approver_id:
        raise ComplianceViolation(
            "SOX_SOD_001: Initiator and approver cannot be the same person"
        )

    # Check approver has required role
    if "senior_approver" not in get_user_roles(approver_id):
        raise InsufficientPermissions("Requires senior_approver role")

    # Execute payment
    execute_payment(approval.payment_id)
    audit_trail.record(
        event="payment_approved",
        initiator=approval.initiator_id,
        approver=approver_id,
        amount=approval.amount
    )

Ошибка 3: Проверка соответствия ПОСЛЕ выполнения сделки

Последствия: регуляторное нарушение (требование пригодности MiFID II), потенциальные расходы на откат сделки, вред клиенту (продан неподходящий продукт), штрафы и репутационный ущерб.

Неверно:

async def execute_trade(client_id: str, instrument: str, quantity: int):
    # DANGER: Execute first, check compliance later
    trade_result = trading_api.submit_order(
        client=client_id,
        symbol=instrument,
        qty=quantity
    )

    # Too late! Trade already executed
    compliance_result = mifid_checker.check_suitability(
        client_id=client_id,
        product=instrument
    )

    if not compliance_result.is_compliant:
        # Now we have a problem - trade is live but non-compliant
        logger.error(f"Post-trade compliance failure: {compliance_result.violations}")

Верно:

async def execute_trade(client_id: str, instrument: str, quantity: int):
    # Step 1: Fetch client profile and product details
    client = db.get_client(client_id)
    product = db.get_instrument(instrument)

    # Step 2: COMPLIANCE CHECK BEFORE EXECUTION
    compliance_result = mifid_checker.check_suitability(
        client=client,
        product=product,
        investment_objective=client.objective
    )

    if not compliance_result.is_compliant:
        audit_trail.record(
            event="trade_blocked",
            reason="compliance_violation",
            violations=compliance_result.violations
        )
        return {
            "status": "rejected",
            "reason": "MiFID II compliance check failed",
            "violations": compliance_result.violations
        }

    # Step 3: Risk check
    risk_result = risk_engine.check_limits(
        client_id=client_id,
        new_position={instrument: quantity}
    )

    if not risk_result.within_limits:
        return {
            "status": "rejected",
            "reason": "Risk limit breach",
            "details": risk_result.breaches
        }

    # Step 4: Execute trade (only if compliance + risk checks pass)
    trade_result = trading_api.submit_order(
        client=client_id,
        symbol=instrument,
        qty=quantity
    )

    # Step 5: Log successful compliant trade
    audit_trail.record(
        event="trade_executed",
        client=client_id,
        instrument=instrument,
        qty=quantity,
        compliance_checks_passed=True,
        trade_id=trade_result.trade_id
    )

    return {"status": "executed", "trade_id": trade_result.trade_id}

Ошибка 4: Использование параметрического VaR для ненормально распределённых доходностей

Последствия: существенное занижение хвостового риска (игнорируются тяжёлые хвосты и асимметрия), недостаточные резервы капитала, потенциальные маржин-коллы в условиях рыночного стресса, недостаточность капитала по Basel III.

Неверно:

def calculate_var(returns: pd.Series) -> float:
    # DANGER: Assumes normal distribution (invalid for crypto, options, emerging markets)
    mean = returns.mean()
    std = returns.std()
    z_score = stats.norm.ppf(0.95)  # 95% confidence
    var = -(mean + z_score * std)
    return var

# Applied to Bitcoin returns (highly non-normal: kurtosis ~10, skew ~-0.5)
btc_var = calculate_var(btc_returns)  # Severely underestimates risk!

Верно:

def calculate_var_robust(returns: pd.Series, method: str = "historical") -> dict:
    if method == "historical":
        # Non-parametric: uses actual distribution (handles fat tails, skewness)
        sorted_returns = np.sort(returns.dropna())
        var_index = int(0.05 * len(sorted_returns))  # 95% VaR = 5th percentile
        var = -sorted_returns[var_index]

        # CVaR (Expected Shortfall): average loss beyond VaR
        tail_losses = sorted_returns[:var_index]
        cvar = -np.mean(tail_losses)

    elif method == "parametric":
        # Use ONLY if returns are approximately normal (test first!)
        from scipy.stats import jarque_bera
        jb_stat, jb_pvalue = jarque_bera(returns)

        if jb_pvalue < 0.05:
            logger.warning(
                f"Returns are non-normal (JB test p={jb_pvalue:.4f}). "
                "Parametric VaR may underestimate risk."
            )

        mean = returns.mean()
        std = returns.std()
        var = -(mean + stats.norm.ppf(0.95) * std)
        cvar = -(mean - std * stats.norm.pdf(stats.norm.ppf(0.95)) / 0.05)

    elif method == "cornish_fisher":
        # Adjusts for skewness and kurtosis (better for moderate non-normality)
        from scipy.stats import skew, kurtosis
        mean = returns.mean()
        std = returns.std()
        s = skew(returns)
        k = kurtosis(returns)

        # Cornish-Fisher expansion
        z = stats.norm.ppf(0.95)
        z_cf = z + (z**2 - 1)*s/6 + (z**3 - 3*z)*(k-3)/24 - (2*z**3 - 5*z)*s**2/36
        var = -(mean + z_cf * std)
        cvar = None  # CF doesn't provide closed-form CVaR

    return {
        "var": round(float(var), 6),
        "cvar": round(float(cvar), 6) if cvar else None,
        "method": method,
        "num_observations": len(returns),
        "returns_skewness": round(float(skew(returns)), 3),
        "returns_kurtosis": round(float(kurtosis(returns)), 3)
    }

# Usage: detect non-normality and choose appropriate method
var_result = calculate_var_robust(btc_returns, method="historical")
# Or: var_result = calculate_var_robust(btc_returns, method="cornish_fisher")

Ошибка 5: Отсутствие circuit breaker при сбоях внешнего API

Последствия: каскадные сбои (повторные попытки агента перегружают падающий API), взрывной рост расходов (цикл повторов потребляет токены), запоздалое обнаружение сбоя, ухудшение пользовательского опыта.

Неверно:

def fetch_market_data(symbol: str) -> dict:
    # DANGER: Infinite retries on API failure
    while True:
        try:
            response = requests.get(
                f"https://api.example.com/quote/{symbol}",
                timeout=5
            )
            return response.json()
        except requests.exceptions.RequestException:
            time.sleep(1)  # Retry forever!

Верно:

import tenacity
from circuitbreaker import circuit

class MarketDataAPI:
    def __init__(self):
        self.failure_count = 0
        self.circuit_open = False

    @circuit(failure_threshold=5, recovery_timeout=60, expected_exception=APIError)
    @tenacity.retry(
        stop=tenacity.stop_after_attempt(3),
        wait=tenacity.wait_exponential(multiplier=1, min=2, max=10),
        retry=tenacity.retry_if_exception_type(requests.exceptions.RequestException),
        reraise=True
    )
    def fetch_market_data(self, symbol: str) -> dict:
        """
        Fetches market data with:
        - 3 retry attempts with exponential backoff (2s, 4s, 8s)
        - Circuit breaker: opens after 5 failures, auto-recovers after 60s
        """
        try:
            response = requests.get(
                f"https://api.example.com/quote/{symbol}",
                timeout=5
            )
            response.raise_for_status()
            return response.json()

        except requests.exceptions.Timeout:
            logger.warning(f"Timeout fetching {symbol}")
            raise APIError(f"Timeout for {symbol}")

        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                # Rate limit hit
                logger.error(f"Rate limit exceeded for {symbol}")
                raise RateLimitError("API rate limit")
            elif e.response.status_code >= 500:
                # Server error - retry
                logger.error(f"Server error for {symbol}: {e}")
                raise APIError(f"Server error for {symbol}")
            else:
                # Client error (4xx) - don't retry
                logger.error(f"Client error for {symbol}: {e}")
                raise ValueError(f"Invalid symbol or request: {symbol}")

    def fetch_with_fallback(self, symbol: str) -> dict:
        """
        Fetch from primary API; fallback to secondary on failure.
        """
        try:
            return self.fetch_market_data(symbol)
        except CircuitBreakerError:
            logger.warning(f"Circuit open for primary API, using fallback")
            return self._fetch_from_fallback_api(symbol)
        except APIError:
            logger.warning(f"Primary API failed for {symbol}, using fallback")
            return self._fetch_from_fallback_api(symbol)

    def _fetch_from_fallback_api(self, symbol: str) -> dict:
        # Fallback to cached data or secondary API
        cached = redis_client.get(f"quote:{symbol}")
        if cached:
            logger.info(f"Returning cached data for {symbol}")
            return json.loads(cached)
        else:
            raise DataUnavailableError(f"No data available for {symbol}")

Агент мониторинга рисков

import numpy as np
import pandas as pd
from scipy import stats
from typing import Literal

class RiskMonitoringAgent:
    """
    Production-grade portfolio risk monitoring agent.
    Computes VaR, CVaR, stress tests, position limits.
    """

    def __init__(self, audit_trail, alert_system):
        self.audit = audit_trail
        self.alerts = alert_system

    def compute_portfolio_var(
        self,
        positions: dict[str, float],  # {symbol: market_value}
        returns_history: pd.DataFrame,  # columns = symbols, index = dates
        confidence_level: float = 0.95,
        holding_period: int = 1,
        method: Literal["historical", "parametric", "monte_carlo"] = "historical"
    ) -> dict:
        """
        Compute portfolio VaR using specified method.

        Args:
            positions: Current portfolio positions {symbol: market_value_usd}
            returns_history: Historical returns DataFrame (daily)
            confidence_level: 0.95 for 95% VaR, 0.99 for Basel III
            holding_period: Days to hold (1 for daily, 10 for Basel III)
            method: VaR calculation method

        Returns:
            {
                "var_pct": float,
                "var_dollar": float,
                "cvar_dollar": float,
                "method": str,
                "breaches_limit": bool
            }
        """
        portfolio_value = sum(positions.values())

        # Calculate portfolio weights
        weights = np.array([
            positions.get(symbol, 0) / portfolio_value
            for symbol in returns_history.columns
        ])

        if method == "historical":
            # Historical simulation VaR
            portfolio_returns = (returns_history @ weights).dropna()
            sorted_returns = np.sort(portfolio_returns)

            var_index = int((1 - confidence_level) * len(sorted_returns))
            var_1day = -sorted_returns[var_index]
            var_scaled = var_1day * np.sqrt(holding_period)

            # CVaR (Expected Shortfall)
            tail_losses = sorted_returns[:var_index]
            cvar_scaled = -np.mean(tail_losses) * np.sqrt(holding_period)

        elif method == "parametric":
            # Variance-covariance VaR
            cov_matrix = returns_history.cov().values
            portfolio_variance = weights @ cov_matrix @ weights
            portfolio_std = np.sqrt(portfolio_variance)

            z_score = stats.norm.ppf(confidence_level)
            var_scaled = z_score * portfolio_std * np.sqrt(holding_period)

            # CVaR for normal distribution
            cvar_scaled = (portfolio_std * stats.norm.pdf(z_score) /
                          (1 - confidence_level)) * np.sqrt(holding_period)

        elif method == "monte_carlo":
            # Monte Carlo simulation (10,000 scenarios)
            cov_matrix = returns_history.cov().values
            mean_returns = returns_history.mean().values

            n_simulations = 10_000
            simulated_returns = np.random.multivariate_normal(
                mean_returns * holding_period,
                cov_matrix * holding_period,
                n_simulations
            )
            portfolio_sim_returns = simulated_returns @ weights

            var_scaled = -np.percentile(portfolio_sim_returns, (1 - confidence_level) * 100)
            cvar_scaled = -np.mean(portfolio_sim_returns[
                portfolio_sim_returns <= -var_scaled
            ])

        var_dollar = var_scaled * portfolio_value
        cvar_dollar = cvar_scaled * portfolio_value

        # Check if VaR breaches threshold (5% of portfolio value)
        var_limit = 0.05 * portfolio_value
        breaches_limit = var_dollar > var_limit

        if breaches_limit:
            self.alerts.send_alert(
                severity="high",
                message=f"VaR ${var_dollar:,.0f} exceeds limit ${var_limit:,.0f}",
                portfolio_id=self._get_portfolio_id(positions)
            )

        # Log to audit trail
        self.audit.record(
            event="var_calculation",
            data={
                "portfolio_value": portfolio_value,
                "var_dollar": round(var_dollar, 2),
                "cvar_dollar": round(cvar_dollar, 2),
                "method": method,
                "confidence_level": confidence_level,
                "holding_period": holding_period
            },
            outcome="completed"
        )

        return {
            "var_pct": round(var_scaled, 6),
            "var_dollar": round(var_dollar, 2),
            "cvar_dollar": round(cvar_dollar, 2),
            "portfolio_value": portfolio_value,
            "method": method,
            "confidence_level": confidence_level,
            "holding_period_days": holding_period,
            "breaches_limit": breaches_limit,
            "limit_threshold": var_limit
        }

    def run_stress_tests(
        self,
        positions: dict[str, float],
        scenario: Literal["2008_crisis", "covid_2020", "rate_hike_2022", "custom"] = "2008_crisis",
        custom_shocks: dict[str, float] = None
    ) -> dict:
        """
        Apply historical stress scenarios to portfolio.

        Args:
            positions: {symbol: market_value_usd}
            scenario: Pre-defined scenario or "custom"
            custom_shocks: If scenario="custom", provide {symbol: shock_pct}

        Returns:
            {
                "scenario": str,
                "total_pnl": float,
                "total_pnl_pct": float,
                "breaches_threshold": bool,
                "asset_impacts": dict
            }
        """
        scenarios = {
            "2008_crisis": {
                "description": "Lehman Brothers collapse (Sep 2008)",
                "shocks": {
                    "SPY": -0.40, "QQQ": -0.45, "IWM": -0.50,  # Equities
                    "TLT": +0.10, "IEF": +0.08,  # Bonds rallied
                    "GLD": -0.05, "USO": -0.60,  # Commodities
                    "UUP": +0.15  # USD strengthened
                }
            },
            "covid_2020": {
                "description": "COVID market crash (Mar 2020)",
                "shocks": {
                    "SPY": -0.34, "QQQ": -0.30, "IWM": -0.40,
                    "TLT": +0.05, "GLD": +0.10, "USO": -0.50
                }
            },
            "rate_hike_2022": {
                "description": "Fed rate hike cycle (2022)",
                "shocks": {
                    "SPY": -0.18, "QQQ": -0.33, "TLT": -0.20,
                    "GLD": -0.08, "UUP": +0.12
                }
            }
        }

        if scenario == "custom":
            if not custom_shocks:
                raise ValueError("Must provide custom_shocks when scenario='custom'")
            shock_dict = custom_shocks
            description = "Custom scenario"
        else:
            shock_dict = scenarios[scenario]["shocks"]
            description = scenarios[scenario]["description"]

        portfolio_value = sum(positions.values())
        total_pnl = 0
        asset_impacts = {}

        for symbol, position_value in positions.items():
            shock = shock_dict.get(symbol, -0.15)  # Default -15% if not specified
            pnl = position_value * shock
            total_pnl += pnl

            asset_impacts[symbol] = {
                "current_value": position_value,
                "shock_applied_pct": f"{shock * 100:.1f}%",
                "pnl": round(pnl, 2),
                "post_shock_value": round(position_value * (1 + shock), 2)
            }

        total_pnl_pct = total_pnl / portfolio_value

        # Check threshold: 20% loss triggers human review
        loss_threshold = -0.20
        breaches_threshold = total_pnl_pct < loss_threshold

        if breaches_threshold:
            self.alerts.send_alert(
                severity="critical",
                message=(
                    f"Stress test {scenario}: Portfolio loss "
                    f"{total_pnl_pct*100:.1f}% exceeds {loss_threshold*100}% threshold"
                ),
                portfolio_id=self._get_portfolio_id(positions)
            )

        self.audit.record(
            event="stress_test",
            data={
                "scenario": scenario,
                "description": description,
                "total_pnl": round(total_pnl, 2),
                "total_pnl_pct": round(total_pnl_pct, 4),
                "breaches_threshold": breaches_threshold
            },
            outcome="completed"
        )

        return {
            "scenario": scenario,
            "description": description,
            "total_pnl": round(total_pnl, 2),
            "total_pnl_pct": f"{total_pnl_pct * 100:.2f}%",
            "portfolio_value": portfolio_value,
            "breaches_threshold": breaches_threshold,
            "threshold": f"{loss_threshold * 100}%",
            "asset_impacts": asset_impacts
        }

    def check_position_limits(
        self,
        positions: dict[str, float],
        concentration_limit: float = 0.25,
        leverage_limit: float = 5.0
    ) -> dict:
        """
        Check portfolio concentration and leverage limits.

        Args:
            positions: {symbol: market_value_usd} (negative for shorts)
            concentration_limit: Max % of portfolio in single position
            leverage_limit: Max gross leverage (long + short) / NAV

        Returns:
            {
                "concentration_violations": list,
                "leverage_ratio": float,
                "leverage_violation": bool,
                "compliant": bool
            }
        """
        portfolio_value = sum(abs(v) for v in positions.values())  # Gross value
        nav = sum(positions.values())  # Net asset value

        concentration_violations = []
        for symbol, position_value in positions.items():
            concentration = abs(position_value) / nav
            if concentration > concentration_limit:
                concentration_violations.append({
                    "symbol": symbol,
                    "concentration_pct": f"{concentration * 100:.1f}%",
                    "limit_pct": f"{concentration_limit * 100}%",
                    "position_value": position_value
                })

        gross_leverage = portfolio_value / nav if nav > 0 else 0
        leverage_violation = gross_leverage > leverage_limit

        compliant = len(concentration_violations) == 0 and not leverage_violation

        if not compliant:
            self.alerts.send_alert(
                severity="high",
                message=f"Position limit breach: {len(concentration_violations)} concentration violations, leverage {gross_leverage:.2f}x",
                portfolio_id=self._get_portfolio_id(positions)
            )

        self.audit.record(
            event="position_limit_check",
            data={
                "concentration_violations": len(concentration_violations),
                "gross_leverage": round(gross_leverage, 2),
                "compliant": compliant
            },
            outcome="completed"
        )

        return {
            "concentration_violations": concentration_violations,
            "gross_leverage": round(gross_leverage, 2),
            "leverage_limit": leverage_limit,
            "leverage_violation": leverage_violation,
            "compliant": compliant
        }

    def _get_portfolio_id(self, positions: dict) -> str:
        # In production: lookup portfolio ID from positions
        return "portfolio_" + hashlib.md5(
            json.dumps(positions, sort_keys=True).encode()
        ).hexdigest()[:8]

Пример использования:

# Initialize agent
audit_trail = ImmutableAuditTrail(storage=S3WORMStorage())
alert_system = AlertSystem(slack_webhook=SLACK_URL)
risk_agent = RiskMonitoringAgent(audit_trail, alert_system)

# Current portfolio
portfolio = {
    "SPY": 500_000,   # S&P 500 ETF
    "QQQ": 300_000,   # Nasdaq ETF
    "TLT": 150_000,   # 20Y Treasury ETF
    "GLD": 50_000     # Gold ETF
}

# Fetch historical returns (252 trading days)
returns_df = fetch_historical_returns(
    symbols=list(portfolio.keys()),
    days=252
)

# Compute VaR
var_result = risk_agent.compute_portfolio_var(
    positions=portfolio,
    returns_history=returns_df,
    confidence_level=0.95,
    holding_period=1,
    method="historical"
)
print(f"95% 1-day VaR: ${var_result['var_dollar']:,.0f}")
print(f"Breaches limit: {var_result['breaches_limit']}")

# Run stress tests
stress_result = risk_agent.run_stress_tests(
    positions=portfolio,
    scenario="2008_crisis"
)
print(f"2008 crisis scenario P&L: ${stress_result['total_pnl']:,.0f}")

# Check position limits
limits_result = risk_agent.check_position_limits(
    positions=portfolio,
    concentration_limit=0.25,
    leverage_limit=5.0
)
print(f"Position limits compliant: {limits_result['compliant']}")

Соответствие требованиям и аудиторский след

import hashlib
import json
import uuid
from datetime import datetime
from enum import Enum

class ComplianceFramework(Enum):
    MIFID_II = "mifid_ii"
    SOX = "sox"
    BASEL_III = "basel_iii"
    GDPR = "gdpr"

class ImmutableAuditTrail:
    """
    Hash-chained audit trail for tamper detection.
    Each entry's hash includes the previous entry's hash (blockchain-style).
    """

    def __init__(self, storage_backend):
        self.storage = storage_backend
        self._last_hash = self._load_last_hash()

    def record(
        self,
        event_type: str,
        actor: str,  # "financial_agent_v2" or user ID
        action: str,
        data: dict,
        outcome: str,
        metadata: dict = None
    ) -> str:
        """
        Record immutable audit event.

        Args:
            event_type: "llm_call", "tool_call", "compliance_check", "human_approval", etc.
            actor: Agent ID or user ID
            action: Specific action taken
            data: Relevant data (PII must be redacted before calling this)
            outcome: "success", "failure", "pending", etc.
            metadata: Additional context

        Returns:
            event_id: UUID of recorded event
        """
        event_id = str(uuid.uuid4())
        timestamp = datetime.utcnow().isoformat() + "Z"

        event = {
            "event_id": event_id,
            "timestamp": timestamp,
            "event_type": event_type,
            "actor": actor,
            "action": action,
            "outcome": outcome,
            "data": data,
            "metadata": metadata or {}
        }

        # Hash chaining for tamper detection
        event_json = json.dumps(event, sort_keys=True, default=str)
        current_hash = hashlib.sha256(
            f"{self._last_hash}{event_json}".encode()
        ).hexdigest()

        event["hash"] = current_hash
        event["previous_hash"] = self._last_hash

        # Persist to immutable storage
        self.storage.append(event)
        self._last_hash = current_hash

        return event_id

    def verify_chain_integrity(self, from_event_id: str = None) -> dict:
        """
        Verify audit trail has not been tampered with.
        Recomputes hashes and checks chain linkage.
        """
        events = self.storage.read_all(from_event_id=from_event_id)
        violations = []

        prev_hash = events[0]["previous_hash"] if events else "GENESIS"

        for event in events:
            # Reconstruct hash
            event_copy = {k: v for k, v in event.items()
                         if k not in ("hash", "previous_hash")}
            event_json = json.dumps(event_copy, sort_keys=True, default=str)
            expected_hash = hashlib.sha256(
                f"{event['previous_hash']}{event_json}".encode()
            ).hexdigest()

            if event["hash"] != expected_hash:
                violations.append({
                    "event_id": event["event_id"],
                    "timestamp": event["timestamp"],
                    "issue": "Hash mismatch - potential tampering"
                })

            if event["previous_hash"] != prev_hash:
                violations.append({
                    "event_id": event["event_id"],
                    "timestamp": event["timestamp"],
                    "issue": "Chain break - event sequence tampered"
                })

            prev_hash = event["hash"]

        return {
            "integrity_verified": len(violations) == 0,
            "events_checked": len(events),
            "violations": violations,
            "verification_timestamp": datetime.utcnow().isoformat() + "Z"
        }

    def _load_last_hash(self) -> str:
        last_event = self.storage.get_latest()
        return last_event["hash"] if last_event else "GENESIS"

class MiFIDIIComplianceAgent:
    """
    MiFID II compliance checker for financial agents.
    Enforces suitability, best execution, transaction reporting.
    """

    def __init__(self, audit_trail):
        self.audit = audit_trail
        self.COST_DISCLOSURE_THRESHOLD = 0.01  # 1%

    def check_suitability(
        self,
        client: dict,
        product: dict,
        investment_objective: str
    ) -> dict:
        """
        MiFID II Article 25: Suitability assessment.
        Required for investment advice and portfolio management.

        Args:
            client: {
                "client_id": str,
                "category": "retail" | "professional" | "eligible_counterparty",
                "risk_tolerance_score": int (1-10),
                "net_worth_eur": float,
                "has_relevant_experience": bool,
                "objective": "growth" | "income" | "capital_preservation"
            }
            product: {
                "isin": str,
                "complexity": "simple" | "non_complex" | "complex",
                "risk_score": int (1-10),
                "min_recommended_net_worth": float,
                "max_loss_scenario_pct": float,
                "total_costs_pct": float
            }
            investment_objective: Client's stated objective for this trade

        Returns:
            {
                "is_compliant": bool,
                "violations": list[str],
                "required_disclosures": list[str],
                "suitability_score": float
            }
        """
        violations = []
        disclosures = []

        # Rule 1: Retail + Complex = Full suitability required
        if (client["category"] == "retail" and
            product["complexity"] == "complex"):

            # Check knowledge & experience
            if not client.get("has_relevant_experience", False):
                violations.append(
                    "MIFID2_SUITABILITY_001: Retail client lacks required "
                    "knowledge/experience for complex product"
                )

            # Check financial situation
            if client["net_worth_eur"] < product.get("min_recommended_net_worth", 0):
                violations.append(
                    f"MIFID2_SUITABILITY_002: Client net worth "
                    f"€{client['net_worth_eur']:,.0f} below product minimum "
                    f"€{product['min_recommended_net_worth']:,.0f}"
                )

            # Check risk tolerance
            client_risk = client.get("risk_tolerance_score", 0)
            product_risk = product.get("risk_score", 10)
            if client_risk < product_risk - 2:
                violations.append(
                    f"MIFID2_SUITABILITY_003: Risk mismatch - client {client_risk}/10 "
                    f"vs product {product_risk}/10"
                )

        # Rule 2: Investment objective alignment
        if investment_objective == "capital_preservation":
            if product.get("max_loss_scenario_pct", 0) > 0.20:
                violations.append(
                    f"MIFID2_SUITABILITY_004: Product max loss "
                    f"{product['max_loss_scenario_pct']*100:.0f}% incompatible "
                    "with capital preservation objective"
                )

        # Rule 3: Cost disclosure (Article 50)
        if product.get("total_costs_pct", 0) > self.COST_DISCLOSURE_THRESHOLD:
            disclosures.append(
                f"COST_DISCLOSURE: Total costs "
                f"{product['total_costs_pct']*100:.2f}% must be disclosed "
                "to client before execution (MiFID II Art. 50)"
            )

        # Rule 4: Best interest obligation for retail
        if client["category"] == "retail":
            disclosures.append(
                "BEST_INTEREST: Firm must act in client's best interest (MiFID II Art. 24.1)"
            )

        # Compute suitability score (0-1)
        suitability_score = self._compute_suitability_score(client, product)

        # Audit log
        self.audit.record(
            event_type="compliance_check",
            actor="mifid_compliance_agent",
            action="suitability_assessment",
            data={
                "client_id": client["client_id"],
                "product_isin": product["isin"],
                "is_compliant": len(violations) == 0,
                "violations_count": len(violations),
                "suitability_score": suitability_score
            },
            outcome="completed"
        )

        return {
            "is_compliant": len(violations) == 0,
            "violations": violations,
            "required_disclosures": disclosures,
            "suitability_score": round(suitability_score, 2),
            "framework": "MiFID II Article 25"
        }

    def check_best_execution(
        self,
        order: dict,
        execution_venues: list[dict]
    ) -> dict:
        """
        MiFID II Article 27: Best execution obligation.

        Args:
            order: {"side": "buy" | "sell", "quantity": int, "client_category": str}
            execution_venues: [
                {
                    "venue_name": str,
                    "bid_price": float,
                    "ask_price": float,
                    "fill_probability": float,
                    "avg_latency_ms": float,
                    "reliability_score": float (0-1)
                }
            ]

        Returns:
            {
                "best_execution_met": bool,
                "selected_venue": str,
                "selection_rationale": str
            }
        """
        if not execution_venues:
            return {
                "best_execution_met": False,
                "reason": "No execution venues provided",
                "selected_venue": None
            }

        # Score venues (price = primary factor for retail)
        scored_venues = []
        for venue in execution_venues:
            # Price score
            if order["side"] == "buy":
                price_score = -venue["ask_price"]  # Lower ask = better
            else:
                price_score = venue["bid_price"]  # Higher bid = better

            # Composite score (MiFID II factor weights)
            score = (
                0.60 * price_score +
                0.20 * venue["fill_probability"] +
                0.10 * (-venue["avg_latency_ms"] / 1000) +
                0.10 * venue["reliability_score"]
            )
            scored_venues.append({**venue, "best_exec_score": score})

        best_venue = max(scored_venues, key=lambda x: x["best_exec_score"])

        # Audit log
        self.audit.record(
            event_type="compliance_check",
            actor="mifid_compliance_agent",
            action="best_execution_check",
            data={
                "order_side": order["side"],
                "selected_venue": best_venue["venue_name"],
                "venues_considered": len(execution_venues)
            },
            outcome="completed"
        )

        return {
            "best_execution_met": True,
            "selected_venue": best_venue["venue_name"],
            "selection_rationale": "Best price with adequate fill probability and low latency",
            "all_venues_considered": [v["venue_name"] for v in execution_venues],
            "documentation_required": True,  # 5-year retention
            "framework": "MiFID II Article 27"
        }

    def _compute_suitability_score(self, client: dict, product: dict) -> float:
        """Compute 0-1 suitability score."""
        score = 1.0

        # Risk tolerance alignment
        risk_diff = abs(client.get("risk_tolerance_score", 5) -
                       product.get("risk_score", 5))
        score -= risk_diff * 0.10  # -0.10 per point of mismatch

        # Net worth adequacy
        if product.get("min_recommended_net_worth", 0) > 0:
            adequacy_ratio = (client["net_worth_eur"] /
                            product["min_recommended_net_worth"])
            if adequacy_ratio < 1.0:
                score -= (1.0 - adequacy_ratio) * 0.30

        # Experience
        if not client.get("has_relevant_experience") and product["complexity"] == "complex":
            score -= 0.25

        return max(0.0, score)

class HumanApprovalWorkflow:
    """
    Human-in-the-loop approval system for high-risk decisions.
    Integrates with ticketing systems (Jira, ServiceNow) and Slack.
    """

    def __init__(self, db_client, notification_client, audit_trail):
        self.db = db_client
        self.notifier = notification_client
        self.audit = audit_trail

    def create_approval_request(
        self,
        agent_task_id: str,
        decision_type: str,
        data_to_review: dict,
        risk_level: Literal["low", "medium", "high", "critical"],
        context: str,
        timeout_hours: int = 24
    ) -> dict:
        """
        Create human approval request and pause agent execution.

        Args:
            agent_task_id: ID of the paused agent task
            decision_type: "trade_execution", "compliance_override", etc.
            data_to_review: Data requiring human judgment (PII redacted)
            risk_level: Determines reviewer assignment
            context: Human-readable explanation
            timeout_hours: Auto-reject if no response

        Returns:
            {"approval_id": str, "status": "pending"}
        """
        approval_id = str(uuid.uuid4())

        # Assign reviewers based on risk level
        reviewer_pool = self._assign_reviewers(risk_level)

        approval_request = {
            "approval_id": approval_id,
            "agent_task_id": agent_task_id,
            "decision_type": decision_type,
            "data": data_to_review,
            "context": context,
            "risk_level": risk_level,
            "status": "pending",
            "assigned_to": reviewer_pool,
            "created_at": datetime.utcnow().isoformat(),
            "expires_at": (
                datetime.utcnow() + timedelta(hours=timeout_hours)
            ).isoformat()
        }

        # Persist to database
        self.db.insert("approval_requests", approval_request)

        # Notify reviewers (Slack, email, SMS for critical)
        self.notifier.send_review_notification(
            reviewer_pool=reviewer_pool,
            approval_id=approval_id,
            risk_level=risk_level,
            summary=f"{decision_type}: {context[:200]}",
            urgency="high" if risk_level == "critical" else "normal"
        )

        # Audit log
        self.audit.record(
            event_type="human_approval",
            actor="approval_workflow",
            action="approval_request_created",
            data={
                "approval_id": approval_id,
                "risk_level": risk_level,
                "assigned_to": reviewer_pool
            },
            outcome="pending"
        )

        return {"approval_id": approval_id, "status": "pending"}

    async def wait_for_approval(
        self,
        approval_id: str,
        poll_interval_seconds: int = 30
    ) -> dict:
        """
        Poll for approval decision (agent suspends here).
        In production: use webhooks or message queues instead of polling.

        Returns:
            {
                "status": "approved" | "rejected" | "expired",
                "reviewer_notes": str,
                "reviewed_by": str,
                "reviewed_at": str
            }
        """
        import asyncio

        start_time = datetime.utcnow()
        approval = self.db.find_one("approval_requests", {"approval_id": approval_id})
        timeout = datetime.fromisoformat(approval["expires_at"])

        while True:
            if datetime.utcnow() > timeout:
                # Timeout expired
                self.db.update(
                    "approval_requests",
                    {"approval_id": approval_id},
                    {"status": "expired"}
                )
                self.audit.record(
                    event_type="human_approval",
                    actor="approval_workflow",
                    action="approval_expired",
                    data={"approval_id": approval_id},
                    outcome="expired"
                )
                return {"status": "expired", "approval_id": approval_id}

            # Check status
            approval = self.db.find_one("approval_requests", {"approval_id": approval_id})

            if approval["status"] in ["approved", "rejected"]:
                self.audit.record(
                    event_type="human_approval",
                    actor=approval.get("reviewed_by", "unknown"),
                    action=f"approval_{approval['status']}",
                    data={"approval_id": approval_id},
                    outcome=approval["status"]
                )
                return {
                    "status": approval["status"],
                    "approval_id": approval_id,
                    "reviewer_notes": approval.get("reviewer_notes"),
                    "reviewed_by": approval.get("reviewed_by"),
                    "reviewed_at": approval.get("reviewed_at")
                }

            await asyncio.sleep(poll_interval_seconds)

    def _assign_reviewers(self, risk_level: str) -> list[str]:
        """Map risk level to reviewer team."""
        pools = {
            "low": ["junior_analyst_team"],
            "medium": ["senior_analyst_team"],
            "high": ["risk_manager_team"],
            "critical": ["cro_team", "cfo_team"]  # C-level for critical
        }
        return pools.get(risk_level, ["senior_analyst_team"])

Сквозной пример:

async def execute_compliant_trade(
    client_id: str,
    isin: str,
    quantity: int,
    side: Literal["buy", "sell"]
):
    """
    Full compliance workflow: suitability → risk → approval → execution.
    """
    # Initialize systems
    audit = ImmutableAuditTrail(storage=S3WORMStorage())
    compliance_agent = MiFIDIIComplianceAgent(audit)
    approval_workflow = HumanApprovalWorkflow(
        db_client=PostgresDB(),
        notification_client=SlackNotifier(),
        audit_trail=audit
    )

    # Step 1: Fetch client and product data
    client = db.get_client(client_id)
    product = db.get_instrument(isin)

    # Step 2: MiFID II suitability check
    suitability = compliance_agent.check_suitability(
        client=client,
        product=product,
        investment_objective=client["objective"]
    )

    if not suitability["is_compliant"]:
        return {
            "status": "rejected",
            "reason": "MiFID II suitability check failed",
            "violations": suitability["violations"]
        }

    # Step 3: Best execution check
    venues = fetch_execution_venues(isin)
    best_exec = compliance_agent.check_best_execution(
        order={"side": side, "quantity": quantity, "client_category": client["category"]},
        execution_venues=venues
    )

    # Step 4: Calculate trade value and check approval threshold
    estimated_price = venues[0]["ask_price"] if side == "buy" else venues[0]["bid_price"]
    trade_value = estimated_price * quantity

    if trade_value > 10_000:
        # Step 5: Human approval required
        approval = approval_workflow.create_approval_request(
            agent_task_id=f"trade_{uuid.uuid4()}",
            decision_type="trade_execution",
            data_to_review={
                "client_id": client_id,
                "isin": isin,
                "quantity": quantity,
                "side": side,
                "estimated_value": trade_value,
                "suitability_score": suitability["suitability_score"]
            },
            risk_level="high" if trade_value > 100_000 else "medium",
            context=f"{side.upper()} {quantity} units of {isin} for €{trade_value:,.0f}",
            timeout_hours=24
        )

        decision = await approval_workflow.wait_for_approval(approval["approval_id"])

        if decision["status"] != "approved":
            return {
                "status": "rejected",
                "reason": f"Human approval {decision['status']}",
                "notes": decision.get("reviewer_notes")
            }

    # Step 6: Execute trade
    trade_result = trading_api.submit_order(
        client=client_id,
        isin=isin,
        quantity=quantity,
        side=side,
        venue=best_exec["selected_venue"]
    )

    # Step 7: Log to audit trail
    audit.record(
        event_type="trade_execution",
        actor="financial_agent",
        action=f"{side}_order_executed",
        data={
            "client_id": client_id,
            "isin": isin,
            "quantity": quantity,
            "trade_id": trade_result["trade_id"],
            "compliance_checks_passed": True,
            "human_approved": trade_value > 10_000
        },
        outcome="success"
    )

    return {
        "status": "executed",
        "trade_id": trade_result["trade_id"],
        "venue": best_exec["selected_venue"]
    }

Агент обнаружения мошенничества

import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict
from typing import Literal

class FraudDetectionAgent:
    """
    Real-time transaction fraud detection using rule-based and ML approaches.
    """

    def __init__(self, audit_trail, alert_system):
        self.audit = audit_trail
        self.alerts = alert_system
        self.transaction_history = defaultdict(list)  # {account_id: [transactions]}

    def analyze_transaction(
        self,
        transaction: dict
    ) -> dict:
        """
        Analyze transaction for fraud indicators.

        Args:
            transaction: {
                "transaction_id": str,
                "account_id": str,
                "amount": float,
                "currency": str,
                "merchant": str,
                "merchant_category": str,
                "timestamp": str (ISO format),
                "ip_address": str,
                "device_id": str,
                "geo_location": {"country": str, "city": str}
            }

        Returns:
            {
                "fraud_score": float (0-1),
                "risk_level": "low" | "medium" | "high" | "critical",
                "flags": list[str],
                "action": "approve" | "review" | "block"
            }
        """
        flags = []
        fraud_score = 0.0

        # Rule 1: Velocity check (transaction frequency)
        velocity_score, velocity_flags = self._check_velocity(
            transaction["account_id"],
            transaction["timestamp"]
        )
        fraud_score += velocity_score
        flags.extend(velocity_flags)

        # Rule 2: Amount anomaly
        amount_score, amount_flags = self._check_amount_anomaly(
            transaction["account_id"],
            transaction["amount"]
        )
        fraud_score += amount_score
        flags.extend(amount_flags)

        # Rule 3: Geographic anomaly
        geo_score, geo_flags = self._check_geographic_anomaly(
            transaction["account_id"],
            transaction["geo_location"]
        )
        fraud_score += geo_score
        flags.extend(geo_flags)

        # Rule 4: Merchant risk
        merchant_score = self._check_merchant_risk(
            transaction["merchant_category"]
        )
        fraud_score += merchant_score

        # Rule 5: Device fingerprint
        device_score, device_flags = self._check_device_anomaly(
            transaction["account_id"],
            transaction["device_id"]
        )
        fraud_score += device_score
        flags.extend(device_flags)

        # Normalize score to 0-1
        fraud_score = min(1.0, fraud_score)

        # Determine risk level and action
        if fraud_score >= 0.80:
            risk_level = "critical"
            action = "block"
        elif fraud_score >= 0.60:
            risk_level = "high"
            action = "review"
        elif fraud_score >= 0.40:
            risk_level = "medium"
            action = "review"
        else:
            risk_level = "low"
            action = "approve"

        # Alert if high risk
        if risk_level in ["high", "critical"]:
            self.alerts.send_alert(
                severity=risk_level,
                message=f"Fraud detected: Account {transaction['account_id']}, "
                        f"Score {fraud_score:.2f}, Flags: {flags}",
                transaction_id=transaction["transaction_id"]
            )

        # Audit log
        self.audit.record(
            event_type="fraud_detection",
            actor="fraud_detection_agent",
            action="transaction_analysis",
            data={
                "transaction_id": transaction["transaction_id"],
                "account_id": transaction["account_id"],
                "fraud_score": round(fraud_score, 3),
                "risk_level": risk_level,
                "action": action,
                "flags": flags
            },
            outcome=action
        )

        # Store transaction in history
        self.transaction_history[transaction["account_id"]].append(transaction)

        return {
            "fraud_score": round(fraud_score, 3),
            "risk_level": risk_level,
            "flags": flags,
            "action": action,
            "transaction_id": transaction["transaction_id"]
        }

    def _check_velocity(self, account_id: str, timestamp: str) -> tuple[float, list[str]]:
        """
        Check transaction frequency (velocity rule).
        >5 transactions in 10 minutes = suspicious.
        """
        current_time = datetime.fromisoformat(timestamp)
        recent_window = current_time - timedelta(minutes=10)

        recent_txns = [
            txn for txn in self.transaction_history.get(account_id, [])
            if datetime.fromisoformat(txn["timestamp"]) > recent_window
        ]

        txn_count = len(recent_txns)
        flags = []

        if txn_count > 10:
            flags.append(f"VELOCITY_CRITICAL: {txn_count} transactions in 10 minutes")
            return 0.40, flags
        elif txn_count > 5:
            flags.append(f"VELOCITY_HIGH: {txn_count} transactions in 10 minutes")
            return 0.25, flags
        elif txn_count > 3:
            flags.append(f"VELOCITY_MEDIUM: {txn_count} transactions in 10 minutes")
            return 0.10, flags

        return 0.0, flags

    def _check_amount_anomaly(self, account_id: str, amount: float) -> tuple[float, list[str]]:
        """
        Check if amount deviates significantly from historical pattern.
        """
        history = self.transaction_history.get(account_id, [])
        if len(history) < 10:
            # Insufficient history
            return 0.0, []

        historical_amounts = [txn["amount"] for txn in history[-30:]]  # Last 30 txns
        mean_amount = np.mean(historical_amounts)
        std_amount = np.std(historical_amounts)

        if std_amount == 0:
            return 0.0, []

        z_score = abs((amount - mean_amount) / std_amount)
        flags = []

        if z_score > 5:
            flags.append(f"AMOUNT_ANOMALY: {z_score:.1f}σ from mean (${amount:,.0f} vs avg ${mean_amount:,.0f})")
            return 0.35, flags
        elif z_score > 3:
            flags.append(f"AMOUNT_UNUSUAL: {z_score:.1f}σ from mean")
            return 0.20, flags

        return 0.0, flags

    def _check_geographic_anomaly(self, account_id: str, geo_location: dict) -> tuple[float, list[str]]:
        """
        Check if transaction location deviates from historical pattern.
        """
        history = self.transaction_history.get(account_id, [])
        if len(history) < 5:
            return 0.0, []

        recent_countries = [
            txn["geo_location"]["country"]
            for txn in history[-10:]
            if "geo_location" in txn
        ]

        current_country = geo_location.get("country")
        flags = []

        if current_country not in recent_countries:
            flags.append(f"GEO_ANOMALY: New country {current_country}")
            return 0.25, flags

        return 0.0, flags

    def _check_merchant_risk(self, merchant_category: str) -> float:
        """
        High-risk merchant categories (gambling, crypto exchanges, etc.).
        """
        high_risk_categories = {
            "gambling": 0.20,
            "crypto_exchange": 0.15,
            "money_transfer": 0.15,
            "adult_services": 0.25,
            "prepaid_cards": 0.10
        }
        return high_risk_categories.get(merchant_category, 0.0)

    def _check_device_anomaly(self, account_id: str, device_id: str) -> tuple[float, list[str]]:
        """
        Check if device is new/unknown for this account.
        """
        history = self.transaction_history.get(account_id, [])
        known_devices = {
            txn["device_id"]
            for txn in history
            if "device_id" in txn
        }

        flags = []
        if device_id not in known_devices:
            flags.append(f"DEVICE_NEW: Unknown device {device_id[:8]}...")
            return 0.15, flags

        return 0.0, flags

Пример использования:

# Initialize fraud detection agent
audit_trail = ImmutableAuditTrail(storage=S3WORMStorage())
alert_system = AlertSystem(slack_webhook=SLACK_URL)
fraud_agent = FraudDetectionAgent(audit_trail, alert_system)

# Incoming transaction
transaction = {
    "transaction_id": "txn_abc123",
    "account_id": "acct_456",
    "amount": 2500.00,
    "currency": "USD",
    "merchant": "Online Retailer XYZ",
    "merchant_category": "retail",
    "timestamp": "2026-03-01T14:32:15Z",
    "ip_address": "203.0.113.45",
    "device_id": "device_789",
    "geo_location": {"country": "US", "city": "New York"}
}

# Analyze for fraud
result = fraud_agent.analyze_transaction(transaction)

print(f"Fraud Score: {result['fraud_score']}")
print(f"Risk Level: {result['risk_level']}")
print(f"Action: {result['action']}")
print(f"Flags: {result['flags']}")

if result["action"] == "block":
    # Auto-block transaction
    print("Transaction BLOCKED due to fraud risk")
elif result["action"] == "review":
    # Route to human review
    approval = approval_workflow.create_approval_request(
        agent_task_id=transaction["transaction_id"],
        decision_type="fraud_review",
        data_to_review=result,
        risk_level=result["risk_level"],
        context=f"Transaction ${transaction['amount']:,.0f} flagged: {result['flags']}"
    )
    print(f"Transaction sent to human review: {approval['approval_id']}")
else:
    # Auto-approve
    print("Transaction APPROVED")

Производительность и бенчмарки

Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.

Характеристики задержки

Получение рыночных данных:

  • Alpha Vantage API: 200–500ms на символ
  • Bloomberg BLPAPI (локальный Terminal): 50–150ms на запрос
  • Polygon.io REST API: 100–300ms
  • Yahoo Finance (yfinance): 300–800ms (включая парсинг)

Расчёт риска:

  • Исторический VaR (252 дня, один актив): 10–30ms
  • Параметрический VaR (портфель, 10 активов): 20–50ms
  • VaR методом Монте-Карло (10 000 симуляций, 10 активов): 500–1 500ms
  • Стресс-тест (4 сценария, 20 позиций): 100–300ms

Вызовы LLM API (Claude Opus 4.6):

  • Простая проверка соответствия: 1 500–3 000ms
  • Генерация отчёта о рисках портфеля: 3 000–8 000ms
  • Многошаговый агентный рабочий процесс (5 итераций): 10 000–25 000ms

Сквозной рабочий процесс (Отчёт о рисках портфеля):

  1. Получение рыночных данных (10 символов): 2 000ms
  2. Расчёт VaR + стресс-тесты: 1 000ms
  3. LLM-анализ (3 вызова инструментов): 8 000ms
  4. Генерация отчёта: 2 000ms
  5. Итого: ~13 секунд

Анализ стоимости (Claude Opus 4.6)

Тарификация: $15/MTok на вход, $75/MTok на выход

Типичный отчёт о рисках портфеля:

  • Системный промпт: 500 токенов
  • Контекст рыночных данных: 2 000 токенов
  • Определения инструментов: 800 токенов
  • История диалога (3 витка): 1 500 токенов
  • Результаты вызовов инструментов: 2 000 токенов
  • Вывод (отчёт): 1 200 токенов
  • Итого на вход: ~6 800 токенов → $0.102
  • Итого на выход: ~1 200 токенов → $0.090
  • Стоимость одного отчёта: ~$0.19

Ежемесячные расходы (100 отчётов/день):

  • Отчёты: $0.19 × 100 × 30 = $570/мес
  • Неудачные запуски / повторы (+10%): $57/мес
  • Итого расходы на LLM: ~$627/мес

Расходы на Data API:

  • Alpha Vantage Premium: $50/мес (75 вызовов/мин)
  • Polygon.io Stocks: $99/мес (неограниченные API-вызовы)
  • Bloomberg Terminal: ~$2 000/мес (обязателен для институциональных участников)

Стратегии оптимизации

1. Кэширование:

import redis
import json

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def fetch_market_data_cached(symbol: str, ttl: int = 60) -> dict:
    """Fetch with Redis cache (60s TTL for real-time quotes)."""
    cache_key = f"quote:{symbol}"
    cached = redis_client.get(cache_key)

    if cached:
        return json.loads(cached)

    # Fetch from API
    data = api.fetch_quote(symbol)
    redis_client.setex(cache_key, ttl, json.dumps(data))
    return data

2. Параллельные вызовы инструментов:

import asyncio

async def fetch_multiple_quotes(symbols: list[str]) -> dict:
    """Fetch quotes in parallel (not sequential)."""
    tasks = [api.fetch_quote_async(symbol) for symbol in symbols]
    results = await asyncio.gather(*tasks)
    return {symbol: result for symbol, result in zip(symbols, results)}

3. Предварительное вычисление:

  • Предварительно вычислять VaR ежедневно (в нерабочее время) для крупных портфелей
  • Хранить результаты в базе данных; агент запрашивает кэшированные значения
  • Сокращает время реального расчёта с 1 500ms до 50ms (запрос к базе данных)

4. Выбор модели:

  • Использовать Claude Sonnet 4 для рутинных расчётов (в 3 раза дешевле Opus)
  • Резервировать Claude Opus для сложного комплаенс-рассуждения
  • Использовать Claude Haiku для простых преобразований данных

5. Оптимизация промптов:

  • Удалять ненужные определения инструментов (включать только инструменты, актуальные для текущей задачи)
  • Сжимать системный промпт (убирать примеры, использовать лаконичные формулировки)
  • Использовать режим использования инструментов (не extended thinking) для детерминированных рабочих процессов

Оценки пропускной способности

Однопоточный агент:

  • Отчёты о рисках портфеля: ~5 отчётов/мин (13s на отчёт)
  • Обнаружение мошенничества: ~200 транзакций/мин (300ms на транзакцию)
  • Проверки соответствия (пригодность MiFID II): ~30 проверок/мин (2s на проверку)

Горизонтально масштабированный (10 рабочих экземпляров):

  • Отчёты о рисках портфеля: ~50 отчётов/мин
  • Обнаружение мошенничества: ~2 000 транзакций/мин
  • Проверки соответствия: ~300 проверок/мин