title: "Производительность агентов: параллельное выполнение, маршрутизация моделей и сжатие контекста" slug: agents-performance-parallel-routing-2026-ru date: 2026-02-24 lang: ru
Производительность агентов: параллельное выполнение, маршрутизация моделей и сжатие контекста
Ключевые факты
- Версии пакетов (PyPI, по состоянию на март 2026):
langchain==1.2.10,langgraph==1.0.10,httpx==0.28.1,anyio==4.12.1,asyncio(стандартная библиотека Python, версия следует версии Python) - asyncio.gather() vs asyncio.wait():
gather()немедленно выбрасывает первое исключение, останавливается на ошибке;wait()возвращает наборы (done, pending), позволяя проверить частичный успех - asyncio.gather() с return_exceptions=True: собирает все результаты, включая исключения как значения; обеспечивает изоляцию ошибок при параллельном выполнении инструментов
- LangGraph Send() API: отправляет несколько копий следующего узла с разными входами; обеспечивает fan-out параллелизм (один узел → многие параллельные выполнения → fan-in агрегирование)
- Вариант использования LangGraph Send(): параллельная обработка документов (один узел "analyze" получает Send() для каждого фрагмента документа, все выполняются параллельно, результаты объединяются downstream)
- Параметр httpx AsyncClient limits:
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)управляет размером пула соединений; по умолчаниюmax_connections=100 - Timeout httpx AsyncClient:
timeout=httpx.Timeout(10.0)устанавливает timeout чтения/записи/подключения; по умолчанию 5 секунд - Формула закона Амдала: Speedup = 1 / ((1 - P) + P/N), где P = доля параллелизируемых операций, N = количество процессоров; если P=0.9 (90% параллель), максимальное ускорение с бесконечными ядрами составляет 10x
- Потолок закона Амдала для агентов: рассуждение LLM — последовательное (30-50% времени агента); вызовы инструментов — параллелизируемые (50-70%); теоретический максимум ускорения ~2-3x даже при бесконечном параллелизме
- Пропускная способность токенов в секунду (GPT-4o): OpenAI заявляет ~60-100 токен/сек при генерации вывода GPT-4o (варьируется в зависимости от нагрузки)
- Пропускная способность токенов в секунду (Claude 3.5 Sonnet): Anthropic заявляет ~50-80 токен/сек для Claude 3.5 Sonnet (варьируется в зависимости от нагрузки)
- Пропускная способность токенов в секунду (Gemini 1.5 Pro): Google заявляет ~80-120 токен/сек для Gemini 1.5 Pro (варьируется в зависимости от нагрузки)
- Потоковая передача vs буферизованный ответ: потоковая передача доставляет первый токен за <1 секунду, буферизованный ответ ждёт полного завершения (добавляет 5-15 секунд для ответа из 500 токенов)
- Масштабирование стоимости размера контекста: контекст из 128K обходится в 128 раз дороже по входным токенам, чем контекст из 1K; сжатие контекста при 75-80% ёмкости пропорционально снижает стоимость
- Паттерн Semaphore для ограничения частоты:
asyncio.Semaphore(5)позволяет максимум 5 одновременных операций; предотвращает перегрузку внешних API параллельными вызовами инструментов - Влияние маршрутизации моделей на стоимость: использование GPT-4o-mini ($0.15/1M входных) для простых задач vs GPT-4o ($2.50/1M входных) экономит ~94% на этих вызовах; маршрутизация 60% вызовов на mini модель = ~56% общее снижение стоимости
- Overhead холодного старта для Docker sandbox: запуск контейнера добавляет 200-500ms latency; pooling контейнеров снижает это до <50ms для прогретых контейнеров
- Параметр LangGraph recursion_limit: по умолчанию 25; устанавливает максимум итераций графика перед принудительным завершением; предотвращает бесконечные циклы в рассуждении агента
- Размер батча для embeddings: OpenAI embedding API принимает до 2048 входов на батч; батчирование 100 документов vs 1-за-раз снижает API вызовы в 100 раз
- Компромисс Cache TTL: короткий TTL (5 мин) обеспечивает свежесть результатов, но низкий hit rate; длинный TTL (1 час) увеличивает hit rate, но рискует устаревшими данными; production агенты обычно используют 15-30 мин
Узкие места производительности в агентах
Основные узкие места производительности в LLM агентах:
Latency LLM доминирует в общем времени выполнения: один вызов LLM занимает 2-10 секунд в зависимости от длины вывода и модели. Агенты, выполняющие 5-10 вызовов LLM на задачу, большую часть времени ждут ответов модели. Это самое крупное узкое место и самое сложное для устранения (присуще автрегрессивной генерации).
Последовательное выполнение инструментов: реализации агентов по умолчанию вызывают инструменты последовательно. Если три инструмента каждый занимают 5 секунд, последовательное выполнение занимает 15 секунд. Параллельное выполнение занимает 5 секунд (длительность самого медленного инструмента). Это самое простое узкое место для исправления с помощью asyncio.
Overhead сериализации состояния: LangGraph контрольные точки состояния после каждого узла. Сериализация больших историй сообщений (100+ сообщений с длинными результатами инструментов) добавляет 50-200ms на контрольную точку. Этот overhead обычно мал по сравнению с latency LLM, но становится заметным в сценариях высокой пропускной способности.
Latency холодного старта: первый запрос к агенту вызывает загрузку модели, инициализацию пула соединений и запуск контейнера (если в sandbox). Это добавляет 500ms до 2 секунд. Смягчается путём сохранения агентов в прогретом состоянии или использования pooling контейнеров.
Переполнение окна контекста: когда контекст превышает пределы модели, агенты выходят из строя или требуют дорогостоящего сжатия. Проактивное сжатие при 75-80% ёмкости предотвращает сбои и снижает стоимость входных токенов.
Отсутствие кэширования: повторные идентичные или похожие запросы повторно выполняют полный цикл агента. Кэширование результатов для частых запросов может снизить latency на несколько порядков величины (с секунд до миллисекунд).
Сетевой I/O для внешних инструментов: инструменты, вызывающие внешние API (веб-поиск, запросы к базе данных, хранилище файлов), привязаны к I/O. Однопоточное выполнение тратит время впустую на ожидание. Асинхронный I/O и параллельное выполнение восстанавливают это время.
Фреймворк принятия решений
Когда использовать последовательное vs параллельное vs батч-выполнение:
| Сценарий | Паттерн выполнения | Обоснование |
|---|---|---|
| Независимые вызовы инструментов (fetch 3 URLs) | Параллель (asyncio.gather) | Нет зависимостей данных; максимальное ускорение = N инструментов |
| Вызовы инструментов с зависимостями (fetch URL, потом parse результат) | Последовательно | Второй вызов зависит от первого результата; невозможно параллелизировать |
| Много похожих операций (embed 100 docs) | Батч (batch API) | Снизить API overhead; один запрос для N элементов |
| Задачи простой классификации | Дешёвая модель (GPT-4o-mini) | Рассуждение не нужно; сэкономить 90%+ на стоимости |
| Задачи сложного рассуждения | Дорогая модель (GPT-4o, o3-mini) | Качество критично; стоимость оправдана |
| Потоковые пользовательские ответы | Streaming LLM | Первый токен за <1s; снижение воспринимаемой latency |
| Background батч-обработка | Buffered LLM | Нет ожидания пользователя; более простой код |
| Высокое разнообразие запросов | No caching | Низкий hit rate; overhead кэширования не стоит |
| Повторяющиеся частые запросы | Exact + semantic cache | Высокий hit rate; снижение latency на 90%+ |
| Context < 50% окна | No compression | Overhead не нужен |
| Context > 75% окна | Compression (truncate or summarize) | Предотвратить переполнение; снизить стоимость входных токенов |
Когда параллелизм вредит:
- Крошечные задачи: overhead параллелизма превышает длительность задачи (напр., параллелизация 3 операций по 10ms каждая)
- API с ограничением частоты: параллельные запросы попадают в лимиты, вызывая сбои; использовать semaphore для дросселирования
- Операции привязанные к памяти: параллельные embeddings 1000 документов могут исчерпать память; использовать батчирование
- Последовательные зависимости: параллелизация зависимых операций вызывает ошибки или потраченную работу
Когда параллелизм помогает:
- Инструменты привязанные к I/O: сетевые вызовы, чтение файлов, запросы к БД все выигрывают от асинхронного параллелизма
- Независимые операции: нет общего состояния или зависимостей; идеальный параллелизм
- High task latency: если каждая задача занимает >1 секунду, overhead параллелизма negligible
Таблица ссылок параметров
| Параметр | Значение | Заметки |
|---|---|---|
max_concurrency (asyncio.Semaphore) |
5 |
Параллельные вызовы инструментов; предотвращает перегрузку API |
recursion_limit (LangGraph) |
25 |
Максимум итераций графика; предотвращает бесконечные циклы |
max_context_tokens |
100000 |
Бюджет окна контекста (зарезервировать для вывода) |
reserve_for_output |
4000 |
Токены зарезервированные для ответа LLM |
compression_threshold |
0.75 to 0.8 |
Сжать когда контекст достигает 75-80% максимума |
cache_ttl |
1800 секунд (30 мин) |
Срок истечения кэша для результатов агента |
streaming_chunk_size |
512 байт |
Размер SSE chunk для потоковых ответов |
batch_size (embeddings) |
100 |
Документы на батч для embedding API |
httpx.Limits.max_connections |
100 |
Максимум одновременных HTTP соединений |
httpx.Limits.max_keepalive_connections |
20 |
Размер keep-alive пула |
httpx.Timeout |
10.0 секунд |
Read/write/connect timeout для HTTP |
tool_result_max_tokens |
2000 |
Truncate результаты инструментов до этой длины |
keep_recent_messages |
20 |
Сообщения для сохранения при сжатии |
similarity_threshold (semantic cache) |
0.95 |
Cosine similarity для cache hit |
max_cached_result_size |
50000 байт |
Не кэшировать результаты больше этого |
Частые ошибки
Ошибка 1: Последовательное выполнение инструментов
Симптом: агент занимает 30 секунд для вызова трёх инструментов, каждый из которых занимает 10 секунд.
Влияние: 3x замедление vs параллельное выполнение; плохой пользовательский опыт; потраченные ресурсы.
❌ Уязвимо:
# Последовательное выполнение - ждёт каждого инструмента перед следующим
async def execute_tools_sequential(tool_calls: list[dict]) -> list:
results = []
for call in tool_calls:
result = await fetch_url(call["url"]) # Blocks for 10s
results.append(result)
# Total time: 10s + 10s + 10s = 30s
return results
✅ Правильно:
# Параллельное выполнение - все инструменты работают одновременно
async def execute_tools_parallel(tool_calls: list[dict]) -> list:
tasks = [fetch_url(call["url"]) for call in tool_calls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Total time: max(10s, 10s, 10s) = 10s (3x faster)
return results
Ошибка 2: Использование дорогой модели для простых задач
Симптом: каждый шаг агента использует GPT-4o, даже для "Содержит ли эта строка сообщение об ошибке?"
Влияние: 10-20x увеличение стоимости для простых задач классификации/извлечения; медленнее latency.
❌ Уязвимо:
# Always use expensive model
async def classify_sentiment(text: str) -> str:
response = await expensive_model.ainvoke([
{"role": "user", "content": f"Is this positive or negative? {text}"}
])
# Cost: $2.50 per 1M tokens
return response.content
✅ Правильно:
# Route to cheap model for simple tasks
async def classify_sentiment(text: str) -> str:
response = await cheap_model.ainvoke([
{"role": "user", "content": f"Is this positive or negative? {text}"}
])
# Cost: $0.15 per 1M tokens (94% savings)
return response.content
Ошибка 3: Нет изоляции ошибок при параллельном выполнении
Симптом: один неудачный вызов инструмента крушит весь параллельный батч; результаты не возвращаются.
Влияние: режим отказа "всё или ничего"; потраченная работа для успешных инструментов.
❌ Уязвимо:
# gather() without return_exceptions - first error crashes all
async def execute_tools(tool_calls: list[dict]) -> list:
tasks = [call_tool(tc) for tc in tool_calls]
results = await asyncio.gather(*tasks)
# If any tool raises exception, entire gather() fails
return results
✅ Правильно:
# gather() with return_exceptions - collect all results and errors
async def execute_tools(tool_calls: list[dict]) -> list:
tasks = [call_tool(tc) for tc in tool_calls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results, log errors, continue with successes
processed = []
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Tool {i} failed: {result}")
processed.append({"error": str(result)})
else:
processed.append(result)
return processed
Ошибка 4: Неограниченный рост контекста
Симптом: контекст агента растёт до 150K токенов, потом падает с ошибкой "context length exceeded".
Влияние: агент крушится после длинного диалога; высокие стоимости входных токенов перед сбоем.
❌ Уязвимо:
# No context management - messages accumulate forever
async def agent_loop(state: dict) -> dict:
messages = state["messages"]
# messages grows: 1K → 10K → 50K → 150K → CRASH
response = await llm.ainvoke(messages)
return {"messages": messages + [response]}
✅ Правильно:
# Compress context before it overflows
async def agent_loop(state: dict) -> dict:
messages = state["messages"]
token_count = count_tokens(messages)
if token_count > 80000: # 80% of 100K limit
messages = compress_messages(messages, keep_recent=20)
response = await llm.ainvoke(messages)
return {"messages": messages + [response]}
Ошибка 5: Нет ограничения частоты для параллельных вызовов API
Симптом: агент запускает 50 параллельных HTTP запросов, все падают с ошибками 429 rate limit.
Влияние: каскадные сбои; потраченная квота API; нет результатов несмотря на работу.
❌ Уязвимо:
# No rate limiting - overwhelms API
async def fetch_all_urls(urls: list[str]) -> list:
tasks = [httpx.AsyncClient().get(url) for url in urls]
# 50 parallel requests → API rate limit → all fail
return await asyncio.gather(*tasks)
✅ Правильно:
# Semaphore limits concurrency
async def fetch_all_urls(urls: list[str]) -> list:
semaphore = asyncio.Semaphore(5) # Max 5 concurrent
async def fetch_one(url: str):
async with semaphore:
async with httpx.AsyncClient() as client:
return await client.get(url)
tasks = [fetch_one(url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
Параллельное выполнение инструментов
Независимые вызовы инструментов всегда должны выполняться concurrently. Основная техника — asyncio.gather() с изоляцией ошибок.
import asyncio
from typing import Any, Callable
from langchain_core.messages import ToolMessage, AIMessage
async def execute_tools_parallel(
ai_message: AIMessage,
tools_registry: dict[str, Callable],
max_concurrency: int = 5,
) -> list[ToolMessage]:
"""
Выполнить все вызовы инструментов из AI сообщения параллельно.
Использует semaphore для соблюдения max_concurrency и предотвращения перегрузки сервисов.
"""
tool_calls = ai_message.tool_calls
if not tool_calls:
return []
semaphore = asyncio.Semaphore(max_concurrency)
async def execute_one(tool_call: dict) -> ToolMessage:
async with semaphore:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
tool_id = tool_call["id"]
tool_func = tools_registry.get(tool_name)
if not tool_func:
return ToolMessage(
content=f"Error: tool '{tool_name}' not found",
tool_call_id=tool_id,
)
try:
# Handle both async and sync tools
if asyncio.iscoroutinefunction(tool_func):
result = await tool_func(**tool_args)
else:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, lambda: tool_func(**tool_args))
return ToolMessage(
content=str(result),
tool_call_id=tool_id,
)
except Exception as e:
# Error isolation - one failed tool doesn't crash others
return ToolMessage(
content=f"Error in {tool_name}: {type(e).__name__}: {e}",
tool_call_id=tool_id,
)
# All tool calls run concurrently, bounded by semaphore
# return_exceptions=True ensures partial success
results = await asyncio.gather(
*[execute_one(tc) for tc in tool_calls],
return_exceptions=True
)
# Filter out any exceptions that escaped (shouldn't happen with try/except above)
return [r for r in results if isinstance(r, ToolMessage)]
# Integration with LangGraph
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class ParallelToolState(TypedDict):
messages: Annotated[list, operator.add]
async def parallel_tool_node(state: ParallelToolState) -> dict:
"""LangGraph node that executes tools in parallel."""
last_message = state["messages"][-1]
if not hasattr(last_message, "tool_calls") or not last_message.tool_calls:
return {}
results = await execute_tools_parallel(
last_message,
tools_registry=get_tools_registry(),
max_concurrency=5,
)
return {"messages": results}
def get_tools_registry() -> dict[str, Callable]:
"""Return registry of available tools."""
return {
"fetch_url": fetch_url_tool,
"search_web": search_web_tool,
"query_database": query_database_tool,
}
Паттерны обработки ошибок
# Pattern 1: Collect all results, handle errors downstream
async def pattern_collect_all(tool_calls: list[dict]) -> list:
results = await asyncio.gather(
*[call_tool(tc) for tc in tool_calls],
return_exceptions=True
)
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
return {"successes": successes, "failures": failures}
# Pattern 2: Retry failed tools once
async def pattern_retry_once(tool_calls: list[dict]) -> list:
results = await asyncio.gather(
*[call_tool(tc) for tc in tool_calls],
return_exceptions=True
)
# Identify failures
retry_calls = [
tool_calls[i] for i, r in enumerate(results)
if isinstance(r, Exception)
]
if retry_calls:
retry_results = await asyncio.gather(
*[call_tool(tc) for tc in retry_calls],
return_exceptions=True
)
# Merge retry results back
retry_idx = 0
for i, r in enumerate(results):
if isinstance(r, Exception):
results[i] = retry_results[retry_idx]
retry_idx += 1
return results
LangGraph Fan-Out / Fan-In
LangGraph Send() API обеспечивает параллельное выполнение подграфика: отправить несколько копий узла с разными входами, потом агрегировать результаты.
from langgraph.graph import StateGraph, END, START
from langgraph.types import Send
from typing import TypedDict, Annotated, Literal
import operator
class FanOutState(TypedDict):
documents: list[str]
analyses: Annotated[list[dict], operator.add]
final_summary: str
# Individual document analysis (runs in parallel)
async def analyze_document(state: dict) -> dict:
"""Analyze a single document. Many copies of this run in parallel."""
doc_text = state["document"]
doc_id = state["doc_id"]
# Expensive LLM call per document
analysis = await llm.ainvoke([
{"role": "user", "content": f"Analyze this document:\n{doc_text}"}
])
return {
"analyses": [{
"doc_id": doc_id,
"analysis": analysis.content,
}]
}
# Fan-out router: dispatch one analyze_document per document
def fan_out_router(state: FanOutState) -> list[Send]:
"""Create parallel Send() for each document."""
return [
Send("analyze_document", {
"document": doc,
"doc_id": i,
})
for i, doc in enumerate(state["documents"])
]
# Fan-in aggregator: collect all analyses
async def aggregate_analyses(state: FanOutState) -> dict:
"""Aggregate parallel results into final summary."""
all_analyses = state["analyses"]
# Synthesize with cheap model
combined = "\n\n".join([a["analysis"] for a in all_analyses])
summary = await cheap_llm.ainvoke([
{"role": "user", "content": f"Summarize these analyses:\n{combined}"}
])
return {"final_summary": summary.content}
# Build graph
graph = StateGraph(FanOutState)
graph.add_node("fan_out", fan_out_router)
graph.add_node("analyze_document", analyze_document)
graph.add_node("aggregate", aggregate_analyses)
graph.add_edge(START, "fan_out")
# fan_out returns list[Send("analyze_document", ...)]
# LangGraph automatically runs all in parallel
graph.add_edge("analyze_document", "aggregate")
graph.add_edge("aggregate", END)
agent = graph.compile()
# Usage
result = await agent.ainvoke({
"documents": ["doc1 text...", "doc2 text...", "doc3 text..."],
"analyses": [],
})
# All 3 analyze_document nodes run in parallel
# aggregate waits for all to complete
Conditional Fan-Out
# Fan-out only for large document sets
def conditional_fan_out(state: FanOutState) -> Literal["fan_out", "direct"]:
if len(state["documents"]) > 3:
return "fan_out" # Parallel processing
else:
return "direct" # Sequential processing (not worth overhead)
graph.add_conditional_edges(
START,
conditional_fan_out,
{
"fan_out": "fan_out_node",
"direct": "sequential_node",
}
)
Маршрутизация моделей по сложности задачи
Не каждый шаг агента требует самую мощную модель. Маршрутизировать простые задачи на дешёвые модели, сложные задачи на дорогие модели.
from enum import Enum
from dataclasses import dataclass
from typing import Optional
from langchain_core.language_models import BaseChatModel
from langchain_openai import ChatOpenAI
class TaskComplexity(Enum):
SIMPLE = "simple" # Classification, extraction, yes/no
MODERATE = "moderate" # Summarization, translation, analysis
COMPLEX = "complex" # Multi-step reasoning, synthesis, code
CRITICAL = "critical" # High-stakes decisions
@dataclass
class ModelConfig:
model_id: str
provider: str
cost_per_1m_input: float
cost_per_1m_output: float
MODEL_REGISTRY = {
"fast": ModelConfig(
model_id="gpt-4o-mini",
provider="openai",
cost_per_1m_input=0.15,
cost_per_1m_output=0.60,
),
"balanced": ModelConfig(
model_id="gpt-4o",
provider="openai",
cost_per_1m_input=2.50,
cost_per_1m_output=10.00,
),
"reasoning": ModelConfig(
model_id="o3-mini",
provider="openai",
cost_per_1m_input=1.10,
cost_per_1m_output=4.40,
),
}
TASK_TO_MODEL = {
TaskComplexity.SIMPLE: "fast",
TaskComplexity.MODERATE: "balanced",
TaskComplexity.COMPLEX: "balanced",
TaskComplexity.CRITICAL: "reasoning",
}
class ModelRouter:
"""Routes LLM calls to appropriate models based on task complexity."""
def __init__(self):
self._models: dict[str, BaseChatModel] = {}
self._call_counts: dict[str, int] = {}
def _get_model(self, complexity: TaskComplexity) -> BaseChatModel:
model_key = TASK_TO_MODEL[complexity]
if model_key not in self._models:
config = MODEL_REGISTRY[model_key]
self._models[model_key] = ChatOpenAI(
model=config.model_id,
temperature=0,
)
return self._models[model_key]
def classify_task(self, prompt: str) -> TaskComplexity:
"""Heuristically classify task complexity from prompt."""
prompt_lower = prompt.lower()
# SIMPLE indicators
simple_patterns = [
"is this", "does this contain", "true or false",
"yes or no", "extract the", "classify as",
]
if any(p in prompt_lower for p in simple_patterns) and len(prompt) < 200:
return TaskComplexity.SIMPLE
# CRITICAL indicators
critical_patterns = [
"legal advice", "medical advice", "financial advice",
"safety critical", "compliance review",
]
if any(p in prompt_lower for p in critical_patterns):
return TaskComplexity.CRITICAL
# COMPLEX indicators
complex_patterns = [
"synthesize", "analyze all", "write code",
"step by step", "comprehensive analysis",
]
if any(p in prompt_lower for p in complex_patterns):
return TaskComplexity.COMPLEX
return TaskComplexity.MODERATE
async def invoke(
self,
messages: list,
complexity: Optional[TaskComplexity] = None,
**kwargs
):
"""Invoke LLM with automatic model routing."""
if complexity is None:
last_content = str(messages[-1].content if hasattr(messages[-1], "content") else "")
complexity = self.classify_task(last_content)
model = self._get_model(complexity)
model_key = TASK_TO_MODEL[complexity]
self._call_counts[model_key] = self._call_counts.get(model_key, 0) + 1
return await model.ainvoke(messages, **kwargs)
def routing_stats(self) -> dict:
"""Return routing statistics for cost analysis."""
total = sum(self._call_counts.values())
return {
"total_calls": total,
"by_model": {
model: {
"calls": count,
"percentage": round(100 * count / max(total, 1), 1),
}
for model, count in self._call_counts.items()
}
}
# Usage
router = ModelRouter()
# Simple task → routed to gpt-4o-mini
response1 = await router.invoke([
{"role": "user", "content": "Is this sentiment positive or negative? I love this!"}
])
# Complex task → routed to gpt-4o
response2 = await router.invoke([
{"role": "user", "content": "Write a Python function that implements binary search with detailed comments"}
])
# Check routing stats
print(router.routing_stats())
# {"total_calls": 2, "by_model": {"fast": {"calls": 1, "percentage": 50.0}, "balanced": {"calls": 1, "percentage": 50.0}}}
Потоковая передача и прогрессивный ответ
Потоковая передача доставляет первый токен менее чем за 1 секунду vs 5-15 секунд для буферизованных ответов. Критична для пользовательских агентов.
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
# Streaming LLM
async def stream_response(prompt: str):
"""Stream LLM response token by token."""
llm = ChatOpenAI(model="gpt-4o", streaming=True)
async for chunk in llm.astream([HumanMessage(content=prompt)]):
if chunk.content:
yield chunk.content
# Usage with SSE (Server-Sent Events)
from starlette.responses import StreamingResponse
async def agent_endpoint(request):
prompt = await request.json()["prompt"]
async def event_stream():
async for token in stream_response(prompt):
yield f"data: {token}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream"
)
Потоковая передача с вызовами инструментов
async def stream_with_tools(prompt: str):
"""Stream response including tool call indicators."""
llm = ChatOpenAI(model="gpt-4o", streaming=True)
accumulated_tool_calls = []
async for chunk in llm.astream([HumanMessage(content=prompt)]):
# Stream text content
if chunk.content:
yield {"type": "text", "content": chunk.content}
# Accumulate tool calls (sent at end)
if hasattr(chunk, "tool_calls") and chunk.tool_calls:
accumulated_tool_calls.extend(chunk.tool_calls)
# Execute tool calls (parallel)
if accumulated_tool_calls:
yield {"type": "tool_calls", "calls": accumulated_tool_calls}
results = await execute_tools_parallel(
accumulated_tool_calls,
tools_registry=get_tools_registry()
)
for result in results:
yield {"type": "tool_result", "result": result}
Обработка обратного давления
import asyncio
async def stream_with_backpressure(prompt: str, queue_size: int = 10):
"""Stream with backpressure to prevent overwhelming consumer."""
queue = asyncio.Queue(maxsize=queue_size)
async def producer():
async for token in stream_response(prompt):
await queue.put(token) # Blocks if queue full
await queue.put(None) # Sentinel
async def consumer():
while True:
token = await queue.get()
if token is None:
break
yield token
# Start producer in background
producer_task = asyncio.create_task(producer())
# Yield from consumer
async for token in consumer():
yield token
await producer_task
Производительность и бенчмарки
Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.
Ускорение параллельного выполнения инструментов
Для N независимых инструментов, каждый занимающий T секунд:
- Последовательное выполнение: общее время = N × T
- Параллельное выполнение: общее время ≈ T (длительность самого медленного инструмента)
- Теоретическое ускорение: N× (линейное с количеством инструментов)
- Практическое ускорение: немного меньше из-за overhead (asyncio scheduling, semaphore contention)
Пример с 5 инструментами, каждый занимающий 6 секунд:
- Последовательное: 30 секунд
- Параллельное (без лимита): ~6 секунд (5× ускорение)
- Параллельное (semaphore=3): ~12 секунд (3× ускорение, соблюдает лимиты)
Снижение стоимости маршрутизации моделей
Предполагая агент делает 10 вызовов LLM на задачу:
- Базовое (все GPT-4o): 10 вызовов × $2.50/1M входных = $25.00 за миллион входных токенов
- Маршрутизирование (6 простых → mini, 4 сложных → GPT-4o): (6 × $0.15) + (4 × $2.50) = $10.90 за миллион входных токенов
- Сбережения: ~56% снижение стоимости
Влияние варьируется по микс задач. Агенты с много шагов классификации/извлечения видят более высокие сбережения; агенты делающие в основном сложное рассуждение видят более низкие сбережения.
Влияние сжатия контекста
Сжатие контекста при 75% пороге vs без сжатия:
- Снижение входных токенов: сжатие типично снижает контекст на 30-50% (зависит от стратегии: truncate vs summarize)
- Сбережения стоимости: пропорционально снижению токенов (40% меньше токенов = 40% ниже стоимость входных)
- Влияние на latency: summarization добавляет один вызов LLM (типично 1-3 секунды с дешёвой моделью); truncation negligible (<10ms)
- Компромисс качества: truncation может потерять важный контекст; summarization сохраняет ключевые факты но теряет детали
Потоковая передача vs буферизованный ответ
Время до первого токена:
- Потоковая: <1 секунда (доставка токен-за-токеном)
- Буферизованная: 5-15 секунд для 500-токенного ответа (ждёт завершения)
Снижение воспринимаемой latency для пользователей существенно несмотря на то что общее время генерации идентично.
Hit rate кэша и latency
Производительность кэша зависит от разнообразия запросов:
- Высокое разнообразие (уникальные запросы): hit rate <10%; overhead кэширования не стоит
- Умеренное разнообразие (частые паттерны): hit rate 30-50%; стоит
- Низкое разнообразие (повторяющиеся запросы): hit rate 70-90%; массивные сбережения
Latency для cache hit vs miss:
- Cache hit: <50ms (Redis lookup)
- Cache miss: 5-15 секунд (полное выполнение агента)
- Ускорение на hit: 100-300×
Закон Амдала применённый к агентам
Учитывая типичное разбиение runtime агента:
- Параллелизируемое (вызовы инструментов, API запросы): 60% runtime
- Последовательное (LLM рассуждение, обновления состояния): 40% runtime
Максимальное теоретическое ускорение при бесконечном параллелизме:
Speedup = 1 / ((1 - 0.6) + 0.6/∞) = 1 / 0.4 = 2.5×
Практическое ускорение с 5-кратным параллелизмом:
Speedup = 1 / (0.4 + 0.6/5) = 1 / 0.52 ≈ 1.9×
Вывод: даже идеальная параллелизация инструментов даёт только ~2-3× общее ускорение потому что LLM рассуждение доминирует и не может быть параллелизировано.