Управление состоянием агента: Redis сессии, LangGraph PostgresSaver, контрольные точки


title: "Управление состоянием агента: Redis сессии, LangGraph PostgresSaver, контрольные точки" slug: agent-state-management-redis-langgraph-2026-ru date: 2026-02-24 lang: ru

Управление состоянием агента: Redis сессии, LangGraph PostgresSaver, контрольные точки

Ключевые факты

  • LangGraph 1.0.10 (последний стабильный релиз на март 2026) предоставляет persistence состояния на основе контрольных точек
  • langgraph-checkpoint-postgres 3.0.4 предлагает AsyncPostgresSaver для production развёртываний
  • redis 7.2.1 клиент Python поддерживает async операции через модуль redis.asyncio
  • psycopg2-binary 2.9.11 требуется для синхронных соединений с PostgreSQL; psycopg 3.x рекомендуется для async
  • MemorySaver хранит контрольные точки в-процессе (словарь Python); состояние теряется при перезагрузке, подходит только для разработки
  • AsyncPostgresSaver записывает в PostgreSQL; ACID гарантии, сохраняется после перезагрузки, поддерживает concurrent workers
  • RedisSaver не включен в ядро LangGraph; требуется custom implementation с использованием BaseCheckpointSaver
  • Redis режимы persistence: RDB (снимки в точке времени), AOF (append-only log), hybrid (RDB+AOF), или без persistence
  • PostgreSQL LISTEN/NOTIFY типичная latency 1-5ms на localhost, 10-50ms по сети
  • State serialization: JsonPlusSerializer (default) обрабатывает primitives, lists, dicts, Pydantic models; PickleSerializer обрабатывает произвольные Python объекты
  • Maximum checkpoint size: PostgreSQL JSONB колонка ограничена 1GB; практический лимит ~10MB для производительности
  • Thread ID является основной единицей изоляции; один thread_id = один изолированный namespace состояния
  • checkpoint_ns включает sub-graph изоляцию состояния внутри одного thread_id
  • Checkpoint таблицы: checkpoints (метаданные), checkpoint_blobs (данные состояния как BYTEA), checkpoint_writes (pending операции)
  • State reducer функции: operator.add (append lists), operator.replace (default для scalars), custom reducers для merge логики
  • interrupt() функция останавливает execution графа в узле; состояние persisted; возобновляется при следующем invoke с Command(resume={...})
  • TTL в Redis: SET key value EX seconds; automatic expiry предотвращает memory bloat для ephemeral сессий
  • Connection pooling: AsyncConnectionPool из psycopg_pool рекомендуется; min_size=2, max_size=10 для типичных workloads
  • Schema versioning: сохраняйте schema_version в checkpoint metadata; применяйте миграции при загрузке для backward compatibility
  • Tiered storage pattern: Redis (hot tier, sub-millisecond reads) + PostgreSQL (cold tier, durable archival)

Что такое состояние агента

Состояние агента — это данные, которые агент должен поддерживать на протяжении нескольких шагов выполнения. Оно делится на две категории:

Ephemeral state существует только во время одного выполнения. Примеры: промежуточные результаты вычислений, временные выходы инструментов, in-flight API ответы. Это состояние исчезает, когда процесс завершается. Для stateless агентов (one-shot запросы) ephemeral состояния достаточно.

Persistent state сохраняется за пределами времени жизни процесса. Примеры: история разговора, накопленные факты из multi-step исследования, предпочтения пользователя, статус задачи, ожидающий feedback от человека. Это состояние должно храниться в external системах (Redis, PostgreSQL, S3) для включения:

  • Conversation continuity: Пользователь задает follow-up вопросы часами позже; агент вспоминает предыдущий контекст
  • Resumable workflows: Долгоживущие задачи (multi-day исследование, human-in-the-loop одобрение), которые паузируются и возобновляются
  • Multi-tenant isolation: Состояние пользователя A никогда не просачивается в сессию пользователя B
  • Crash recovery: Процесс агента перезагружается mid-task; выполнение возобновляется с последней контрольной точки
  • Audit trails: Полная история решений, tool calls, state transitions для отладки и compliance

LangGraph'а архитектура checkpointing обрабатывает все состояние как persistent по умолчанию. Каждое выполнение узла создает контрольную точку — снимок полного состояния на тот момент. Backend checkpointer (MemorySaver, PostgreSQL, Redis) определяет, сохраняются ли контрольные точки после перезагрузки процесса.

Почему это важно для долгоживущих агентов: Без persistent состояния агент, который падает после 45 минут исследования, теряет весь прогресс. С контрольными точками он возобновляется с последнего сохраненного состояния, сохраняя выходы инструментов, собранные факты и историю выполнения. Human-in-the-loop workflows зависят от этого: агент паузируется, сохраняет состояние, ждет input от человека (который может прийти часами позже), затем возобновляется ровно с того же места.

Framework решения

Выберите checkpointer на основе lifetime сессии, требований durability и operational constraints:

MemorySaver

Используйте когда:

  • Разработка и тестирование локально
  • Запуск агентов в single-process окружении (Jupyter notebooks, scripts)
  • Persistent состояние за пределами lifetime процесса не требуется
  • Простота приоритизируется над durability

Гарантии:

  • Состояние сохраняется на протяжении lifetime Python процесса
  • Ноль external зависимостей
  • Sub-microsecond read/write latency (in-memory dict)
  • Нет network overhead

Ограничения:

  • Состояние теряется при перезагрузке процесса
  • Не может разделять состояние между несколькими workers
  • Нет backup или recovery механизма
  • Memory растет unbounded без manual cleanup

RedisSaver (Custom Implementation)

Используйте когда:

  • Сессии short-lived (минуты до часов)
  • Sub-millisecond state access требуется
  • Automatic TTL-based expiry желательна
  • Durability вторична скорости

Гарантии:

  • Состояние persists после перезагрузки процесса (если Redis persistence включена)
  • Shared состояние между несколькими workers (single Redis instance)
  • Automatic expiry через TTL (предотвращает memory bloat)
  • Типичная latency: <1ms для localhost, <5ms по локальной сети

Ограничения:

  • Нет built-in LangGraph implementation (must implement BaseCheckpointSaver)
  • Redis eviction policies могут discard данные под memory pressure
  • Persistence режимы (RDB/AOF) trade off durability vs performance
  • Нет ACID transactions для checkpoint consistency

PostgresSaver (AsyncPostgresSaver)

Используйте когда:

  • Сессии long-lived (часы до недель)
  • Durability и ACID гарантии требуются
  • Несколько concurrent агентов access shared состояние
  • Complex queries на checkpoint metadata нужны (find все сессии пользователя X, сессии старше 30 дней)

Гарантии:

  • Состояние survives перезагрузку процесса, crashes серверов и network failures
  • ACID transactions обеспечивают checkpoint consistency
  • Поддерживает concurrent writers через PostgreSQL locking
  • Full audit trail через checkpoint history

Ограничения:

  • Выше latency чем MemorySaver или Redis (1-10ms per checkpoint write)
  • Требует PostgreSQL server setup и maintenance
  • Database размер растет без cleanup policies
  • Connection pooling требуется для high-concurrency workloads

Tiered Strategy (Redis + PostgreSQL)

Используйте когда:

  • Нужны и скорость (active сессии) и durability (completed сессии)
  • Session lifecycle имеет distinct "active" и "archived" фазы
  • Cost optimization важна (Redis memory дорогая)

Pattern:

  • Write все checkpoints в оба Redis (hot tier) и PostgreSQL (cold tier)
  • Active сессии читаются из Redis (fast)
  • Archived сессии purged из Redis, читаются из PostgreSQL
  • Automatic promotion/demotion на основе session activity

Таблица параметров

Параметр Значение Примечания
thread_id String (e.g., "user-123-session-456") Primary изоляционный ключ; один thread = один state namespace
checkpoint_ns String (default "") Sub-graph namespace внутри thread; включает isolated sub-workflows
checkpoint_id String (auto-generated UUID) Unique identifier конкретной контрольной точки; используется для resumption
ttl_seconds Integer (e.g., 3600 для 1 часа) Redis TTL для automatic expiry; не применимо к PostgreSQL
min_size Integer (e.g., 2) Минимум соединений в psycopg AsyncConnectionPool
max_size Integer (e.g., 10) Максимум соединений в pool; tune на основе concurrency
autocommit Boolean (True) PostgreSQL connection режим; True для checkpointer compatibility
prepare_threshold Integer (0) Disable prepared statements в psycopg (0 = disabled)
filter Dict (e.g., {"user_id": "123"}) Metadata фильтр для checkpoint queries через alist()
limit Integer (e.g., 100) Max checkpoints возвращается alist()
before Dict (checkpoint config) Return checkpoints до этого checkpoint_id (pagination)
schema_version Integer (e.g., 3) Stored в checkpoint metadata; используется для state migrations
redis_url String (e.g., "redis://localhost:6379/0") Connection string для Redis; поддерживает redis://, rediss:// (TLS)
conninfo String (e.g., "postgresql://user:pass@host/db") PostgreSQL connection string для psycopg
decode_responses Boolean (False) Redis client setting; False требуется для binary data (pickled checkpoints)

Common Pitfalls

❌ Pitfall 1: Storing Non-Serializable Objects в State

Problem: JsonPlusSerializer (default) не может serializable datetime, set или произвольные Python объекты.

Impact: Checkpoint save fails с TypeError: Object of type datetime is not JSON serializable, состояние теряется.

# ❌ WRONG: datetime и set не JSON-serializable
from typing import TypedDict
from datetime import datetime

class BadState(TypedDict):
    messages: list
    created_at: datetime  # Will crash on checkpoint save
    tags: set[str]        # Will crash on checkpoint save
# ✅ CORRECT: Convert в JSON-compatible типы
class GoodState(TypedDict):
    messages: list
    created_at: str       # ISO format: "2026-03-01T12:34:56"
    tags: list[str]       # Lists JSON-serializable

def create_state(task: str) -> GoodState:
    return GoodState(
        messages=[{"role": "user", "content": task}],
        created_at=datetime.utcnow().isoformat(),
        tags=["research", "urgent"],
    )

❌ Pitfall 2: Exposing Raw User IDs как Thread IDs

Problem: Использование predictable thread IDs (e.g., thread_id = user_id) включает users access к другим состояниям пользователей.

Impact: Security breach; пользователь A может читать conversation history пользователя B, угадав thread_id.

# ❌ WRONG: User может guess другие thread IDs
config = {"configurable": {"thread_id": user_id}}  # user_id = "12345"
# ✅ CORRECT: Hash user_id + session_id для prevent guessing
import hashlib

def make_thread_id(user_id: str, session_id: str) -> str:
    combined = f"{user_id}:{session_id}"
    return hashlib.sha256(combined.encode()).hexdigest()[:32]

config = {"configurable": {"thread_id": make_thread_id(user_id, session_id)}}

❌ Pitfall 3: Forgetting to Call setup() на PostgresSaver

Problem: AsyncPostgresSaver требует await checkpointer.setup() для создания таблиц.

Impact: First checkpoint write fails с relation "checkpoints" does not exist.

# ❌ WRONG: Missing setup() call
async with get_checkpointer() as checkpointer:
    app = build_graph(checkpointer)
    await app.ainvoke(state, config)  # Crashes: table doesn't exist
# ✅ CORRECT: Call setup() перед first use
async with get_checkpointer() as checkpointer:
    await checkpointer.setup()  # Creates tables
    app = build_graph(checkpointer)
    await app.ainvoke(state, config)

❌ Pitfall 4: Unbounded Checkpoint Growth

Problem: Каждое выполнение узла создает контрольную точку; long-running сессии accumulate сотни контрольных точек.

Impact: Database/memory bloat, slow queries, увеличенные storage costs.

# ❌ WRONG: Нет cleanup policy; checkpoints растут forever
# После 1000 agent invocations, database содержит 1000+ checkpoints per thread
# ✅ CORRECT: Periodic cleanup старых checkpoints
from datetime import datetime, timedelta

async def cleanup_old_checkpoints(checkpointer, days: int = 30):
    cutoff = datetime.utcnow() - timedelta(days=days)
    async with checkpointer.conn.cursor() as cursor:
        await cursor.execute(
            """
            DELETE FROM checkpoints
            WHERE (metadata->>'created_at')::timestamptz < %s
            """,
            (cutoff.isoformat(),)
        )
        return cursor.rowcount

# Run cleanup ежедневно via cron или scheduled task

❌ Pitfall 5: Using MemorySaver в Production

Problem: MemorySaver сохраняет состояние in-process; не shared между workers, теряется при restart.

Impact: Load balancer посылает пользователя на Worker 2, состояние от Worker 1 недоступно; conversation context теряется.

# ❌ WRONG: MemorySaver в multi-worker production
from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()  # State живет в этом процессе только
app = build_graph(checkpointer)
# User's second request попадает на different worker → state not found
# ✅ CORRECT: Используйте PostgreSQL или Redis для shared state
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

async with get_postgres_checkpointer() as checkpointer:
    await checkpointer.setup()
    app = build_graph(checkpointer)
    # All workers share same PostgreSQL backend

Redis State Implementation

LangGraph не включает Redis checkpointer в ядро. Реализуйте custom saver расширением BaseCheckpointSaver:

import pickle
from typing import Optional, AsyncIterator
from langgraph.checkpoint.base import (
    BaseCheckpointSaver,
    Checkpoint,
    CheckpointTuple,
    CheckpointMetadata,
)
import redis.asyncio as aioredis

class RedisCheckpointSaver(BaseCheckpointSaver):
    """
    Redis-backed checkpoint saver with TTL-based expiry.
    Suitable for short-lived agent sessions (chat, web agents).
    """

    def __init__(
        self,
        redis_client: aioredis.Redis,
        ttl_seconds: int = 3600,
        key_prefix: str = "lgraph:ckpt:",
    ):
        super().__init__()
        self.redis = redis_client
        self.ttl = ttl_seconds
        self.prefix = key_prefix

    @classmethod
    async def from_url(cls, redis_url: str, ttl_seconds: int = 3600):
        client = await aioredis.from_url(redis_url, decode_responses=False)
        return cls(client, ttl_seconds)

    def _checkpoint_key(self, thread_id: str, checkpoint_ns: str, checkpoint_id: str) -> str:
        return f"{self.prefix}{thread_id}:{checkpoint_ns}:{checkpoint_id}"

    def _index_key(self, thread_id: str, checkpoint_ns: str) -> str:
        """Key для хранения ordered list checkpoint IDs."""
        return f"{self.prefix}idx:{thread_id}:{checkpoint_ns}"

    async def aget_tuple(self, config: dict) -> Optional[CheckpointTuple]:
        """Retrieve latest или specific контрольную точку."""
        thread_id = config["configurable"]["thread_id"]
        checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
        checkpoint_id = config["configurable"].get("checkpoint_id")

        if not checkpoint_id:
            # Get latest checkpoint ID из index
            index_key = self._index_key(thread_id, checkpoint_ns)
            latest = await self.redis.lindex(index_key, -1)
            if not latest:
                return None
            checkpoint_id = latest.decode()

        key = self._checkpoint_key(thread_id, checkpoint_ns, checkpoint_id)
        data = await self.redis.get(key)
        if not data:
            return None

        stored = pickle.loads(data)
        return CheckpointTuple(
            config={
                "configurable": {
                    "thread_id": thread_id,
                    "checkpoint_ns": checkpoint_ns,
                    "checkpoint_id": checkpoint_id,
                }
            },
            checkpoint=stored["checkpoint"],
            metadata=stored["metadata"],
            parent_config=stored.get("parent_config"),
        )

    async def aput(
        self,
        config: dict,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: dict,
    ) -> dict:
        """Store контрольную точку с TTL."""
        thread_id = config["configurable"]["thread_id"]
        checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
        checkpoint_id = checkpoint["id"]

        key = self._checkpoint_key(thread_id, checkpoint_ns, checkpoint_id)
        index_key = self._index_key(thread_id, checkpoint_ns)

        data = pickle.dumps({
            "checkpoint": checkpoint,
            "metadata": metadata,
            "parent_config": config,
        })

        # Store контрольную точку + update index, оба с TTL
        pipe = self.redis.pipeline()
        pipe.set(key, data, ex=self.ttl)
        pipe.rpush(index_key, checkpoint_id)
        pipe.expire(index_key, self.ttl)
        await pipe.execute()

        return {
            "configurable": {
                "thread_id": thread_id,
                "checkpoint_ns": checkpoint_ns,
                "checkpoint_id": checkpoint_id,
            }
        }

    async def aput_writes(self, config: dict, writes: list, task_id: str) -> None:
        """Store pending writes для resumption."""
        thread_id = config["configurable"]["thread_id"]
        checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
        checkpoint_id = config["configurable"]["checkpoint_id"]

        writes_key = f"{self.prefix}writes:{thread_id}:{checkpoint_ns}:{checkpoint_id}"
        data = pickle.dumps({"writes": writes, "task_id": task_id})
        await self.redis.set(writes_key, data, ex=self.ttl)

    async def alist(
        self,
        config: Optional[dict],
        *,
        filter: Optional[dict] = None,
        before: Optional[dict] = None,
        limit: Optional[int] = None,
    ) -> AsyncIterator[CheckpointTuple]:
        """List контрольные точки для thread в reverse chronological order."""
        if not config:
            return

        thread_id = config["configurable"]["thread_id"]
        checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
        index_key = self._index_key(thread_id, checkpoint_ns)

        checkpoint_ids = await self.redis.lrange(index_key, 0, -1)

        count = 0
        for cid_bytes in reversed(checkpoint_ids):
            if limit and count >= limit:
                break

            cid = cid_bytes.decode()
            key = self._checkpoint_key(thread_id, checkpoint_ns, cid)
            data = await self.redis.get(key)

            if data:
                stored = pickle.loads(data)
                yield CheckpointTuple(
                    config={
                        "configurable": {
                            "thread_id": thread_id,
                            "checkpoint_ns": checkpoint_ns,
                            "checkpoint_id": cid,
                        }
                    },
                    checkpoint=stored["checkpoint"],
                    metadata=stored["metadata"],
                    parent_config=stored.get("parent_config"),
                )
                count += 1

Usage Example

import asyncio

async def main():
    # Create Redis checkpointer с 30-minute TTL
    saver = await RedisCheckpointSaver.from_url(
        "redis://localhost:6379/0",
        ttl_seconds=1800,
    )

    app = build_graph(saver)
    config = {"configurable": {"thread_id": "session-abc123"}}

    # First invocation: creates контрольную точку
    result = await app.ainvoke(
        {"messages": [{"role": "user", "content": "Hello"}]},
        config=config,
    )

    # Second invocation: loads из Redis контрольной точки
    result2 = await app.ainvoke(
        {"messages": [{"role": "user", "content": "Follow-up question"}]},
        config=config,
    )

    # После 30 минут, Redis expires контрольную точку automatically

Redis Persistence Configuration

Redis persistence режимы контролируют durability гарантии:

Нет persistence (cache режим):

# redis.conf
save ""
appendonly no
# Самый быстрый; данные теряются при Redis restart

RDB snapshots (periodic dumps):

# redis.conf
save 900 1      # Save если 1 key changed в 15 минутах
save 300 10     # Save если 10 keys changed в 5 минутах
save 60 10000   # Save если 10000 keys changed в 1 минуте
# Хорошо для recovery из crashes; может потерять recent writes

AOF log (append-only file):

# redis.conf
appendonly yes
appendfsync everysec  # Fsync каждую секунду (хороший баланс)
# Более durable; максимум 1 секунда data loss

Hybrid (RDB + AOF):

# redis.conf
save 900 1
appendonly yes
appendfsync everysec
# Лучшая durability; RDB для fast restarts, AOF для minimal data loss

PostgreSQL State Implementation

AsyncPostgresSaver предоставляет production-grade persistence с ACID гарантиями и concurrent access.

Setup и Connection Pooling

from contextlib import asynccontextmanager
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from psycopg_pool import AsyncConnectionPool

DATABASE_URL = "postgresql://agent_user:password@localhost:5432/agent_db"

@asynccontextmanager
async def get_checkpointer():
    """Create AsyncPostgresSaver с connection pooling."""
    async with AsyncConnectionPool(
        conninfo=DATABASE_URL,
        min_size=2,        # Keep 2 соединений открытых
        max_size=10,       # Max 10 concurrent соединений
        kwargs={
            "autocommit": True,           # Required для LangGraph
            "prepare_threshold": 0,       # Disable prepared statements
        },
    ) as pool:
        checkpointer = AsyncPostgresSaver(pool)
        await checkpointer.setup()  # Create таблицы если не exist
        yield checkpointer

async def main():
    async with get_checkpointer() as saver:
        app = build_graph(saver)
        config = {"configurable": {"thread_id": "prod-thread-456"}}

        result = await app.ainvoke(
            {"messages": [{"role": "user", "content": "Research Q4 earnings"}]},
            config=config,
        )

        # State persisted к PostgreSQL; survives процесс restart

Database Schema

После setup(), AsyncPostgresSaver создает эти таблицы:

-- Checkpoint метаданные и структура
CREATE TABLE checkpoints (
    thread_id TEXT NOT NULL,
    checkpoint_ns TEXT NOT NULL DEFAULT '',
    checkpoint_id TEXT NOT NULL,
    parent_checkpoint_id TEXT,
    type TEXT,
    checkpoint JSONB NOT NULL,
    metadata JSONB NOT NULL DEFAULT '{}',
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);

-- Actual state данные (BYTEA для serialized content)
CREATE TABLE checkpoint_blobs (
    thread_id TEXT NOT NULL,
    checkpoint_ns TEXT NOT NULL DEFAULT '',
    channel TEXT NOT NULL,
    version TEXT NOT NULL,
    type TEXT NOT NULL,
    blob BYTEA,
    PRIMARY KEY (thread_id, checkpoint_ns, channel, version)
);

-- Pending writes для interrupted workflows
CREATE TABLE checkpoint_writes (
    thread_id TEXT NOT NULL,
    checkpoint_ns TEXT NOT NULL DEFAULT '',
    checkpoint_id TEXT NOT NULL,
    task_id TEXT NOT NULL,
    idx INTEGER NOT NULL,
    channel TEXT NOT NULL,
    type TEXT,
    blob BYTEA,
    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);

Querying Checkpoints

async def list_user_sessions(checkpointer, user_id: str) -> list[dict]:
    """Find все active сессии для пользователя."""
    sessions = []

    # Metadata filter ищет metadata JSONB колонку
    async for checkpoint_tuple in checkpointer.alist(
        config=None,
        filter={"user_id": user_id},
        limit=100,
    ):
        sessions.append({
            "thread_id": checkpoint_tuple.config["configurable"]["thread_id"],
            "checkpoint_id": checkpoint_tuple.config["configurable"]["checkpoint_id"],
            "created_at": checkpoint_tuple.metadata.get("created_at"),
            "task_status": checkpoint_tuple.checkpoint.get("channel_values", {}).get("task_status"),
        })

    return sessions

async def get_checkpoint_count(pool) -> int:
    """Count total контрольные точки в database."""
    async with pool.connection() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT COUNT(*) FROM checkpoints")
            result = await cursor.fetchone()
            return result[0] if result else 0

Concurrent Agent Handling

PostgreSQL MVCC (Multi-Version Concurrency Control) позволяет несколькими агентам access same thread_id безопасно:

# Worker 1 и Worker 2 оба могут read/write same thread_id
# PostgreSQL обеспечивает serializable изоляцию

# Worker 1
config = {"configurable": {"thread_id": "shared-thread"}}
await app1.ainvoke(state_update_1, config)

# Worker 2 (concurrent)
config = {"configurable": {"thread_id": "shared-thread"}}
await app2.ainvoke(state_update_2, config)

# PostgreSQL решает conflicts; последний write wins

Best practice: Для critical workflows требующих strict ordering, используйте application-level locking:

import asyncio

class ThreadLock:
    def __init__(self):
        self._locks = {}

    async def acquire(self, thread_id: str):
        if thread_id not in self._locks:
            self._locks[thread_id] = asyncio.Lock()
        await self._locks[thread_id].acquire()

    def release(self, thread_id: str):
        if thread_id in self._locks:
            self._locks[thread_id].release()

lock_manager = ThreadLock()

async def invoke_with_lock(app, state, config):
    thread_id = config["configurable"]["thread_id"]
    await lock_manager.acquire(thread_id)
    try:
        return await app.ainvoke(state, config)
    finally:
        lock_manager.release(thread_id)

Schema Migrations

# Create миграционная система для PostgreSQL контрольных точек
from typing import Callable

class CheckpointMigrations:
    def __init__(self, pool):
        self.pool = pool
        self.migrations: list[Callable] = []

    def register(self, func: Callable):
        self.migrations.append(func)
        return func

    async def run_migrations(self):
        """Apply все registered миграции."""
        async with self.pool.connection() as conn:
            async with conn.cursor() as cursor:
                # Create миграции таблицу если не exists
                await cursor.execute("""
                    CREATE TABLE IF NOT EXISTS schema_migrations (
                        version INTEGER PRIMARY KEY,
                        applied_at TIMESTAMPTZ DEFAULT NOW()
                    )
                """)

                # Get текущей версии
                await cursor.execute("SELECT COALESCE(MAX(version), 0) FROM schema_migrations")
                current_version = (await cursor.fetchone())[0]

                # Apply pending миграции
                for i, migration in enumerate(self.migrations[current_version:], start=current_version + 1):
                    await migration(cursor)
                    await cursor.execute("INSERT INTO schema_migrations (version) VALUES (%s)", (i,))
                    await conn.commit()

migrations = CheckpointMigrations(pool)

@migrations.register
async def migration_001_add_indexes(cursor):
    """Add индексы для common queries."""
    await cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_checkpoints_metadata_user_id
        ON checkpoints ((metadata->>'user_id'))
    """)
    await cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_checkpoints_created_at
        ON checkpoints ((metadata->>'created_at'))
    """)

@migrations.register
async def migration_002_add_cleanup_function(cursor):
    """Add stored procedure для checkpoint cleanup."""
    await cursor.execute("""
        CREATE OR REPLACE FUNCTION cleanup_old_checkpoints(days INTEGER)
        RETURNS INTEGER AS $$
        DECLARE
            deleted_count INTEGER;
        BEGIN
            DELETE FROM checkpoints
            WHERE (metadata->>'created_at')::timestamptz < NOW() - (days || ' days')::INTERVAL;
            GET DIAGNOSTICS deleted_count = ROW_COUNT;
            RETURN deleted_count;
        END;
        $$ LANGUAGE plpgsql;
    """)

State Schema Design

State schemas используют TypedDict с reducer функциями контролировать как updates merge.

Basic Pattern с Reducers

from typing import TypedDict, Annotated, Optional
import operator

class ConversationState(TypedDict):
    # Annotated с operator.add: новые messages append к list
    messages: Annotated[list, operator.add]

    # Plain fields: replaced на каждый update
    current_task: Optional[str]
    task_status: str  # "pending" | "in_progress" | "completed"

    # Accumulated данные с deduplication
    gathered_facts: Annotated[list[str], operator.add]

    # Metadata (immutable после creation)
    user_id: str
    session_id: str
    created_at: str  # ISO format
    updated_at: str

    # Tool результаты (dict replaced, не merged)
    tool_results: dict[str, any]

    # Human-in-the-loop состояние
    human_feedback: Optional[str]
    awaiting_feedback: bool

Custom Reducers

def merge_tool_results(existing: dict, update: dict) -> dict:
    """Merge dicts; newer values выигрывают на key collision."""
    merged = {**existing}
    merged.update(update)
    return merged

def deduplicate_facts(existing: list[str], update: list[str]) -> list[str]:
    """Append только unique facts."""
    seen = set(existing)
    new_facts = [f for f in update if f not in seen]
    return existing + new_facts

def increment_counter(a: int, b: int) -> int:
    """Increment counter на b."""
    return a + b

class AdvancedState(TypedDict):
    messages: Annotated[list, operator.add]
    tool_results: Annotated[dict, merge_tool_results]
    facts: Annotated[list[str], deduplicate_facts]
    step_count: Annotated[int, increment_counter]

Versioning для Backward Compatibility

from datetime import datetime

STATE_VERSION = 3  # Increment на schema changes

class StateV3(TypedDict):
    schema_version: int  # Всегда включайте version field
    messages: Annotated[list[dict], operator.add]
    current_task: Optional[str]
    task_status: str
    user_id: str
    session_id: str
    created_at: str
    updated_at: str

def create_initial_state(user_id: str, session_id: str, task: str) -> StateV3:
    now = datetime.utcnow().isoformat()
    return StateV3(
        schema_version=STATE_VERSION,
        messages=[{"role": "user", "content": task}],
        current_task=task,
        task_status="pending",
        user_id=user_id,
        session_id=session_id,
        created_at=now,
        updated_at=now,
    )

def migrate_state(state: dict) -> StateV3:
    """Migrate старые state schemas к текущей версии."""
    version = state.get("schema_version", 1)

    # V1 → V2: добавляет task_status field
    if version < 2:
        state["task_status"] = "unknown"

    # V2 → V3: преобразует messages из string к dict
    if version < 3:
        if "messages" in state:
            state["messages"] = [
                {"role": "user", "content": msg} if isinstance(msg, str) else msg
                for msg in state["messages"]
            ]

    state["schema_version"] = STATE_VERSION
    return state

Nested State Structures

class ResearchSubState(TypedDict):
    query: str
    sources: Annotated[list[str], operator.add]
    findings: Annotated[list[str], operator.add]
    completed: bool

class WritingSubState(TypedDict):
    outline: Optional[str]
    draft: Optional[str]
    final: Optional[str]

class MultiStepWorkflowState(TypedDict):
    messages: Annotated[list, operator.add]
    research: ResearchSubState
    writing: WritingSubState
    overall_status: str  # "researching" | "writing" | "completed"

Performance & Benchmarks

Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.

Checkpoint Write Latency

MemorySaver:

  • In-memory dict write: sub-microsecond (0.001ms)
  • Нет serialization overhead
  • Нет network или disk I/O

RedisSaver (localhost):

  • SET command latency: 0.2-1ms
  • Pickle serialization overhead: 0.1-0.5ms per контрольная точка
  • Total: 0.3-1.5ms per checkpoint write

RedisSaver (remote, same datacenter):

  • Network RTT: 1-5ms
  • Serialization + Redis write: 1-6ms total

AsyncPostgresSaver (localhost):

  • Connection pool acquisition: 0.1ms
  • JSONB write + index update: 1-5ms
  • Transaction commit: 1-3ms
  • Total: 2-10ms per checkpoint write

AsyncPostgresSaver (remote, same datacenter):

  • Network RTT: 2-10ms
  • Database write: 3-15ms total

Checkpoint Read Latency

MemorySaver:

  • Dict lookup: sub-microsecond

RedisSaver:

  • GET command: 0.2-1ms (localhost)
  • Unpickle deserialization: 0.1-0.5ms
  • Total: 0.3-1.5ms

AsyncPostgresSaver:

  • Query execution: 1-5ms
  • JSONB deserialization: 0.5-2ms
  • Total: 1.5-7ms

Storage Efficiency

Checkpoint размер примеры (serialized):

  • Simple chat состояние (10 messages): 2-5 KB
  • Research agent состояние (50 tool outputs): 20-50 KB
  • Complex workflow (nested sub-states): 100-500 KB

PostgreSQL storage:

  • JSONB compression: typically 30-50% меньше чем raw JSON
  • BYTEA blobs: pickled состояние, ~10% overhead vs raw Python objects

Redis storage:

  • Pickle serialization: efficient для Python-native structures
  • Memory overhead: Redis добавляет ~50-100 bytes per key для metadata

Concurrent Access Performance

PostgreSQL (10 concurrent агентов, same thread_id):

  • Throughput: moderate (serialized writes due к locking)
  • Latency: увеличивается slightly под contention (5-20ms)

Redis (10 concurrent агентов, same thread_id):

  • Throughput: high (single-threaded Redis обрабатывает concurrency efficiently)
  • Latency: stable (<2ms даже под load)

MemorySaver (10 concurrent агентов, same thread_id):

  • Throughput: highest (in-memory, нет locking)
  • Latency: sub-millisecond
  • Caveat: нет process isolation; только works в single-process deployments

Scalability Limits

MemorySaver:

  • Limited процессом memory
  • 10,000 контрольные точки × 10 KB каждый = 100 MB
  • Нет практического limit для short-lived сессий

Redis:

  • Limited доступный memory
  • 1 million контрольные точки × 10 KB каждый = 10 GB
  • Используйте TTL предотвратить unbounded growth

PostgreSQL:

  • Limited disk space (практически unlimited)
  • 100 million контрольные точки × 10 KB каждый = 1 TB
  • Требуется indexing strategy для fast queries в scale