title: "Финансовые AI-агенты: оценка рисков, проверка соответствия, генерация отчётов" slug: financial-agents-risk-compliance-2026-ru date: 2026-02-24 lang: ru
Финансовые AI-агенты: оценка рисков, проверка соответствия, генерация отчётов
Ключевые факты
- Основные версии Python SDK:
anthropic==0.84.0,openai==2.24.0,langchain==1.2.10,langgraph==1.0.10 - API финансовых данных: Alpha Vantage (бесплатный тариф: 5 вызовов/мин, премиум: 75 вызовов/мин),
yfinance==1.2.0(Yahoo Finance), Polygon.io (REST + WebSocket), Bloomberg BLPAPI (требует лицензию Terminal), Refinitiv Eikon Data API - Регуляторные фреймворки — MiFID II: требует оценку пригодности для розничных клиентов при торговле сложными продуктами, документацию наилучшего исполнения (хранение 5 лет), отчётность по сделкам в NCA (T+1), раскрытие затрат при >1% от суммарных расходов
- Регуляторные фреймворки — SOX: Section 302 (сертификация финансовых отчётов CEO/CFO), Section 404 (оценка системы внутреннего контроля), разделение обязанностей (один человек не может одновременно инициировать И утверждать транзакции)
- Регуляторные фреймворки — Basel III: 10-дневный VaR при доверительном интервале 99% для требований к рыночному капиталу, стресс-тестирование при неблагоприятных сценариях, расчёт кредитного риска контрагента (CVA)
- Требования к аудиторскому следу: логировать каждый вызов LLM API (модель, токены, временная метка), каждый вызов инструмента (имя, параметры, результат), каждую точку принятия решения (проверенное условие, выбранная ветка), каждую обработанную финансовую сумму
- Хранение аудиторского следа: MiFID II требует 5 лет для записей о сделках, SOX требует бессрочное хранение для существенных позиций, использовать неизменяемые хранилища (S3 WORM, append-only базы данных)
- Контрольные точки с участием человека — пороги транзакций: <$10K (автоматически), $10K–$100K (требует одобрение старшего аналитика), $100K–$1M (требует одобрение риск-менеджера), >$1M (требует одобрение C-level)
- Контрольные точки с участием человека — пороги риска: VaR >5% от стоимости портфеля, убыток при стресс-тесте >20%, концентрация позиции >25% в одном активе, валовое плечо >5x
- Расчёт риска — методы VaR: историческое моделирование (непараметрическое, использует реальное распределение доходностей), параметрический VaR (предполагает нормальное распределение, быстрее, но менее точен для тяжёлых хвостов), метод Монте-Карло (наиболее гибкий, вычислительно затратный)
- Расчёт риска — ключевые метрики: VaR (Value at Risk — максимальный ожидаемый убыток при заданном уровне доверия), CVaR/ES (Conditional VaR/Expected Shortfall — средний убыток сверх порога VaR), стресс-тестирование (применение исторических кризисных сценариев), маржинальный VaR (вклад каждого актива в риск)
- PII в финансовом контексте — идентификаторы: SSN (regex:
\b\d{3}-\d{2}-\d{4}\b), номера банковских счетов (8–17 цифр), номера кредитных карт (16 цифр с возможными дефисами/пробелами), IBAN (2-буквенный код страны + контрольные цифры + номер счёта) - Требования к маскированию PII: никогда не отправлять сырые PII в LLM API, маскировать номера счетов до последних 4 цифр (
••••••7890), полностью редактировать SSN ([REDACTED_SSN]), использовать токенизацию для обратимого маскирования при необходимости авторизованного доступа - Время проверки соответствия: проводить проверки ПЕРЕД выполнением любой сделки, ПОСЛЕ изменений в портфеле, по расписанию (ежедневно для лимитов риска, ежемесячно для обзоров пригодности), по запросу человека-ревьюера
- Ограничения вызовов инструментов: реализовать максимальное число итераций (обычно 10–15) для предотвращения бесконечных циклов, таймаут на вызов инструмента (30s для запросов данных, 120s для сложных вычислений), паттерн circuit breaker при сбоях внешних API
- Производственные соображения: кэшировать статические справочные данные в Redis (метаданные инструментов, регуляторные правила), предварительно вычислять метрики риска по расписанию для больших портфелей, использовать streaming для длительных отчётов, параллелизировать независимые вызовы инструментов
- Пример стоимости (Claude Opus 4.6): ~7K токенов на отчёт о рисках портфеля при $15/MTok на вход + $75/MTok на выход ≈ $0.10 за отчёт, 100 отчётов/день ≈ $10/день операционных расходов
- Восстановление после ошибок: повторять при временных сбоях (3 попытки с экспоненциальным backoff 2s→4s→8s), создавать контрольные точки для долгих агентов (сохранять состояние каждые 5 итераций), хранить историю диалога для отладки упавших запусков
- Средства защиты: API-ключи в переменных окружения (никогда в коде), ротировать учётные данные Bloomberg/Reuters каждые 90 дней, IP-белый список для production API, rate limiting на пользователя/клиента (предотвращение злоупотреблений)
- Выбор модели: Claude Opus 4.6 для сложного комплаенс-рассуждения, Claude Sonnet 4 для рутинных расчётов рисков (в 3 раза дешевле), GPT-4 Turbo для комментариев к рыночной ситуации в реальном времени, избегать старых моделей для финансового анализа
Что такое финансовый агент
Финансовый AI-агент — это автономная система, в которой большая языковая модель оркестрирует выполнение специализированных инструментов для решения многошаговых задач финансового анализа в рамках регуляторных ограничений. В отличие от простых чат-ботов, отвечающих на вопросы, финансовые агенты:
- Сохраняют персистентное состояние между несколькими API-вызовами, отслеживая контекст портфеля, промежуточные расчёты и историю принятых решений
- Выполняют многошаговые рабочие процессы: получение рыночных данных → расчёт риска → проверка соответствия → генерация отчёта
- Вызывают предметно-специфичные инструменты для количественных финансов (расчёт VaR, стресс-тестирование, оптимизация портфеля) и регуляторных проверок (пригодность по MiFID II, разделение обязанностей по SOX)
- Приостанавливаются для проверки человеком при определённых порогах риска перед выполнением высокорисковых действий
- Генерируют неизменяемые аудиторские следы, документирующие каждое решение, вызов инструмента и преобразование данных для регуляторной проверки
Типовые сценарии применения финансовых агентов:
- Мониторинг риска портфеля: непрерывный расчёт VaR, отслеживание лимитов позиций, оповещения при превышении порогов
- Проверка соответствия: валидация наилучшего исполнения по MiFID II, разделения обязанностей по SOX, требований к капиталу по Basel III до подачи заявки на сделку
- Регуляторная отчётность: генерация отчётов по транзакциям MiFID II, оценок системы внутреннего контроля по SOX Section 404, отчётов о достаточности капитала по Basel III
- Обнаружение мошенничества: мониторинг транзакций в реальном времени на аномалии (проверки частоты, сопоставление паттернов, ML-скоринг)
- Инвестиционное исследование: агрегация аналитических отчётов, данных о прибыли, рыночных настроений; синтез в практические выводы с указанием источников
Финансовые агенты НЕ делают:
- Не совершают сделки автономно без одобрения человека (регуляторный запрет в большинстве юрисдикций)
- Не принимают дискреционных инвестиционных решений (требует зарегистрированного инвестиционного советника)
- Не предоставляют персонализированных инвестиционных советов розничным клиентам без оценки пригодности
- Не переопределяют жёстко заданные лимиты риска или регуляторные правила (агенты предлагают, люди одобряют исключения)
Фреймворк принятия решений
Финансовые агенты работают в трёх режимах управления в зависимости от уровня риска задачи и регуляторных ограничений:
1. Полностью автономный (без одобрения человека)
Когда применять: низкорисковые, повторяющиеся задачи с детерминированными результатами, не требующие регуляторного одобрения.
Примеры:
- Получение рыночных данных из утверждённых источников
- Вычисление стандартных метрик риска (VaR, волатильность) для существующих портфелей
- Генерация отчётов только для чтения для внутреннего использования
- Рутинные проверки соответствия, которые только фиксируют проблемы (не изменяют данные)
Реализация:
def run_autonomous_agent(task: str) -> dict:
result = agent.run(task, max_iterations=10)
audit_trail.record(event="autonomous_task", outcome=result)
return result
Средства управления рисками: жёстко заданные лимиты итераций, доступ к базе данных только на чтение, отсутствие записей во внешние API, исчерпывающее логирование.
2. Человек в контуре (одобрение на ключевых контрольных точках)
Когда применять: задачи среднего-высокого риска, где агент готовит анализ, но человек проверяет и одобряет перед выполнением.
Примеры:
- Подготовка торгового ордера >$10K (агент составляет черновик, человек проверяет соответствие/риск, человек подаёт заявку)
- Рекомендации по ребалансировке портфеля (агент предлагает изменения аллокации, человек одобряет)
- Финализация регуляторного отчёта (агент генерирует черновик, комплаенс-офицер проверяет, CCO подписывает)
- Оценка пригодности клиента для сложных продуктов (агент выставляет оценку, советник проверяет, советник одобряет)
Реализация:
async def run_human_in_loop_agent(task: str, risk_level: str) -> dict:
# Agent performs analysis
analysis = agent.run(task, pause_on=["needs_approval"])
# Create review request
review = approval_system.create_review(
data=analysis,
risk_level=risk_level,
timeout_hours=24
)
# Wait for human decision
decision = await approval_system.wait_for_review(review.id)
if decision.status == "approved":
# Agent proceeds with approved action
result = agent.continue_from_checkpoint(analysis.checkpoint)
return result
else:
audit_trail.record(event="human_rejection", reason=decision.notes)
return {"status": "rejected", "reason": decision.notes}
Средства управления рисками: разделение обязанностей (агент готовит ≠ человек одобряет), истечение таймаута (автоотклонение через 24ч), пути эскалации (высокий риск → старший ревьюер), аудиторский след решений об одобрении.
3. Правило-базированный (детерминированный, без дискреции LLM)
Когда применять: критически важные контроли, которые должны выполняться идентично каждый раз без какой-либо вариативности — как правило, для жёстких регуляторных ограничений.
Примеры:
- Остановка торговли при превышении лимита позиции (если позиция > лимит → отклонить ордер, без рассуждений LLM)
- Редактирование PII (если поле соответствует regex SSN → маскировать до
[REDACTED], без суждения LLM) - Срабатывание circuit breaker (если VaR > 10% → остановить всю торговлю, уведомить CRO)
- Проверки разделения обязанностей (если пользователь имеет роли инициатора + утверждающего → заблокировать транзакцию)
Реализация:
def rule_based_compliance_check(transaction: dict) -> dict:
violations = []
# Hard rule: no transaction >$1M without executive approval
if transaction["amount"] > 1_000_000:
if "executive_approver" not in transaction["approvals"]:
violations.append("HARD_LIMIT: $1M+ requires executive approval")
# Hard rule: VaR limit
if transaction["portfolio_var"] > 0.10: # 10% portfolio value
violations.append("HARD_LIMIT: VaR exceeds 10% threshold")
return {
"compliant": len(violations) == 0,
"violations": violations,
"rules_checked": ["amount_limit", "var_limit"],
"timestamp": datetime.utcnow().isoformat()
}
Средства управления рисками: отсутствие LLM (нулевой риск галлюцинаций), логика правил под версионным контролем, автоматическое тестирование изменений правил, неизменяемый журнал оценок правил.
Матрица решений
| Тип задачи | Уровень риска | Режим управления | Роль человека | Пример |
|---|---|---|---|---|
| Получение рыночных данных | Низкий | Автономный | Нет (только мониторинг) | Ежедневное обновление цен |
| Расчёт метрик риска | Низкий–средний | Автономный | Просматривает оповещения | Вычисление VaR портфеля |
| Торговый ордер >$10K | Средний | Человек в контуре | Одобряет после проверки | Ордер на покупку акций |
| Торговый ордер >$1M | Высокий | Человек в контуре | Одобрение старшего/C-level | Крупная институциональная сделка |
| Превышение лимита позиции | Критический | Правило-базированный | Получает оповещение, расследует | Автоостановка торговли |
| Пригодность по MiFID II (розница + сложный продукт) | Высокий | Человек в контуре | Советник проверяет + одобряет | Продажа структурированного продукта |
| Редактирование PII | Критический | Правило-базированный | Нет (автоматически) | Маскирование SSN перед логированием |
Справочная таблица параметров
| Параметр | Значение | Примечания |
|---|---|---|
| Модель | claude-opus-4-6 |
Основная модель для сложного финансового рассуждения |
| Модель (оптимизированная по стоимости) | claude-sonnet-4 |
Для рутинных расчётов (в 3 раза дешевле Opus) |
| Макс. токены | 4096 |
Достаточно для большинства финансовых отчётов; увеличить до 8K для детального анализа |
| Макс. итерации | 10 |
Предотвращение бесконечных циклов при выполнении агента |
| Температура | 0 |
Детерминированный вывод для расчётов соответствия/риска |
| Температура (исследование) | 0.3 |
Небольшое творчество для инвестиционных комментариев, всё же консервативно |
| Таймаут вызова инструмента | 30000ms |
30s для запросов рыночных данных |
| Таймаут вызова инструмента (расчёт риска) | 120000ms |
2 мин для сложных расчётов VaR/стресс-тестов |
| Таймаут одобрения человеком | 86400s |
24 часа; автоотклонение при отсутствии ответа |
| Хранение аудита | indefinite |
SOX: существенные позиции; MiFID II: не менее 5 лет |
| Доверительный интервал VaR | 0.95 |
95% для внутреннего управления рисками |
| Доверительный интервал VaR (Basel III) | 0.99 |
99% для расчёта регуляторного капитала |
| Период хранения VaR | 1 day |
Внутренняя отчётность по рискам |
| Период хранения VaR (Basel III) | 10 days |
Требование к регуляторному капиталу |
| Порог убытка при стресс-тесте | 0.20 |
Убыток 20% инициирует проверку человеком |
| Лимит концентрации позиции | 0.25 |
Не более 25% портфеля в одном активе |
| Лимит валового плеча | 5.0 |
Сумма длинных + коротких позиций / NAV |
| Сумма транзакции (автоодобрение) | $10,000 |
Ниже порога: одобрение не требуется |
| Сумма транзакции (одобрение старшего) | $100,000 |
$10K–$100K: требует старшего аналитика |
| Сумма транзакции (одобрение руководства) | $1,000,000 |
Свыше $1M: требует C-level |
| Rate limit API (Alpha Vantage бесплатный) | 5 calls/min |
Перейти на премиум для 75 вызовов/мин |
| Rate limit API (production) | 100 calls/min |
Реализовать circuit breaker при 80% порога |
| Cache TTL (рыночные данные) | 60s |
Котировки в реальном времени; увеличить до 300s для задержанных данных |
| Cache TTL (справочные данные) | 86400s |
Метаданные инструментов, статические регуляторные правила |
Типичные ошибки
Ошибка 1: Отправка PII в LLM API без редактирования
Последствия: нарушения GDPR (штрафы до 4% глобального оборота), регуляторные взыскания, потеря доверия клиентов, потенциальные обязательства по раскрытию утечки данных.
❌ Неверно:
client_profile = db.query(
"SELECT name, ssn, account_number, net_worth FROM clients WHERE id = ?",
client_id
)
# DANGER: Sending raw SSN to LLM
analysis = anthropic_client.messages.create(
model="claude-opus-4-6",
messages=[{
"role": "user",
"content": f"Analyze suitability for client: {client_profile}"
}]
)
✅ Верно:
client_profile = db.query(
"SELECT name, ssn, account_number, net_worth FROM clients WHERE id = ?",
client_id
)
# Redact PII before LLM call
redactor = PIIRedactor()
safe_profile = redactor.redact_dict(client_profile)
# Result: {"name": "[REDACTED]", "ssn": "[REDACTED_SSN]",
# "account_number": "••••••7890", "net_worth": 500000}
analysis = anthropic_client.messages.create(
model="claude-opus-4-6",
messages=[{
"role": "user",
"content": f"Analyze suitability for client profile: {safe_profile}"
}]
)
# Log redacted version to audit trail
audit_trail.record(
event="suitability_analysis",
data=safe_profile, # Never log original PII
llm_response=analysis
)
Ошибка 2: Отсутствие разделения обязанностей в рабочем процессе одобрения
Последствия: нарушение SOX Section 404 (существенный недостаток внутреннего контроля), провал аудита, повышенный риск мошенничества, потенциальные меры принуждения со стороны SEC.
❌ Неверно:
def process_payment(user_id: str, amount: float):
# DANGER: Same user can initiate AND approve
payment = create_payment(user_id, amount)
approve_payment(payment.id, approved_by=user_id) # Same user!
execute_payment(payment.id)
✅ Верно:
def process_payment(initiator_id: str, amount: float):
# Create payment (initiator role)
payment = create_payment(initiator_id, amount)
audit_trail.record(event="payment_created", actor=initiator_id, amount=amount)
# Route to approval queue
if amount > 10_000:
approval = approval_system.create_review(
payment_id=payment.id,
initiator=initiator_id,
amount=amount,
requires_role="senior_approver"
)
# Workflow pauses here; different user must approve
return {"status": "pending_approval", "approval_id": approval.id}
else:
# Small amounts: dual control not required
execute_payment(payment.id)
return {"status": "executed"}
async def approve_payment(approval_id: str, approver_id: str):
approval = db.get_approval(approval_id)
# SOX control: initiator != approver
if approval.initiator_id == approver_id:
raise ComplianceViolation(
"SOX_SOD_001: Initiator and approver cannot be the same person"
)
# Check approver has required role
if "senior_approver" not in get_user_roles(approver_id):
raise InsufficientPermissions("Requires senior_approver role")
# Execute payment
execute_payment(approval.payment_id)
audit_trail.record(
event="payment_approved",
initiator=approval.initiator_id,
approver=approver_id,
amount=approval.amount
)
Ошибка 3: Проверка соответствия ПОСЛЕ выполнения сделки
Последствия: регуляторное нарушение (требование пригодности MiFID II), потенциальные расходы на откат сделки, вред клиенту (продан неподходящий продукт), штрафы и репутационный ущерб.
❌ Неверно:
async def execute_trade(client_id: str, instrument: str, quantity: int):
# DANGER: Execute first, check compliance later
trade_result = trading_api.submit_order(
client=client_id,
symbol=instrument,
qty=quantity
)
# Too late! Trade already executed
compliance_result = mifid_checker.check_suitability(
client_id=client_id,
product=instrument
)
if not compliance_result.is_compliant:
# Now we have a problem - trade is live but non-compliant
logger.error(f"Post-trade compliance failure: {compliance_result.violations}")
✅ Верно:
async def execute_trade(client_id: str, instrument: str, quantity: int):
# Step 1: Fetch client profile and product details
client = db.get_client(client_id)
product = db.get_instrument(instrument)
# Step 2: COMPLIANCE CHECK BEFORE EXECUTION
compliance_result = mifid_checker.check_suitability(
client=client,
product=product,
investment_objective=client.objective
)
if not compliance_result.is_compliant:
audit_trail.record(
event="trade_blocked",
reason="compliance_violation",
violations=compliance_result.violations
)
return {
"status": "rejected",
"reason": "MiFID II compliance check failed",
"violations": compliance_result.violations
}
# Step 3: Risk check
risk_result = risk_engine.check_limits(
client_id=client_id,
new_position={instrument: quantity}
)
if not risk_result.within_limits:
return {
"status": "rejected",
"reason": "Risk limit breach",
"details": risk_result.breaches
}
# Step 4: Execute trade (only if compliance + risk checks pass)
trade_result = trading_api.submit_order(
client=client_id,
symbol=instrument,
qty=quantity
)
# Step 5: Log successful compliant trade
audit_trail.record(
event="trade_executed",
client=client_id,
instrument=instrument,
qty=quantity,
compliance_checks_passed=True,
trade_id=trade_result.trade_id
)
return {"status": "executed", "trade_id": trade_result.trade_id}
Ошибка 4: Использование параметрического VaR для ненормально распределённых доходностей
Последствия: существенное занижение хвостового риска (игнорируются тяжёлые хвосты и асимметрия), недостаточные резервы капитала, потенциальные маржин-коллы в условиях рыночного стресса, недостаточность капитала по Basel III.
❌ Неверно:
def calculate_var(returns: pd.Series) -> float:
# DANGER: Assumes normal distribution (invalid for crypto, options, emerging markets)
mean = returns.mean()
std = returns.std()
z_score = stats.norm.ppf(0.95) # 95% confidence
var = -(mean + z_score * std)
return var
# Applied to Bitcoin returns (highly non-normal: kurtosis ~10, skew ~-0.5)
btc_var = calculate_var(btc_returns) # Severely underestimates risk!
✅ Верно:
def calculate_var_robust(returns: pd.Series, method: str = "historical") -> dict:
if method == "historical":
# Non-parametric: uses actual distribution (handles fat tails, skewness)
sorted_returns = np.sort(returns.dropna())
var_index = int(0.05 * len(sorted_returns)) # 95% VaR = 5th percentile
var = -sorted_returns[var_index]
# CVaR (Expected Shortfall): average loss beyond VaR
tail_losses = sorted_returns[:var_index]
cvar = -np.mean(tail_losses)
elif method == "parametric":
# Use ONLY if returns are approximately normal (test first!)
from scipy.stats import jarque_bera
jb_stat, jb_pvalue = jarque_bera(returns)
if jb_pvalue < 0.05:
logger.warning(
f"Returns are non-normal (JB test p={jb_pvalue:.4f}). "
"Parametric VaR may underestimate risk."
)
mean = returns.mean()
std = returns.std()
var = -(mean + stats.norm.ppf(0.95) * std)
cvar = -(mean - std * stats.norm.pdf(stats.norm.ppf(0.95)) / 0.05)
elif method == "cornish_fisher":
# Adjusts for skewness and kurtosis (better for moderate non-normality)
from scipy.stats import skew, kurtosis
mean = returns.mean()
std = returns.std()
s = skew(returns)
k = kurtosis(returns)
# Cornish-Fisher expansion
z = stats.norm.ppf(0.95)
z_cf = z + (z**2 - 1)*s/6 + (z**3 - 3*z)*(k-3)/24 - (2*z**3 - 5*z)*s**2/36
var = -(mean + z_cf * std)
cvar = None # CF doesn't provide closed-form CVaR
return {
"var": round(float(var), 6),
"cvar": round(float(cvar), 6) if cvar else None,
"method": method,
"num_observations": len(returns),
"returns_skewness": round(float(skew(returns)), 3),
"returns_kurtosis": round(float(kurtosis(returns)), 3)
}
# Usage: detect non-normality and choose appropriate method
var_result = calculate_var_robust(btc_returns, method="historical")
# Or: var_result = calculate_var_robust(btc_returns, method="cornish_fisher")
Ошибка 5: Отсутствие circuit breaker при сбоях внешнего API
Последствия: каскадные сбои (повторные попытки агента перегружают падающий API), взрывной рост расходов (цикл повторов потребляет токены), запоздалое обнаружение сбоя, ухудшение пользовательского опыта.
❌ Неверно:
def fetch_market_data(symbol: str) -> dict:
# DANGER: Infinite retries on API failure
while True:
try:
response = requests.get(
f"https://api.example.com/quote/{symbol}",
timeout=5
)
return response.json()
except requests.exceptions.RequestException:
time.sleep(1) # Retry forever!
✅ Верно:
import tenacity
from circuitbreaker import circuit
class MarketDataAPI:
def __init__(self):
self.failure_count = 0
self.circuit_open = False
@circuit(failure_threshold=5, recovery_timeout=60, expected_exception=APIError)
@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(multiplier=1, min=2, max=10),
retry=tenacity.retry_if_exception_type(requests.exceptions.RequestException),
reraise=True
)
def fetch_market_data(self, symbol: str) -> dict:
"""
Fetches market data with:
- 3 retry attempts with exponential backoff (2s, 4s, 8s)
- Circuit breaker: opens after 5 failures, auto-recovers after 60s
"""
try:
response = requests.get(
f"https://api.example.com/quote/{symbol}",
timeout=5
)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
logger.warning(f"Timeout fetching {symbol}")
raise APIError(f"Timeout for {symbol}")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
# Rate limit hit
logger.error(f"Rate limit exceeded for {symbol}")
raise RateLimitError("API rate limit")
elif e.response.status_code >= 500:
# Server error - retry
logger.error(f"Server error for {symbol}: {e}")
raise APIError(f"Server error for {symbol}")
else:
# Client error (4xx) - don't retry
logger.error(f"Client error for {symbol}: {e}")
raise ValueError(f"Invalid symbol or request: {symbol}")
def fetch_with_fallback(self, symbol: str) -> dict:
"""
Fetch from primary API; fallback to secondary on failure.
"""
try:
return self.fetch_market_data(symbol)
except CircuitBreakerError:
logger.warning(f"Circuit open for primary API, using fallback")
return self._fetch_from_fallback_api(symbol)
except APIError:
logger.warning(f"Primary API failed for {symbol}, using fallback")
return self._fetch_from_fallback_api(symbol)
def _fetch_from_fallback_api(self, symbol: str) -> dict:
# Fallback to cached data or secondary API
cached = redis_client.get(f"quote:{symbol}")
if cached:
logger.info(f"Returning cached data for {symbol}")
return json.loads(cached)
else:
raise DataUnavailableError(f"No data available for {symbol}")
Агент мониторинга рисков
import numpy as np
import pandas as pd
from scipy import stats
from typing import Literal
class RiskMonitoringAgent:
"""
Production-grade portfolio risk monitoring agent.
Computes VaR, CVaR, stress tests, position limits.
"""
def __init__(self, audit_trail, alert_system):
self.audit = audit_trail
self.alerts = alert_system
def compute_portfolio_var(
self,
positions: dict[str, float], # {symbol: market_value}
returns_history: pd.DataFrame, # columns = symbols, index = dates
confidence_level: float = 0.95,
holding_period: int = 1,
method: Literal["historical", "parametric", "monte_carlo"] = "historical"
) -> dict:
"""
Compute portfolio VaR using specified method.
Args:
positions: Current portfolio positions {symbol: market_value_usd}
returns_history: Historical returns DataFrame (daily)
confidence_level: 0.95 for 95% VaR, 0.99 for Basel III
holding_period: Days to hold (1 for daily, 10 for Basel III)
method: VaR calculation method
Returns:
{
"var_pct": float,
"var_dollar": float,
"cvar_dollar": float,
"method": str,
"breaches_limit": bool
}
"""
portfolio_value = sum(positions.values())
# Calculate portfolio weights
weights = np.array([
positions.get(symbol, 0) / portfolio_value
for symbol in returns_history.columns
])
if method == "historical":
# Historical simulation VaR
portfolio_returns = (returns_history @ weights).dropna()
sorted_returns = np.sort(portfolio_returns)
var_index = int((1 - confidence_level) * len(sorted_returns))
var_1day = -sorted_returns[var_index]
var_scaled = var_1day * np.sqrt(holding_period)
# CVaR (Expected Shortfall)
tail_losses = sorted_returns[:var_index]
cvar_scaled = -np.mean(tail_losses) * np.sqrt(holding_period)
elif method == "parametric":
# Variance-covariance VaR
cov_matrix = returns_history.cov().values
portfolio_variance = weights @ cov_matrix @ weights
portfolio_std = np.sqrt(portfolio_variance)
z_score = stats.norm.ppf(confidence_level)
var_scaled = z_score * portfolio_std * np.sqrt(holding_period)
# CVaR for normal distribution
cvar_scaled = (portfolio_std * stats.norm.pdf(z_score) /
(1 - confidence_level)) * np.sqrt(holding_period)
elif method == "monte_carlo":
# Monte Carlo simulation (10,000 scenarios)
cov_matrix = returns_history.cov().values
mean_returns = returns_history.mean().values
n_simulations = 10_000
simulated_returns = np.random.multivariate_normal(
mean_returns * holding_period,
cov_matrix * holding_period,
n_simulations
)
portfolio_sim_returns = simulated_returns @ weights
var_scaled = -np.percentile(portfolio_sim_returns, (1 - confidence_level) * 100)
cvar_scaled = -np.mean(portfolio_sim_returns[
portfolio_sim_returns <= -var_scaled
])
var_dollar = var_scaled * portfolio_value
cvar_dollar = cvar_scaled * portfolio_value
# Check if VaR breaches threshold (5% of portfolio value)
var_limit = 0.05 * portfolio_value
breaches_limit = var_dollar > var_limit
if breaches_limit:
self.alerts.send_alert(
severity="high",
message=f"VaR ${var_dollar:,.0f} exceeds limit ${var_limit:,.0f}",
portfolio_id=self._get_portfolio_id(positions)
)
# Log to audit trail
self.audit.record(
event="var_calculation",
data={
"portfolio_value": portfolio_value,
"var_dollar": round(var_dollar, 2),
"cvar_dollar": round(cvar_dollar, 2),
"method": method,
"confidence_level": confidence_level,
"holding_period": holding_period
},
outcome="completed"
)
return {
"var_pct": round(var_scaled, 6),
"var_dollar": round(var_dollar, 2),
"cvar_dollar": round(cvar_dollar, 2),
"portfolio_value": portfolio_value,
"method": method,
"confidence_level": confidence_level,
"holding_period_days": holding_period,
"breaches_limit": breaches_limit,
"limit_threshold": var_limit
}
def run_stress_tests(
self,
positions: dict[str, float],
scenario: Literal["2008_crisis", "covid_2020", "rate_hike_2022", "custom"] = "2008_crisis",
custom_shocks: dict[str, float] = None
) -> dict:
"""
Apply historical stress scenarios to portfolio.
Args:
positions: {symbol: market_value_usd}
scenario: Pre-defined scenario or "custom"
custom_shocks: If scenario="custom", provide {symbol: shock_pct}
Returns:
{
"scenario": str,
"total_pnl": float,
"total_pnl_pct": float,
"breaches_threshold": bool,
"asset_impacts": dict
}
"""
scenarios = {
"2008_crisis": {
"description": "Lehman Brothers collapse (Sep 2008)",
"shocks": {
"SPY": -0.40, "QQQ": -0.45, "IWM": -0.50, # Equities
"TLT": +0.10, "IEF": +0.08, # Bonds rallied
"GLD": -0.05, "USO": -0.60, # Commodities
"UUP": +0.15 # USD strengthened
}
},
"covid_2020": {
"description": "COVID market crash (Mar 2020)",
"shocks": {
"SPY": -0.34, "QQQ": -0.30, "IWM": -0.40,
"TLT": +0.05, "GLD": +0.10, "USO": -0.50
}
},
"rate_hike_2022": {
"description": "Fed rate hike cycle (2022)",
"shocks": {
"SPY": -0.18, "QQQ": -0.33, "TLT": -0.20,
"GLD": -0.08, "UUP": +0.12
}
}
}
if scenario == "custom":
if not custom_shocks:
raise ValueError("Must provide custom_shocks when scenario='custom'")
shock_dict = custom_shocks
description = "Custom scenario"
else:
shock_dict = scenarios[scenario]["shocks"]
description = scenarios[scenario]["description"]
portfolio_value = sum(positions.values())
total_pnl = 0
asset_impacts = {}
for symbol, position_value in positions.items():
shock = shock_dict.get(symbol, -0.15) # Default -15% if not specified
pnl = position_value * shock
total_pnl += pnl
asset_impacts[symbol] = {
"current_value": position_value,
"shock_applied_pct": f"{shock * 100:.1f}%",
"pnl": round(pnl, 2),
"post_shock_value": round(position_value * (1 + shock), 2)
}
total_pnl_pct = total_pnl / portfolio_value
# Check threshold: 20% loss triggers human review
loss_threshold = -0.20
breaches_threshold = total_pnl_pct < loss_threshold
if breaches_threshold:
self.alerts.send_alert(
severity="critical",
message=(
f"Stress test {scenario}: Portfolio loss "
f"{total_pnl_pct*100:.1f}% exceeds {loss_threshold*100}% threshold"
),
portfolio_id=self._get_portfolio_id(positions)
)
self.audit.record(
event="stress_test",
data={
"scenario": scenario,
"description": description,
"total_pnl": round(total_pnl, 2),
"total_pnl_pct": round(total_pnl_pct, 4),
"breaches_threshold": breaches_threshold
},
outcome="completed"
)
return {
"scenario": scenario,
"description": description,
"total_pnl": round(total_pnl, 2),
"total_pnl_pct": f"{total_pnl_pct * 100:.2f}%",
"portfolio_value": portfolio_value,
"breaches_threshold": breaches_threshold,
"threshold": f"{loss_threshold * 100}%",
"asset_impacts": asset_impacts
}
def check_position_limits(
self,
positions: dict[str, float],
concentration_limit: float = 0.25,
leverage_limit: float = 5.0
) -> dict:
"""
Check portfolio concentration and leverage limits.
Args:
positions: {symbol: market_value_usd} (negative for shorts)
concentration_limit: Max % of portfolio in single position
leverage_limit: Max gross leverage (long + short) / NAV
Returns:
{
"concentration_violations": list,
"leverage_ratio": float,
"leverage_violation": bool,
"compliant": bool
}
"""
portfolio_value = sum(abs(v) for v in positions.values()) # Gross value
nav = sum(positions.values()) # Net asset value
concentration_violations = []
for symbol, position_value in positions.items():
concentration = abs(position_value) / nav
if concentration > concentration_limit:
concentration_violations.append({
"symbol": symbol,
"concentration_pct": f"{concentration * 100:.1f}%",
"limit_pct": f"{concentration_limit * 100}%",
"position_value": position_value
})
gross_leverage = portfolio_value / nav if nav > 0 else 0
leverage_violation = gross_leverage > leverage_limit
compliant = len(concentration_violations) == 0 and not leverage_violation
if not compliant:
self.alerts.send_alert(
severity="high",
message=f"Position limit breach: {len(concentration_violations)} concentration violations, leverage {gross_leverage:.2f}x",
portfolio_id=self._get_portfolio_id(positions)
)
self.audit.record(
event="position_limit_check",
data={
"concentration_violations": len(concentration_violations),
"gross_leverage": round(gross_leverage, 2),
"compliant": compliant
},
outcome="completed"
)
return {
"concentration_violations": concentration_violations,
"gross_leverage": round(gross_leverage, 2),
"leverage_limit": leverage_limit,
"leverage_violation": leverage_violation,
"compliant": compliant
}
def _get_portfolio_id(self, positions: dict) -> str:
# In production: lookup portfolio ID from positions
return "portfolio_" + hashlib.md5(
json.dumps(positions, sort_keys=True).encode()
).hexdigest()[:8]
Пример использования:
# Initialize agent
audit_trail = ImmutableAuditTrail(storage=S3WORMStorage())
alert_system = AlertSystem(slack_webhook=SLACK_URL)
risk_agent = RiskMonitoringAgent(audit_trail, alert_system)
# Current portfolio
portfolio = {
"SPY": 500_000, # S&P 500 ETF
"QQQ": 300_000, # Nasdaq ETF
"TLT": 150_000, # 20Y Treasury ETF
"GLD": 50_000 # Gold ETF
}
# Fetch historical returns (252 trading days)
returns_df = fetch_historical_returns(
symbols=list(portfolio.keys()),
days=252
)
# Compute VaR
var_result = risk_agent.compute_portfolio_var(
positions=portfolio,
returns_history=returns_df,
confidence_level=0.95,
holding_period=1,
method="historical"
)
print(f"95% 1-day VaR: ${var_result['var_dollar']:,.0f}")
print(f"Breaches limit: {var_result['breaches_limit']}")
# Run stress tests
stress_result = risk_agent.run_stress_tests(
positions=portfolio,
scenario="2008_crisis"
)
print(f"2008 crisis scenario P&L: ${stress_result['total_pnl']:,.0f}")
# Check position limits
limits_result = risk_agent.check_position_limits(
positions=portfolio,
concentration_limit=0.25,
leverage_limit=5.0
)
print(f"Position limits compliant: {limits_result['compliant']}")
Соответствие требованиям и аудиторский след
import hashlib
import json
import uuid
from datetime import datetime
from enum import Enum
class ComplianceFramework(Enum):
MIFID_II = "mifid_ii"
SOX = "sox"
BASEL_III = "basel_iii"
GDPR = "gdpr"
class ImmutableAuditTrail:
"""
Hash-chained audit trail for tamper detection.
Each entry's hash includes the previous entry's hash (blockchain-style).
"""
def __init__(self, storage_backend):
self.storage = storage_backend
self._last_hash = self._load_last_hash()
def record(
self,
event_type: str,
actor: str, # "financial_agent_v2" or user ID
action: str,
data: dict,
outcome: str,
metadata: dict = None
) -> str:
"""
Record immutable audit event.
Args:
event_type: "llm_call", "tool_call", "compliance_check", "human_approval", etc.
actor: Agent ID or user ID
action: Specific action taken
data: Relevant data (PII must be redacted before calling this)
outcome: "success", "failure", "pending", etc.
metadata: Additional context
Returns:
event_id: UUID of recorded event
"""
event_id = str(uuid.uuid4())
timestamp = datetime.utcnow().isoformat() + "Z"
event = {
"event_id": event_id,
"timestamp": timestamp,
"event_type": event_type,
"actor": actor,
"action": action,
"outcome": outcome,
"data": data,
"metadata": metadata or {}
}
# Hash chaining for tamper detection
event_json = json.dumps(event, sort_keys=True, default=str)
current_hash = hashlib.sha256(
f"{self._last_hash}{event_json}".encode()
).hexdigest()
event["hash"] = current_hash
event["previous_hash"] = self._last_hash
# Persist to immutable storage
self.storage.append(event)
self._last_hash = current_hash
return event_id
def verify_chain_integrity(self, from_event_id: str = None) -> dict:
"""
Verify audit trail has not been tampered with.
Recomputes hashes and checks chain linkage.
"""
events = self.storage.read_all(from_event_id=from_event_id)
violations = []
prev_hash = events[0]["previous_hash"] if events else "GENESIS"
for event in events:
# Reconstruct hash
event_copy = {k: v for k, v in event.items()
if k not in ("hash", "previous_hash")}
event_json = json.dumps(event_copy, sort_keys=True, default=str)
expected_hash = hashlib.sha256(
f"{event['previous_hash']}{event_json}".encode()
).hexdigest()
if event["hash"] != expected_hash:
violations.append({
"event_id": event["event_id"],
"timestamp": event["timestamp"],
"issue": "Hash mismatch - potential tampering"
})
if event["previous_hash"] != prev_hash:
violations.append({
"event_id": event["event_id"],
"timestamp": event["timestamp"],
"issue": "Chain break - event sequence tampered"
})
prev_hash = event["hash"]
return {
"integrity_verified": len(violations) == 0,
"events_checked": len(events),
"violations": violations,
"verification_timestamp": datetime.utcnow().isoformat() + "Z"
}
def _load_last_hash(self) -> str:
last_event = self.storage.get_latest()
return last_event["hash"] if last_event else "GENESIS"
class MiFIDIIComplianceAgent:
"""
MiFID II compliance checker for financial agents.
Enforces suitability, best execution, transaction reporting.
"""
def __init__(self, audit_trail):
self.audit = audit_trail
self.COST_DISCLOSURE_THRESHOLD = 0.01 # 1%
def check_suitability(
self,
client: dict,
product: dict,
investment_objective: str
) -> dict:
"""
MiFID II Article 25: Suitability assessment.
Required for investment advice and portfolio management.
Args:
client: {
"client_id": str,
"category": "retail" | "professional" | "eligible_counterparty",
"risk_tolerance_score": int (1-10),
"net_worth_eur": float,
"has_relevant_experience": bool,
"objective": "growth" | "income" | "capital_preservation"
}
product: {
"isin": str,
"complexity": "simple" | "non_complex" | "complex",
"risk_score": int (1-10),
"min_recommended_net_worth": float,
"max_loss_scenario_pct": float,
"total_costs_pct": float
}
investment_objective: Client's stated objective for this trade
Returns:
{
"is_compliant": bool,
"violations": list[str],
"required_disclosures": list[str],
"suitability_score": float
}
"""
violations = []
disclosures = []
# Rule 1: Retail + Complex = Full suitability required
if (client["category"] == "retail" and
product["complexity"] == "complex"):
# Check knowledge & experience
if not client.get("has_relevant_experience", False):
violations.append(
"MIFID2_SUITABILITY_001: Retail client lacks required "
"knowledge/experience for complex product"
)
# Check financial situation
if client["net_worth_eur"] < product.get("min_recommended_net_worth", 0):
violations.append(
f"MIFID2_SUITABILITY_002: Client net worth "
f"€{client['net_worth_eur']:,.0f} below product minimum "
f"€{product['min_recommended_net_worth']:,.0f}"
)
# Check risk tolerance
client_risk = client.get("risk_tolerance_score", 0)
product_risk = product.get("risk_score", 10)
if client_risk < product_risk - 2:
violations.append(
f"MIFID2_SUITABILITY_003: Risk mismatch - client {client_risk}/10 "
f"vs product {product_risk}/10"
)
# Rule 2: Investment objective alignment
if investment_objective == "capital_preservation":
if product.get("max_loss_scenario_pct", 0) > 0.20:
violations.append(
f"MIFID2_SUITABILITY_004: Product max loss "
f"{product['max_loss_scenario_pct']*100:.0f}% incompatible "
"with capital preservation objective"
)
# Rule 3: Cost disclosure (Article 50)
if product.get("total_costs_pct", 0) > self.COST_DISCLOSURE_THRESHOLD:
disclosures.append(
f"COST_DISCLOSURE: Total costs "
f"{product['total_costs_pct']*100:.2f}% must be disclosed "
"to client before execution (MiFID II Art. 50)"
)
# Rule 4: Best interest obligation for retail
if client["category"] == "retail":
disclosures.append(
"BEST_INTEREST: Firm must act in client's best interest (MiFID II Art. 24.1)"
)
# Compute suitability score (0-1)
suitability_score = self._compute_suitability_score(client, product)
# Audit log
self.audit.record(
event_type="compliance_check",
actor="mifid_compliance_agent",
action="suitability_assessment",
data={
"client_id": client["client_id"],
"product_isin": product["isin"],
"is_compliant": len(violations) == 0,
"violations_count": len(violations),
"suitability_score": suitability_score
},
outcome="completed"
)
return {
"is_compliant": len(violations) == 0,
"violations": violations,
"required_disclosures": disclosures,
"suitability_score": round(suitability_score, 2),
"framework": "MiFID II Article 25"
}
def check_best_execution(
self,
order: dict,
execution_venues: list[dict]
) -> dict:
"""
MiFID II Article 27: Best execution obligation.
Args:
order: {"side": "buy" | "sell", "quantity": int, "client_category": str}
execution_venues: [
{
"venue_name": str,
"bid_price": float,
"ask_price": float,
"fill_probability": float,
"avg_latency_ms": float,
"reliability_score": float (0-1)
}
]
Returns:
{
"best_execution_met": bool,
"selected_venue": str,
"selection_rationale": str
}
"""
if not execution_venues:
return {
"best_execution_met": False,
"reason": "No execution venues provided",
"selected_venue": None
}
# Score venues (price = primary factor for retail)
scored_venues = []
for venue in execution_venues:
# Price score
if order["side"] == "buy":
price_score = -venue["ask_price"] # Lower ask = better
else:
price_score = venue["bid_price"] # Higher bid = better
# Composite score (MiFID II factor weights)
score = (
0.60 * price_score +
0.20 * venue["fill_probability"] +
0.10 * (-venue["avg_latency_ms"] / 1000) +
0.10 * venue["reliability_score"]
)
scored_venues.append({**venue, "best_exec_score": score})
best_venue = max(scored_venues, key=lambda x: x["best_exec_score"])
# Audit log
self.audit.record(
event_type="compliance_check",
actor="mifid_compliance_agent",
action="best_execution_check",
data={
"order_side": order["side"],
"selected_venue": best_venue["venue_name"],
"venues_considered": len(execution_venues)
},
outcome="completed"
)
return {
"best_execution_met": True,
"selected_venue": best_venue["venue_name"],
"selection_rationale": "Best price with adequate fill probability and low latency",
"all_venues_considered": [v["venue_name"] for v in execution_venues],
"documentation_required": True, # 5-year retention
"framework": "MiFID II Article 27"
}
def _compute_suitability_score(self, client: dict, product: dict) -> float:
"""Compute 0-1 suitability score."""
score = 1.0
# Risk tolerance alignment
risk_diff = abs(client.get("risk_tolerance_score", 5) -
product.get("risk_score", 5))
score -= risk_diff * 0.10 # -0.10 per point of mismatch
# Net worth adequacy
if product.get("min_recommended_net_worth", 0) > 0:
adequacy_ratio = (client["net_worth_eur"] /
product["min_recommended_net_worth"])
if adequacy_ratio < 1.0:
score -= (1.0 - adequacy_ratio) * 0.30
# Experience
if not client.get("has_relevant_experience") and product["complexity"] == "complex":
score -= 0.25
return max(0.0, score)
class HumanApprovalWorkflow:
"""
Human-in-the-loop approval system for high-risk decisions.
Integrates with ticketing systems (Jira, ServiceNow) and Slack.
"""
def __init__(self, db_client, notification_client, audit_trail):
self.db = db_client
self.notifier = notification_client
self.audit = audit_trail
def create_approval_request(
self,
agent_task_id: str,
decision_type: str,
data_to_review: dict,
risk_level: Literal["low", "medium", "high", "critical"],
context: str,
timeout_hours: int = 24
) -> dict:
"""
Create human approval request and pause agent execution.
Args:
agent_task_id: ID of the paused agent task
decision_type: "trade_execution", "compliance_override", etc.
data_to_review: Data requiring human judgment (PII redacted)
risk_level: Determines reviewer assignment
context: Human-readable explanation
timeout_hours: Auto-reject if no response
Returns:
{"approval_id": str, "status": "pending"}
"""
approval_id = str(uuid.uuid4())
# Assign reviewers based on risk level
reviewer_pool = self._assign_reviewers(risk_level)
approval_request = {
"approval_id": approval_id,
"agent_task_id": agent_task_id,
"decision_type": decision_type,
"data": data_to_review,
"context": context,
"risk_level": risk_level,
"status": "pending",
"assigned_to": reviewer_pool,
"created_at": datetime.utcnow().isoformat(),
"expires_at": (
datetime.utcnow() + timedelta(hours=timeout_hours)
).isoformat()
}
# Persist to database
self.db.insert("approval_requests", approval_request)
# Notify reviewers (Slack, email, SMS for critical)
self.notifier.send_review_notification(
reviewer_pool=reviewer_pool,
approval_id=approval_id,
risk_level=risk_level,
summary=f"{decision_type}: {context[:200]}",
urgency="high" if risk_level == "critical" else "normal"
)
# Audit log
self.audit.record(
event_type="human_approval",
actor="approval_workflow",
action="approval_request_created",
data={
"approval_id": approval_id,
"risk_level": risk_level,
"assigned_to": reviewer_pool
},
outcome="pending"
)
return {"approval_id": approval_id, "status": "pending"}
async def wait_for_approval(
self,
approval_id: str,
poll_interval_seconds: int = 30
) -> dict:
"""
Poll for approval decision (agent suspends here).
In production: use webhooks or message queues instead of polling.
Returns:
{
"status": "approved" | "rejected" | "expired",
"reviewer_notes": str,
"reviewed_by": str,
"reviewed_at": str
}
"""
import asyncio
start_time = datetime.utcnow()
approval = self.db.find_one("approval_requests", {"approval_id": approval_id})
timeout = datetime.fromisoformat(approval["expires_at"])
while True:
if datetime.utcnow() > timeout:
# Timeout expired
self.db.update(
"approval_requests",
{"approval_id": approval_id},
{"status": "expired"}
)
self.audit.record(
event_type="human_approval",
actor="approval_workflow",
action="approval_expired",
data={"approval_id": approval_id},
outcome="expired"
)
return {"status": "expired", "approval_id": approval_id}
# Check status
approval = self.db.find_one("approval_requests", {"approval_id": approval_id})
if approval["status"] in ["approved", "rejected"]:
self.audit.record(
event_type="human_approval",
actor=approval.get("reviewed_by", "unknown"),
action=f"approval_{approval['status']}",
data={"approval_id": approval_id},
outcome=approval["status"]
)
return {
"status": approval["status"],
"approval_id": approval_id,
"reviewer_notes": approval.get("reviewer_notes"),
"reviewed_by": approval.get("reviewed_by"),
"reviewed_at": approval.get("reviewed_at")
}
await asyncio.sleep(poll_interval_seconds)
def _assign_reviewers(self, risk_level: str) -> list[str]:
"""Map risk level to reviewer team."""
pools = {
"low": ["junior_analyst_team"],
"medium": ["senior_analyst_team"],
"high": ["risk_manager_team"],
"critical": ["cro_team", "cfo_team"] # C-level for critical
}
return pools.get(risk_level, ["senior_analyst_team"])
Сквозной пример:
async def execute_compliant_trade(
client_id: str,
isin: str,
quantity: int,
side: Literal["buy", "sell"]
):
"""
Full compliance workflow: suitability → risk → approval → execution.
"""
# Initialize systems
audit = ImmutableAuditTrail(storage=S3WORMStorage())
compliance_agent = MiFIDIIComplianceAgent(audit)
approval_workflow = HumanApprovalWorkflow(
db_client=PostgresDB(),
notification_client=SlackNotifier(),
audit_trail=audit
)
# Step 1: Fetch client and product data
client = db.get_client(client_id)
product = db.get_instrument(isin)
# Step 2: MiFID II suitability check
suitability = compliance_agent.check_suitability(
client=client,
product=product,
investment_objective=client["objective"]
)
if not suitability["is_compliant"]:
return {
"status": "rejected",
"reason": "MiFID II suitability check failed",
"violations": suitability["violations"]
}
# Step 3: Best execution check
venues = fetch_execution_venues(isin)
best_exec = compliance_agent.check_best_execution(
order={"side": side, "quantity": quantity, "client_category": client["category"]},
execution_venues=venues
)
# Step 4: Calculate trade value and check approval threshold
estimated_price = venues[0]["ask_price"] if side == "buy" else venues[0]["bid_price"]
trade_value = estimated_price * quantity
if trade_value > 10_000:
# Step 5: Human approval required
approval = approval_workflow.create_approval_request(
agent_task_id=f"trade_{uuid.uuid4()}",
decision_type="trade_execution",
data_to_review={
"client_id": client_id,
"isin": isin,
"quantity": quantity,
"side": side,
"estimated_value": trade_value,
"suitability_score": suitability["suitability_score"]
},
risk_level="high" if trade_value > 100_000 else "medium",
context=f"{side.upper()} {quantity} units of {isin} for €{trade_value:,.0f}",
timeout_hours=24
)
decision = await approval_workflow.wait_for_approval(approval["approval_id"])
if decision["status"] != "approved":
return {
"status": "rejected",
"reason": f"Human approval {decision['status']}",
"notes": decision.get("reviewer_notes")
}
# Step 6: Execute trade
trade_result = trading_api.submit_order(
client=client_id,
isin=isin,
quantity=quantity,
side=side,
venue=best_exec["selected_venue"]
)
# Step 7: Log to audit trail
audit.record(
event_type="trade_execution",
actor="financial_agent",
action=f"{side}_order_executed",
data={
"client_id": client_id,
"isin": isin,
"quantity": quantity,
"trade_id": trade_result["trade_id"],
"compliance_checks_passed": True,
"human_approved": trade_value > 10_000
},
outcome="success"
)
return {
"status": "executed",
"trade_id": trade_result["trade_id"],
"venue": best_exec["selected_venue"]
}
Агент обнаружения мошенничества
import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict
from typing import Literal
class FraudDetectionAgent:
"""
Real-time transaction fraud detection using rule-based and ML approaches.
"""
def __init__(self, audit_trail, alert_system):
self.audit = audit_trail
self.alerts = alert_system
self.transaction_history = defaultdict(list) # {account_id: [transactions]}
def analyze_transaction(
self,
transaction: dict
) -> dict:
"""
Analyze transaction for fraud indicators.
Args:
transaction: {
"transaction_id": str,
"account_id": str,
"amount": float,
"currency": str,
"merchant": str,
"merchant_category": str,
"timestamp": str (ISO format),
"ip_address": str,
"device_id": str,
"geo_location": {"country": str, "city": str}
}
Returns:
{
"fraud_score": float (0-1),
"risk_level": "low" | "medium" | "high" | "critical",
"flags": list[str],
"action": "approve" | "review" | "block"
}
"""
flags = []
fraud_score = 0.0
# Rule 1: Velocity check (transaction frequency)
velocity_score, velocity_flags = self._check_velocity(
transaction["account_id"],
transaction["timestamp"]
)
fraud_score += velocity_score
flags.extend(velocity_flags)
# Rule 2: Amount anomaly
amount_score, amount_flags = self._check_amount_anomaly(
transaction["account_id"],
transaction["amount"]
)
fraud_score += amount_score
flags.extend(amount_flags)
# Rule 3: Geographic anomaly
geo_score, geo_flags = self._check_geographic_anomaly(
transaction["account_id"],
transaction["geo_location"]
)
fraud_score += geo_score
flags.extend(geo_flags)
# Rule 4: Merchant risk
merchant_score = self._check_merchant_risk(
transaction["merchant_category"]
)
fraud_score += merchant_score
# Rule 5: Device fingerprint
device_score, device_flags = self._check_device_anomaly(
transaction["account_id"],
transaction["device_id"]
)
fraud_score += device_score
flags.extend(device_flags)
# Normalize score to 0-1
fraud_score = min(1.0, fraud_score)
# Determine risk level and action
if fraud_score >= 0.80:
risk_level = "critical"
action = "block"
elif fraud_score >= 0.60:
risk_level = "high"
action = "review"
elif fraud_score >= 0.40:
risk_level = "medium"
action = "review"
else:
risk_level = "low"
action = "approve"
# Alert if high risk
if risk_level in ["high", "critical"]:
self.alerts.send_alert(
severity=risk_level,
message=f"Fraud detected: Account {transaction['account_id']}, "
f"Score {fraud_score:.2f}, Flags: {flags}",
transaction_id=transaction["transaction_id"]
)
# Audit log
self.audit.record(
event_type="fraud_detection",
actor="fraud_detection_agent",
action="transaction_analysis",
data={
"transaction_id": transaction["transaction_id"],
"account_id": transaction["account_id"],
"fraud_score": round(fraud_score, 3),
"risk_level": risk_level,
"action": action,
"flags": flags
},
outcome=action
)
# Store transaction in history
self.transaction_history[transaction["account_id"]].append(transaction)
return {
"fraud_score": round(fraud_score, 3),
"risk_level": risk_level,
"flags": flags,
"action": action,
"transaction_id": transaction["transaction_id"]
}
def _check_velocity(self, account_id: str, timestamp: str) -> tuple[float, list[str]]:
"""
Check transaction frequency (velocity rule).
>5 transactions in 10 minutes = suspicious.
"""
current_time = datetime.fromisoformat(timestamp)
recent_window = current_time - timedelta(minutes=10)
recent_txns = [
txn for txn in self.transaction_history.get(account_id, [])
if datetime.fromisoformat(txn["timestamp"]) > recent_window
]
txn_count = len(recent_txns)
flags = []
if txn_count > 10:
flags.append(f"VELOCITY_CRITICAL: {txn_count} transactions in 10 minutes")
return 0.40, flags
elif txn_count > 5:
flags.append(f"VELOCITY_HIGH: {txn_count} transactions in 10 minutes")
return 0.25, flags
elif txn_count > 3:
flags.append(f"VELOCITY_MEDIUM: {txn_count} transactions in 10 minutes")
return 0.10, flags
return 0.0, flags
def _check_amount_anomaly(self, account_id: str, amount: float) -> tuple[float, list[str]]:
"""
Check if amount deviates significantly from historical pattern.
"""
history = self.transaction_history.get(account_id, [])
if len(history) < 10:
# Insufficient history
return 0.0, []
historical_amounts = [txn["amount"] for txn in history[-30:]] # Last 30 txns
mean_amount = np.mean(historical_amounts)
std_amount = np.std(historical_amounts)
if std_amount == 0:
return 0.0, []
z_score = abs((amount - mean_amount) / std_amount)
flags = []
if z_score > 5:
flags.append(f"AMOUNT_ANOMALY: {z_score:.1f}σ from mean (${amount:,.0f} vs avg ${mean_amount:,.0f})")
return 0.35, flags
elif z_score > 3:
flags.append(f"AMOUNT_UNUSUAL: {z_score:.1f}σ from mean")
return 0.20, flags
return 0.0, flags
def _check_geographic_anomaly(self, account_id: str, geo_location: dict) -> tuple[float, list[str]]:
"""
Check if transaction location deviates from historical pattern.
"""
history = self.transaction_history.get(account_id, [])
if len(history) < 5:
return 0.0, []
recent_countries = [
txn["geo_location"]["country"]
for txn in history[-10:]
if "geo_location" in txn
]
current_country = geo_location.get("country")
flags = []
if current_country not in recent_countries:
flags.append(f"GEO_ANOMALY: New country {current_country}")
return 0.25, flags
return 0.0, flags
def _check_merchant_risk(self, merchant_category: str) -> float:
"""
High-risk merchant categories (gambling, crypto exchanges, etc.).
"""
high_risk_categories = {
"gambling": 0.20,
"crypto_exchange": 0.15,
"money_transfer": 0.15,
"adult_services": 0.25,
"prepaid_cards": 0.10
}
return high_risk_categories.get(merchant_category, 0.0)
def _check_device_anomaly(self, account_id: str, device_id: str) -> tuple[float, list[str]]:
"""
Check if device is new/unknown for this account.
"""
history = self.transaction_history.get(account_id, [])
known_devices = {
txn["device_id"]
for txn in history
if "device_id" in txn
}
flags = []
if device_id not in known_devices:
flags.append(f"DEVICE_NEW: Unknown device {device_id[:8]}...")
return 0.15, flags
return 0.0, flags
Пример использования:
# Initialize fraud detection agent
audit_trail = ImmutableAuditTrail(storage=S3WORMStorage())
alert_system = AlertSystem(slack_webhook=SLACK_URL)
fraud_agent = FraudDetectionAgent(audit_trail, alert_system)
# Incoming transaction
transaction = {
"transaction_id": "txn_abc123",
"account_id": "acct_456",
"amount": 2500.00,
"currency": "USD",
"merchant": "Online Retailer XYZ",
"merchant_category": "retail",
"timestamp": "2026-03-01T14:32:15Z",
"ip_address": "203.0.113.45",
"device_id": "device_789",
"geo_location": {"country": "US", "city": "New York"}
}
# Analyze for fraud
result = fraud_agent.analyze_transaction(transaction)
print(f"Fraud Score: {result['fraud_score']}")
print(f"Risk Level: {result['risk_level']}")
print(f"Action: {result['action']}")
print(f"Flags: {result['flags']}")
if result["action"] == "block":
# Auto-block transaction
print("Transaction BLOCKED due to fraud risk")
elif result["action"] == "review":
# Route to human review
approval = approval_workflow.create_approval_request(
agent_task_id=transaction["transaction_id"],
decision_type="fraud_review",
data_to_review=result,
risk_level=result["risk_level"],
context=f"Transaction ${transaction['amount']:,.0f} flagged: {result['flags']}"
)
print(f"Transaction sent to human review: {approval['approval_id']}")
else:
# Auto-approve
print("Transaction APPROVED")
Производительность и бенчмарки
Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.
Характеристики задержки
Получение рыночных данных:
- Alpha Vantage API: 200–500ms на символ
- Bloomberg BLPAPI (локальный Terminal): 50–150ms на запрос
- Polygon.io REST API: 100–300ms
- Yahoo Finance (yfinance): 300–800ms (включая парсинг)
Расчёт риска:
- Исторический VaR (252 дня, один актив): 10–30ms
- Параметрический VaR (портфель, 10 активов): 20–50ms
- VaR методом Монте-Карло (10 000 симуляций, 10 активов): 500–1 500ms
- Стресс-тест (4 сценария, 20 позиций): 100–300ms
Вызовы LLM API (Claude Opus 4.6):
- Простая проверка соответствия: 1 500–3 000ms
- Генерация отчёта о рисках портфеля: 3 000–8 000ms
- Многошаговый агентный рабочий процесс (5 итераций): 10 000–25 000ms
Сквозной рабочий процесс (Отчёт о рисках портфеля):
- Получение рыночных данных (10 символов): 2 000ms
- Расчёт VaR + стресс-тесты: 1 000ms
- LLM-анализ (3 вызова инструментов): 8 000ms
- Генерация отчёта: 2 000ms
- Итого: ~13 секунд
Анализ стоимости (Claude Opus 4.6)
Тарификация: $15/MTok на вход, $75/MTok на выход
Типичный отчёт о рисках портфеля:
- Системный промпт: 500 токенов
- Контекст рыночных данных: 2 000 токенов
- Определения инструментов: 800 токенов
- История диалога (3 витка): 1 500 токенов
- Результаты вызовов инструментов: 2 000 токенов
- Вывод (отчёт): 1 200 токенов
- Итого на вход: ~6 800 токенов → $0.102
- Итого на выход: ~1 200 токенов → $0.090
- Стоимость одного отчёта: ~$0.19
Ежемесячные расходы (100 отчётов/день):
- Отчёты: $0.19 × 100 × 30 = $570/мес
- Неудачные запуски / повторы (+10%): $57/мес
- Итого расходы на LLM: ~$627/мес
Расходы на Data API:
- Alpha Vantage Premium: $50/мес (75 вызовов/мин)
- Polygon.io Stocks: $99/мес (неограниченные API-вызовы)
- Bloomberg Terminal: ~$2 000/мес (обязателен для институциональных участников)
Стратегии оптимизации
1. Кэширование:
import redis
import json
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def fetch_market_data_cached(symbol: str, ttl: int = 60) -> dict:
"""Fetch with Redis cache (60s TTL for real-time quotes)."""
cache_key = f"quote:{symbol}"
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from API
data = api.fetch_quote(symbol)
redis_client.setex(cache_key, ttl, json.dumps(data))
return data
2. Параллельные вызовы инструментов:
import asyncio
async def fetch_multiple_quotes(symbols: list[str]) -> dict:
"""Fetch quotes in parallel (not sequential)."""
tasks = [api.fetch_quote_async(symbol) for symbol in symbols]
results = await asyncio.gather(*tasks)
return {symbol: result for symbol, result in zip(symbols, results)}
3. Предварительное вычисление:
- Предварительно вычислять VaR ежедневно (в нерабочее время) для крупных портфелей
- Хранить результаты в базе данных; агент запрашивает кэшированные значения
- Сокращает время реального расчёта с 1 500ms до 50ms (запрос к базе данных)
4. Выбор модели:
- Использовать Claude Sonnet 4 для рутинных расчётов (в 3 раза дешевле Opus)
- Резервировать Claude Opus для сложного комплаенс-рассуждения
- Использовать Claude Haiku для простых преобразований данных
5. Оптимизация промптов:
- Удалять ненужные определения инструментов (включать только инструменты, актуальные для текущей задачи)
- Сжимать системный промпт (убирать примеры, использовать лаконичные формулировки)
- Использовать режим использования инструментов (не extended thinking) для детерминированных рабочих процессов
Оценки пропускной способности
Однопоточный агент:
- Отчёты о рисках портфеля: ~5 отчётов/мин (13s на отчёт)
- Обнаружение мошенничества: ~200 транзакций/мин (300ms на транзакцию)
- Проверки соответствия (пригодность MiFID II): ~30 проверок/мин (2s на проверку)
Горизонтально масштабированный (10 рабочих экземпляров):
- Отчёты о рисках портфеля: ~50 отчётов/мин
- Обнаружение мошенничества: ~2 000 транзакций/мин
- Проверки соответствия: ~300 проверок/мин