title: "Агенты + Базы данных: генерация SQL, интроспекция схемы, безопасное выполнение запросов" slug: agents-databases-sql-generation-2026-ru date: 2026-02-22 lang: ru
Агенты + Базы данных: генерация SQL, интроспекция схемы, безопасное выполнение запросов
Ключевые факты
- Версии пакетов Python (PyPI, март 2026):
langchain==1.2.10,langgraph==1.0.10,sqlalchemy==2.0.47,psycopg2-binary==2.9.11,pymysql==1.1.2,sqlglot==29.0.1,anthropic==0.84.0,pgvector==0.4.2 - Классы интеграции LangChain SQL:
SQLDatabase(обёртка вокруг SQLAlchemy engine),SQLDatabaseToolkit(коллекция инструментов для интроспекции схемы),create_sql_agent()(фабричная функция для предварительно настроенного агента) - SQLAlchemy 2.0 vs 1.x ключевые различия: 2.0 требует явного
connection.commit(), использует обёрткуtext()для raw SQL,engine.connect()возвращает контекстный менеджер по умолчанию, удалена неявная коерция строк для имён таблиц - Сигнатуры методов инструментов LangChain:
SQLDatabaseToolkit.get_tools()возвращает список экземпляровBaseTool:QuerySQLDataBaseTool,InfoSQLDatabaseTool,ListSQLDatabaseTool,QuerySQLCheckerTool - Принудительное чтение на уровне БД: PostgreSQL
GRANT SELECTтолько, MySQLGRANT SELECT ON db.*, SQLitePRAGMA query_only = ON, параметр подключения-c default_transaction_read_only=on - Оценка токенов контекста схемы: каждая таблица в среднем 150-300 токенов (имя таблицы, колонки, типы, ограничения); БД из 100 таблиц потребляет 15,000-30,000 токенов перед генерацией запроса
- Максимальный безопасный размер схемы: полная схема может быть внедрена при количестве до 50 таблиц; выше 100 таблиц требуется предварительный отбор таблиц или инкрементальные инструменты интроспекции
- Векторы SQL injection в LLM-генерируемых запросах: пользовательский ввод без параметризации в WHERE, имена таблиц из пользовательского ввода в FROM, обфускация комментариями (
--,/**/), цепочка выражений с точкой с запятой,UNION ALLутечка данных - AST валидация sqlglot: парсит SQL в дерево выражений, обнаруживает DDL (
Create,Drop,Alter,Truncate), DML записи (Insert,Update,Delete,Merge), отсутствиеLIMIT, глубину вложенных подзапросов - Принудительное использование LIMIT: агенты должны добавлять
LIMITво все SELECT запросы; типичные значения по умолчанию: 100 для исследования, 1000 для отчётов, 10000 жёсткий потолок; валидация отклоняет неограниченные SELECT'ы - Конфигурация timeout выражения: PostgreSQL
SET statement_timeout = '30s', MySQLSET max_execution_time = 30000, предотвращает бесконечные аналитические запросы от блокировки БД - Оценка стоимости запроса:
EXPLAIN (FORMAT JSON)перед выполнением; отклонить запросы со стоимостью > 10000, полные сканирования таблиц > 1M строк, отсутствие индексов на JOIN колонках - Маршрутизация мультибазы данных: агент получает реестр БД (имя, описание, примеры таблиц); использует LLM для классификации вопроса и маршрутизации в правильную БД; откат к стандартной если неоднозначно
- Гибридный паттерн векторного + SQL поиска: сгенерировать embedding для семантического компонента, извлечь SQL фильтр для точного совпадения, объединить через
WHERE filter AND ORDER BY embedding <=> query_vector LIMIT k - Connection pooling для агентов: использовать
QueuePoolсpool_size=5,max_overflow=10,pool_recycle=3600для обработки конкурентных запросов агентов без истощения соединений БД - Кеширование интроспекции схемы: кешировать запросы
information_schemaна 5-15 минут, инвалидировать на DDL операциях, снижает повторные запросы метаданных во время разговора агента - Стратегии восстановления ошибок: UnsafeSQLError → объяснить нарушение и повторить, SchemaNotFoundError → список похожих таблиц через расстояние Левенштейна, QueryTimeoutError → предложить добавить WHERE фильтры или уменьшить LIMIT
- Суммирование результатов: запрос возвращает > 20 строк → отправить первые 20 в LLM с общим количеством; использовать более дешёвую модель (
claude-haiku) для суммирования; включить статистику колонок (мин, макс, среднее) для числовых данных - Стратегия тестирования: unit тесты с SQLite in-memory БД, интеграционные тесты против dockerized PostgreSQL, регрессионные тесты валидатора безопасности с корпусом вредоносного SQL
- Инструкция в prompt для безопасности: системный prompt должен явно запретить встраивание raw пользовательских строк в SQL, требовать параметризованные запросы для фильтров, инструктировать модель возвращать структурированный JSON с SQL + объяснением
Что такое SQL агент для генерации
SQL агент для генерации - это естественный языковой интерфейс к структурированным БД, работающий в непрерывном цикле запрос-выполнение-интерпретация:
- Пользователь вводит вопрос на естественном языке ("Какие клиенты потратили более 10,000 $ в прошлом квартале?")
- Агент интроспектирует схему через вызываемые инструменты (список таблиц, описание колонок, проверка внешних ключей)
- Агент генерирует SQL запрос из контекста вопроса и понимания схемы
- Валидатор безопасности проверяет SQL (парсинг AST, блокировка DDL/DML, принудительное использование LIMIT)
- Запрос выполняется на соединении только для чтения с ограничениями timeout
- Результаты постобработаны и суммированы (первые N строк, статистика агрегирования, суммирование на естественном языке)
- Агент представляет ответ с прозрачностью SQL (пользователь видит сгенерированный запрос для верификации/коррекции)
Это отличается от традиционного ORM или построителя запросов потому что слой трансляции - это LLM, который понимает намерение, справляется с неоднозначностью и может исследовать незнакомую схему автономно. Агент не выполняет произвольный код - он генерирует декларативный SQL, который валидирован перед выполнением.
Фреймворк принятия решений
Когда использовать SQL агент
- Ad-hoc аналитика на известной схеме: бизнес-аналитики запрашивают хранилище данных без знания SQL
- Исследование схемы: новые члены команды открывают, какие данные существуют и отношения между таблицами
- Многоэтапные аналитические рабочие потоки: "Показать топ продукты, потом отфильтровать по регионам, потом вычислить темп роста" требует последовательных запросов
- Кросс-таблические агрегации: вопросы, охватывающие несколько JOIN'ов, которые требовали бы сложного ручного SQL
Когда использовать предварительно определённые запросы
- Высокочастотные операционные запросы: дашборды, эндпоинты метрик, запланированные отчёты
- Производительно-критичные пути: планы запросов должны быть кешированы и оптимизированы; LLM задержка (500ms-2s) неприемлема
- Регуляторные/аудитные требования: SQL должен быть проверен и одобрен; не может быть делегирована генерация модели
- Операции записи: любой INSERT/UPDATE/DELETE должен быть явным кодом приложения, никогда генерируемый
Когда использовать RAG вместо результатов SQL
- Неструктурированные данные в БД:
TEXTколонки с документами, JSON блобы, пользовательский контент - Поиски семантического сходства: "Найти похожие продукты" требует embeddings, не точного SQL совпадения
- Гибридные структурированные + неструктурированные: метаданные продуктов (SQL) + отзывы (векторные embeddings) комбинированный поиск
- Низкая кардинальность, высокая размерность: фильтрирование по 50+ атрибутам лучше обрабатывается векторным сходством, чем WHERE предложениями
Таблица справочника параметров
| Параметр | Значение | Примечания |
|---|---|---|
max_tokens |
4096 | Генерация SQL + объяснение; увеличить до 8192 для сложных multi-JOIN запросов |
model |
claude-opus-4-5 |
Генерация запросов; понизить до claude-sonnet-4-5 для более простых схем |
model (суммирование) |
claude-haiku-4-5 |
Суммирование результатов; дешевле, быстрее для постобработки |
temperature |
0.0 | Детерминированная генерация SQL; увеличить до 0.3 для исследовательских запросов |
LIMIT по умолчанию |
100 | Применяется ко всем SELECT'ам если не обнаружена агрегация |
LIMIT максимум |
10000 | Жёсткий потолок, принудительно применяемый валидатором безопасности |
statement_timeout |
30000ms | PostgreSQL/MySQL timeout запроса; 30s предотвращает бесконечные аналитические запросы |
pool_size |
5 | SQLAlchemy пул соединений; обрабатывает конкурентные сессии агентов |
max_overflow |
10 | Временные соединения сверх pool_size во время пиков |
pool_recycle |
3600s | Переиспользовать соединения каждый час для предотвращения ошибок устаревших соединений |
schema_cache_ttl |
300s | Кешировать результаты интроспекции information_schema на 5 минут |
explain_cost_threshold |
10000 | Отклонить запросы с предполагаемой стоимостью выше этого значения |
sample_rows |
3 | Количество примеров строк, показанных агенту для понимания формата данных |
result_summary_rows |
20 | Максимум строк, отправляемых в LLM для суммирования |
tool_call_timeout |
60s | Максимальное время для одного вызова инструмента (интроспекция схемы) |
Общие ошибки
Ошибка 1: Внедрение всей схемы в системный prompt
Влияние: переполнение контекста на больших БД, устаревшая схема после DDL изменений, агент не может исследовать незнакомые таблицы.
❌ Плохо:
# Запекание схемы в prompt
schema_snapshot = get_all_tables_and_columns() # 50,000 токенов
system_prompt = f"Database schema:\n{schema_snapshot}\n\nGenerate SQL for: {{question}}"
✅ Хорошо:
# Схема как вызываемые инструменты
tools = [
{
"name": "list_tables",
"description": "List all tables with row counts and descriptions",
"input_schema": {"type": "object", "properties": {"schema": {"type": "string"}}}
},
{
"name": "describe_table",
"description": "Get columns, types, foreign keys for a specific table",
"input_schema": {
"type": "object",
"properties": {"table_name": {"type": "string"}},
"required": ["table_name"]
}
}
]
Ошибка 2: Доверие к SQL, генерируемому LLM, без AST валидации
Влияние: SQL injection через обфускацию комментариями, выполнение DDL, неограниченные запросы вызывающие блокировку БД.
❌ Плохо:
# Прямое выполнение сгенерированного SQL
sql = generate_sql(user_question)
cursor.execute(sql) # Нет валидации!
✅ Хорошо:
# AST парсинг и валидация
import sqlglot
def validate_sql_safety(sql: str) -> tuple[bool, str]:
try:
parsed = sqlglot.parse_one(sql)
except Exception as e:
return False, f"Parse error: {e}"
# Блокировать DDL
if isinstance(parsed, (sqlglot.expressions.Create, sqlglot.expressions.Drop)):
return False, "DDL not allowed"
# Блокировать записи
if isinstance(parsed, (sqlglot.expressions.Insert, sqlglot.expressions.Update)):
return False, "Write operations not allowed"
# Требовать LIMIT
if isinstance(parsed, sqlglot.expressions.Select):
if not parsed.args.get("limit"):
return False, "SELECT must have LIMIT"
return True, ""
is_safe, reason = validate_sql_safety(sql)
if not is_safe:
raise UnsafeSQLError(reason)
Ошибка 3: Использование флага только-чтение на уровне приложения без принудительного применения на БД
Влияние: обход через ошибки пула соединений, уязвимости библиотеки или инъекция вредоносного кода.
❌ Плохо:
# Доверие флагу на уровне приложения
READ_ONLY_MODE = True
def execute_query(sql):
if READ_ONLY_MODE:
if sql.strip().upper().startswith("SELECT"):
return cursor.execute(sql)
raise ValueError("Read-only mode")
✅ Хорошо:
# Принудительное применение только-чтение на уровне БД
conn = psycopg2.connect(
host="db.example.com",
database="analytics",
user="agent_readonly", # GRANT SELECT только
password=os.environ["DB_READONLY_PASSWORD"],
options="-c default_transaction_read_only=on" # Принудительное применение на уровне сессии
)
conn.set_session(readonly=True)
Ошибка 4: Пропуск валидации имён таблиц в динамическом SQL
Влияние: SQL injection через вредоносные имена таблиц содержащие разделители выражений или комментарии.
❌ Плохо:
# Небезопасная интерполяция имени таблицы
def get_sample_rows(table_name: str):
sql = f"SELECT * FROM {table_name} LIMIT 3" # Injection!
cursor.execute(sql)
✅ Хорошо:
# Валидировать имя таблицы против каталога схемы
def get_sample_rows(table_name: str):
cursor.execute("""
SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_name = %s
)
""", (table_name,))
if not cursor.fetchone()[0]:
raise ValueError(f"Table '{table_name}' does not exist")
# Безопасно использовать после валидации
cursor.execute(f"SELECT * FROM {table_name} LIMIT 3")
Ошибка 5: Отсутствие принудительного timeout запроса
Влияние: один дорогой запрос (Cartesian JOIN, отсутствие WHERE) блокирует БД, предотвращает конкурентные запросы агентов.
❌ Плохо:
# Нет защиты от timeout
cursor.execute(generated_sql) # Может выполняться часами
results = cursor.fetchall()
✅ Хорошо:
# Timeout на уровне выражения
cursor.execute("SET statement_timeout = '30s'")
try:
cursor.execute(generated_sql)
results = cursor.fetchall()
except psycopg2.errors.QueryCanceled:
raise QueryTimeoutError("Query exceeded 30s limit")
finally:
cursor.execute("SET statement_timeout = DEFAULT")
Реализация SQL агента
Архитектура ядра с SQLDatabaseToolkit
from anthropic import Anthropic
import psycopg2
import sqlglot
import json
from typing import Any, Dict, List
client = Anthropic()
class DatabaseIntrospectionTools:
def __init__(self, connection_string: str):
self.conn_string = connection_string
def _get_connection(self):
return psycopg2.connect(self.conn_string)
def list_tables(self, schema: str = "public") -> List[Dict]:
"""List all tables with row counts and descriptions."""
conn = self._get_connection()
try:
with conn.cursor() as cur:
cur.execute("""
SELECT
t.table_name,
t.table_type,
obj_description(c.oid, 'pg_class') as description,
(SELECT reltuples::bigint
FROM pg_class
WHERE oid = c.oid) as approx_row_count
FROM information_schema.tables t
LEFT JOIN pg_class c ON c.relname = t.table_name
WHERE t.table_schema = %s
ORDER BY t.table_name
""", (schema,))
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in cur.fetchall()]
finally:
conn.close()
def describe_table(self, table_name: str, schema: str = "public") -> Dict:
"""Get detailed column information for a table."""
conn = self._get_connection()
try:
with conn.cursor() as cur:
# Validate table exists
cur.execute("""
SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_name = %s AND table_schema = %s
)
""", (table_name, schema))
if not cur.fetchone()[0]:
raise ValueError(f"Table '{table_name}' does not exist")
# Get columns
cur.execute("""
SELECT
c.column_name,
c.data_type,
c.is_nullable,
c.column_default,
c.character_maximum_length,
col_description(
(table_schema || '.' || table_name)::regclass::oid,
ordinal_position
) as description
FROM information_schema.columns c
WHERE c.table_name = %s AND c.table_schema = %s
ORDER BY c.ordinal_position
""", (table_name, schema))
cols = [desc[0] for desc in cur.description]
columns = [dict(zip(cols, row)) for row in cur.fetchall()]
# Get foreign keys
cur.execute("""
SELECT
kcu.column_name,
ccu.table_name AS foreign_table,
ccu.column_name AS foreign_column
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage ccu
ON tc.constraint_name = ccu.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_name = %s
""", (table_name,))
fk_cols = [desc[0] for desc in cur.description]
foreign_keys = [dict(zip(fk_cols, row)) for row in cur.fetchall()]
return {
"table_name": table_name,
"columns": columns,
"foreign_keys": foreign_keys
}
finally:
conn.close()
def get_sample_rows(self, table_name: str, limit: int = 3) -> List[Dict]:
"""Get sample rows to understand data format."""
conn = self._get_connection()
try:
with conn.cursor() as cur:
# Validate table name
cur.execute("""
SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_name = %s
)
""", (table_name,))
if not cur.fetchone()[0]:
raise ValueError(f"Table '{table_name}' does not exist")
cur.execute(f"SELECT * FROM {table_name} LIMIT %s", (limit,))
cols = [desc[0] for desc in cur.description]
return [dict(zip(cols, row)) for row in cur.fetchall()]
finally:
conn.close()
def get_schema_tools(db_tools: DatabaseIntrospectionTools) -> List[Dict]:
"""Return tool definitions for schema introspection."""
return [
{
"name": "list_tables",
"description": "List all tables in the database with approximate row counts. Use this first to discover available tables.",
"input_schema": {
"type": "object",
"properties": {
"schema": {
"type": "string",
"description": "Database schema name (default: public)",
"default": "public"
}
}
}
},
{
"name": "describe_table",
"description": "Get detailed schema for a specific table including columns, data types, foreign keys. Use this to understand table structure before generating SQL.",
"input_schema": {
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "Name of the table to describe"},
"schema": {"type": "string", "default": "public"}
},
"required": ["table_name"]
}
},
{
"name": "get_sample_rows",
"description": "Get 3 sample rows from a table to understand actual data format and values.",
"input_schema": {
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "Name of the table"}
},
"required": ["table_name"]
}
},
{
"name": "execute_read_query",
"description": "Execute a SELECT query and return results. Query must include LIMIT clause.",
"input_schema": {
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SQL SELECT statement to execute"}
},
"required": ["sql"]
}
}
]
Цикл генерации запроса с вызовом инструментов
def validate_sql_safety(sql: str) -> tuple[bool, str]:
"""Validate SQL using AST parsing."""
try:
parsed = sqlglot.parse_one(sql)
except Exception as e:
return False, f"SQL parse error: {e}"
# Block DDL
ddl_types = (
sqlglot.expressions.Create,
sqlglot.expressions.Drop,
sqlglot.expressions.Alter,
sqlglot.expressions.Truncate,
)
if isinstance(parsed, ddl_types):
return False, f"DDL statements not allowed: {type(parsed).__name__}"
# Block DML writes
dml_types = (
sqlglot.expressions.Insert,
sqlglot.expressions.Update,
sqlglot.expressions.Delete,
sqlglot.expressions.Merge,
)
if isinstance(parsed, dml_types):
return False, f"Write statements not allowed: {type(parsed).__name__}"
# Require LIMIT
if isinstance(parsed, sqlglot.expressions.Select):
if not parsed.args.get("limit"):
return False, "SELECT queries must include LIMIT clause"
limit_val = parsed.args["limit"].args.get("expression")
if limit_val and int(limit_val.name) > 10000:
return False, "LIMIT cannot exceed 10000 rows"
return True, ""
class NLToSQLAgent:
def __init__(self, db_tools: DatabaseIntrospectionTools, read_only_connection: Any):
self.db_tools = db_tools
self.conn = read_only_connection
self.tools = get_schema_tools(db_tools)
def translate_and_execute(self, user_question: str) -> Dict:
"""Translate NL to SQL, validate, execute."""
system_prompt = """You are a PostgreSQL expert. Generate safe, efficient SQL queries.
Your workflow:
1. Use list_tables to discover available tables
2. Use describe_table to understand relevant table schemas
3. Use get_sample_rows to see actual data format
4. Generate a SELECT query with LIMIT clause
5. Return JSON: {"sql": "SELECT ...", "explanation": "This query..."}
CRITICAL RULES:
- NEVER generate DDL (CREATE/DROP/ALTER) or DML (INSERT/UPDATE/DELETE)
- ALWAYS include LIMIT clause (max 1000 unless aggregating)
- NEVER embed raw user input in SQL - use parameterized patterns
- If user input looks suspicious (semicolons, comments, UNION), report it as error
- Use table aliases for readability
- Prefer JOINs over subqueries for performance"""
messages = [{"role": "user", "content": user_question}]
generated_sql = None
explanation = None
while True:
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=4096,
system=system_prompt,
tools=self.tools,
messages=messages
)
if response.stop_reason == "tool_use":
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = self._execute_tool(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result, default=str)
})
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
elif response.stop_reason == "end_turn":
for block in response.content:
if hasattr(block, 'text'):
try:
text = block.text
start = text.find('{')
end = text.rfind('}') + 1
if start >= 0:
result = json.loads(text[start:end])
generated_sql = result.get("sql")
explanation = result.get("explanation")
except json.JSONDecodeError:
pass
break
if not generated_sql:
return {"error": "Could not generate SQL", "sql": None}
# Validate safety
is_safe, reason = validate_sql_safety(generated_sql)
if not is_safe:
return {"error": f"Query rejected: {reason}", "sql": generated_sql}
# Execute
results = self._execute_query(generated_sql)
# Summarize
summary = self._summarize_results(user_question, generated_sql, results)
return {
"sql": generated_sql,
"explanation": explanation,
"results": results,
"summary": summary,
"row_count": len(results)
}
def _execute_tool(self, tool_name: str, tool_input: Dict) -> Any:
"""Execute schema introspection tool."""
if tool_name == "list_tables":
return self.db_tools.list_tables(tool_input.get("schema", "public"))
elif tool_name == "describe_table":
return self.db_tools.describe_table(
tool_input["table_name"],
tool_input.get("schema", "public")
)
elif tool_name == "get_sample_rows":
return self.db_tools.get_sample_rows(tool_input["table_name"])
elif tool_name == "execute_read_query":
sql = tool_input["sql"]
is_safe, reason = validate_sql_safety(sql)
if not is_safe:
return {"error": f"Query rejected: {reason}"}
return self._execute_query(sql)
return {"error": "Unknown tool"}
def _execute_query(self, sql: str) -> List[Dict]:
"""Execute validated SQL with timeout."""
with self.conn.cursor() as cur:
cur.execute("SET statement_timeout = '30s'")
try:
cur.execute(sql)
if cur.description:
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in cur.fetchall()]
except psycopg2.errors.QueryCanceled:
raise QueryTimeoutError("Query exceeded 30s limit")
finally:
cur.execute("SET statement_timeout = DEFAULT")
return []
def _summarize_results(self, question: str, sql: str, results: List[Dict]) -> str:
"""Generate natural language summary."""
if not results:
return "The query returned no results."
sample = results[:20] if len(results) > 20 else results
prompt = f"""User asked: "{question}"
SQL executed:
```sql
{sql}
Results ({len(results)} total rows, showing first {len(sample)}): {json.dumps(sample, indent=2, default=str)}
Provide a 2-3 sentence summary that directly answers the user's question."""
response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=512,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
## Управление контекстом схемы
### Стратегия усечения больших схем
Для БД с 100+ таблицами, внедрение полной схемы превышает лимиты контекста и тратит токены на нерелевантные таблицы. Используйте предварительный отбор таблиц:
```python
def select_relevant_tables(user_question: str, all_tables: List[Dict]) -> List[str]:
"""Use LLM to identify relevant tables before introspection."""
table_list = "\n".join([
f"- {t['table_name']}: {t.get('description', 'no description')} ({t.get('approx_row_count', 0)} rows)"
for t in all_tables
])
prompt = f"""Available tables:
{table_list}
User question: "{user_question}"
Which tables are likely needed to answer this question? Return ONLY table names, one per line."""
response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=256,
messages=[{"role": "user", "content": prompt}]
)
selected = [line.strip() for line in response.content[0].text.split('\n') if line.strip()]
return selected[:5] # Limit to 5 tables maximum
# Usage in agent
all_tables = db_tools.list_tables()
relevant_tables = select_relevant_tables(user_question, all_tables)
# Only introspect selected tables
for table in relevant_tables:
schema = db_tools.describe_table(table)
# Inject into context
Паттерн инкрементального открытия схемы
Вместо предзагрузки схемы, позволить агенту открывать таблицы по требованию:
# System prompt instructs agent to use tools incrementally
system_prompt = """You have access to schema introspection tools. Use them strategically:
1. Start with list_tables to see available tables
2. Identify 2-3 tables most likely relevant to the question
3. Use describe_table on ONLY those tables
4. If query fails or returns empty results, explore additional tables
5. Avoid calling describe_table on all tables - this wastes time and tokens
This incremental approach keeps context focused and efficient."""
Кеширование схемы для повторных запросов
Кешировать результаты information_schema для избежания повторных запросов метаданных:
import time
from functools import lru_cache
class CachedDatabaseTools(DatabaseIntrospectionTools):
def __init__(self, connection_string: str, cache_ttl: int = 300):
super().__init__(connection_string)
self.cache_ttl = cache_ttl
self._cache_timestamp = {}
def list_tables(self, schema: str = "public") -> List[Dict]:
cache_key = f"tables_{schema}"
if cache_key in self._cache_timestamp:
if time.time() - self._cache_timestamp[cache_key] < self.cache_ttl:
return self._get_cached_tables(schema)
tables = super().list_tables(schema)
self._cache_timestamp[cache_key] = time.time()
self._store_cached_tables(schema, tables)
return tables
@lru_cache(maxsize=100)
def _get_cached_tables(self, schema: str) -> List[Dict]:
return super().list_tables(schema)
def invalidate_cache(self):
"""Call after DDL operations."""
self._cache_timestamp.clear()
self._get_cached_tables.cache_clear()
Безопасность и паттерны только-чтение
Принудительное применение разрешений на уровне БД
-- PostgreSQL: Create read-only user
CREATE USER agent_readonly WITH PASSWORD 'secure_password_here';
GRANT CONNECT ON DATABASE analytics TO agent_readonly;
GRANT USAGE ON SCHEMA public TO agent_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO agent_readonly;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO agent_readonly;
-- Verify permissions
SELECT grantee, privilege_type
FROM information_schema.role_table_grants
WHERE grantee = 'agent_readonly';
# Connection with session-level read-only enforcement
def create_read_only_connection(host: str, database: str, port: int = 5432):
conn = psycopg2.connect(
host=host,
database=database,
port=port,
user="agent_readonly",
password=os.environ["DB_READONLY_PASSWORD"],
options="-c default_transaction_read_only=on" # Session-level protection
)
conn.set_session(readonly=True, autocommit=True)
return conn
Очистка запросов с AST парсингом
def detect_injection_patterns(sql: str) -> tuple[bool, str]:
"""Additional heuristic checks beyond AST parsing."""
sql_upper = sql.upper()
# Check for statement separators
if ';' in sql and not sql.strip().endswith(';'):
return False, "Multiple statements detected (semicolon not at end)"
# Check for comment-based obfuscation
if '--' in sql or '/*' in sql:
return False, "Comments in SQL not allowed (injection risk)"
# Check for UNION-based exfiltration
if 'UNION' in sql_upper:
# UNION is sometimes legitimate, but log it
import logging
logging.warning(f"UNION query detected: {sql}")
# Check for dynamic SQL execution
forbidden = ['EXECUTE', 'EXEC(', 'sp_executesql']
for pattern in forbidden:
if pattern in sql_upper:
return False, f"Dynamic SQL execution not allowed: {pattern}"
return True, ""
def validate_sql_safety(sql: str) -> tuple[bool, str]:
"""Combined AST + heuristic validation."""
# AST validation
try:
parsed = sqlglot.parse_one(sql)
except Exception as e:
return False, f"Parse error: {e}"
# Type checks
if isinstance(parsed, (sqlglot.expressions.Create, sqlglot.expressions.Drop,
sqlglot.expressions.Alter, sqlglot.expressions.Truncate)):
return False, "DDL not allowed"
if isinstance(parsed, (sqlglot.expressions.Insert, sqlglot.expressions.Update,
sqlglot.expressions.Delete, sqlglot.expressions.Merge)):
return False, "Write operations not allowed"
# LIMIT enforcement
if isinstance(parsed, sqlglot.expressions.Select):
if not parsed.args.get("limit"):
return False, "SELECT must have LIMIT"
# Heuristic checks
is_safe, reason = detect_injection_patterns(sql)
if not is_safe:
return False, reason
return True, ""
Режим dry-run с EXPLAIN
def dry_run_query(sql: str, conn) -> Dict:
"""Execute EXPLAIN to check query plan without running query."""
with conn.cursor() as cur:
cur.execute(f"EXPLAIN (FORMAT JSON, ANALYZE FALSE) {sql}")
plan = cur.fetchone()[0][0]
total_cost = plan.get('Plan', {}).get('Total Cost', 0)
# Reject expensive queries
if total_cost > 10000:
raise ValueError(f"Query cost too high: {total_cost}")
return plan
# Use in agent
plan = dry_run_query(generated_sql, conn)
if plan['Plan'].get('Node Type') == 'Seq Scan':
# Warn about full table scan
logging.warning(f"Sequential scan detected on {plan['Plan']['Relation Name']}")
Производительность и бенчмарки
Примечание: Приведённые ниже цифры являются иллюстративными оценками на основе типичных production-конфигураций, а не измерениями конкретной системы.
Задержка генерации запроса
- Простой single-table SELECT: агент интроспектирует 1 таблицу, генерирует SQL за 1-2 секунды (1 вызов инструмента + 1 генерация LLM)
- Multi-table JOIN: агент интроспектирует 3-4 таблицы, генерирует SQL за 3-5 секунд (4 вызова инструментов + 1 генерация LLM)
- Исследовательский запрос (незнакомая схема): агент перечисляет все таблицы, делает выборку 2-3, интроспектирует 5+ таблиц, генерирует SQL за 5-10 секунд
- Кешированная схема: последующие запросы на же таблицы сокращаются до 1-2 секунд (интроспекция не требуется)
Паттерны потребления токенов
- Интроспекция схемы: каждый вызов
describe_tableвозвращает 200-500 токенов; 5 таблиц = 1000-2500 токенов - Overhead вызова инструмента: каждый вызов инструмента добавляет 100-150 токенов (вызов функции + обёртка результата)
- Генерация SQL: типичный системный prompt (500 токенов) + разговор (1000-3000 токенов) + контекст схемы (2000-5000 токенов)
- Всего за запрос: 3000-8000 токенов для входа, 200-800 токенов для выхода (SQL + объяснение)
Выполнение запроса БД
- Простая агрегация (
SELECT COUNT(*)): выполняется за 10-50ms на индексированной колонке - Multi-table JOIN (3 таблицы, правильные индексы): выполняется за 50-200ms на датасетах < 1M строк
- Полное сканирование таблицы (отсутствие WHERE): блокируется принудительным LIMIT; типичное выполнение 100-500ms для LIMIT 1000
- Сложный аналитический запрос (оконные функции, подзапросы): может достичь 5-30 секунд; принудительный timeout на 30s предотвращает бесконечные запросы
Опыт пользователя от начала до конца
- Лучший случай (простой запрос, кешированная схема): 1.5-2 секунды (вопрос пользователя → ответ на естественном языке)
- Типичный случай (средняя сложность): 4-6 секунд (интроспекция + генерация + выполнение + суммирование)
- Худший случай (исследовательский запрос на незнакомой схеме): 8-12 секунд (несколько раундов интроспекции)
Стратегии оптимизации стоимости
- Использовать
claude-haiku-4-5для суммирования результатов (10x дешевле чем Opus) - Кешировать результаты интроспекции схемы на 5-15 минут
- Предварительно отбирать релевантные таблицы перед полной интроспекцией (снижает вызовы инструментов с 50+ до 5)
- Ограничить суммирование результатов первыми 20 строками (снижает токены выхода)
Конкурентность и пул соединений
Для развёртываний агентов multi-user:
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
"postgresql://agent_readonly:password@db.example.com/analytics",
poolclass=QueuePool,
pool_size=5, # 5 persistent connections
max_overflow=10, # 10 additional connections during spikes
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True # Verify connection health before use
)
# Usage
with engine.connect() as conn:
agent = NLToSQLAgent(db_tools, conn)
result = agent.translate_and_execute(user_question)
Эта конфигурация справляется с 5 конкурентными сессиями агентов комфортно, с пиковой ёмкостью до 15 во время пиковой нагрузки.