Агенты + Базы данных: генерация SQL, интроспекция схемы, безопасное выполнение запросов


title: "Агенты + Базы данных: генерация SQL, интроспекция схемы, безопасное выполнение запросов" slug: agents-databases-sql-generation-2026-ru date: 2026-02-22 lang: ru

Агенты + Базы данных: генерация SQL, интроспекция схемы, безопасное выполнение запросов

Ключевые факты

  • Версии пакетов Python (PyPI, март 2026): langchain==1.2.10, langgraph==1.0.10, sqlalchemy==2.0.47, psycopg2-binary==2.9.11, pymysql==1.1.2, sqlglot==29.0.1, anthropic==0.84.0, pgvector==0.4.2
  • Классы интеграции LangChain SQL: SQLDatabase (обёртка вокруг SQLAlchemy engine), SQLDatabaseToolkit (коллекция инструментов для интроспекции схемы), create_sql_agent() (фабричная функция для предварительно настроенного агента)
  • SQLAlchemy 2.0 vs 1.x ключевые различия: 2.0 требует явного connection.commit(), использует обёртку text() для raw SQL, engine.connect() возвращает контекстный менеджер по умолчанию, удалена неявная коерция строк для имён таблиц
  • Сигнатуры методов инструментов LangChain: SQLDatabaseToolkit.get_tools() возвращает список экземпляров BaseTool: QuerySQLDataBaseTool, InfoSQLDatabaseTool, ListSQLDatabaseTool, QuerySQLCheckerTool
  • Принудительное чтение на уровне БД: PostgreSQL GRANT SELECT только, MySQL GRANT SELECT ON db.*, SQLite PRAGMA query_only = ON, параметр подключения -c default_transaction_read_only=on
  • Оценка токенов контекста схемы: каждая таблица в среднем 150-300 токенов (имя таблицы, колонки, типы, ограничения); БД из 100 таблиц потребляет 15,000-30,000 токенов перед генерацией запроса
  • Максимальный безопасный размер схемы: полная схема может быть внедрена при количестве до 50 таблиц; выше 100 таблиц требуется предварительный отбор таблиц или инкрементальные инструменты интроспекции
  • Векторы SQL injection в LLM-генерируемых запросах: пользовательский ввод без параметризации в WHERE, имена таблиц из пользовательского ввода в FROM, обфускация комментариями (--, /**/), цепочка выражений с точкой с запятой, UNION ALL утечка данных
  • AST валидация sqlglot: парсит SQL в дерево выражений, обнаруживает DDL (Create, Drop, Alter, Truncate), DML записи (Insert, Update, Delete, Merge), отсутствие LIMIT, глубину вложенных подзапросов
  • Принудительное использование LIMIT: агенты должны добавлять LIMIT во все SELECT запросы; типичные значения по умолчанию: 100 для исследования, 1000 для отчётов, 10000 жёсткий потолок; валидация отклоняет неограниченные SELECT'ы
  • Конфигурация timeout выражения: PostgreSQL SET statement_timeout = '30s', MySQL SET max_execution_time = 30000, предотвращает бесконечные аналитические запросы от блокировки БД
  • Оценка стоимости запроса: EXPLAIN (FORMAT JSON) перед выполнением; отклонить запросы со стоимостью > 10000, полные сканирования таблиц > 1M строк, отсутствие индексов на JOIN колонках
  • Маршрутизация мультибазы данных: агент получает реестр БД (имя, описание, примеры таблиц); использует LLM для классификации вопроса и маршрутизации в правильную БД; откат к стандартной если неоднозначно
  • Гибридный паттерн векторного + SQL поиска: сгенерировать embedding для семантического компонента, извлечь SQL фильтр для точного совпадения, объединить через WHERE filter AND ORDER BY embedding <=> query_vector LIMIT k
  • Connection pooling для агентов: использовать QueuePool с pool_size=5, max_overflow=10, pool_recycle=3600 для обработки конкурентных запросов агентов без истощения соединений БД
  • Кеширование интроспекции схемы: кешировать запросы information_schema на 5-15 минут, инвалидировать на DDL операциях, снижает повторные запросы метаданных во время разговора агента
  • Стратегии восстановления ошибок: UnsafeSQLError → объяснить нарушение и повторить, SchemaNotFoundError → список похожих таблиц через расстояние Левенштейна, QueryTimeoutError → предложить добавить WHERE фильтры или уменьшить LIMIT
  • Суммирование результатов: запрос возвращает > 20 строк → отправить первые 20 в LLM с общим количеством; использовать более дешёвую модель (claude-haiku) для суммирования; включить статистику колонок (мин, макс, среднее) для числовых данных
  • Стратегия тестирования: unit тесты с SQLite in-memory БД, интеграционные тесты против dockerized PostgreSQL, регрессионные тесты валидатора безопасности с корпусом вредоносного SQL
  • Инструкция в prompt для безопасности: системный prompt должен явно запретить встраивание raw пользовательских строк в SQL, требовать параметризованные запросы для фильтров, инструктировать модель возвращать структурированный JSON с SQL + объяснением

Что такое SQL агент для генерации

SQL агент для генерации - это естественный языковой интерфейс к структурированным БД, работающий в непрерывном цикле запрос-выполнение-интерпретация:

  1. Пользователь вводит вопрос на естественном языке ("Какие клиенты потратили более 10,000 $ в прошлом квартале?")
  2. Агент интроспектирует схему через вызываемые инструменты (список таблиц, описание колонок, проверка внешних ключей)
  3. Агент генерирует SQL запрос из контекста вопроса и понимания схемы
  4. Валидатор безопасности проверяет SQL (парсинг AST, блокировка DDL/DML, принудительное использование LIMIT)
  5. Запрос выполняется на соединении только для чтения с ограничениями timeout
  6. Результаты постобработаны и суммированы (первые N строк, статистика агрегирования, суммирование на естественном языке)
  7. Агент представляет ответ с прозрачностью SQL (пользователь видит сгенерированный запрос для верификации/коррекции)

Это отличается от традиционного ORM или построителя запросов потому что слой трансляции - это LLM, который понимает намерение, справляется с неоднозначностью и может исследовать незнакомую схему автономно. Агент не выполняет произвольный код - он генерирует декларативный SQL, который валидирован перед выполнением.

Фреймворк принятия решений

Когда использовать SQL агент

  • Ad-hoc аналитика на известной схеме: бизнес-аналитики запрашивают хранилище данных без знания SQL
  • Исследование схемы: новые члены команды открывают, какие данные существуют и отношения между таблицами
  • Многоэтапные аналитические рабочие потоки: "Показать топ продукты, потом отфильтровать по регионам, потом вычислить темп роста" требует последовательных запросов
  • Кросс-таблические агрегации: вопросы, охватывающие несколько JOIN'ов, которые требовали бы сложного ручного SQL

Когда использовать предварительно определённые запросы

  • Высокочастотные операционные запросы: дашборды, эндпоинты метрик, запланированные отчёты
  • Производительно-критичные пути: планы запросов должны быть кешированы и оптимизированы; LLM задержка (500ms-2s) неприемлема
  • Регуляторные/аудитные требования: SQL должен быть проверен и одобрен; не может быть делегирована генерация модели
  • Операции записи: любой INSERT/UPDATE/DELETE должен быть явным кодом приложения, никогда генерируемый

Когда использовать RAG вместо результатов SQL

  • Неструктурированные данные в БД: TEXT колонки с документами, JSON блобы, пользовательский контент
  • Поиски семантического сходства: "Найти похожие продукты" требует embeddings, не точного SQL совпадения
  • Гибридные структурированные + неструктурированные: метаданные продуктов (SQL) + отзывы (векторные embeddings) комбинированный поиск
  • Низкая кардинальность, высокая размерность: фильтрирование по 50+ атрибутам лучше обрабатывается векторным сходством, чем WHERE предложениями

Таблица справочника параметров

Параметр Значение Примечания
max_tokens 4096 Генерация SQL + объяснение; увеличить до 8192 для сложных multi-JOIN запросов
model claude-opus-4-5 Генерация запросов; понизить до claude-sonnet-4-5 для более простых схем
model (суммирование) claude-haiku-4-5 Суммирование результатов; дешевле, быстрее для постобработки
temperature 0.0 Детерминированная генерация SQL; увеличить до 0.3 для исследовательских запросов
LIMIT по умолчанию 100 Применяется ко всем SELECT'ам если не обнаружена агрегация
LIMIT максимум 10000 Жёсткий потолок, принудительно применяемый валидатором безопасности
statement_timeout 30000ms PostgreSQL/MySQL timeout запроса; 30s предотвращает бесконечные аналитические запросы
pool_size 5 SQLAlchemy пул соединений; обрабатывает конкурентные сессии агентов
max_overflow 10 Временные соединения сверх pool_size во время пиков
pool_recycle 3600s Переиспользовать соединения каждый час для предотвращения ошибок устаревших соединений
schema_cache_ttl 300s Кешировать результаты интроспекции information_schema на 5 минут
explain_cost_threshold 10000 Отклонить запросы с предполагаемой стоимостью выше этого значения
sample_rows 3 Количество примеров строк, показанных агенту для понимания формата данных
result_summary_rows 20 Максимум строк, отправляемых в LLM для суммирования
tool_call_timeout 60s Максимальное время для одного вызова инструмента (интроспекция схемы)

Общие ошибки

Ошибка 1: Внедрение всей схемы в системный prompt

Влияние: переполнение контекста на больших БД, устаревшая схема после DDL изменений, агент не может исследовать незнакомые таблицы.

Плохо:

# Запекание схемы в prompt
schema_snapshot = get_all_tables_and_columns()  # 50,000 токенов
system_prompt = f"Database schema:\n{schema_snapshot}\n\nGenerate SQL for: {{question}}"

Хорошо:

# Схема как вызываемые инструменты
tools = [
    {
        "name": "list_tables",
        "description": "List all tables with row counts and descriptions",
        "input_schema": {"type": "object", "properties": {"schema": {"type": "string"}}}
    },
    {
        "name": "describe_table",
        "description": "Get columns, types, foreign keys for a specific table",
        "input_schema": {
            "type": "object",
            "properties": {"table_name": {"type": "string"}},
            "required": ["table_name"]
        }
    }
]

Ошибка 2: Доверие к SQL, генерируемому LLM, без AST валидации

Влияние: SQL injection через обфускацию комментариями, выполнение DDL, неограниченные запросы вызывающие блокировку БД.

Плохо:

# Прямое выполнение сгенерированного SQL
sql = generate_sql(user_question)
cursor.execute(sql)  # Нет валидации!

Хорошо:

# AST парсинг и валидация
import sqlglot

def validate_sql_safety(sql: str) -> tuple[bool, str]:
    try:
        parsed = sqlglot.parse_one(sql)
    except Exception as e:
        return False, f"Parse error: {e}"

    # Блокировать DDL
    if isinstance(parsed, (sqlglot.expressions.Create, sqlglot.expressions.Drop)):
        return False, "DDL not allowed"

    # Блокировать записи
    if isinstance(parsed, (sqlglot.expressions.Insert, sqlglot.expressions.Update)):
        return False, "Write operations not allowed"

    # Требовать LIMIT
    if isinstance(parsed, sqlglot.expressions.Select):
        if not parsed.args.get("limit"):
            return False, "SELECT must have LIMIT"

    return True, ""

is_safe, reason = validate_sql_safety(sql)
if not is_safe:
    raise UnsafeSQLError(reason)

Ошибка 3: Использование флага только-чтение на уровне приложения без принудительного применения на БД

Влияние: обход через ошибки пула соединений, уязвимости библиотеки или инъекция вредоносного кода.

Плохо:

# Доверие флагу на уровне приложения
READ_ONLY_MODE = True

def execute_query(sql):
    if READ_ONLY_MODE:
        if sql.strip().upper().startswith("SELECT"):
            return cursor.execute(sql)
    raise ValueError("Read-only mode")

Хорошо:

# Принудительное применение только-чтение на уровне БД
conn = psycopg2.connect(
    host="db.example.com",
    database="analytics",
    user="agent_readonly",  # GRANT SELECT только
    password=os.environ["DB_READONLY_PASSWORD"],
    options="-c default_transaction_read_only=on"  # Принудительное применение на уровне сессии
)
conn.set_session(readonly=True)

Ошибка 4: Пропуск валидации имён таблиц в динамическом SQL

Влияние: SQL injection через вредоносные имена таблиц содержащие разделители выражений или комментарии.

Плохо:

# Небезопасная интерполяция имени таблицы
def get_sample_rows(table_name: str):
    sql = f"SELECT * FROM {table_name} LIMIT 3"  # Injection!
    cursor.execute(sql)

Хорошо:

# Валидировать имя таблицы против каталога схемы
def get_sample_rows(table_name: str):
    cursor.execute("""
        SELECT EXISTS (
            SELECT 1 FROM information_schema.tables
            WHERE table_name = %s
        )
    """, (table_name,))

    if not cursor.fetchone()[0]:
        raise ValueError(f"Table '{table_name}' does not exist")

    # Безопасно использовать после валидации
    cursor.execute(f"SELECT * FROM {table_name} LIMIT 3")

Ошибка 5: Отсутствие принудительного timeout запроса

Влияние: один дорогой запрос (Cartesian JOIN, отсутствие WHERE) блокирует БД, предотвращает конкурентные запросы агентов.

Плохо:

# Нет защиты от timeout
cursor.execute(generated_sql)  # Может выполняться часами
results = cursor.fetchall()

Хорошо:

# Timeout на уровне выражения
cursor.execute("SET statement_timeout = '30s'")
try:
    cursor.execute(generated_sql)
    results = cursor.fetchall()
except psycopg2.errors.QueryCanceled:
    raise QueryTimeoutError("Query exceeded 30s limit")
finally:
    cursor.execute("SET statement_timeout = DEFAULT")

Реализация SQL агента

Архитектура ядра с SQLDatabaseToolkit

from anthropic import Anthropic
import psycopg2
import sqlglot
import json
from typing import Any, Dict, List

client = Anthropic()

class DatabaseIntrospectionTools:
    def __init__(self, connection_string: str):
        self.conn_string = connection_string

    def _get_connection(self):
        return psycopg2.connect(self.conn_string)

    def list_tables(self, schema: str = "public") -> List[Dict]:
        """List all tables with row counts and descriptions."""
        conn = self._get_connection()
        try:
            with conn.cursor() as cur:
                cur.execute("""
                    SELECT
                        t.table_name,
                        t.table_type,
                        obj_description(c.oid, 'pg_class') as description,
                        (SELECT reltuples::bigint
                         FROM pg_class
                         WHERE oid = c.oid) as approx_row_count
                    FROM information_schema.tables t
                    LEFT JOIN pg_class c ON c.relname = t.table_name
                    WHERE t.table_schema = %s
                    ORDER BY t.table_name
                """, (schema,))

                columns = [desc[0] for desc in cur.description]
                return [dict(zip(columns, row)) for row in cur.fetchall()]
        finally:
            conn.close()

    def describe_table(self, table_name: str, schema: str = "public") -> Dict:
        """Get detailed column information for a table."""
        conn = self._get_connection()
        try:
            with conn.cursor() as cur:
                # Validate table exists
                cur.execute("""
                    SELECT EXISTS (
                        SELECT 1 FROM information_schema.tables
                        WHERE table_name = %s AND table_schema = %s
                    )
                """, (table_name, schema))

                if not cur.fetchone()[0]:
                    raise ValueError(f"Table '{table_name}' does not exist")

                # Get columns
                cur.execute("""
                    SELECT
                        c.column_name,
                        c.data_type,
                        c.is_nullable,
                        c.column_default,
                        c.character_maximum_length,
                        col_description(
                            (table_schema || '.' || table_name)::regclass::oid,
                            ordinal_position
                        ) as description
                    FROM information_schema.columns c
                    WHERE c.table_name = %s AND c.table_schema = %s
                    ORDER BY c.ordinal_position
                """, (table_name, schema))

                cols = [desc[0] for desc in cur.description]
                columns = [dict(zip(cols, row)) for row in cur.fetchall()]

                # Get foreign keys
                cur.execute("""
                    SELECT
                        kcu.column_name,
                        ccu.table_name AS foreign_table,
                        ccu.column_name AS foreign_column
                    FROM information_schema.table_constraints tc
                    JOIN information_schema.key_column_usage kcu
                        ON tc.constraint_name = kcu.constraint_name
                    JOIN information_schema.constraint_column_usage ccu
                        ON tc.constraint_name = ccu.constraint_name
                    WHERE tc.constraint_type = 'FOREIGN KEY'
                        AND tc.table_name = %s
                """, (table_name,))

                fk_cols = [desc[0] for desc in cur.description]
                foreign_keys = [dict(zip(fk_cols, row)) for row in cur.fetchall()]

                return {
                    "table_name": table_name,
                    "columns": columns,
                    "foreign_keys": foreign_keys
                }
        finally:
            conn.close()

    def get_sample_rows(self, table_name: str, limit: int = 3) -> List[Dict]:
        """Get sample rows to understand data format."""
        conn = self._get_connection()
        try:
            with conn.cursor() as cur:
                # Validate table name
                cur.execute("""
                    SELECT EXISTS (
                        SELECT 1 FROM information_schema.tables
                        WHERE table_name = %s
                    )
                """, (table_name,))

                if not cur.fetchone()[0]:
                    raise ValueError(f"Table '{table_name}' does not exist")

                cur.execute(f"SELECT * FROM {table_name} LIMIT %s", (limit,))
                cols = [desc[0] for desc in cur.description]
                return [dict(zip(cols, row)) for row in cur.fetchall()]
        finally:
            conn.close()

def get_schema_tools(db_tools: DatabaseIntrospectionTools) -> List[Dict]:
    """Return tool definitions for schema introspection."""
    return [
        {
            "name": "list_tables",
            "description": "List all tables in the database with approximate row counts. Use this first to discover available tables.",
            "input_schema": {
                "type": "object",
                "properties": {
                    "schema": {
                        "type": "string",
                        "description": "Database schema name (default: public)",
                        "default": "public"
                    }
                }
            }
        },
        {
            "name": "describe_table",
            "description": "Get detailed schema for a specific table including columns, data types, foreign keys. Use this to understand table structure before generating SQL.",
            "input_schema": {
                "type": "object",
                "properties": {
                    "table_name": {"type": "string", "description": "Name of the table to describe"},
                    "schema": {"type": "string", "default": "public"}
                },
                "required": ["table_name"]
            }
        },
        {
            "name": "get_sample_rows",
            "description": "Get 3 sample rows from a table to understand actual data format and values.",
            "input_schema": {
                "type": "object",
                "properties": {
                    "table_name": {"type": "string", "description": "Name of the table"}
                },
                "required": ["table_name"]
            }
        },
        {
            "name": "execute_read_query",
            "description": "Execute a SELECT query and return results. Query must include LIMIT clause.",
            "input_schema": {
                "type": "object",
                "properties": {
                    "sql": {"type": "string", "description": "SQL SELECT statement to execute"}
                },
                "required": ["sql"]
            }
        }
    ]

Цикл генерации запроса с вызовом инструментов

def validate_sql_safety(sql: str) -> tuple[bool, str]:
    """Validate SQL using AST parsing."""
    try:
        parsed = sqlglot.parse_one(sql)
    except Exception as e:
        return False, f"SQL parse error: {e}"

    # Block DDL
    ddl_types = (
        sqlglot.expressions.Create,
        sqlglot.expressions.Drop,
        sqlglot.expressions.Alter,
        sqlglot.expressions.Truncate,
    )
    if isinstance(parsed, ddl_types):
        return False, f"DDL statements not allowed: {type(parsed).__name__}"

    # Block DML writes
    dml_types = (
        sqlglot.expressions.Insert,
        sqlglot.expressions.Update,
        sqlglot.expressions.Delete,
        sqlglot.expressions.Merge,
    )
    if isinstance(parsed, dml_types):
        return False, f"Write statements not allowed: {type(parsed).__name__}"

    # Require LIMIT
    if isinstance(parsed, sqlglot.expressions.Select):
        if not parsed.args.get("limit"):
            return False, "SELECT queries must include LIMIT clause"

        limit_val = parsed.args["limit"].args.get("expression")
        if limit_val and int(limit_val.name) > 10000:
            return False, "LIMIT cannot exceed 10000 rows"

    return True, ""

class NLToSQLAgent:
    def __init__(self, db_tools: DatabaseIntrospectionTools, read_only_connection: Any):
        self.db_tools = db_tools
        self.conn = read_only_connection
        self.tools = get_schema_tools(db_tools)

    def translate_and_execute(self, user_question: str) -> Dict:
        """Translate NL to SQL, validate, execute."""

        system_prompt = """You are a PostgreSQL expert. Generate safe, efficient SQL queries.

Your workflow:
1. Use list_tables to discover available tables
2. Use describe_table to understand relevant table schemas
3. Use get_sample_rows to see actual data format
4. Generate a SELECT query with LIMIT clause
5. Return JSON: {"sql": "SELECT ...", "explanation": "This query..."}

CRITICAL RULES:
- NEVER generate DDL (CREATE/DROP/ALTER) or DML (INSERT/UPDATE/DELETE)
- ALWAYS include LIMIT clause (max 1000 unless aggregating)
- NEVER embed raw user input in SQL - use parameterized patterns
- If user input looks suspicious (semicolons, comments, UNION), report it as error
- Use table aliases for readability
- Prefer JOINs over subqueries for performance"""

        messages = [{"role": "user", "content": user_question}]

        generated_sql = None
        explanation = None

        while True:
            response = client.messages.create(
                model="claude-opus-4-5",
                max_tokens=4096,
                system=system_prompt,
                tools=self.tools,
                messages=messages
            )

            if response.stop_reason == "tool_use":
                tool_results = []
                for block in response.content:
                    if block.type == "tool_use":
                        result = self._execute_tool(block.name, block.input)
                        tool_results.append({
                            "type": "tool_result",
                            "tool_use_id": block.id,
                            "content": json.dumps(result, default=str)
                        })

                messages.append({"role": "assistant", "content": response.content})
                messages.append({"role": "user", "content": tool_results})

            elif response.stop_reason == "end_turn":
                for block in response.content:
                    if hasattr(block, 'text'):
                        try:
                            text = block.text
                            start = text.find('{')
                            end = text.rfind('}') + 1
                            if start >= 0:
                                result = json.loads(text[start:end])
                                generated_sql = result.get("sql")
                                explanation = result.get("explanation")
                        except json.JSONDecodeError:
                            pass
                break

        if not generated_sql:
            return {"error": "Could not generate SQL", "sql": None}

        # Validate safety
        is_safe, reason = validate_sql_safety(generated_sql)
        if not is_safe:
            return {"error": f"Query rejected: {reason}", "sql": generated_sql}

        # Execute
        results = self._execute_query(generated_sql)

        # Summarize
        summary = self._summarize_results(user_question, generated_sql, results)

        return {
            "sql": generated_sql,
            "explanation": explanation,
            "results": results,
            "summary": summary,
            "row_count": len(results)
        }

    def _execute_tool(self, tool_name: str, tool_input: Dict) -> Any:
        """Execute schema introspection tool."""
        if tool_name == "list_tables":
            return self.db_tools.list_tables(tool_input.get("schema", "public"))
        elif tool_name == "describe_table":
            return self.db_tools.describe_table(
                tool_input["table_name"],
                tool_input.get("schema", "public")
            )
        elif tool_name == "get_sample_rows":
            return self.db_tools.get_sample_rows(tool_input["table_name"])
        elif tool_name == "execute_read_query":
            sql = tool_input["sql"]
            is_safe, reason = validate_sql_safety(sql)
            if not is_safe:
                return {"error": f"Query rejected: {reason}"}
            return self._execute_query(sql)
        return {"error": "Unknown tool"}

    def _execute_query(self, sql: str) -> List[Dict]:
        """Execute validated SQL with timeout."""
        with self.conn.cursor() as cur:
            cur.execute("SET statement_timeout = '30s'")
            try:
                cur.execute(sql)
                if cur.description:
                    columns = [desc[0] for desc in cur.description]
                    return [dict(zip(columns, row)) for row in cur.fetchall()]
            except psycopg2.errors.QueryCanceled:
                raise QueryTimeoutError("Query exceeded 30s limit")
            finally:
                cur.execute("SET statement_timeout = DEFAULT")
        return []

    def _summarize_results(self, question: str, sql: str, results: List[Dict]) -> str:
        """Generate natural language summary."""
        if not results:
            return "The query returned no results."

        sample = results[:20] if len(results) > 20 else results

        prompt = f"""User asked: "{question}"

SQL executed:
```sql
{sql}

Results ({len(results)} total rows, showing first {len(sample)}): {json.dumps(sample, indent=2, default=str)}

Provide a 2-3 sentence summary that directly answers the user's question."""

    response = client.messages.create(
        model="claude-haiku-4-5",
        max_tokens=512,
        messages=[{"role": "user", "content": prompt}]
    )

    return response.content[0].text

## Управление контекстом схемы

### Стратегия усечения больших схем

Для БД с 100+ таблицами, внедрение полной схемы превышает лимиты контекста и тратит токены на нерелевантные таблицы. Используйте предварительный отбор таблиц:

```python
def select_relevant_tables(user_question: str, all_tables: List[Dict]) -> List[str]:
    """Use LLM to identify relevant tables before introspection."""

    table_list = "\n".join([
        f"- {t['table_name']}: {t.get('description', 'no description')} ({t.get('approx_row_count', 0)} rows)"
        for t in all_tables
    ])

    prompt = f"""Available tables:
{table_list}

User question: "{user_question}"

Which tables are likely needed to answer this question? Return ONLY table names, one per line."""

    response = client.messages.create(
        model="claude-haiku-4-5",
        max_tokens=256,
        messages=[{"role": "user", "content": prompt}]
    )

    selected = [line.strip() for line in response.content[0].text.split('\n') if line.strip()]
    return selected[:5]  # Limit to 5 tables maximum

# Usage in agent
all_tables = db_tools.list_tables()
relevant_tables = select_relevant_tables(user_question, all_tables)

# Only introspect selected tables
for table in relevant_tables:
    schema = db_tools.describe_table(table)
    # Inject into context

Паттерн инкрементального открытия схемы

Вместо предзагрузки схемы, позволить агенту открывать таблицы по требованию:

# System prompt instructs agent to use tools incrementally
system_prompt = """You have access to schema introspection tools. Use them strategically:

1. Start with list_tables to see available tables
2. Identify 2-3 tables most likely relevant to the question
3. Use describe_table on ONLY those tables
4. If query fails or returns empty results, explore additional tables
5. Avoid calling describe_table on all tables - this wastes time and tokens

This incremental approach keeps context focused and efficient."""

Кеширование схемы для повторных запросов

Кешировать результаты information_schema для избежания повторных запросов метаданных:

import time
from functools import lru_cache

class CachedDatabaseTools(DatabaseIntrospectionTools):
    def __init__(self, connection_string: str, cache_ttl: int = 300):
        super().__init__(connection_string)
        self.cache_ttl = cache_ttl
        self._cache_timestamp = {}

    def list_tables(self, schema: str = "public") -> List[Dict]:
        cache_key = f"tables_{schema}"

        if cache_key in self._cache_timestamp:
            if time.time() - self._cache_timestamp[cache_key] < self.cache_ttl:
                return self._get_cached_tables(schema)

        tables = super().list_tables(schema)
        self._cache_timestamp[cache_key] = time.time()
        self._store_cached_tables(schema, tables)
        return tables

    @lru_cache(maxsize=100)
    def _get_cached_tables(self, schema: str) -> List[Dict]:
        return super().list_tables(schema)

    def invalidate_cache(self):
        """Call after DDL operations."""
        self._cache_timestamp.clear()
        self._get_cached_tables.cache_clear()

Безопасность и паттерны только-чтение

Принудительное применение разрешений на уровне БД

-- PostgreSQL: Create read-only user
CREATE USER agent_readonly WITH PASSWORD 'secure_password_here';
GRANT CONNECT ON DATABASE analytics TO agent_readonly;
GRANT USAGE ON SCHEMA public TO agent_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO agent_readonly;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO agent_readonly;

-- Verify permissions
SELECT grantee, privilege_type
FROM information_schema.role_table_grants
WHERE grantee = 'agent_readonly';
# Connection with session-level read-only enforcement
def create_read_only_connection(host: str, database: str, port: int = 5432):
    conn = psycopg2.connect(
        host=host,
        database=database,
        port=port,
        user="agent_readonly",
        password=os.environ["DB_READONLY_PASSWORD"],
        options="-c default_transaction_read_only=on"  # Session-level protection
    )
    conn.set_session(readonly=True, autocommit=True)
    return conn

Очистка запросов с AST парсингом

def detect_injection_patterns(sql: str) -> tuple[bool, str]:
    """Additional heuristic checks beyond AST parsing."""

    sql_upper = sql.upper()

    # Check for statement separators
    if ';' in sql and not sql.strip().endswith(';'):
        return False, "Multiple statements detected (semicolon not at end)"

    # Check for comment-based obfuscation
    if '--' in sql or '/*' in sql:
        return False, "Comments in SQL not allowed (injection risk)"

    # Check for UNION-based exfiltration
    if 'UNION' in sql_upper:
        # UNION is sometimes legitimate, but log it
        import logging
        logging.warning(f"UNION query detected: {sql}")

    # Check for dynamic SQL execution
    forbidden = ['EXECUTE', 'EXEC(', 'sp_executesql']
    for pattern in forbidden:
        if pattern in sql_upper:
            return False, f"Dynamic SQL execution not allowed: {pattern}"

    return True, ""

def validate_sql_safety(sql: str) -> tuple[bool, str]:
    """Combined AST + heuristic validation."""

    # AST validation
    try:
        parsed = sqlglot.parse_one(sql)
    except Exception as e:
        return False, f"Parse error: {e}"

    # Type checks
    if isinstance(parsed, (sqlglot.expressions.Create, sqlglot.expressions.Drop,
                          sqlglot.expressions.Alter, sqlglot.expressions.Truncate)):
        return False, "DDL not allowed"

    if isinstance(parsed, (sqlglot.expressions.Insert, sqlglot.expressions.Update,
                          sqlglot.expressions.Delete, sqlglot.expressions.Merge)):
        return False, "Write operations not allowed"

    # LIMIT enforcement
    if isinstance(parsed, sqlglot.expressions.Select):
        if not parsed.args.get("limit"):
            return False, "SELECT must have LIMIT"

    # Heuristic checks
    is_safe, reason = detect_injection_patterns(sql)
    if not is_safe:
        return False, reason

    return True, ""

Режим dry-run с EXPLAIN

def dry_run_query(sql: str, conn) -> Dict:
    """Execute EXPLAIN to check query plan without running query."""

    with conn.cursor() as cur:
        cur.execute(f"EXPLAIN (FORMAT JSON, ANALYZE FALSE) {sql}")
        plan = cur.fetchone()[0][0]

        total_cost = plan.get('Plan', {}).get('Total Cost', 0)

        # Reject expensive queries
        if total_cost > 10000:
            raise ValueError(f"Query cost too high: {total_cost}")

        return plan

# Use in agent
plan = dry_run_query(generated_sql, conn)
if plan['Plan'].get('Node Type') == 'Seq Scan':
    # Warn about full table scan
    logging.warning(f"Sequential scan detected on {plan['Plan']['Relation Name']}")

Производительность и бенчмарки

Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.

Задержка генерации запроса

  • Простой single-table SELECT: агент интроспектирует 1 таблицу, генерирует SQL за 1-2 секунды (1 вызов инструмента + 1 генерация LLM)
  • Multi-table JOIN: агент интроспектирует 3-4 таблицы, генерирует SQL за 3-5 секунд (4 вызова инструментов + 1 генерация LLM)
  • Исследовательский запрос (незнакомая схема): агент перечисляет все таблицы, делает выборку 2-3, интроспектирует 5+ таблиц, генерирует SQL за 5-10 секунд
  • Кешированная схема: последующие запросы на же таблицы сокращаются до 1-2 секунд (интроспекция не требуется)

Паттерны потребления токенов

  • Интроспекция схемы: каждый вызов describe_table возвращает 200-500 токенов; 5 таблиц = 1000-2500 токенов
  • Overhead вызова инструмента: каждый вызов инструмента добавляет 100-150 токенов (вызов функции + обёртка результата)
  • Генерация SQL: типичный системный prompt (500 токенов) + разговор (1000-3000 токенов) + контекст схемы (2000-5000 токенов)
  • Всего за запрос: 3000-8000 токенов для входа, 200-800 токенов для выхода (SQL + объяснение)

Выполнение запроса БД

  • Простая агрегация (SELECT COUNT(*)): выполняется за 10-50ms на индексированной колонке
  • Multi-table JOIN (3 таблицы, правильные индексы): выполняется за 50-200ms на датасетах < 1M строк
  • Полное сканирование таблицы (отсутствие WHERE): блокируется принудительным LIMIT; типичное выполнение 100-500ms для LIMIT 1000
  • Сложный аналитический запрос (оконные функции, подзапросы): может достичь 5-30 секунд; принудительный timeout на 30s предотвращает бесконечные запросы

Опыт пользователя от начала до конца

  • Лучший случай (простой запрос, кешированная схема): 1.5-2 секунды (вопрос пользователя → ответ на естественном языке)
  • Типичный случай (средняя сложность): 4-6 секунд (интроспекция + генерация + выполнение + суммирование)
  • Худший случай (исследовательский запрос на незнакомой схеме): 8-12 секунд (несколько раундов интроспекции)

Стратегии оптимизации стоимости

  • Использовать claude-haiku-4-5 для суммирования результатов (10x дешевле чем Opus)
  • Кешировать результаты интроспекции схемы на 5-15 минут
  • Предварительно отбирать релевантные таблицы перед полной интроспекцией (снижает вызовы инструментов с 50+ до 5)
  • Ограничить суммирование результатов первыми 20 строками (снижает токены выхода)

Конкурентность и пул соединений

Для развёртываний агентов multi-user:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "postgresql://agent_readonly:password@db.example.com/analytics",
    poolclass=QueuePool,
    pool_size=5,           # 5 persistent connections
    max_overflow=10,       # 10 additional connections during spikes
    pool_recycle=3600,     # Recycle connections after 1 hour
    pool_pre_ping=True     # Verify connection health before use
)

# Usage
with engine.connect() as conn:
    agent = NLToSQLAgent(db_tools, conn)
    result = agent.translate_and_execute(user_question)

Эта конфигурация справляется с 5 конкурентными сессиями агентов комфортно, с пиковой ёмкостью до 15 во время пиковой нагрузки.