Natural Language to SQL: Building Text-to-Query Systems
How to build reliable natural language to SQL translation systems using LLMs, schema-aware prompting, query validation, and execution sandboxing in Python.
The promise of natural language to SQL is compelling: let business users query databases in plain English instead of learning SQL. The reality is treacherous. SQL is a precise language where a misplaced JOIN condition or a wrong aggregation silently produces incorrect results. An LLM that generates plausible-looking SQL is dangerous precisely because the errors are hard to detect. The query runs, returns numbers, and the user trusts them.
Data Whispal Agent includes a text-to-SQL capability for users who need direct database access. Building it taught us that the hard part is not generating SQL from natural language. Modern LLMs are surprisingly competent at that task. The hard part is making the system reliable enough that users can trust the results. This article covers the schema-aware prompting, multi-stage validation, safe execution environment, and error recovery mechanisms that make our text-to-SQL pipeline production-worthy.
Schema-Aware Prompt Design
The most impactful technique in text-to-SQL is providing the LLM with detailed schema information. Without knowing the table names, column names, data types, and relationships, the model has to guess. With complete schema context, it can write accurate queries even for complex schemas.
The challenge is that real schemas are large. An analytics database with hundreds of tables and thousands of columns cannot fit in a single prompt. We solve this with a two-stage approach: first identify the relevant tables, then generate SQL with only those tables in context.
from dataclasses import dataclass
@dataclass
class ColumnInfo:
name: str
data_type: str
description: str
sample_values: list[str] | None = None
is_primary_key: bool = False
is_foreign_key: bool = False
foreign_key_reference: str | None = None
@dataclass
class TableInfo:
name: str
description: str
columns: list[ColumnInfo]
row_count: int | None = None
def format_schema_for_prompt(tables: list[TableInfo]) -> str:
"""Format table schemas into a clear, LLM-friendly representation."""
sections = []
for table in tables:
cols = []
for col in table.columns:
parts = [f" {col.name} ({col.data_type})"]
if col.is_primary_key:
parts.append(" [PK]")
if col.is_foreign_key:
parts.append(f" [FK -> {col.foreign_key_reference}]")
if col.description:
parts.append(f" -- {col.description}")
if col.sample_values:
samples = ", ".join(f"'{v}'" for v in col.sample_values[:3])
parts.append(f" (e.g., {samples})")
cols.append("".join(parts))
header = f"TABLE: {table.name}"
if table.description:
header += f"\n-- {table.description}"
if table.row_count:
header += f"\n-- Approximate rows: {table.row_count:,}"
section = header + "\n" + "\n".join(cols)
sections.append(section)
return "\n\n".join(sections)Sample values are particularly important. When a user asks "What's the revenue in EMEA?", the model needs to know that the region column contains values like "EMEA", "APAC", and "NA" rather than full names like "Europe, Middle East, and Africa". We extract sample values during schema introspection:
import sqlalchemy
from sqlalchemy import inspect, text
def introspect_schema(
engine: sqlalchemy.Engine,
schema: str = "public",
sample_size: int = 3,
) -> list[TableInfo]:
"""Extract detailed schema information including sample values."""
inspector = inspect(engine)
tables = []
for table_name in inspector.get_table_names(schema=schema):
columns = []
pk_columns = {
col for col in inspector.get_pk_constraint(
table_name, schema=schema
).get("constrained_columns", [])
}
fk_map = {}
for fk in inspector.get_foreign_keys(table_name, schema=schema):
for col in fk["constrained_columns"]:
ref = f"{fk['referred_table']}.{fk['referred_columns'][0]}"
fk_map[col] = ref
for col in inspector.get_columns(table_name, schema=schema):
# Get sample values for string and categorical columns
sample_values = None
col_type = str(col["type"])
if any(t in col_type.upper() for t in ["VARCHAR", "TEXT", "CHAR"]):
with engine.connect() as conn:
result = conn.execute(text(
f"SELECT DISTINCT {col['name']} "
f"FROM {schema}.{table_name} "
f"WHERE {col['name']} IS NOT NULL "
f"LIMIT {sample_size}"
))
sample_values = [str(row[0]) for row in result]
columns.append(ColumnInfo(
name=col["name"],
data_type=col_type,
description=col.get("comment", ""),
sample_values=sample_values,
is_primary_key=col["name"] in pk_columns,
is_foreign_key=col["name"] in fk_map,
foreign_key_reference=fk_map.get(col["name"]),
))
# Get approximate row count
with engine.connect() as conn:
result = conn.execute(text(
f"SELECT reltuples::bigint FROM pg_class "
f"WHERE relname = '{table_name}'"
))
row_count = result.scalar()
tables.append(TableInfo(
name=table_name,
description=inspector.get_table_comment(
table_name, schema=schema
).get("text", ""),
columns=columns,
row_count=row_count,
))
return tablesTable Selection and Query Generation
With schema introspection complete, we build a two-stage pipeline. The first stage identifies which tables are relevant to the user's question. The second stage generates SQL using only those tables.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
TABLE_SELECTOR_PROMPT = ChatPromptTemplate.from_messages([
("system", """You are a database expert. Given a natural language question
and a list of available tables with their descriptions, identify which
tables are needed to answer the question.
Available tables:
{table_list}
Return ONLY a JSON array of table names. Example: ["orders", "customers"]"""),
("human", "{question}"),
])
SQL_GENERATOR_PROMPT = ChatPromptTemplate.from_messages([
("system", """You are an expert SQL query writer. Generate a PostgreSQL
query to answer the user's question.
RULES:
1. Use ONLY the tables and columns provided in the schema below
2. Always use explicit JOIN conditions (never implicit joins)
3. Use table aliases for readability
4. Add LIMIT 1000 unless the user asks for all results
5. Use appropriate aggregations (SUM, AVG, COUNT) for analytical questions
6. Handle NULL values explicitly with COALESCE where appropriate
7. Use date functions for time-based queries (DATE_TRUNC, EXTRACT)
8. Return ONLY the SQL query, no explanations
Schema:
{schema}
{few_shot_examples}"""),
("human", "{question}"),
])
async def generate_sql(
question: str,
all_tables: list[TableInfo],
llm: ChatOpenAI | None = None,
) -> str:
"""Two-stage SQL generation: table selection then query writing."""
if llm is None:
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Stage 1: Select relevant tables
table_list = "\n".join(
f"- {t.name}: {t.description}" for t in all_tables
)
selector_chain = TABLE_SELECTOR_PROMPT | llm
selected_raw = await selector_chain.ainvoke({
"table_list": table_list,
"question": question,
})
import json
selected_names = json.loads(selected_raw.content)
selected_tables = [
t for t in all_tables if t.name in selected_names
]
# Stage 2: Generate SQL with relevant schema only
schema_text = format_schema_for_prompt(selected_tables)
few_shots = get_few_shot_examples(question, selected_names)
generator_chain = SQL_GENERATOR_PROMPT | llm
sql_response = await generator_chain.ainvoke({
"schema": schema_text,
"question": question,
"few_shot_examples": few_shots,
})
return sql_response.content.strip().strip("```sql").strip("```").strip()Few-shot examples are indexed by the tables they reference, so the model sees examples that use the same tables as the current query. This dramatically improves JOIN accuracy and aggregation correctness.
FEW_SHOT_LIBRARY = {
frozenset(["orders", "customers"]): [
{
"question": "How many orders per customer segment last month?",
"sql": """SELECT c.segment, COUNT(o.id) AS order_count
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.created_at >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month')
AND o.created_at < DATE_TRUNC('month', CURRENT_DATE)
GROUP BY c.segment
ORDER BY order_count DESC""",
},
],
frozenset(["revenue", "products"]): [
{
"question": "What's the revenue breakdown by product category?",
"sql": """SELECT p.category, SUM(r.amount) AS total_revenue
FROM revenue r
JOIN products p ON r.product_id = p.id
WHERE r.recorded_at >= DATE_TRUNC('quarter', CURRENT_DATE)
GROUP BY p.category
ORDER BY total_revenue DESC""",
},
],
}
def get_few_shot_examples(question: str, table_names: list[str]) -> str:
key = frozenset(table_names)
examples = FEW_SHOT_LIBRARY.get(key, [])
if not examples:
# Fallback: find examples that share at least one table
for example_key, example_list in FEW_SHOT_LIBRARY.items():
if example_key & key:
examples.extend(example_list)
if not examples:
return ""
text = "Here are example queries for similar questions:\n\n"
for ex in examples[:2]:
text += f"Question: {ex['question']}\nSQL:\n{ex['sql']}\n\n"
return textQuery Validation and Safety
Generated SQL must be validated before execution. We implement three layers of validation: syntactic, semantic, and safety.
Syntactic validation parses the SQL and checks for basic correctness. We use the sqlglot library for dialect-aware parsing:
import sqlglot
from sqlglot import errors as sqlglot_errors
def validate_syntax(sql: str, dialect: str = "postgres") -> tuple[bool, str]:
"""Parse SQL and check for syntax errors."""
try:
parsed = sqlglot.parse(sql, dialect=dialect)
if not parsed or not parsed[0]:
return False, "Empty or unparseable query"
return True, ""
except sqlglot_errors.ParseError as e:
return False, f"Syntax error: {str(e)}"
def validate_safety(sql: str) -> tuple[bool, list[str]]:
"""Check for dangerous SQL operations."""
violations = []
sql_upper = sql.upper().strip()
# Block data modification
dangerous_keywords = [
"INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "CREATE",
"TRUNCATE", "GRANT", "REVOKE", "EXEC", "EXECUTE",
]
for keyword in dangerous_keywords:
# Check if keyword appears as a statement start (not inside strings)
if sql_upper.startswith(keyword) or f"\n{keyword}" in sql_upper:
violations.append(f"Blocked operation: {keyword}")
# Block system table access
system_schemas = ["pg_catalog", "information_schema", "pg_temp"]
for schema in system_schemas:
if schema in sql.lower():
violations.append(f"Access to system schema: {schema}")
# Check for subqueries that could be expensive
subquery_count = sql_upper.count("SELECT") - 1
if subquery_count > 3:
violations.append(
f"Too many subqueries ({subquery_count}): potential performance issue"
)
return len(violations) == 0, violations
def validate_tables_exist(
sql: str,
available_tables: set[str],
dialect: str = "postgres",
) -> tuple[bool, list[str]]:
"""Verify all referenced tables exist in the schema."""
try:
parsed = sqlglot.parse_one(sql, dialect=dialect)
referenced_tables = set()
for table in parsed.find_all(sqlglot.exp.Table):
referenced_tables.add(table.name)
missing = referenced_tables - available_tables
if missing:
return False, [f"Unknown table: {t}" for t in missing]
return True, []
except Exception as e:
return False, [f"Parse error during table validation: {str(e)}"]Safe Execution Environment
Even validated queries can be expensive. A query that joins two large tables without proper filtering can consume all available database resources. We execute generated queries in a sandboxed environment with resource limits:
import asyncio
from contextlib import asynccontextmanager
from sqlalchemy import create_engine, text
from sqlalchemy.pool import NullPool
class SafeQueryExecutor:
def __init__(
self,
connection_string: str,
statement_timeout_ms: int = 30000,
max_rows: int = 10000,
):
# Use NullPool to avoid connection leaks
self.engine = create_engine(
connection_string,
poolclass=NullPool,
connect_args={
"options": f"-c statement_timeout={statement_timeout_ms}"
},
)
self.max_rows = max_rows
def execute_query(self, sql: str) -> dict:
"""Execute a validated SQL query with resource limits."""
# Add row limit if not present
if "LIMIT" not in sql.upper():
sql = f"SELECT * FROM ({sql}) AS limited LIMIT {self.max_rows}"
try:
with self.engine.connect() as conn:
# Set read-only transaction
conn.execute(text("SET TRANSACTION READ ONLY"))
result = conn.execute(text(sql))
columns = list(result.keys())
rows = [dict(zip(columns, row)) for row in result.fetchall()]
return {
"success": True,
"columns": columns,
"rows": rows,
"row_count": len(rows),
"truncated": len(rows) >= self.max_rows,
}
except Exception as e:
error_msg = str(e)
if "statement timeout" in error_msg.lower():
return {
"success": False,
"error": "Query exceeded the 30-second time limit. "
"Try adding more specific filters.",
"error_type": "timeout",
}
return {
"success": False,
"error": f"Query execution failed: {error_msg}",
"error_type": "execution_error",
}Error Recovery and Self-Correction
When SQL generation produces an error, whether syntactic, semantic, or at execution time, the agent should attempt to self-correct rather than simply reporting the failure. We implement a retry loop that feeds the error back to the LLM:
CORRECTION_PROMPT = ChatPromptTemplate.from_messages([
("system", """You are fixing a SQL query that produced an error.
Original question: {question}
Schema: {schema}
Previous SQL attempt:
{failed_sql}
Error message: {error}
Generate a corrected SQL query that fixes this error.
Return ONLY the SQL query."""),
("human", "Please fix the query."),
])
async def generate_sql_with_recovery(
question: str,
tables: list[TableInfo],
executor: SafeQueryExecutor,
max_attempts: int = 3,
) -> dict:
"""Generate and execute SQL with automatic error recovery."""
schema_text = format_schema_for_prompt(tables)
available_table_names = {t.name for t in tables}
sql = await generate_sql(question, tables)
attempts = [{"sql": sql, "attempt": 1}]
for attempt in range(1, max_attempts + 1):
# Validate
syntax_ok, syntax_err = validate_syntax(sql)
if not syntax_ok:
error = f"Syntax error: {syntax_err}"
attempts[-1]["error"] = error
sql = await _correct_sql(question, schema_text, sql, error)
attempts.append({"sql": sql, "attempt": attempt + 1})
continue
safety_ok, safety_violations = validate_safety(sql)
if not safety_ok:
return {
"success": False,
"error": f"Safety violation: {'; '.join(safety_violations)}",
"attempts": attempts,
}
tables_ok, table_errs = validate_tables_exist(
sql, available_table_names
)
if not tables_ok:
error = f"Schema error: {'; '.join(table_errs)}"
attempts[-1]["error"] = error
sql = await _correct_sql(question, schema_text, sql, error)
attempts.append({"sql": sql, "attempt": attempt + 1})
continue
# Execute
result = executor.execute_query(sql)
if result["success"]:
return {
"success": True,
"sql": sql,
"result": result,
"attempts": attempts,
}
else:
error = result["error"]
attempts[-1]["error"] = error
if result["error_type"] == "timeout":
return {"success": False, "error": error, "attempts": attempts}
sql = await _correct_sql(question, schema_text, sql, error)
attempts.append({"sql": sql, "attempt": attempt + 1})
return {
"success": False,
"error": "Max correction attempts exceeded",
"attempts": attempts,
}
async def _correct_sql(
question: str, schema: str, failed_sql: str, error: str
) -> str:
llm = ChatOpenAI(model="gpt-4o", temperature=0)
chain = CORRECTION_PROMPT | llm
response = await chain.ainvoke({
"question": question,
"schema": schema,
"failed_sql": failed_sql,
"error": error,
})
return response.content.strip().strip("```sql").strip("```").strip()In production, the self-correction loop resolves approximately 60% of first-attempt failures. The most common corrections involve fixing column names, adding missing JOIN conditions, and adjusting date function syntax.
Conclusion
Natural language to SQL is a high-stakes capability. Users trust the numbers your system produces, and incorrect SQL can produce plausible but wrong results. The techniques that make text-to-SQL production-ready are not glamorous: exhaustive schema introspection with sample values, table-specific few-shot examples, multi-layer validation, resource-limited execution, and self-correcting error recovery. Each layer catches a different class of failure, and together they bring error rates low enough for business use.
The most important principle is defense in depth. No single validation step catches all errors. Syntactic validation misses semantic errors. Safety checks miss performance problems. Even execution success does not guarantee correctness, because a query can run successfully and return wrong results. For high-stakes queries, we add a final layer: presenting the generated SQL alongside the results so that data-literate users can verify the approach. Transparency is the ultimate safety net.
Related Articles
Scaling AI Agents: From Prototype to Production
A practical guide to scaling AI agent systems from initial prototype to production deployment, covering infrastructure architecture, cost management, reliability engineering, and team organization.
Data Privacy and Security in AI Agent Systems
A practical guide to building privacy-preserving AI agent systems, covering data classification, access controls, PII handling, audit logging, and compliance requirements.
AI-Powered Data Analytics: Transforming Business Intelligence
How AI-powered analytics agents are changing the way organizations extract insights from data, with practical guidance on adoption strategies, use cases, and measuring business impact.