Наблюдаемость агентов: трейсинг Langfuse, OpenTelemetry и метрики production


title: "Наблюдаемость агентов: трейсинг Langfuse, OpenTelemetry и метрики production" slug: agent-observability-langfuse-opentelemetry-2026-ru date: 2026-02-21 lang: ru tags: [llm, agents, observability, langfuse, opentelemetry, prometheus] description: "Полное руководство по наблюдаемости LLM-агентов: трейсинг Langfuse с spans и tool_calls, семантические соглашения OpenTelemetry, метрики Prometheus, распределённый трейсинг и алертинг."

Наблюдаемость агентов: трейсинг Langfuse, OpenTelemetry и метрики production

Ключевые факты

  • Версии пакетов (PyPI, март 2026): langfuse==3.14.5, opentelemetry-sdk==1.39.1, opentelemetry-api==1.39.1, opentelemetry-exporter-otlp==1.39.1, langsmith==0.7.9
  • Опции развёртывания Langfuse: self-hosted через Docker (требуется Postgres backend), облачный SaaS на cloud.langfuse.com
  • Ценообразование Langfuse: Self-hosted бесплатен и безлимитен; Cloud Free tier включает 50K observations/month, Team tier начинается с $59/month за 500K observations
  • Семантические соглашения OpenTelemetry для LLM: gen_ai.* namespace (spec version 1.28.0) — gen_ai.system, gen_ai.request.model, gen_ai.usage.input_tokens, gen_ai.usage.output_tokens
  • Хранение трейсов в LangSmith: Developer tier 14 дней, Team tier 1 год, Enterprise неограниченно; стоимость примерно $1 за 1,000 трейсов на Team tier
  • Основные метрики отслеживания: latency (p50, p95, p99), стоимость за трейс в USD, количество input/output токенов, error rate на агента, время выполнения инструментов, ratio успехов/неудач
  • Паттерн Correlation ID: используйте заголовок traceparent (W3C Trace Context standard) для связи workflow trace → LLM trace → downstream service traces в распределённых системах
  • Модель трейсов Langfuse: Trace (один запуск агента) → Span (единица работы) → Generation (конкретный LLM-вызов с подсчётом токенов и стоимостью)
  • Группировка сессий в Langfuse: все трейсы с одинаковым session_id группируются в UI для replay беседы и анализа multi-turn
  • OTLP-экспорт OpenTelemetry: port 4317 по умолчанию для gRPC, 4318 для HTTP; поддерживает Jaeger, Grafana Tempo, Honeycomb, Datadog
  • Buckets гистограмм Prometheus: для latency агентов используйте [0.5, 1, 2, 5, 10, 20, 30, 60, 120, 300] секунд; для LLM-вызовов используйте [0.1, 0.5, 1, 2, 5, 10, 30] секунд
  • Декоратор @observe Langfuse: zero-boilerplate инструментация — автоматически создаёт spans, поддерживает nested calls, обновление через langfuse_context.update_current_observation()
  • Расчёт стоимости токенов: GPT-4o (март 2026): $2.50/1M input токенов, $10/1M output токенов; GPT-4o-mini: $0.15/1M input, $0.60/1M output
  • Механизм распространения трейсов: inject(headers) OpenTelemetry добавляет traceparent в исходящие HTTP-запросы; extract(headers) продолжает span из входящего запроса
  • Поведение flush в Langfuse: SDK группирует observations, auto-flush каждые 10 секунд или при 20 очередных items; вызовите langfuse.flush() перед exit процесса
  • Traceloop SDK: обеспечивает автоматическую инструментацию OpenAI, Anthropic, Cohere, LangChain, LlamaIndex без изменений кода через Traceloop.init()
  • Linkage Grafana dashboard: используйте template variables ${__value.raw} для создания clickable trace ID links на Langfuse/Jaeger из Prometheus panels
  • Пороги алертинга для агентов: error rate > 5% на 2 минуты, p95 latency > 120 секунд на 5 минут, бюджет превышен > 10 событий/час
  • Гранулярность отслеживания стоимости: отслеживайте per user, per session, per agent type, per model — включайте детектирование аномалий стоимости путём сравнения почасовых трат к 7-дневному rolling average
  • Span kinds распределённого трейсинга: SpanKind.INTERNAL для логики агента, SpanKind.CLIENT для исходящих LLM-вызовов, SpanKind.SERVER для получения запросов агента

Почему наблюдаемость агентов отличается

LLM-агенты требуют принципиально другие примитивы наблюдаемости, чем традиционное ПО:

Недетерминированные пути выполнения: один и тот же запрос пользователя может инициировать разные последовательности инструментов в зависимости от выходов модели. Традиционный APM предполагает детерминированные пути кода; агенты нуждаются в наблюдаемости, которая захватывает каждое решение ветвления, выбор инструмента и шаг рассуждения.

Multi-step причинно-следственные трейсы: один запуск агента включает 5-15 шагов (reasoning → tool call → reasoning → final answer). Каждый шаг имеет latency, стоимость и implications качества. Вы должны трассировать причинно-следственную цепь, чтобы понять, почему агент успел или неудачно завершился.

Стоимость токенов как метрика first-class: в отличие от CPU/memory в традиционных приложениях, LLM-агенты тратят деньги на каждый вывод. Стоимость — это не ancillary — это primary operational metric, который должен отслеживаться per-trace, per-user, per-model и агрегироваться для бюджетирования.

Prompt/completion как debugging data: stack traces заменяются контентом prompt и completions модели. Вы не можете debug агентский failure без просмотра точного prompt, который его инициировал, ответа модели и результатов инструментов, возвращённых обратно в контекст.

Оценка качества как operational signal: корректность агента субъективна. Наблюдаемость должна поддерживать quality scores (feedback пользователя, LLM-as-judge, heuristic checks), привязанные к трейсам, позволяя обнаруживать quality regressions наряду с performance regressions.

Видимость выполнения инструментов: агенты вызывают external APIs, databases, search engines. Каждый tool call — это потенциальная point failure с собственным latency distribution. Наблюдаемость должна инструментировать tool calls как first-class spans.

Decision Framework

Когда использовать Langfuse

  • Вам нужна LLM-native trace structure (generation objects с подсчётом токенов, prompts, completions)
  • Вы хотите web UI для trace replay, session analysis и prompt comparison
  • Вы инструментируете LangChain/LangGraph/CrewAI приложение (native callback support)
  • Вам нужна коллекция user feedback (thumbs up/down, quality scores)
  • Self-hosting приемлемый (требуется Postgres + Next.js server)
  • Выбирайте облако если: Team < 10 engineers, < 500K traces/month, предпочитаете zero-ops SaaS
  • Выбирайте self-hosted если: требуется data sovereignty, > 5M traces/month, custom retention policies

Когда использовать LangSmith

  • Вы уже используете LangChain и хотите zero-config трейсинг
  • Вам нужна dataset management для prompt engineering (test sets, few-shot examples)
  • Вы хотите LLM-based evaluations (LangSmith имеет built-in eval framework)
  • Бюджет допускает SaaS pricing ($1/1K traces на Team tier)
  • Недостаток: vendor lock-in к LangChain ecosystem, no self-hosted option

Когда использовать custom OpenTelemetry

  • У вас есть существующая OTel infrastructure (Jaeger, Tempo, Honeycomb)
  • Вам нужно соотнести LLM traces с backend service traces в одной системе
  • Вы хотите vendor-neutral instrumentation (можете переключать backends без changes кода)
  • Вы строите multi-modal agent (LLM + vision + audio) и нужны custom span attributes
  • Недостаток: No LLM-specific UI (Jaeger не понимает "tokens" или "cost"), требуется manual token tracking

Рекомендуемый hybrid approach

  • Langfuse для development и debugging: используйте @observe decorator для detailed trace capture, prompt iteration и quality analysis
  • OpenTelemetry для production correlation: инструментируйте agent service с OTel, экспортируйте в Tempo/Jaeger для distributed tracing через microservices
  • Prometheus для SLO monitoring: записывайте aggregate metrics (request rate, error rate, latency, cost) для alerting и dashboards
  • Link traces через системы: используйте trace ID из OTel как correlation_id в Langfuse metadata; linkage из Grafana panels на Langfuse trace UI

Таблица ссылок параметров

Parameter Value Notes
LANGFUSE_PUBLIC_KEY Project-specific key из UI Safe to commit в code, используется для trace submission
LANGFUSE_SECRET_KEY Project-specific secret Store в environment, никогда не commit
LANGFUSE_HOST http://localhost:3000 или https://cloud.langfuse.com Self-hosted использует localhost, cloud использует SaaS endpoint
langfuse.Langfuse(threads=N) Default: 1 Увеличьте до 4-8 для high-throughput services
langfuse.Langfuse(flush_at=N) Default: 15 Batch size перед auto-flush; уменьшьте до 5 для low-latency apps
langfuse.Langfuse(flush_interval=N) Default: 10 секунд Time перед auto-flush; уменьшьте до 1 для real-time debugging
OTEL_EXPORTER_OTLP_ENDPOINT http://localhost:4317 gRPC endpoint для OTLP collector
OTEL_EXPORTER_OTLP_PROTOCOL grpc или http/protobuf gRPC является default и более efficient
gen_ai.system openai, anthropic, cohere LLM provider name согласно semantic conventions
gen_ai.request.model gpt-4o, claude-opus-4-6 Model identifier requested
gen_ai.response.model Actual model used Может отличаться от request (e.g., gpt-4o-2024-05-13)
gen_ai.usage.input_tokens Integer Prompt tokens consumed
gen_ai.usage.output_tokens Integer Completion tokens generated
agent_run_duration_seconds Histogram Buckets: [0.5, 1, 2, 5, 10, 20, 30, 60, 120, 300]
agent_llm_call_duration_seconds Histogram Buckets: [0.1, 0.5, 1, 2, 5, 10, 30]
agent_cost_usd_total Counter Cumulative cost; label by model и agent name
langfuse_context.update_current_trace() Обновляет top-level trace Используйте для session_id, user_id, tags, output
langfuse_context.update_current_observation() Обновляет current span Используйте для input, output, metadata в функции @observe
trace.set_attribute(key, value) OTel span attribute Используйте для custom metadata в distributed traces
inject(headers) / extract(headers) Trace propagation W3C Trace Context standard для distributed tracing

Common Pitfalls

Pitfall 1: Забывание flush Langfuse перед выходом процесса

Impact: Трейсы потеряны в буфере, никогда не отправлены на сервер. Intermittent missing data в UI.

Неправильно:

@observe(name="agent_run")
def run_agent(query: str):
    # ... agent logic
    return result

# FastAPI endpoint
@app.post("/agent")
def agent_endpoint(query: str):
    return run_agent(query)
    # Process может выйти перед flush

Правильно:

from langfuse import Langfuse

langfuse = Langfuse()

@observe(name="agent_run")
def run_agent(query: str):
    # ... agent logic
    return result

@app.post("/agent")
def agent_endpoint(query: str):
    result = run_agent(query)
    langfuse.flush()  # Force send перед response
    return result

Pitfall 2: Не распространение trace context через границы сервисов

Impact: Distributed traces появляются как disconnected fragments. Невозможно соотнести agent trace с downstream database/API calls.

Неправильно:

import httpx

async def call_sub_agent(payload: dict):
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://sub-agent:8000/execute",
            json=payload,
        )
        return response.json()

Правильно:

from opentelemetry.propagate import inject
from opentelemetry import trace

async def call_sub_agent(payload: dict):
    with trace.get_tracer(__name__).start_as_current_span("sub_agent_call") as span:
        headers = {}
        inject(headers)  # Adds traceparent header

        async with httpx.AsyncClient() as client:
            response = await client.post(
                "http://sub-agent:8000/execute",
                json=payload,
                headers=headers,
            )
            return response.json()

Pitfall 3: Использование @observe без обновления observation metadata

Impact: Трейсы появляются в UI но не содержат useful data (no input, output, token counts). Debugging требует угадывания, что случилось.

Неправильно:

@observe(name="llm_call")
def call_llm(prompt: str) -> str:
    response = llm.invoke(prompt)
    return response.content
    # No metadata recorded

Правильно:

from langfuse.decorators import langfuse_context

@observe(name="llm_call")
def call_llm(prompt: str) -> str:
    response = llm.invoke(prompt)

    langfuse_context.update_current_observation(
        input={"prompt": prompt},
        output={"completion": response.content},
        model="gpt-4o",
        usage={
            "input": response.usage_metadata["input_tokens"],
            "output": response.usage_metadata["output_tokens"],
            "total": response.usage_metadata["total_tokens"],
            "unit": "TOKENS",
        },
        metadata={
            "temperature": 0,
            "max_tokens": 4096,
        },
    )

    return response.content

Pitfall 4: Запись стоимости как строкового типа вместо numeric type

Impact: Невозможно агрегировать cost metrics в Prometheus или создавать cost alerts. Grafana queries fail с type errors.

Неправильно:

agent_cost_usd_total.labels(model="gpt-4o").inc("0.05")  # String value
span.set_attribute("llm.cost_usd", "$0.05")  # String with currency symbol

Правильно:

cost = 0.05  # Numeric type
agent_cost_usd_total.labels(model="gpt-4o").inc(cost)
span.set_attribute("llm.cost_usd", cost)

Pitfall 5: Не установка buckets гистограмм Prometheus appropriate для agent latency

Impact: Percentiles (p95, p99) неточны или отсутствуют. Alerting на latency thresholds использует неправильные данные.

Неправильно:

# Default buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
# Agent runs берут 10-60 секунд, buckets слишком маленькие
agent_run_duration = Histogram("agent_run_duration_seconds", "Agent runtime")

Правильно:

# Buckets aligned с agent behavior
agent_run_duration = Histogram(
    "agent_run_duration_seconds",
    "Agent runtime",
    buckets=[0.5, 1, 2, 5, 10, 20, 30, 60, 120, 300],  # До 5 минут
)

Интеграция Langfuse

Installation и initialization

pip install langfuse==3.14.5
import os
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context

langfuse = Langfuse(
    public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
    secret_key=os.environ["LANGFUSE_SECRET_KEY"],
    host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com"),
    release=os.environ.get("GIT_COMMIT", "dev"),  # Track deployments
    enabled=os.environ.get("LANGFUSE_ENABLED", "true").lower() == "true",
    threads=4,  # Parallel flush для high throughput
    flush_at=20,  # Batch size
    flush_interval=10,  # Seconds
)

Tracing с @observe decorator

Декоратор @observe является primary instrumentation mechanism. Он автоматически создаёт spans для decorated functions и поддерживает nested calls.

from langfuse.decorators import observe, langfuse_context
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import time

llm = ChatOpenAI(model="gpt-4o", temperature=0)

@observe(name="web_search_tool")
def search_web(query: str) -> str:
    """Execute web search tool call."""
    langfuse_context.update_current_observation(
        input={"query": query},
        metadata={"tool": "web_search", "engine": "serpapi"},
    )

    # Simulated search execution
    result = f"Search results for: {query}\n[Result 1]\n[Result 2]"

    langfuse_context.update_current_observation(
        output={"result_length": len(result), "preview": result[:200]},
    )

    return result

@observe(name="llm_generation")
def call_llm(messages: list, model: str = "gpt-4o") -> str:
    """Make an LLM call и record as generation."""
    start = time.monotonic()
    response = llm.invoke(messages)
    duration = time.monotonic() - start

    usage = response.usage_metadata

    # Update observation с generation-specific data
    langfuse_context.update_current_observation(
        type="generation",  # Special type для LLM calls
        model=model,
        input=messages,
        output=response.content,
        usage={
            "input": usage["input_tokens"],
            "output": usage["output_tokens"],
            "total": usage["total_tokens"],
            "unit": "TOKENS",
        },
        metadata={
            "duration_seconds": round(duration, 3),
            "tokens_per_second": round(usage["output_tokens"] / duration, 1),
        },
    )

    return response.content

@observe(name="agent_run")
def run_agent(user_query: str, session_id: str = None) -> dict:
    """Main agent entry point — creates top-level trace."""

    # Tag trace с session info для grouping
    langfuse_context.update_current_trace(
        session_id=session_id,
        user_id="user_123",
        tags=["production", "v2.1"],
        metadata={"query_length": len(user_query)},
    )

    messages = [
        SystemMessage(content="You are a helpful research assistant."),
        HumanMessage(content=user_query),
    ]

    step = 0
    tool_calls = []

    while step < 10:
        step += 1

        # First LLM call (creates nested generation span)
        response = call_llm(messages, model="gpt-4o")
        messages.append({"role": "assistant", "content": response})

        # Check для tool call request
        if "SEARCH:" in response:
            query = response.split("SEARCH:")[1].strip().split("\n")[0]

            # Execute tool (creates nested span)
            search_result = search_web(query)
            tool_calls.append({"tool": "web_search", "query": query})

            messages.append({
                "role": "tool",
                "content": search_result,
                "tool_call_id": f"call_{step}",
            })
        else:
            # Final answer received
            break

    # Update trace с final output
    langfuse_context.update_current_trace(
        output={"response": response, "steps": step},
        metadata={"tool_calls": tool_calls},
    )

    return {"response": response, "steps": step, "tool_calls": tool_calls}

Создание spans и nested tracing

Для complex agents используйте manual API для full control над span hierarchy:

from langfuse import Langfuse
from datetime import datetime

langfuse = Langfuse()

def run_complex_agent(query: str, user_id: str):
    # Create top-level trace
    trace = langfuse.trace(
        name="complex_agent",
        user_id=user_id,
        input={"query": query},
        tags=["production"],
    )

    # Create planning span
    planning_span = trace.span(
        name="planning",
        input={"task": "analyze query and create plan"},
        start_time=datetime.utcnow(),
    )

    # Create generation в span
    planning_gen = planning_span.generation(
        name="planning_llm_call",
        model="gpt-4o",
        input=[{"role": "user", "content": f"Create a plan for: {query}"}],
        output="1. Search for X\n2. Analyze results\n3. Summarize",
        usage={"input": 120, "output": 45, "unit": "TOKENS"},
    )
    planning_gen.end()

    planning_span.end(
        output={"plan_steps": 3},
        end_time=datetime.utcnow(),
    )

    # Create execution span
    exec_span = trace.span(
        name="execution",
        input={"plan_steps": 3},
        start_time=datetime.utcnow(),
    )

    # ... execution logic

    exec_span.end(
        output={"results": "completed"},
        end_time=datetime.utcnow(),
    )

    # Finalize trace
    trace.update(
        output={"status": "success"},
    )

    langfuse.flush()

Score submission для quality tracking

Присоедините quality scores к трейсам для regression detection:

from langfuse import Langfuse

langfuse = Langfuse()

def submit_user_feedback(trace_id: str, thumbs_up: bool, comment: str = None):
    """Record user feedback как score."""
    langfuse.score(
        trace_id=trace_id,
        name="user_feedback",
        value=1.0 if thumbs_up else 0.0,
        data_type="BOOLEAN",
        comment=comment,
    )

def submit_llm_judge_score(trace_id: str, quality_score: float):
    """Record LLM-as-judge evaluation."""
    langfuse.score(
        trace_id=trace_id,
        name="llm_judge_quality",
        value=quality_score,  # 0.0 к 1.0
        data_type="NUMERIC",
        comment="GPT-4 evaluated response quality",
    )

def submit_heuristic_score(trace_id: str, passed: bool, rule_name: str):
    """Record heuristic check result."""
    langfuse.score(
        trace_id=trace_id,
        name=f"heuristic_{rule_name}",
        value=1.0 if passed else 0.0,
        data_type="BOOLEAN",
        comment=f"Heuristic rule: {rule_name}",
    )

OpenTelemetry для LLM-агентов

Installation и setup

pip install opentelemetry-api==1.39.1 opentelemetry-sdk==1.39.1 opentelemetry-exporter-otlp==1.39.1
pip install traceloop-sdk  # Для automatic OpenAI/Anthropic instrumentation
import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource

# Configure resource attributes
resource = Resource.create({
    "service.name": "llm-agent-service",
    "service.version": "2.1.0",
    "deployment.environment": "production",
})

provider = TracerProvider(resource=resource)

# Export в OTLP collector (Jaeger, Tempo, Honeycomb)
otlp_exporter = OTLPSpanExporter(
    endpoint=os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317"),
    headers={"Authorization": f"Bearer {os.environ.get('OTEL_AUTH_TOKEN', '')}"},
)
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))

trace.set_tracer_provider(provider)
tracer = trace.get_tracer("agent-service", "2.1.0")

gen_ai семантические соглашения

OpenTelemetry определяет standard span attributes для LLM calls под gen_ai.* namespace (semantic conventions v1.28.0):

from opentelemetry import trace
from opentelemetry.semconv.ai import SpanAttributes
import time

tracer = trace.get_tracer(__name__)

def instrument_llm_call(model: str, messages: list):
    """Instrument LLM call с gen_ai semantic conventions."""
    with tracer.start_as_current_span("llm.generate") as span:
        # Set request attributes
        span.set_attribute(SpanAttributes.LLM_SYSTEM, "openai")
        span.set_attribute(SpanAttributes.LLM_REQUEST_MODEL, model)
        span.set_attribute(SpanAttributes.LLM_REQUEST_TEMPERATURE, 0)
        span.set_attribute(SpanAttributes.LLM_REQUEST_MAX_TOKENS, 4096)

        # Record prompt content
        for i, msg in enumerate(messages):
            span.set_attribute(f"gen_ai.prompt.{i}.role", msg["role"])
            span.set_attribute(f"gen_ai.prompt.{i}.content", msg["content"])

        start = time.monotonic()

        # Execute LLM call
        response = llm.invoke(messages)

        duration = time.monotonic() - start
        usage = response.usage_metadata

        # Set response attributes
        span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, "gpt-4o-2024-05-13")
        span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, usage["input_tokens"])
        span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, usage["output_tokens"])
        span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, usage["total_tokens"])

        # Record completion content
        span.set_attribute("gen_ai.completion.0.content", response.content)

        # Custom metrics
        span.set_attribute("llm.duration_seconds", round(duration, 3))
        span.set_attribute("llm.cost_usd", calculate_cost(usage, model))

        span.set_status(trace.StatusCode.OK)

        return response

def calculate_cost(usage: dict, model: str) -> float:
    """Calculate cost в USD based на token usage."""
    pricing = {
        "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
        "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
    }

    if model not in pricing:
        return 0.0

    input_cost = usage["input_tokens"] * pricing[model]["input"]
    output_cost = usage["output_tokens"] * pricing[model]["output"]

    return round(input_cost + output_cost, 6)

OTLP экспорт в Grafana Tempo

Сконфигурируйте Tempo как OTLP backend для long-term trace storage:

# tempo-config.yaml
server:
  http_listen_port: 3200

distributor:
  receivers:
    otlp:
      protocols:
        grpc:
          endpoint: 0.0.0.0:4317
        http:
          endpoint: 0.0.0.0:4318

storage:
  trace:
    backend: s3
    s3:
      bucket: traces
      endpoint: s3.amazonaws.com
      access_key: ${S3_ACCESS_KEY}
      secret_key: ${S3_SECRET_KEY}
# docker-compose.yaml
version: '3.8'
services:
  tempo:
    image: grafana/tempo:latest
    command: ["-config.file=/etc/tempo.yaml"]
    volumes:
      - ./tempo-config.yaml:/etc/tempo.yaml
      - tempo_data:/var/tempo
    ports:
      - "3200:3200"  # Tempo HTTP
      - "4317:4317"  # OTLP gRPC
      - "4318:4318"  # OTLP HTTP

volumes:
  tempo_data:

Grafana dashboard для trace visualization

Создайте Grafana dashboard, который linkает Prometheus metrics к Tempo traces:

{
  "dashboard": {
    "title": "LLM Agent Observability",
    "panels": [
      {
        "title": "Agent Request Rate",
        "type": "timeseries",
        "targets": [
          {
            "expr": "rate(agent_requests_total[5m])",
            "legendFormat": "{{agent_name}} - {{status}}"
          }
        ]
      },
      {
        "title": "Trace ID Lookup",
        "type": "table",
        "targets": [
          {
            "expr": "agent_requests_total",
            "format": "table"
          }
        ],
        "transformations": [
          {
            "id": "organize",
            "options": {
              "renameByName": {
                "trace_id": "Trace ID"
              }
            }
          }
        ],
        "fieldConfig": {
          "overrides": [
            {
              "matcher": {"id": "byName", "options": "Trace ID"},
              "properties": [
                {
                  "id": "links",
                  "value": [
                    {
                      "title": "View in Tempo",
                      "url": "http://localhost:3200/trace/${__value.raw}"
                    }
                  ]
                }
              ]
            }
          ]
        }
      }
    ]
  }
}

Отслеживание стоимости и алертинг

Per-user budget tracking

Отслеживайте token usage и стоимость per user для enforcement quotas:

from prometheus_client import Counter, Gauge
from datetime import datetime, timedelta
from typing import Dict
import redis

# Prometheus metrics
user_cost_usd_total = Counter(
    "user_cost_usd_total",
    "Total cost per user в USD",
    ["user_id", "model"],
)

user_tokens_total = Counter(
    "user_tokens_total",
    "Total tokens per user",
    ["user_id", "direction"],  # input, output
)

# Redis для real-time budget tracking
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)

class UserBudgetTracker:
    """Track и enforce per-user spending limits."""

    def __init__(self, daily_limit_usd: float = 10.0):
        self.daily_limit_usd = daily_limit_usd

    def get_user_spend_today(self, user_id: str) -> float:
        """Get user's spending для current day."""
        today = datetime.utcnow().strftime("%Y-%m-%d")
        key = f"user_spend:{user_id}:{today}"
        spend = redis_client.get(key)
        return float(spend) if spend else 0.0

    def record_spend(self, user_id: str, cost_usd: float, model: str) -> None:
        """Record spending и update metrics."""
        today = datetime.utcnow().strftime("%Y-%m-%d")
        key = f"user_spend:{user_id}:{today}"

        # Increment Redis counter с 24h expiry
        redis_client.incrbyfloat(key, cost_usd)
        redis_client.expire(key, 86400)

        # Update Prometheus metric
        user_cost_usd_total.labels(user_id=user_id, model=model).inc(cost_usd)

    def check_budget(self, user_id: str) -> tuple[bool, float]:
        """Check если user в budget."""
        current_spend = self.get_user_spend_today(user_id)
        within_budget = current_spend < self.daily_limit_usd
        remaining = self.daily_limit_usd - current_spend

        return within_budget, max(0, remaining)

    def enforce_budget(self, user_id: str):
        """Decorator для enforce budget перед function execution."""
        def decorator(func):
            def wrapper(*args, **kwargs):
                within_budget, remaining = self.check_budget(user_id)

                if not within_budget:
                    raise RuntimeError(
                        f"User {user_id} exceeded daily budget "
                        f"(${self.daily_limit_usd}). Current spend: "
                        f"${self.get_user_spend_today(user_id):.2f}"
                    )

                return func(*args, **kwargs)

            return wrapper
        return decorator

budget_tracker = UserBudgetTracker(daily_limit_usd=50.0)

@observe(name="agent_run")
def run_agent_with_budget(user_id: str, query: str):
    """Run agent с budget enforcement."""

    # Check budget перед execution
    within_budget, remaining = budget_tracker.check_budget(user_id)
    if not within_budget:
        raise RuntimeError(f"Budget exceeded для user {user_id}")

    # Execute agent
    result = run_agent(query)

    # Calculate и record cost
    total_cost = sum(
        calculate_cost(gen["usage"], gen["model"])
        for gen in result.get("generations", [])
    )

    budget_tracker.record_spend(user_id, total_cost, model="gpt-4o")

    # Warn если approaching limit
    if remaining < 5.0:
        import logging
        logging.warning(
            f"User {user_id} approaching budget limit: ${remaining:.2f} remaining"
        )

    return result

Детектирование аномалий стоимости

Обнаруживайте unusual spending patterns используя statistical methods:

from prometheus_client import Gauge
from datetime import datetime, timedelta
from typing import List
import statistics

cost_anomaly_detected = Gauge(
    "cost_anomaly_detected",
    "Binary indicator cost anomaly (1 = anomaly, 0 = normal)",
    ["time_window"],
)

class CostAnomalyDetector:
    """Detect anomalous spending patterns."""

    def __init__(self, threshold_std_devs: float = 3.0):
        self.threshold_std_devs = threshold_std_devs

    def get_hourly_costs(self, hours: int = 168) -> List[float]:
        """Query Prometheus для hourly cost над N hours (default 7 days)."""
        # This would use Prometheus API в production
        # Simplified для example
        return [1.2, 1.5, 1.3, 1.4, 25.6, 1.2, 1.1]  # Spike в index 4

    def detect_anomaly(self, current_hour_cost: float) -> bool:
        """Detect если current hour cost anomalous."""
        historical_costs = self.get_hourly_costs(hours=168)

        if len(historical_costs) < 24:
            return False  # Not enough data

        mean = statistics.mean(historical_costs)
        std_dev = statistics.stdev(historical_costs)

        # Z-score method
        z_score = (current_hour_cost - mean) / std_dev if std_dev > 0 else 0

        is_anomaly = abs(z_score) > self.threshold_std_devs

        if is_anomaly:
            cost_anomaly_detected.labels(time_window="1h").set(1)
        else:
            cost_anomaly_detected.labels(time_window="1h").set(0)

        return is_anomaly

detector = CostAnomalyDetector(threshold_std_devs=3.0)

def monitor_cost_anomalies():
    """Periodic task для check для cost anomalies."""
    current_hour_cost = get_current_hour_cost()  # Query Prometheus

    if detector.detect_anomaly(current_hour_cost):
        send_alert(
            severity="warning",
            message=f"Cost anomaly detected: ${current_hour_cost:.2f} this hour "
                   f"(historical mean: ${statistics.mean(detector.get_hourly_costs()):.2f})"
        )

Slack alerts для budget events

Отправляйте real-time notifications когда budget thresholds превышены:

import requests
import os

SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL")

def send_slack_alert(message: str, severity: str = "warning"):
    """Send alert в Slack channel."""
    color_map = {
        "info": "#36a64f",
        "warning": "#ff9900",
        "critical": "#ff0000",
    }

    payload = {
        "attachments": [
            {
                "color": color_map.get(severity, "#999999"),
                "title": f"LLM Agent Alert - {severity.upper()}",
                "text": message,
                "footer": "Agent Observability System",
                "ts": int(time.time()),
            }
        ]
    }

    if SLACK_WEBHOOK_URL:
        requests.post(SLACK_WEBHOOK_URL, json=payload)

def alert_on_budget_exceeded(user_id: str, spend: float, limit: float):
    """Alert когда user превышает budget."""
    send_slack_alert(
        message=f"User `{user_id}` exceeded daily budget\n"
               f"Spend: ${spend:.2f} / Limit: ${limit:.2f}",
        severity="warning",
    )

def alert_on_cost_anomaly(current_cost: float, mean_cost: float, std_dev: float):
    """Alert когда cost anomaly detected."""
    z_score = (current_cost - mean_cost) / std_dev
    send_slack_alert(
        message=f"Cost anomaly detected this hour\n"
               f"Current: ${current_cost:.2f}\n"
               f"Expected: ${mean_cost:.2f} ± ${std_dev:.2f}\n"
               f"Z-score: {z_score:.2f}",
        severity="critical" if z_score > 5 else "warning",
    )

Производительность и бенчмарки

Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.

Langfuse trace ingestion throughput

Langfuse SDK с threads=4 и flush_at=20 конфигурацией может обрабатывать trace submission без блокирования application threads. Batch uploads снижают network overhead по сравнению с synchronous per-trace submission.

Ожидаемое поведение: Трейсы queued в memory и flushed в background thread каждые 10 секунд или когда accumulate 20 observations, в зависимости от того, что наступит первым.

OpenTelemetry span overhead

OTLP export через gRPC вводит minimal overhead к span creation. Spans buffered в BatchSpanProcessor и exported в batches 512 spans или каждые 5 секунд.

Ожидаемое поведение: Span creation добавляет sub-millisecond overhead per operation. Export batching предотвращает per-span network calls.

Prometheus metrics collection cost

Histogram observations включают bucket lookups и atomic counter increments. Histograms с 10 buckets имеют примерно linear overhead относительно bucket count.

Ожидаемое поведение: Recording histogram observation берёт single-digit microseconds. Scrape endpoint generation (/metrics) масштабируется с cardinality label combinations.

Distributed trace latency

Trace context propagation через W3C Trace Context добавляет один HTTP header (traceparent) к исходящим requests. Header parsing и span context extraction на receiving side является negligible.

Ожидаемое поведение: Trace propagation overhead доминируется HTTP serialization, не trace context handling.

Agent run latency breakdown

Для типичного multi-step agent run с 3 LLM calls и 2 tool executions:

  • LLM calls: Majority latency (p50: 2-5 секунд per call, p95: 8-15 секунд)
  • Tool execution: Зависит от tool type (database query: 50-200ms, web search: 500-2000ms, file I/O: 10-100ms)
  • Agent orchestration logic: Negligible (sub-10ms per step)
  • Observability overhead: Sub-1% total runtime (span creation, metric recording)

Ожидаемое поведение: LLM API latency доминирует agent runtime. Оптимизации должны сосредоточиться на prompt efficiency, streaming responses и parallel tool execution.

Cost per trace

Token consumption drives cost больше, чем infrastructure:

  • GPT-4o agent run (3 turns, 4K input tokens, 1K output tokens): $0.02-0.04 per trace
  • GPT-4o-mini agent run (same workload): $0.001-0.002 per trace
  • Langfuse cloud storage: $0.0001 per trace (amortized через Team tier)
  • OpenTelemetry/Tempo storage: Negligible (S3 storage costs < $0.00001 per trace)

Ожидаемое поведение: Model inference cost является 100-1000x выше, чем observability infrastructure cost.

Trace retention и storage

Langfuse cloud Team tier включает 500K observations/month с 1-year retention. Self-hosted Postgres storage grows примерно 5-10KB per trace в зависимости от prompt/completion length.

Ожидаемое поведение: 1M traces/month требует 5-10GB Postgres storage. Archive old traces на S3 для cost efficiency.

Alert latency

Prometheus scrapes metrics каждые 15-30 секунд. AlertManager evaluates rules на scrape interval и fires alerts после for duration threshold.

Ожидаемое поведение: Budget exceeded alert fires в течение 1-2 минут threshold breach. Cost anomaly alert fires в течение 5-10 минут (depends на rule for: 5m конфигурацию).