title: "Управление состоянием агента: Redis сессии, LangGraph PostgresSaver, контрольные точки" slug: agent-state-management-redis-langgraph-2026-ru date: 2026-02-24 lang: ru
Управление состоянием агента: Redis сессии, LangGraph PostgresSaver, контрольные точки
Ключевые факты
- LangGraph 1.0.10 (последний стабильный релиз на март 2026) предоставляет persistence состояния на основе контрольных точек
- langgraph-checkpoint-postgres 3.0.4 предлагает AsyncPostgresSaver для production развёртываний
- redis 7.2.1 клиент Python поддерживает async операции через модуль redis.asyncio
- psycopg2-binary 2.9.11 требуется для синхронных соединений с PostgreSQL; psycopg 3.x рекомендуется для async
- MemorySaver хранит контрольные точки в-процессе (словарь Python); состояние теряется при перезагрузке, подходит только для разработки
- AsyncPostgresSaver записывает в PostgreSQL; ACID гарантии, сохраняется после перезагрузки, поддерживает concurrent workers
- RedisSaver не включен в ядро LangGraph; требуется custom implementation с использованием BaseCheckpointSaver
- Redis режимы persistence: RDB (снимки в точке времени), AOF (append-only log), hybrid (RDB+AOF), или без persistence
- PostgreSQL LISTEN/NOTIFY типичная latency 1-5ms на localhost, 10-50ms по сети
- State serialization: JsonPlusSerializer (default) обрабатывает primitives, lists, dicts, Pydantic models; PickleSerializer обрабатывает произвольные Python объекты
- Maximum checkpoint size: PostgreSQL JSONB колонка ограничена 1GB; практический лимит ~10MB для производительности
- Thread ID является основной единицей изоляции; один thread_id = один изолированный namespace состояния
- checkpoint_ns включает sub-graph изоляцию состояния внутри одного thread_id
- Checkpoint таблицы: checkpoints (метаданные), checkpoint_blobs (данные состояния как BYTEA), checkpoint_writes (pending операции)
- State reducer функции: operator.add (append lists), operator.replace (default для scalars), custom reducers для merge логики
- interrupt() функция останавливает execution графа в узле; состояние persisted; возобновляется при следующем invoke с Command(resume={...})
- TTL в Redis: SET key value EX seconds; automatic expiry предотвращает memory bloat для ephemeral сессий
- Connection pooling: AsyncConnectionPool из psycopg_pool рекомендуется; min_size=2, max_size=10 для типичных workloads
- Schema versioning: сохраняйте schema_version в checkpoint metadata; применяйте миграции при загрузке для backward compatibility
- Tiered storage pattern: Redis (hot tier, sub-millisecond reads) + PostgreSQL (cold tier, durable archival)
Что такое состояние агента
Состояние агента — это данные, которые агент должен поддерживать на протяжении нескольких шагов выполнения. Оно делится на две категории:
Ephemeral state существует только во время одного выполнения. Примеры: промежуточные результаты вычислений, временные выходы инструментов, in-flight API ответы. Это состояние исчезает, когда процесс завершается. Для stateless агентов (one-shot запросы) ephemeral состояния достаточно.
Persistent state сохраняется за пределами времени жизни процесса. Примеры: история разговора, накопленные факты из multi-step исследования, предпочтения пользователя, статус задачи, ожидающий feedback от человека. Это состояние должно храниться в external системах (Redis, PostgreSQL, S3) для включения:
- Conversation continuity: Пользователь задает follow-up вопросы часами позже; агент вспоминает предыдущий контекст
- Resumable workflows: Долгоживущие задачи (multi-day исследование, human-in-the-loop одобрение), которые паузируются и возобновляются
- Multi-tenant isolation: Состояние пользователя A никогда не просачивается в сессию пользователя B
- Crash recovery: Процесс агента перезагружается mid-task; выполнение возобновляется с последней контрольной точки
- Audit trails: Полная история решений, tool calls, state transitions для отладки и compliance
LangGraph'а архитектура checkpointing обрабатывает все состояние как persistent по умолчанию. Каждое выполнение узла создает контрольную точку — снимок полного состояния на тот момент. Backend checkpointer (MemorySaver, PostgreSQL, Redis) определяет, сохраняются ли контрольные точки после перезагрузки процесса.
Почему это важно для долгоживущих агентов: Без persistent состояния агент, который падает после 45 минут исследования, теряет весь прогресс. С контрольными точками он возобновляется с последнего сохраненного состояния, сохраняя выходы инструментов, собранные факты и историю выполнения. Human-in-the-loop workflows зависят от этого: агент паузируется, сохраняет состояние, ждет input от человека (который может прийти часами позже), затем возобновляется ровно с того же места.
Framework решения
Выберите checkpointer на основе lifetime сессии, требований durability и operational constraints:
MemorySaver
Используйте когда:
- Разработка и тестирование локально
- Запуск агентов в single-process окружении (Jupyter notebooks, scripts)
- Persistent состояние за пределами lifetime процесса не требуется
- Простота приоритизируется над durability
Гарантии:
- Состояние сохраняется на протяжении lifetime Python процесса
- Ноль external зависимостей
- Sub-microsecond read/write latency (in-memory dict)
- Нет network overhead
Ограничения:
- Состояние теряется при перезагрузке процесса
- Не может разделять состояние между несколькими workers
- Нет backup или recovery механизма
- Memory растет unbounded без manual cleanup
RedisSaver (Custom Implementation)
Используйте когда:
- Сессии short-lived (минуты до часов)
- Sub-millisecond state access требуется
- Automatic TTL-based expiry желательна
- Durability вторична скорости
Гарантии:
- Состояние persists после перезагрузки процесса (если Redis persistence включена)
- Shared состояние между несколькими workers (single Redis instance)
- Automatic expiry через TTL (предотвращает memory bloat)
- Типичная latency: <1ms для localhost, <5ms по локальной сети
Ограничения:
- Нет built-in LangGraph implementation (must implement BaseCheckpointSaver)
- Redis eviction policies могут discard данные под memory pressure
- Persistence режимы (RDB/AOF) trade off durability vs performance
- Нет ACID transactions для checkpoint consistency
PostgresSaver (AsyncPostgresSaver)
Используйте когда:
- Сессии long-lived (часы до недель)
- Durability и ACID гарантии требуются
- Несколько concurrent агентов access shared состояние
- Complex queries на checkpoint metadata нужны (find все сессии пользователя X, сессии старше 30 дней)
Гарантии:
- Состояние survives перезагрузку процесса, crashes серверов и network failures
- ACID transactions обеспечивают checkpoint consistency
- Поддерживает concurrent writers через PostgreSQL locking
- Full audit trail через checkpoint history
Ограничения:
- Выше latency чем MemorySaver или Redis (1-10ms per checkpoint write)
- Требует PostgreSQL server setup и maintenance
- Database размер растет без cleanup policies
- Connection pooling требуется для high-concurrency workloads
Tiered Strategy (Redis + PostgreSQL)
Используйте когда:
- Нужны и скорость (active сессии) и durability (completed сессии)
- Session lifecycle имеет distinct "active" и "archived" фазы
- Cost optimization важна (Redis memory дорогая)
Pattern:
- Write все checkpoints в оба Redis (hot tier) и PostgreSQL (cold tier)
- Active сессии читаются из Redis (fast)
- Archived сессии purged из Redis, читаются из PostgreSQL
- Automatic promotion/demotion на основе session activity
Таблица параметров
| Параметр | Значение | Примечания |
|---|---|---|
thread_id |
String (e.g., "user-123-session-456") |
Primary изоляционный ключ; один thread = один state namespace |
checkpoint_ns |
String (default "") |
Sub-graph namespace внутри thread; включает isolated sub-workflows |
checkpoint_id |
String (auto-generated UUID) | Unique identifier конкретной контрольной точки; используется для resumption |
ttl_seconds |
Integer (e.g., 3600 для 1 часа) |
Redis TTL для automatic expiry; не применимо к PostgreSQL |
min_size |
Integer (e.g., 2) |
Минимум соединений в psycopg AsyncConnectionPool |
max_size |
Integer (e.g., 10) |
Максимум соединений в pool; tune на основе concurrency |
autocommit |
Boolean (True) |
PostgreSQL connection режим; True для checkpointer compatibility |
prepare_threshold |
Integer (0) |
Disable prepared statements в psycopg (0 = disabled) |
filter |
Dict (e.g., {"user_id": "123"}) |
Metadata фильтр для checkpoint queries через alist() |
limit |
Integer (e.g., 100) |
Max checkpoints возвращается alist() |
before |
Dict (checkpoint config) | Return checkpoints до этого checkpoint_id (pagination) |
schema_version |
Integer (e.g., 3) |
Stored в checkpoint metadata; используется для state migrations |
redis_url |
String (e.g., "redis://localhost:6379/0") |
Connection string для Redis; поддерживает redis://, rediss:// (TLS) |
conninfo |
String (e.g., "postgresql://user:pass@host/db") |
PostgreSQL connection string для psycopg |
decode_responses |
Boolean (False) |
Redis client setting; False требуется для binary data (pickled checkpoints) |
Common Pitfalls
❌ Pitfall 1: Storing Non-Serializable Objects в State
Problem: JsonPlusSerializer (default) не может serializable datetime, set или произвольные Python объекты.
Impact: Checkpoint save fails с TypeError: Object of type datetime is not JSON serializable, состояние теряется.
# ❌ WRONG: datetime и set не JSON-serializable
from typing import TypedDict
from datetime import datetime
class BadState(TypedDict):
messages: list
created_at: datetime # Will crash on checkpoint save
tags: set[str] # Will crash on checkpoint save
# ✅ CORRECT: Convert в JSON-compatible типы
class GoodState(TypedDict):
messages: list
created_at: str # ISO format: "2026-03-01T12:34:56"
tags: list[str] # Lists JSON-serializable
def create_state(task: str) -> GoodState:
return GoodState(
messages=[{"role": "user", "content": task}],
created_at=datetime.utcnow().isoformat(),
tags=["research", "urgent"],
)
❌ Pitfall 2: Exposing Raw User IDs как Thread IDs
Problem: Использование predictable thread IDs (e.g., thread_id = user_id) включает users access к другим состояниям пользователей.
Impact: Security breach; пользователь A может читать conversation history пользователя B, угадав thread_id.
# ❌ WRONG: User может guess другие thread IDs
config = {"configurable": {"thread_id": user_id}} # user_id = "12345"
# ✅ CORRECT: Hash user_id + session_id для prevent guessing
import hashlib
def make_thread_id(user_id: str, session_id: str) -> str:
combined = f"{user_id}:{session_id}"
return hashlib.sha256(combined.encode()).hexdigest()[:32]
config = {"configurable": {"thread_id": make_thread_id(user_id, session_id)}}
❌ Pitfall 3: Forgetting to Call setup() на PostgresSaver
Problem: AsyncPostgresSaver требует await checkpointer.setup() для создания таблиц.
Impact: First checkpoint write fails с relation "checkpoints" does not exist.
# ❌ WRONG: Missing setup() call
async with get_checkpointer() as checkpointer:
app = build_graph(checkpointer)
await app.ainvoke(state, config) # Crashes: table doesn't exist
# ✅ CORRECT: Call setup() перед first use
async with get_checkpointer() as checkpointer:
await checkpointer.setup() # Creates tables
app = build_graph(checkpointer)
await app.ainvoke(state, config)
❌ Pitfall 4: Unbounded Checkpoint Growth
Problem: Каждое выполнение узла создает контрольную точку; long-running сессии accumulate сотни контрольных точек.
Impact: Database/memory bloat, slow queries, увеличенные storage costs.
# ❌ WRONG: Нет cleanup policy; checkpoints растут forever
# После 1000 agent invocations, database содержит 1000+ checkpoints per thread
# ✅ CORRECT: Periodic cleanup старых checkpoints
from datetime import datetime, timedelta
async def cleanup_old_checkpoints(checkpointer, days: int = 30):
cutoff = datetime.utcnow() - timedelta(days=days)
async with checkpointer.conn.cursor() as cursor:
await cursor.execute(
"""
DELETE FROM checkpoints
WHERE (metadata->>'created_at')::timestamptz < %s
""",
(cutoff.isoformat(),)
)
return cursor.rowcount
# Run cleanup ежедневно via cron или scheduled task
❌ Pitfall 5: Using MemorySaver в Production
Problem: MemorySaver сохраняет состояние in-process; не shared между workers, теряется при restart.
Impact: Load balancer посылает пользователя на Worker 2, состояние от Worker 1 недоступно; conversation context теряется.
# ❌ WRONG: MemorySaver в multi-worker production
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver() # State живет в этом процессе только
app = build_graph(checkpointer)
# User's second request попадает на different worker → state not found
# ✅ CORRECT: Используйте PostgreSQL или Redis для shared state
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
async with get_postgres_checkpointer() as checkpointer:
await checkpointer.setup()
app = build_graph(checkpointer)
# All workers share same PostgreSQL backend
Redis State Implementation
LangGraph не включает Redis checkpointer в ядро. Реализуйте custom saver расширением BaseCheckpointSaver:
import pickle
from typing import Optional, AsyncIterator
from langgraph.checkpoint.base import (
BaseCheckpointSaver,
Checkpoint,
CheckpointTuple,
CheckpointMetadata,
)
import redis.asyncio as aioredis
class RedisCheckpointSaver(BaseCheckpointSaver):
"""
Redis-backed checkpoint saver with TTL-based expiry.
Suitable for short-lived agent sessions (chat, web agents).
"""
def __init__(
self,
redis_client: aioredis.Redis,
ttl_seconds: int = 3600,
key_prefix: str = "lgraph:ckpt:",
):
super().__init__()
self.redis = redis_client
self.ttl = ttl_seconds
self.prefix = key_prefix
@classmethod
async def from_url(cls, redis_url: str, ttl_seconds: int = 3600):
client = await aioredis.from_url(redis_url, decode_responses=False)
return cls(client, ttl_seconds)
def _checkpoint_key(self, thread_id: str, checkpoint_ns: str, checkpoint_id: str) -> str:
return f"{self.prefix}{thread_id}:{checkpoint_ns}:{checkpoint_id}"
def _index_key(self, thread_id: str, checkpoint_ns: str) -> str:
"""Key для хранения ordered list checkpoint IDs."""
return f"{self.prefix}idx:{thread_id}:{checkpoint_ns}"
async def aget_tuple(self, config: dict) -> Optional[CheckpointTuple]:
"""Retrieve latest или specific контрольную точку."""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
checkpoint_id = config["configurable"].get("checkpoint_id")
if not checkpoint_id:
# Get latest checkpoint ID из index
index_key = self._index_key(thread_id, checkpoint_ns)
latest = await self.redis.lindex(index_key, -1)
if not latest:
return None
checkpoint_id = latest.decode()
key = self._checkpoint_key(thread_id, checkpoint_ns, checkpoint_id)
data = await self.redis.get(key)
if not data:
return None
stored = pickle.loads(data)
return CheckpointTuple(
config={
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
},
checkpoint=stored["checkpoint"],
metadata=stored["metadata"],
parent_config=stored.get("parent_config"),
)
async def aput(
self,
config: dict,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: dict,
) -> dict:
"""Store контрольную точку с TTL."""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
checkpoint_id = checkpoint["id"]
key = self._checkpoint_key(thread_id, checkpoint_ns, checkpoint_id)
index_key = self._index_key(thread_id, checkpoint_ns)
data = pickle.dumps({
"checkpoint": checkpoint,
"metadata": metadata,
"parent_config": config,
})
# Store контрольную точку + update index, оба с TTL
pipe = self.redis.pipeline()
pipe.set(key, data, ex=self.ttl)
pipe.rpush(index_key, checkpoint_id)
pipe.expire(index_key, self.ttl)
await pipe.execute()
return {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
}
async def aput_writes(self, config: dict, writes: list, task_id: str) -> None:
"""Store pending writes для resumption."""
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
checkpoint_id = config["configurable"]["checkpoint_id"]
writes_key = f"{self.prefix}writes:{thread_id}:{checkpoint_ns}:{checkpoint_id}"
data = pickle.dumps({"writes": writes, "task_id": task_id})
await self.redis.set(writes_key, data, ex=self.ttl)
async def alist(
self,
config: Optional[dict],
*,
filter: Optional[dict] = None,
before: Optional[dict] = None,
limit: Optional[int] = None,
) -> AsyncIterator[CheckpointTuple]:
"""List контрольные точки для thread в reverse chronological order."""
if not config:
return
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
index_key = self._index_key(thread_id, checkpoint_ns)
checkpoint_ids = await self.redis.lrange(index_key, 0, -1)
count = 0
for cid_bytes in reversed(checkpoint_ids):
if limit and count >= limit:
break
cid = cid_bytes.decode()
key = self._checkpoint_key(thread_id, checkpoint_ns, cid)
data = await self.redis.get(key)
if data:
stored = pickle.loads(data)
yield CheckpointTuple(
config={
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": cid,
}
},
checkpoint=stored["checkpoint"],
metadata=stored["metadata"],
parent_config=stored.get("parent_config"),
)
count += 1
Usage Example
import asyncio
async def main():
# Create Redis checkpointer с 30-minute TTL
saver = await RedisCheckpointSaver.from_url(
"redis://localhost:6379/0",
ttl_seconds=1800,
)
app = build_graph(saver)
config = {"configurable": {"thread_id": "session-abc123"}}
# First invocation: creates контрольную точку
result = await app.ainvoke(
{"messages": [{"role": "user", "content": "Hello"}]},
config=config,
)
# Second invocation: loads из Redis контрольной точки
result2 = await app.ainvoke(
{"messages": [{"role": "user", "content": "Follow-up question"}]},
config=config,
)
# После 30 минут, Redis expires контрольную точку automatically
Redis Persistence Configuration
Redis persistence режимы контролируют durability гарантии:
Нет persistence (cache режим):
# redis.conf
save ""
appendonly no
# Самый быстрый; данные теряются при Redis restart
RDB snapshots (periodic dumps):
# redis.conf
save 900 1 # Save если 1 key changed в 15 минутах
save 300 10 # Save если 10 keys changed в 5 минутах
save 60 10000 # Save если 10000 keys changed в 1 минуте
# Хорошо для recovery из crashes; может потерять recent writes
AOF log (append-only file):
# redis.conf
appendonly yes
appendfsync everysec # Fsync каждую секунду (хороший баланс)
# Более durable; максимум 1 секунда data loss
Hybrid (RDB + AOF):
# redis.conf
save 900 1
appendonly yes
appendfsync everysec
# Лучшая durability; RDB для fast restarts, AOF для minimal data loss
PostgreSQL State Implementation
AsyncPostgresSaver предоставляет production-grade persistence с ACID гарантиями и concurrent access.
Setup и Connection Pooling
from contextlib import asynccontextmanager
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from psycopg_pool import AsyncConnectionPool
DATABASE_URL = "postgresql://agent_user:password@localhost:5432/agent_db"
@asynccontextmanager
async def get_checkpointer():
"""Create AsyncPostgresSaver с connection pooling."""
async with AsyncConnectionPool(
conninfo=DATABASE_URL,
min_size=2, # Keep 2 соединений открытых
max_size=10, # Max 10 concurrent соединений
kwargs={
"autocommit": True, # Required для LangGraph
"prepare_threshold": 0, # Disable prepared statements
},
) as pool:
checkpointer = AsyncPostgresSaver(pool)
await checkpointer.setup() # Create таблицы если не exist
yield checkpointer
async def main():
async with get_checkpointer() as saver:
app = build_graph(saver)
config = {"configurable": {"thread_id": "prod-thread-456"}}
result = await app.ainvoke(
{"messages": [{"role": "user", "content": "Research Q4 earnings"}]},
config=config,
)
# State persisted к PostgreSQL; survives процесс restart
Database Schema
После setup(), AsyncPostgresSaver создает эти таблицы:
-- Checkpoint метаданные и структура
CREATE TABLE checkpoints (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL,
parent_checkpoint_id TEXT,
type TEXT,
checkpoint JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);
-- Actual state данные (BYTEA для serialized content)
CREATE TABLE checkpoint_blobs (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
channel TEXT NOT NULL,
version TEXT NOT NULL,
type TEXT NOT NULL,
blob BYTEA,
PRIMARY KEY (thread_id, checkpoint_ns, channel, version)
);
-- Pending writes для interrupted workflows
CREATE TABLE checkpoint_writes (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL,
task_id TEXT NOT NULL,
idx INTEGER NOT NULL,
channel TEXT NOT NULL,
type TEXT,
blob BYTEA,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);
Querying Checkpoints
async def list_user_sessions(checkpointer, user_id: str) -> list[dict]:
"""Find все active сессии для пользователя."""
sessions = []
# Metadata filter ищет metadata JSONB колонку
async for checkpoint_tuple in checkpointer.alist(
config=None,
filter={"user_id": user_id},
limit=100,
):
sessions.append({
"thread_id": checkpoint_tuple.config["configurable"]["thread_id"],
"checkpoint_id": checkpoint_tuple.config["configurable"]["checkpoint_id"],
"created_at": checkpoint_tuple.metadata.get("created_at"),
"task_status": checkpoint_tuple.checkpoint.get("channel_values", {}).get("task_status"),
})
return sessions
async def get_checkpoint_count(pool) -> int:
"""Count total контрольные точки в database."""
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT COUNT(*) FROM checkpoints")
result = await cursor.fetchone()
return result[0] if result else 0
Concurrent Agent Handling
PostgreSQL MVCC (Multi-Version Concurrency Control) позволяет несколькими агентам access same thread_id безопасно:
# Worker 1 и Worker 2 оба могут read/write same thread_id
# PostgreSQL обеспечивает serializable изоляцию
# Worker 1
config = {"configurable": {"thread_id": "shared-thread"}}
await app1.ainvoke(state_update_1, config)
# Worker 2 (concurrent)
config = {"configurable": {"thread_id": "shared-thread"}}
await app2.ainvoke(state_update_2, config)
# PostgreSQL решает conflicts; последний write wins
Best practice: Для critical workflows требующих strict ordering, используйте application-level locking:
import asyncio
class ThreadLock:
def __init__(self):
self._locks = {}
async def acquire(self, thread_id: str):
if thread_id not in self._locks:
self._locks[thread_id] = asyncio.Lock()
await self._locks[thread_id].acquire()
def release(self, thread_id: str):
if thread_id in self._locks:
self._locks[thread_id].release()
lock_manager = ThreadLock()
async def invoke_with_lock(app, state, config):
thread_id = config["configurable"]["thread_id"]
await lock_manager.acquire(thread_id)
try:
return await app.ainvoke(state, config)
finally:
lock_manager.release(thread_id)
Schema Migrations
# Create миграционная система для PostgreSQL контрольных точек
from typing import Callable
class CheckpointMigrations:
def __init__(self, pool):
self.pool = pool
self.migrations: list[Callable] = []
def register(self, func: Callable):
self.migrations.append(func)
return func
async def run_migrations(self):
"""Apply все registered миграции."""
async with self.pool.connection() as conn:
async with conn.cursor() as cursor:
# Create миграции таблицу если не exists
await cursor.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMPTZ DEFAULT NOW()
)
""")
# Get текущей версии
await cursor.execute("SELECT COALESCE(MAX(version), 0) FROM schema_migrations")
current_version = (await cursor.fetchone())[0]
# Apply pending миграции
for i, migration in enumerate(self.migrations[current_version:], start=current_version + 1):
await migration(cursor)
await cursor.execute("INSERT INTO schema_migrations (version) VALUES (%s)", (i,))
await conn.commit()
migrations = CheckpointMigrations(pool)
@migrations.register
async def migration_001_add_indexes(cursor):
"""Add индексы для common queries."""
await cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_checkpoints_metadata_user_id
ON checkpoints ((metadata->>'user_id'))
""")
await cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_checkpoints_created_at
ON checkpoints ((metadata->>'created_at'))
""")
@migrations.register
async def migration_002_add_cleanup_function(cursor):
"""Add stored procedure для checkpoint cleanup."""
await cursor.execute("""
CREATE OR REPLACE FUNCTION cleanup_old_checkpoints(days INTEGER)
RETURNS INTEGER AS $$
DECLARE
deleted_count INTEGER;
BEGIN
DELETE FROM checkpoints
WHERE (metadata->>'created_at')::timestamptz < NOW() - (days || ' days')::INTERVAL;
GET DIAGNOSTICS deleted_count = ROW_COUNT;
RETURN deleted_count;
END;
$$ LANGUAGE plpgsql;
""")
State Schema Design
State schemas используют TypedDict с reducer функциями контролировать как updates merge.
Basic Pattern с Reducers
from typing import TypedDict, Annotated, Optional
import operator
class ConversationState(TypedDict):
# Annotated с operator.add: новые messages append к list
messages: Annotated[list, operator.add]
# Plain fields: replaced на каждый update
current_task: Optional[str]
task_status: str # "pending" | "in_progress" | "completed"
# Accumulated данные с deduplication
gathered_facts: Annotated[list[str], operator.add]
# Metadata (immutable после creation)
user_id: str
session_id: str
created_at: str # ISO format
updated_at: str
# Tool результаты (dict replaced, не merged)
tool_results: dict[str, any]
# Human-in-the-loop состояние
human_feedback: Optional[str]
awaiting_feedback: bool
Custom Reducers
def merge_tool_results(existing: dict, update: dict) -> dict:
"""Merge dicts; newer values выигрывают на key collision."""
merged = {**existing}
merged.update(update)
return merged
def deduplicate_facts(existing: list[str], update: list[str]) -> list[str]:
"""Append только unique facts."""
seen = set(existing)
new_facts = [f for f in update if f not in seen]
return existing + new_facts
def increment_counter(a: int, b: int) -> int:
"""Increment counter на b."""
return a + b
class AdvancedState(TypedDict):
messages: Annotated[list, operator.add]
tool_results: Annotated[dict, merge_tool_results]
facts: Annotated[list[str], deduplicate_facts]
step_count: Annotated[int, increment_counter]
Versioning для Backward Compatibility
from datetime import datetime
STATE_VERSION = 3 # Increment на schema changes
class StateV3(TypedDict):
schema_version: int # Всегда включайте version field
messages: Annotated[list[dict], operator.add]
current_task: Optional[str]
task_status: str
user_id: str
session_id: str
created_at: str
updated_at: str
def create_initial_state(user_id: str, session_id: str, task: str) -> StateV3:
now = datetime.utcnow().isoformat()
return StateV3(
schema_version=STATE_VERSION,
messages=[{"role": "user", "content": task}],
current_task=task,
task_status="pending",
user_id=user_id,
session_id=session_id,
created_at=now,
updated_at=now,
)
def migrate_state(state: dict) -> StateV3:
"""Migrate старые state schemas к текущей версии."""
version = state.get("schema_version", 1)
# V1 → V2: добавляет task_status field
if version < 2:
state["task_status"] = "unknown"
# V2 → V3: преобразует messages из string к dict
if version < 3:
if "messages" in state:
state["messages"] = [
{"role": "user", "content": msg} if isinstance(msg, str) else msg
for msg in state["messages"]
]
state["schema_version"] = STATE_VERSION
return state
Nested State Structures
class ResearchSubState(TypedDict):
query: str
sources: Annotated[list[str], operator.add]
findings: Annotated[list[str], operator.add]
completed: bool
class WritingSubState(TypedDict):
outline: Optional[str]
draft: Optional[str]
final: Optional[str]
class MultiStepWorkflowState(TypedDict):
messages: Annotated[list, operator.add]
research: ResearchSubState
writing: WritingSubState
overall_status: str # "researching" | "writing" | "completed"
Performance & Benchmarks
Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.
Checkpoint Write Latency
MemorySaver:
- In-memory dict write: sub-microsecond (0.001ms)
- Нет serialization overhead
- Нет network или disk I/O
RedisSaver (localhost):
- SET command latency: 0.2-1ms
- Pickle serialization overhead: 0.1-0.5ms per контрольная точка
- Total: 0.3-1.5ms per checkpoint write
RedisSaver (remote, same datacenter):
- Network RTT: 1-5ms
- Serialization + Redis write: 1-6ms total
AsyncPostgresSaver (localhost):
- Connection pool acquisition: 0.1ms
- JSONB write + index update: 1-5ms
- Transaction commit: 1-3ms
- Total: 2-10ms per checkpoint write
AsyncPostgresSaver (remote, same datacenter):
- Network RTT: 2-10ms
- Database write: 3-15ms total
Checkpoint Read Latency
MemorySaver:
- Dict lookup: sub-microsecond
RedisSaver:
- GET command: 0.2-1ms (localhost)
- Unpickle deserialization: 0.1-0.5ms
- Total: 0.3-1.5ms
AsyncPostgresSaver:
- Query execution: 1-5ms
- JSONB deserialization: 0.5-2ms
- Total: 1.5-7ms
Storage Efficiency
Checkpoint размер примеры (serialized):
- Simple chat состояние (10 messages): 2-5 KB
- Research agent состояние (50 tool outputs): 20-50 KB
- Complex workflow (nested sub-states): 100-500 KB
PostgreSQL storage:
- JSONB compression: typically 30-50% меньше чем raw JSON
- BYTEA blobs: pickled состояние, ~10% overhead vs raw Python objects
Redis storage:
- Pickle serialization: efficient для Python-native structures
- Memory overhead: Redis добавляет ~50-100 bytes per key для metadata
Concurrent Access Performance
PostgreSQL (10 concurrent агентов, same thread_id):
- Throughput: moderate (serialized writes due к locking)
- Latency: увеличивается slightly под contention (5-20ms)
Redis (10 concurrent агентов, same thread_id):
- Throughput: high (single-threaded Redis обрабатывает concurrency efficiently)
- Latency: stable (<2ms даже под load)
MemorySaver (10 concurrent агентов, same thread_id):
- Throughput: highest (in-memory, нет locking)
- Latency: sub-millisecond
- Caveat: нет process isolation; только works в single-process deployments
Scalability Limits
MemorySaver:
- Limited процессом memory
- 10,000 контрольные точки × 10 KB каждый = 100 MB
- Нет практического limit для short-lived сессий
Redis:
- Limited доступный memory
- 1 million контрольные точки × 10 KB каждый = 10 GB
- Используйте TTL предотвратить unbounded growth
PostgreSQL:
- Limited disk space (практически unlimited)
- 100 million контрольные точки × 10 KB каждый = 1 TB
- Требуется indexing strategy для fast queries в scale