title: "Наблюдаемость агентов: трейсинг Langfuse, OpenTelemetry и метрики production" slug: agent-observability-langfuse-opentelemetry-2026-ru date: 2026-02-21 lang: ru tags: [llm, agents, observability, langfuse, opentelemetry, prometheus] description: "Полное руководство по наблюдаемости LLM-агентов: трейсинг Langfuse с spans и tool_calls, семантические соглашения OpenTelemetry, метрики Prometheus, распределённый трейсинг и алертинг."
Наблюдаемость агентов: трейсинг Langfuse, OpenTelemetry и метрики production
Ключевые факты
- Версии пакетов (PyPI, март 2026):
langfuse==3.14.5,opentelemetry-sdk==1.39.1,opentelemetry-api==1.39.1,opentelemetry-exporter-otlp==1.39.1,langsmith==0.7.9 - Опции развёртывания Langfuse: self-hosted через Docker (требуется Postgres backend), облачный SaaS на cloud.langfuse.com
- Ценообразование Langfuse: Self-hosted бесплатен и безлимитен; Cloud Free tier включает 50K observations/month, Team tier начинается с $59/month за 500K observations
- Семантические соглашения OpenTelemetry для LLM:
gen_ai.*namespace (spec version 1.28.0) —gen_ai.system,gen_ai.request.model,gen_ai.usage.input_tokens,gen_ai.usage.output_tokens - Хранение трейсов в LangSmith: Developer tier 14 дней, Team tier 1 год, Enterprise неограниченно; стоимость примерно $1 за 1,000 трейсов на Team tier
- Основные метрики отслеживания: latency (p50, p95, p99), стоимость за трейс в USD, количество input/output токенов, error rate на агента, время выполнения инструментов, ratio успехов/неудач
- Паттерн Correlation ID: используйте заголовок
traceparent(W3C Trace Context standard) для связи workflow trace → LLM trace → downstream service traces в распределённых системах - Модель трейсов Langfuse: Trace (один запуск агента) → Span (единица работы) → Generation (конкретный LLM-вызов с подсчётом токенов и стоимостью)
- Группировка сессий в Langfuse: все трейсы с одинаковым
session_idгруппируются в UI для replay беседы и анализа multi-turn - OTLP-экспорт OpenTelemetry: port 4317 по умолчанию для gRPC, 4318 для HTTP; поддерживает Jaeger, Grafana Tempo, Honeycomb, Datadog
- Buckets гистограмм Prometheus: для latency агентов используйте
[0.5, 1, 2, 5, 10, 20, 30, 60, 120, 300]секунд; для LLM-вызовов используйте[0.1, 0.5, 1, 2, 5, 10, 30]секунд - Декоратор
@observeLangfuse: zero-boilerplate инструментация — автоматически создаёт spans, поддерживает nested calls, обновление черезlangfuse_context.update_current_observation() - Расчёт стоимости токенов: GPT-4o (март 2026): $2.50/1M input токенов, $10/1M output токенов; GPT-4o-mini: $0.15/1M input, $0.60/1M output
- Механизм распространения трейсов:
inject(headers)OpenTelemetry добавляет traceparent в исходящие HTTP-запросы;extract(headers)продолжает span из входящего запроса - Поведение flush в Langfuse: SDK группирует observations, auto-flush каждые 10 секунд или при 20 очередных items; вызовите
langfuse.flush()перед exit процесса - Traceloop SDK: обеспечивает автоматическую инструментацию OpenAI, Anthropic, Cohere, LangChain, LlamaIndex без изменений кода через
Traceloop.init() - Linkage Grafana dashboard: используйте template variables
${__value.raw}для создания clickable trace ID links на Langfuse/Jaeger из Prometheus panels - Пороги алертинга для агентов: error rate > 5% на 2 минуты, p95 latency > 120 секунд на 5 минут, бюджет превышен > 10 событий/час
- Гранулярность отслеживания стоимости: отслеживайте per user, per session, per agent type, per model — включайте детектирование аномалий стоимости путём сравнения почасовых трат к 7-дневному rolling average
- Span kinds распределённого трейсинга:
SpanKind.INTERNALдля логики агента,SpanKind.CLIENTдля исходящих LLM-вызовов,SpanKind.SERVERдля получения запросов агента
Почему наблюдаемость агентов отличается
LLM-агенты требуют принципиально другие примитивы наблюдаемости, чем традиционное ПО:
Недетерминированные пути выполнения: один и тот же запрос пользователя может инициировать разные последовательности инструментов в зависимости от выходов модели. Традиционный APM предполагает детерминированные пути кода; агенты нуждаются в наблюдаемости, которая захватывает каждое решение ветвления, выбор инструмента и шаг рассуждения.
Multi-step причинно-следственные трейсы: один запуск агента включает 5-15 шагов (reasoning → tool call → reasoning → final answer). Каждый шаг имеет latency, стоимость и implications качества. Вы должны трассировать причинно-следственную цепь, чтобы понять, почему агент успел или неудачно завершился.
Стоимость токенов как метрика first-class: в отличие от CPU/memory в традиционных приложениях, LLM-агенты тратят деньги на каждый вывод. Стоимость — это не ancillary — это primary operational metric, который должен отслеживаться per-trace, per-user, per-model и агрегироваться для бюджетирования.
Prompt/completion как debugging data: stack traces заменяются контентом prompt и completions модели. Вы не можете debug агентский failure без просмотра точного prompt, который его инициировал, ответа модели и результатов инструментов, возвращённых обратно в контекст.
Оценка качества как operational signal: корректность агента субъективна. Наблюдаемость должна поддерживать quality scores (feedback пользователя, LLM-as-judge, heuristic checks), привязанные к трейсам, позволяя обнаруживать quality regressions наряду с performance regressions.
Видимость выполнения инструментов: агенты вызывают external APIs, databases, search engines. Каждый tool call — это потенциальная point failure с собственным latency distribution. Наблюдаемость должна инструментировать tool calls как first-class spans.
Decision Framework
Когда использовать Langfuse
- Вам нужна LLM-native trace structure (generation objects с подсчётом токенов, prompts, completions)
- Вы хотите web UI для trace replay, session analysis и prompt comparison
- Вы инструментируете LangChain/LangGraph/CrewAI приложение (native callback support)
- Вам нужна коллекция user feedback (thumbs up/down, quality scores)
- Self-hosting приемлемый (требуется Postgres + Next.js server)
- Выбирайте облако если: Team < 10 engineers, < 500K traces/month, предпочитаете zero-ops SaaS
- Выбирайте self-hosted если: требуется data sovereignty, > 5M traces/month, custom retention policies
Когда использовать LangSmith
- Вы уже используете LangChain и хотите zero-config трейсинг
- Вам нужна dataset management для prompt engineering (test sets, few-shot examples)
- Вы хотите LLM-based evaluations (LangSmith имеет built-in eval framework)
- Бюджет допускает SaaS pricing ($1/1K traces на Team tier)
- Недостаток: vendor lock-in к LangChain ecosystem, no self-hosted option
Когда использовать custom OpenTelemetry
- У вас есть существующая OTel infrastructure (Jaeger, Tempo, Honeycomb)
- Вам нужно соотнести LLM traces с backend service traces в одной системе
- Вы хотите vendor-neutral instrumentation (можете переключать backends без changes кода)
- Вы строите multi-modal agent (LLM + vision + audio) и нужны custom span attributes
- Недостаток: No LLM-specific UI (Jaeger не понимает "tokens" или "cost"), требуется manual token tracking
Рекомендуемый hybrid approach
- Langfuse для development и debugging: используйте
@observedecorator для detailed trace capture, prompt iteration и quality analysis - OpenTelemetry для production correlation: инструментируйте agent service с OTel, экспортируйте в Tempo/Jaeger для distributed tracing через microservices
- Prometheus для SLO monitoring: записывайте aggregate metrics (request rate, error rate, latency, cost) для alerting и dashboards
- Link traces через системы: используйте trace ID из OTel как
correlation_idв Langfuse metadata; linkage из Grafana panels на Langfuse trace UI
Таблица ссылок параметров
| Parameter | Value | Notes |
|---|---|---|
LANGFUSE_PUBLIC_KEY |
Project-specific key из UI | Safe to commit в code, используется для trace submission |
LANGFUSE_SECRET_KEY |
Project-specific secret | Store в environment, никогда не commit |
LANGFUSE_HOST |
http://localhost:3000 или https://cloud.langfuse.com |
Self-hosted использует localhost, cloud использует SaaS endpoint |
langfuse.Langfuse(threads=N) |
Default: 1 | Увеличьте до 4-8 для high-throughput services |
langfuse.Langfuse(flush_at=N) |
Default: 15 | Batch size перед auto-flush; уменьшьте до 5 для low-latency apps |
langfuse.Langfuse(flush_interval=N) |
Default: 10 секунд | Time перед auto-flush; уменьшьте до 1 для real-time debugging |
OTEL_EXPORTER_OTLP_ENDPOINT |
http://localhost:4317 |
gRPC endpoint для OTLP collector |
OTEL_EXPORTER_OTLP_PROTOCOL |
grpc или http/protobuf |
gRPC является default и более efficient |
gen_ai.system |
openai, anthropic, cohere |
LLM provider name согласно semantic conventions |
gen_ai.request.model |
gpt-4o, claude-opus-4-6 |
Model identifier requested |
gen_ai.response.model |
Actual model used | Может отличаться от request (e.g., gpt-4o-2024-05-13) |
gen_ai.usage.input_tokens |
Integer | Prompt tokens consumed |
gen_ai.usage.output_tokens |
Integer | Completion tokens generated |
agent_run_duration_seconds |
Histogram | Buckets: [0.5, 1, 2, 5, 10, 20, 30, 60, 120, 300] |
agent_llm_call_duration_seconds |
Histogram | Buckets: [0.1, 0.5, 1, 2, 5, 10, 30] |
agent_cost_usd_total |
Counter | Cumulative cost; label by model и agent name |
langfuse_context.update_current_trace() |
Обновляет top-level trace | Используйте для session_id, user_id, tags, output |
langfuse_context.update_current_observation() |
Обновляет current span | Используйте для input, output, metadata в функции @observe |
trace.set_attribute(key, value) |
OTel span attribute | Используйте для custom metadata в distributed traces |
inject(headers) / extract(headers) |
Trace propagation | W3C Trace Context standard для distributed tracing |
Common Pitfalls
Pitfall 1: Забывание flush Langfuse перед выходом процесса
Impact: Трейсы потеряны в буфере, никогда не отправлены на сервер. Intermittent missing data в UI.
❌ Неправильно:
@observe(name="agent_run")
def run_agent(query: str):
# ... agent logic
return result
# FastAPI endpoint
@app.post("/agent")
def agent_endpoint(query: str):
return run_agent(query)
# Process может выйти перед flush
✅ Правильно:
from langfuse import Langfuse
langfuse = Langfuse()
@observe(name="agent_run")
def run_agent(query: str):
# ... agent logic
return result
@app.post("/agent")
def agent_endpoint(query: str):
result = run_agent(query)
langfuse.flush() # Force send перед response
return result
Pitfall 2: Не распространение trace context через границы сервисов
Impact: Distributed traces появляются как disconnected fragments. Невозможно соотнести agent trace с downstream database/API calls.
❌ Неправильно:
import httpx
async def call_sub_agent(payload: dict):
async with httpx.AsyncClient() as client:
response = await client.post(
"http://sub-agent:8000/execute",
json=payload,
)
return response.json()
✅ Правильно:
from opentelemetry.propagate import inject
from opentelemetry import trace
async def call_sub_agent(payload: dict):
with trace.get_tracer(__name__).start_as_current_span("sub_agent_call") as span:
headers = {}
inject(headers) # Adds traceparent header
async with httpx.AsyncClient() as client:
response = await client.post(
"http://sub-agent:8000/execute",
json=payload,
headers=headers,
)
return response.json()
Pitfall 3: Использование @observe без обновления observation metadata
Impact: Трейсы появляются в UI но не содержат useful data (no input, output, token counts). Debugging требует угадывания, что случилось.
❌ Неправильно:
@observe(name="llm_call")
def call_llm(prompt: str) -> str:
response = llm.invoke(prompt)
return response.content
# No metadata recorded
✅ Правильно:
from langfuse.decorators import langfuse_context
@observe(name="llm_call")
def call_llm(prompt: str) -> str:
response = llm.invoke(prompt)
langfuse_context.update_current_observation(
input={"prompt": prompt},
output={"completion": response.content},
model="gpt-4o",
usage={
"input": response.usage_metadata["input_tokens"],
"output": response.usage_metadata["output_tokens"],
"total": response.usage_metadata["total_tokens"],
"unit": "TOKENS",
},
metadata={
"temperature": 0,
"max_tokens": 4096,
},
)
return response.content
Pitfall 4: Запись стоимости как строкового типа вместо numeric type
Impact: Невозможно агрегировать cost metrics в Prometheus или создавать cost alerts. Grafana queries fail с type errors.
❌ Неправильно:
agent_cost_usd_total.labels(model="gpt-4o").inc("0.05") # String value
span.set_attribute("llm.cost_usd", "$0.05") # String with currency symbol
✅ Правильно:
cost = 0.05 # Numeric type
agent_cost_usd_total.labels(model="gpt-4o").inc(cost)
span.set_attribute("llm.cost_usd", cost)
Pitfall 5: Не установка buckets гистограмм Prometheus appropriate для agent latency
Impact: Percentiles (p95, p99) неточны или отсутствуют. Alerting на latency thresholds использует неправильные данные.
❌ Неправильно:
# Default buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
# Agent runs берут 10-60 секунд, buckets слишком маленькие
agent_run_duration = Histogram("agent_run_duration_seconds", "Agent runtime")
✅ Правильно:
# Buckets aligned с agent behavior
agent_run_duration = Histogram(
"agent_run_duration_seconds",
"Agent runtime",
buckets=[0.5, 1, 2, 5, 10, 20, 30, 60, 120, 300], # До 5 минут
)
Интеграция Langfuse
Installation и initialization
pip install langfuse==3.14.5
import os
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
langfuse = Langfuse(
public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
secret_key=os.environ["LANGFUSE_SECRET_KEY"],
host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com"),
release=os.environ.get("GIT_COMMIT", "dev"), # Track deployments
enabled=os.environ.get("LANGFUSE_ENABLED", "true").lower() == "true",
threads=4, # Parallel flush для high throughput
flush_at=20, # Batch size
flush_interval=10, # Seconds
)
Tracing с @observe decorator
Декоратор @observe является primary instrumentation mechanism. Он автоматически создаёт spans для decorated functions и поддерживает nested calls.
from langfuse.decorators import observe, langfuse_context
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import time
llm = ChatOpenAI(model="gpt-4o", temperature=0)
@observe(name="web_search_tool")
def search_web(query: str) -> str:
"""Execute web search tool call."""
langfuse_context.update_current_observation(
input={"query": query},
metadata={"tool": "web_search", "engine": "serpapi"},
)
# Simulated search execution
result = f"Search results for: {query}\n[Result 1]\n[Result 2]"
langfuse_context.update_current_observation(
output={"result_length": len(result), "preview": result[:200]},
)
return result
@observe(name="llm_generation")
def call_llm(messages: list, model: str = "gpt-4o") -> str:
"""Make an LLM call и record as generation."""
start = time.monotonic()
response = llm.invoke(messages)
duration = time.monotonic() - start
usage = response.usage_metadata
# Update observation с generation-specific data
langfuse_context.update_current_observation(
type="generation", # Special type для LLM calls
model=model,
input=messages,
output=response.content,
usage={
"input": usage["input_tokens"],
"output": usage["output_tokens"],
"total": usage["total_tokens"],
"unit": "TOKENS",
},
metadata={
"duration_seconds": round(duration, 3),
"tokens_per_second": round(usage["output_tokens"] / duration, 1),
},
)
return response.content
@observe(name="agent_run")
def run_agent(user_query: str, session_id: str = None) -> dict:
"""Main agent entry point — creates top-level trace."""
# Tag trace с session info для grouping
langfuse_context.update_current_trace(
session_id=session_id,
user_id="user_123",
tags=["production", "v2.1"],
metadata={"query_length": len(user_query)},
)
messages = [
SystemMessage(content="You are a helpful research assistant."),
HumanMessage(content=user_query),
]
step = 0
tool_calls = []
while step < 10:
step += 1
# First LLM call (creates nested generation span)
response = call_llm(messages, model="gpt-4o")
messages.append({"role": "assistant", "content": response})
# Check для tool call request
if "SEARCH:" in response:
query = response.split("SEARCH:")[1].strip().split("\n")[0]
# Execute tool (creates nested span)
search_result = search_web(query)
tool_calls.append({"tool": "web_search", "query": query})
messages.append({
"role": "tool",
"content": search_result,
"tool_call_id": f"call_{step}",
})
else:
# Final answer received
break
# Update trace с final output
langfuse_context.update_current_trace(
output={"response": response, "steps": step},
metadata={"tool_calls": tool_calls},
)
return {"response": response, "steps": step, "tool_calls": tool_calls}
Создание spans и nested tracing
Для complex agents используйте manual API для full control над span hierarchy:
from langfuse import Langfuse
from datetime import datetime
langfuse = Langfuse()
def run_complex_agent(query: str, user_id: str):
# Create top-level trace
trace = langfuse.trace(
name="complex_agent",
user_id=user_id,
input={"query": query},
tags=["production"],
)
# Create planning span
planning_span = trace.span(
name="planning",
input={"task": "analyze query and create plan"},
start_time=datetime.utcnow(),
)
# Create generation в span
planning_gen = planning_span.generation(
name="planning_llm_call",
model="gpt-4o",
input=[{"role": "user", "content": f"Create a plan for: {query}"}],
output="1. Search for X\n2. Analyze results\n3. Summarize",
usage={"input": 120, "output": 45, "unit": "TOKENS"},
)
planning_gen.end()
planning_span.end(
output={"plan_steps": 3},
end_time=datetime.utcnow(),
)
# Create execution span
exec_span = trace.span(
name="execution",
input={"plan_steps": 3},
start_time=datetime.utcnow(),
)
# ... execution logic
exec_span.end(
output={"results": "completed"},
end_time=datetime.utcnow(),
)
# Finalize trace
trace.update(
output={"status": "success"},
)
langfuse.flush()
Score submission для quality tracking
Присоедините quality scores к трейсам для regression detection:
from langfuse import Langfuse
langfuse = Langfuse()
def submit_user_feedback(trace_id: str, thumbs_up: bool, comment: str = None):
"""Record user feedback как score."""
langfuse.score(
trace_id=trace_id,
name="user_feedback",
value=1.0 if thumbs_up else 0.0,
data_type="BOOLEAN",
comment=comment,
)
def submit_llm_judge_score(trace_id: str, quality_score: float):
"""Record LLM-as-judge evaluation."""
langfuse.score(
trace_id=trace_id,
name="llm_judge_quality",
value=quality_score, # 0.0 к 1.0
data_type="NUMERIC",
comment="GPT-4 evaluated response quality",
)
def submit_heuristic_score(trace_id: str, passed: bool, rule_name: str):
"""Record heuristic check result."""
langfuse.score(
trace_id=trace_id,
name=f"heuristic_{rule_name}",
value=1.0 if passed else 0.0,
data_type="BOOLEAN",
comment=f"Heuristic rule: {rule_name}",
)
OpenTelemetry для LLM-агентов
Installation и setup
pip install opentelemetry-api==1.39.1 opentelemetry-sdk==1.39.1 opentelemetry-exporter-otlp==1.39.1
pip install traceloop-sdk # Для automatic OpenAI/Anthropic instrumentation
import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
# Configure resource attributes
resource = Resource.create({
"service.name": "llm-agent-service",
"service.version": "2.1.0",
"deployment.environment": "production",
})
provider = TracerProvider(resource=resource)
# Export в OTLP collector (Jaeger, Tempo, Honeycomb)
otlp_exporter = OTLPSpanExporter(
endpoint=os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317"),
headers={"Authorization": f"Bearer {os.environ.get('OTEL_AUTH_TOKEN', '')}"},
)
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("agent-service", "2.1.0")
gen_ai семантические соглашения
OpenTelemetry определяет standard span attributes для LLM calls под gen_ai.* namespace (semantic conventions v1.28.0):
from opentelemetry import trace
from opentelemetry.semconv.ai import SpanAttributes
import time
tracer = trace.get_tracer(__name__)
def instrument_llm_call(model: str, messages: list):
"""Instrument LLM call с gen_ai semantic conventions."""
with tracer.start_as_current_span("llm.generate") as span:
# Set request attributes
span.set_attribute(SpanAttributes.LLM_SYSTEM, "openai")
span.set_attribute(SpanAttributes.LLM_REQUEST_MODEL, model)
span.set_attribute(SpanAttributes.LLM_REQUEST_TEMPERATURE, 0)
span.set_attribute(SpanAttributes.LLM_REQUEST_MAX_TOKENS, 4096)
# Record prompt content
for i, msg in enumerate(messages):
span.set_attribute(f"gen_ai.prompt.{i}.role", msg["role"])
span.set_attribute(f"gen_ai.prompt.{i}.content", msg["content"])
start = time.monotonic()
# Execute LLM call
response = llm.invoke(messages)
duration = time.monotonic() - start
usage = response.usage_metadata
# Set response attributes
span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, "gpt-4o-2024-05-13")
span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, usage["input_tokens"])
span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, usage["output_tokens"])
span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, usage["total_tokens"])
# Record completion content
span.set_attribute("gen_ai.completion.0.content", response.content)
# Custom metrics
span.set_attribute("llm.duration_seconds", round(duration, 3))
span.set_attribute("llm.cost_usd", calculate_cost(usage, model))
span.set_status(trace.StatusCode.OK)
return response
def calculate_cost(usage: dict, model: str) -> float:
"""Calculate cost в USD based на token usage."""
pricing = {
"gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
"gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
}
if model not in pricing:
return 0.0
input_cost = usage["input_tokens"] * pricing[model]["input"]
output_cost = usage["output_tokens"] * pricing[model]["output"]
return round(input_cost + output_cost, 6)
OTLP экспорт в Grafana Tempo
Сконфигурируйте Tempo как OTLP backend для long-term trace storage:
# tempo-config.yaml
server:
http_listen_port: 3200
distributor:
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
storage:
trace:
backend: s3
s3:
bucket: traces
endpoint: s3.amazonaws.com
access_key: ${S3_ACCESS_KEY}
secret_key: ${S3_SECRET_KEY}
# docker-compose.yaml
version: '3.8'
services:
tempo:
image: grafana/tempo:latest
command: ["-config.file=/etc/tempo.yaml"]
volumes:
- ./tempo-config.yaml:/etc/tempo.yaml
- tempo_data:/var/tempo
ports:
- "3200:3200" # Tempo HTTP
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
volumes:
tempo_data:
Grafana dashboard для trace visualization
Создайте Grafana dashboard, который linkает Prometheus metrics к Tempo traces:
{
"dashboard": {
"title": "LLM Agent Observability",
"panels": [
{
"title": "Agent Request Rate",
"type": "timeseries",
"targets": [
{
"expr": "rate(agent_requests_total[5m])",
"legendFormat": "{{agent_name}} - {{status}}"
}
]
},
{
"title": "Trace ID Lookup",
"type": "table",
"targets": [
{
"expr": "agent_requests_total",
"format": "table"
}
],
"transformations": [
{
"id": "organize",
"options": {
"renameByName": {
"trace_id": "Trace ID"
}
}
}
],
"fieldConfig": {
"overrides": [
{
"matcher": {"id": "byName", "options": "Trace ID"},
"properties": [
{
"id": "links",
"value": [
{
"title": "View in Tempo",
"url": "http://localhost:3200/trace/${__value.raw}"
}
]
}
]
}
]
}
}
]
}
}
Отслеживание стоимости и алертинг
Per-user budget tracking
Отслеживайте token usage и стоимость per user для enforcement quotas:
from prometheus_client import Counter, Gauge
from datetime import datetime, timedelta
from typing import Dict
import redis
# Prometheus metrics
user_cost_usd_total = Counter(
"user_cost_usd_total",
"Total cost per user в USD",
["user_id", "model"],
)
user_tokens_total = Counter(
"user_tokens_total",
"Total tokens per user",
["user_id", "direction"], # input, output
)
# Redis для real-time budget tracking
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
class UserBudgetTracker:
"""Track и enforce per-user spending limits."""
def __init__(self, daily_limit_usd: float = 10.0):
self.daily_limit_usd = daily_limit_usd
def get_user_spend_today(self, user_id: str) -> float:
"""Get user's spending для current day."""
today = datetime.utcnow().strftime("%Y-%m-%d")
key = f"user_spend:{user_id}:{today}"
spend = redis_client.get(key)
return float(spend) if spend else 0.0
def record_spend(self, user_id: str, cost_usd: float, model: str) -> None:
"""Record spending и update metrics."""
today = datetime.utcnow().strftime("%Y-%m-%d")
key = f"user_spend:{user_id}:{today}"
# Increment Redis counter с 24h expiry
redis_client.incrbyfloat(key, cost_usd)
redis_client.expire(key, 86400)
# Update Prometheus metric
user_cost_usd_total.labels(user_id=user_id, model=model).inc(cost_usd)
def check_budget(self, user_id: str) -> tuple[bool, float]:
"""Check если user в budget."""
current_spend = self.get_user_spend_today(user_id)
within_budget = current_spend < self.daily_limit_usd
remaining = self.daily_limit_usd - current_spend
return within_budget, max(0, remaining)
def enforce_budget(self, user_id: str):
"""Decorator для enforce budget перед function execution."""
def decorator(func):
def wrapper(*args, **kwargs):
within_budget, remaining = self.check_budget(user_id)
if not within_budget:
raise RuntimeError(
f"User {user_id} exceeded daily budget "
f"(${self.daily_limit_usd}). Current spend: "
f"${self.get_user_spend_today(user_id):.2f}"
)
return func(*args, **kwargs)
return wrapper
return decorator
budget_tracker = UserBudgetTracker(daily_limit_usd=50.0)
@observe(name="agent_run")
def run_agent_with_budget(user_id: str, query: str):
"""Run agent с budget enforcement."""
# Check budget перед execution
within_budget, remaining = budget_tracker.check_budget(user_id)
if not within_budget:
raise RuntimeError(f"Budget exceeded для user {user_id}")
# Execute agent
result = run_agent(query)
# Calculate и record cost
total_cost = sum(
calculate_cost(gen["usage"], gen["model"])
for gen in result.get("generations", [])
)
budget_tracker.record_spend(user_id, total_cost, model="gpt-4o")
# Warn если approaching limit
if remaining < 5.0:
import logging
logging.warning(
f"User {user_id} approaching budget limit: ${remaining:.2f} remaining"
)
return result
Детектирование аномалий стоимости
Обнаруживайте unusual spending patterns используя statistical methods:
from prometheus_client import Gauge
from datetime import datetime, timedelta
from typing import List
import statistics
cost_anomaly_detected = Gauge(
"cost_anomaly_detected",
"Binary indicator cost anomaly (1 = anomaly, 0 = normal)",
["time_window"],
)
class CostAnomalyDetector:
"""Detect anomalous spending patterns."""
def __init__(self, threshold_std_devs: float = 3.0):
self.threshold_std_devs = threshold_std_devs
def get_hourly_costs(self, hours: int = 168) -> List[float]:
"""Query Prometheus для hourly cost над N hours (default 7 days)."""
# This would use Prometheus API в production
# Simplified для example
return [1.2, 1.5, 1.3, 1.4, 25.6, 1.2, 1.1] # Spike в index 4
def detect_anomaly(self, current_hour_cost: float) -> bool:
"""Detect если current hour cost anomalous."""
historical_costs = self.get_hourly_costs(hours=168)
if len(historical_costs) < 24:
return False # Not enough data
mean = statistics.mean(historical_costs)
std_dev = statistics.stdev(historical_costs)
# Z-score method
z_score = (current_hour_cost - mean) / std_dev if std_dev > 0 else 0
is_anomaly = abs(z_score) > self.threshold_std_devs
if is_anomaly:
cost_anomaly_detected.labels(time_window="1h").set(1)
else:
cost_anomaly_detected.labels(time_window="1h").set(0)
return is_anomaly
detector = CostAnomalyDetector(threshold_std_devs=3.0)
def monitor_cost_anomalies():
"""Periodic task для check для cost anomalies."""
current_hour_cost = get_current_hour_cost() # Query Prometheus
if detector.detect_anomaly(current_hour_cost):
send_alert(
severity="warning",
message=f"Cost anomaly detected: ${current_hour_cost:.2f} this hour "
f"(historical mean: ${statistics.mean(detector.get_hourly_costs()):.2f})"
)
Slack alerts для budget events
Отправляйте real-time notifications когда budget thresholds превышены:
import requests
import os
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL")
def send_slack_alert(message: str, severity: str = "warning"):
"""Send alert в Slack channel."""
color_map = {
"info": "#36a64f",
"warning": "#ff9900",
"critical": "#ff0000",
}
payload = {
"attachments": [
{
"color": color_map.get(severity, "#999999"),
"title": f"LLM Agent Alert - {severity.upper()}",
"text": message,
"footer": "Agent Observability System",
"ts": int(time.time()),
}
]
}
if SLACK_WEBHOOK_URL:
requests.post(SLACK_WEBHOOK_URL, json=payload)
def alert_on_budget_exceeded(user_id: str, spend: float, limit: float):
"""Alert когда user превышает budget."""
send_slack_alert(
message=f"User `{user_id}` exceeded daily budget\n"
f"Spend: ${spend:.2f} / Limit: ${limit:.2f}",
severity="warning",
)
def alert_on_cost_anomaly(current_cost: float, mean_cost: float, std_dev: float):
"""Alert когда cost anomaly detected."""
z_score = (current_cost - mean_cost) / std_dev
send_slack_alert(
message=f"Cost anomaly detected this hour\n"
f"Current: ${current_cost:.2f}\n"
f"Expected: ${mean_cost:.2f} ± ${std_dev:.2f}\n"
f"Z-score: {z_score:.2f}",
severity="critical" if z_score > 5 else "warning",
)
Производительность и бенчмарки
Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.
Langfuse trace ingestion throughput
Langfuse SDK с threads=4 и flush_at=20 конфигурацией может обрабатывать trace submission без блокирования application threads. Batch uploads снижают network overhead по сравнению с synchronous per-trace submission.
Ожидаемое поведение: Трейсы queued в memory и flushed в background thread каждые 10 секунд или когда accumulate 20 observations, в зависимости от того, что наступит первым.
OpenTelemetry span overhead
OTLP export через gRPC вводит minimal overhead к span creation. Spans buffered в BatchSpanProcessor и exported в batches 512 spans или каждые 5 секунд.
Ожидаемое поведение: Span creation добавляет sub-millisecond overhead per operation. Export batching предотвращает per-span network calls.
Prometheus metrics collection cost
Histogram observations включают bucket lookups и atomic counter increments. Histograms с 10 buckets имеют примерно linear overhead относительно bucket count.
Ожидаемое поведение: Recording histogram observation берёт single-digit microseconds. Scrape endpoint generation (/metrics) масштабируется с cardinality label combinations.
Distributed trace latency
Trace context propagation через W3C Trace Context добавляет один HTTP header (traceparent) к исходящим requests. Header parsing и span context extraction на receiving side является negligible.
Ожидаемое поведение: Trace propagation overhead доминируется HTTP serialization, не trace context handling.
Agent run latency breakdown
Для типичного multi-step agent run с 3 LLM calls и 2 tool executions:
- LLM calls: Majority latency (p50: 2-5 секунд per call, p95: 8-15 секунд)
- Tool execution: Зависит от tool type (database query: 50-200ms, web search: 500-2000ms, file I/O: 10-100ms)
- Agent orchestration logic: Negligible (sub-10ms per step)
- Observability overhead: Sub-1% total runtime (span creation, metric recording)
Ожидаемое поведение: LLM API latency доминирует agent runtime. Оптимизации должны сосредоточиться на prompt efficiency, streaming responses и parallel tool execution.
Cost per trace
Token consumption drives cost больше, чем infrastructure:
- GPT-4o agent run (3 turns, 4K input tokens, 1K output tokens): $0.02-0.04 per trace
- GPT-4o-mini agent run (same workload): $0.001-0.002 per trace
- Langfuse cloud storage: $0.0001 per trace (amortized через Team tier)
- OpenTelemetry/Tempo storage: Negligible (S3 storage costs < $0.00001 per trace)
Ожидаемое поведение: Model inference cost является 100-1000x выше, чем observability infrastructure cost.
Trace retention и storage
Langfuse cloud Team tier включает 500K observations/month с 1-year retention. Self-hosted Postgres storage grows примерно 5-10KB per trace в зависимости от prompt/completion length.
Ожидаемое поведение: 1M traces/month требует 5-10GB Postgres storage. Archive old traces на S3 для cost efficiency.
Alert latency
Prometheus scrapes metrics каждые 15-30 секунд. AlertManager evaluates rules на scrape interval и fires alerts после for duration threshold.
Ожидаемое поведение: Budget exceeded alert fires в течение 1-2 минут threshold breach. Cost anomaly alert fires в течение 5-10 минут (depends на rule for: 5m конфигурацию).