LangChain in Production: Patterns and Pitfalls

Lessons learned from deploying LangChain-based AI agents in production, covering chain design, error handling, observability, and performance optimization patterns.

technical8 min readBy Klivvr Engineering
Share:

LangChain is one of the most popular frameworks for building LLM-powered applications, and for good reason. It provides composable abstractions for chains, agents, retrievers, and memory that let you prototype quickly. But the gap between a working notebook demo and a reliable production system is vast. Prototypes tolerate flaky API calls, unbounded token usage, and opaque failures. Production systems cannot.

When we built Data Whispal Agent on LangChain, we spent roughly thirty percent of our engineering time on the initial prototype and seventy percent on making it production-ready. The framework gave us velocity in the early stages, but we had to layer on significant infrastructure for reliability, observability, and cost control. This article distills the patterns that worked, the pitfalls we encountered, and the code we wrote to bridge the gap.

Chain Design: Composition Over Complexity

The first pitfall teams hit with LangChain is building monolithic chains. A single chain that retrieves documents, reformulates the query, calls an LLM, parses the output, validates the result, and formats the response becomes impossible to test and debug. When something goes wrong, and it will, you cannot tell which step failed or why.

We decompose every workflow into small, single-responsibility runnables and compose them with LangChain Expression Language (LCEL). Each runnable does one thing, has typed inputs and outputs, and can be tested in isolation.

from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
 
# Step 1: Query reformulation
def reformulate_query(state: dict) -> dict:
    """Rewrite the user query for better retrieval."""
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    rewrite_prompt = (
        "Rewrite this question to be more specific for "
        "data retrieval. Keep the same intent.\n\n"
        f"Original: {state['question']}\nRewritten:"
    )
    rewritten = llm.invoke(rewrite_prompt).content
    return {**state, "rewritten_query": rewritten}
 
# Step 2: Retrieval (separate runnable)
def retrieve_context(state: dict) -> dict:
    """Fetch relevant documents from the vector store."""
    docs = state["retriever"].invoke(state["rewritten_query"])
    return {**state, "context": docs}
 
# Step 3: Generation (separate runnable)
def generate_answer(state: dict) -> dict:
    """Produce the final answer from context."""
    llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
    context_text = "\n\n".join(d.page_content for d in state["context"])
    prompt = f"Context:\n{context_text}\n\nQuestion: {state['question']}"
    answer = llm.invoke(prompt).content
    return {**state, "answer": answer}
 
# Compose the chain
chain = (
    RunnableLambda(reformulate_query)
    | RunnableLambda(retrieve_context)
    | RunnableLambda(generate_answer)
)

This decomposition has three concrete benefits. First, you can unit test each step with mocked inputs. Second, you can swap implementations without touching the rest of the pipeline. For instance, replacing OpenAI with Anthropic for the generation step requires changing only the generate_answer function. Third, you can add observability hooks between steps without modifying the business logic.

A subtlety worth noting: avoid instantiating LLM clients inside runnables in production. The examples above do so for clarity, but in practice we inject clients through a dependency container to enable connection pooling and centralized configuration.

Error Handling and Retry Logic

LLM APIs are unreliable. Rate limits, timeouts, malformed responses, and transient server errors are not edge cases; they are the norm. A production LangChain application needs robust error handling at every layer.

LangChain's built-in retry support through the with_retry method covers basic transient failures, but it is not sufficient for production. We wrap it with custom logic that distinguishes between retryable and terminal errors, implements exponential backoff with jitter, and logs structured error data for post-incident analysis.

import time
import random
import logging
from openai import RateLimitError, APITimeoutError, APIConnectionError
 
logger = logging.getLogger("data_whispal.chains")
 
RETRYABLE_ERRORS = (RateLimitError, APITimeoutError, APIConnectionError)
 
def retry_with_backoff(func, max_retries: int = 3, base_delay: float = 1.0):
    """Execute a function with exponential backoff and jitter."""
    def wrapper(*args, **kwargs):
        last_exception = None
        for attempt in range(max_retries + 1):
            try:
                return func(*args, **kwargs)
            except RETRYABLE_ERRORS as e:
                last_exception = e
                if attempt == max_retries:
                    logger.error(
                        "Max retries exceeded",
                        extra={
                            "function": func.__name__,
                            "attempt": attempt,
                            "error_type": type(e).__name__,
                        },
                    )
                    raise
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                logger.warning(
                    "Retrying after transient error",
                    extra={
                        "function": func.__name__,
                        "attempt": attempt,
                        "delay_seconds": round(delay, 2),
                        "error_type": type(e).__name__,
                    },
                )
                time.sleep(delay)
            except Exception as e:
                logger.error(
                    "Non-retryable error in chain step",
                    extra={
                        "function": func.__name__,
                        "error_type": type(e).__name__,
                        "error_message": str(e),
                    },
                )
                raise
        raise last_exception
    return wrapper

Beyond API errors, you must handle LLM output parsing failures. When you expect JSON from the model and receive malformed text, silently returning garbage to the user is worse than failing loudly. We use Pydantic output parsers with fallback chains:

from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
 
class AnalyticsResponse(BaseModel):
    answer: str = Field(description="The natural language answer")
    confidence: float = Field(description="Confidence score 0-1")
    sources: list[str] = Field(description="Source datasets used")
 
parser = PydanticOutputParser(pydantic_object=AnalyticsResponse)
 
def safe_parse(raw_output: str) -> AnalyticsResponse:
    try:
        return parser.parse(raw_output)
    except Exception:
        logger.warning(
            "Structured parsing failed, falling back to plain text",
            extra={"raw_output_length": len(raw_output)},
        )
        return AnalyticsResponse(
            answer=raw_output,
            confidence=0.5,
            sources=["parsing_fallback"],
        )

This fallback ensures the user always receives a response, even when the LLM deviates from the expected format. The reduced confidence score signals to downstream systems that the result may need human review.

Observability: Tracing Every Token

In a traditional web application, a slow endpoint can be diagnosed with request logs and APM traces. In an LLM application, the "endpoint" is a multi-step chain where latency, cost, and quality are all functions of nondeterministic model behavior. You need specialized observability.

We use LangSmith for chain tracing and export structured logs to our existing observability stack. Every chain invocation produces a trace that captures the input, output, latency, token usage, and model parameters at each step.

from langchain_core.tracers import LangChainTracer
from langchain_core.callbacks import CallbackManager
import os
 
def get_callback_manager(session_name: str = "production"):
    callbacks = []
 
    if os.getenv("LANGCHAIN_TRACING_V2") == "true":
        tracer = LangChainTracer(project_name=session_name)
        callbacks.append(tracer)
 
    return CallbackManager(callbacks)
 
# Custom callback for structured logging
from langchain_core.callbacks import BaseCallbackHandler
from typing import Any
 
class MetricsCallbackHandler(BaseCallbackHandler):
    def __init__(self):
        self.step_timings = {}
        self.total_tokens = 0
 
    def on_chain_start(self, serialized: dict, inputs: dict, **kwargs):
        run_id = kwargs.get("run_id", "unknown")
        self.step_timings[str(run_id)] = time.time()
 
    def on_chain_end(self, outputs: dict, **kwargs):
        run_id = str(kwargs.get("run_id", "unknown"))
        start = self.step_timings.pop(run_id, None)
        if start:
            duration = time.time() - start
            logger.info(
                "Chain step completed",
                extra={
                    "run_id": run_id,
                    "duration_ms": round(duration * 1000),
                },
            )
 
    def on_llm_end(self, response, **kwargs):
        if hasattr(response, "llm_output") and response.llm_output:
            usage = response.llm_output.get("token_usage", {})
            self.total_tokens += usage.get("total_tokens", 0)
            logger.info(
                "LLM call completed",
                extra={
                    "prompt_tokens": usage.get("prompt_tokens", 0),
                    "completion_tokens": usage.get("completion_tokens", 0),
                    "model": response.llm_output.get("model_name", "unknown"),
                },
            )

We track four key metrics for every chain invocation: end-to-end latency, total token count, retrieval precision (measured by comparing retrieved chunks against a ground-truth relevance set for sampled queries), and answer quality (measured by LLM-as-judge scoring on a weekly sample). These metrics feed dashboards that alert on regressions before users notice them.

Performance Optimization

LLM calls dominate latency in any LangChain application. A single GPT-4o call takes 2-8 seconds depending on prompt length and output length. A chain with three LLM calls can easily exceed 15 seconds. Users expect sub-five-second responses for analytics queries.

Our primary optimization strategies are parallelism, caching, and model routing.

Parallelism. When chain steps are independent, run them concurrently. LangChain's RunnableParallel makes this straightforward:

from langchain_core.runnables import RunnableParallel
 
# Run retrieval and query classification in parallel
parallel_step = RunnableParallel(
    context=retriever_chain,
    query_type=classification_chain,
    metadata=metadata_extraction_chain,
)

Caching. Identical queries should not trigger redundant LLM calls. We use a semantic cache backed by Redis that matches queries by embedding similarity rather than exact string match:

from langchain_community.cache import RedisSemanticCache
from langchain_openai import OpenAIEmbeddings
from langchain_core.globals import set_llm_cache
 
def initialize_cache(redis_url: str, threshold: float = 0.92):
    set_llm_cache(
        RedisSemanticCache(
            redis_url=redis_url,
            embedding=OpenAIEmbeddings(model="text-embedding-3-small"),
            score_threshold=threshold,
        )
    )

The threshold of 0.92 means that a cached response will be returned if the new query is at least 92% similar to a previously seen query. This catches paraphrases like "What was Q3 revenue?" and "How much revenue did we make in Q3?" without returning stale results for genuinely different questions.

Model routing. Not every step needs the most capable model. Query reformulation and metadata extraction work well with GPT-4o-mini at a fraction of the cost and latency. Only the final generation step uses GPT-4o. This tiered approach cut our median latency by 40% and our API costs by 55%.

MODELS = {
    "fast": ChatOpenAI(model="gpt-4o-mini", temperature=0),
    "capable": ChatOpenAI(model="gpt-4o", temperature=0.1),
}
 
def get_model(task: str) -> ChatOpenAI:
    fast_tasks = {"reformulation", "classification", "extraction"}
    return MODELS["fast"] if task in fast_tasks else MODELS["capable"]

Managing Dependencies and Versioning

LangChain evolves rapidly. Breaking changes between minor versions are common, and the ecosystem of integrations (langchain-community, langchain-openai, langchain-chroma) each follow their own release cadence. Pinning versions is non-negotiable.

We maintain a requirements.txt with exact pins and use a dedicated CI job to test upgrades weekly:

# requirements.txt (pinned for reproducibility)
langchain-core==0.3.15
langchain==0.3.7
langchain-openai==0.2.14
langchain-community==0.3.7
langchain-chroma==0.2.0
chromadb==0.5.23
pydantic==2.10.1

When upgrading, we run our full evaluation suite (200+ question-answer pairs with expected retrieval results and answer quality scores) against the new version before merging. This has caught regressions in prompt formatting, retriever behavior, and output parsing that would have been invisible to unit tests.

Conclusion

LangChain provides excellent building blocks for LLM applications, but production readiness requires deliberate engineering beyond what the framework offers out of the box. Decompose chains into testable runnables. Implement retry logic that distinguishes transient from terminal errors. Trace every invocation with structured metrics. Optimize latency through parallelism, caching, and model routing. Pin your dependencies and validate upgrades with evaluation suites.

These patterns are not specific to Data Whispal Agent. They apply to any LangChain application that needs to serve real users reliably. The framework handles the happy path well; your job as an engineer is to handle everything else.

Related Articles

business

Scaling AI Agents: From Prototype to Production

A practical guide to scaling AI agent systems from initial prototype to production deployment, covering infrastructure architecture, cost management, reliability engineering, and team organization.

13 min read
business

Data Privacy and Security in AI Agent Systems

A practical guide to building privacy-preserving AI agent systems, covering data classification, access controls, PII handling, audit logging, and compliance requirements.

11 min read