Scaling AI Agents: From Prototype to Production

A practical guide to scaling AI agent systems from initial prototype to production deployment, covering infrastructure architecture, cost management, reliability engineering, and team organization.

business13 min readBy Klivvr Engineering
Share:

Every AI agent starts as a prototype. A few hundred lines of Python, a vector store running locally, and API calls to an LLM provider. The prototype works beautifully in demos. It answers questions intelligently, retrieves relevant context, and impresses stakeholders. Then someone asks: "Can we deploy this for the whole company?"

The gap between a working prototype and a production system that serves hundreds of users reliably is larger for AI agents than for most software. Traditional web applications scale along well-understood dimensions: request throughput, database connections, and compute resources. AI agents add dimensions that most teams have not encountered before: LLM API rate limits, token cost that scales with usage, nondeterministic latency, and quality degradation under load. Scaling Data Whispal Agent from an internal prototype to a production system taught us that the technical challenges are only half the battle. Cost management, reliability engineering, and organizational alignment are equally critical.

Infrastructure Architecture for Production

A prototype runs everything in a single process. Production requires separating concerns into independently scalable components. Here is the architecture we settled on for Data Whispal Agent.

The system has four components: the API layer that handles user requests, the retrieval service that manages vector search, the generation service that orchestrates LLM calls, and the indexing pipeline that keeps the vector store current.

from dataclasses import dataclass
 
@dataclass
class ServiceConfig:
    """Configuration for each service component."""
    api_replicas: int = 3
    retrieval_replicas: int = 2
    generation_replicas: int = 2
    indexing_workers: int = 1
 
    # Resource limits
    api_memory_mb: int = 512
    retrieval_memory_mb: int = 2048  # Vector store needs RAM
    generation_memory_mb: int = 1024
    indexing_memory_mb: int = 4096   # Embedding generation is memory-hungry
 
    # Scaling thresholds
    api_target_cpu_percent: int = 70
    retrieval_target_latency_p95_ms: int = 100
    generation_target_queue_depth: int = 10
 
def generate_deployment_config(config: ServiceConfig) -> dict:
    """Generate deployment configuration for the agent system."""
    return {
        "api_service": {
            "replicas": config.api_replicas,
            "resources": {
                "memory": f"{config.api_memory_mb}Mi",
                "cpu": "500m",
            },
            "autoscaling": {
                "min_replicas": config.api_replicas,
                "max_replicas": config.api_replicas * 3,
                "target_cpu_utilization": config.api_target_cpu_percent,
            },
            "health_check": "/health",
            "readiness_check": "/ready",
        },
        "retrieval_service": {
            "replicas": config.retrieval_replicas,
            "resources": {
                "memory": f"{config.retrieval_memory_mb}Mi",
                "cpu": "1000m",
            },
            "volumes": [
                {"name": "vector-store", "size": "50Gi", "type": "ssd"},
            ],
        },
        "generation_service": {
            "replicas": config.generation_replicas,
            "resources": {
                "memory": f"{config.generation_memory_mb}Mi",
                "cpu": "500m",
            },
            "autoscaling": {
                "min_replicas": config.generation_replicas,
                "max_replicas": config.generation_replicas * 5,
                "target_metric": "queue_depth",
                "target_value": config.generation_target_queue_depth,
            },
        },
        "indexing_pipeline": {
            "replicas": config.indexing_workers,
            "resources": {
                "memory": f"{config.indexing_memory_mb}Mi",
                "cpu": "2000m",
            },
            "schedule": "0 2 * * *",  # Daily at 2 AM
        },
    }

Separating the retrieval and generation services is the most important architectural decision. They have fundamentally different scaling characteristics. Retrieval is CPU and memory bound with predictable latency. Generation is I/O bound (waiting for LLM API responses) with highly variable latency. Coupling them in a single service means that a spike in LLM latency blocks retrieval for other requests.

Request Queue and Backpressure

LLM APIs have rate limits. When concurrent users exceed the rate limit, requests must be queued, not dropped. We implement a priority queue that ensures time-sensitive queries are processed first while batch analytics requests wait during peak periods.

import asyncio
from dataclasses import dataclass, field
from datetime import datetime
from enum import IntEnum
import heapq
 
class Priority(IntEnum):
    CRITICAL = 0    # Executive dashboard queries
    HIGH = 1        # Interactive user queries
    NORMAL = 2      # Standard queries
    LOW = 3         # Batch analytics, background tasks
 
@dataclass(order=True)
class QueuedRequest:
    priority: int
    submitted_at: float = field(compare=False)
    request_id: str = field(compare=False)
    query: str = field(compare=False)
    user_id: str = field(compare=False)
    future: asyncio.Future = field(compare=False, repr=False)
 
class RequestQueue:
    def __init__(
        self,
        max_concurrent: int = 20,
        max_queue_size: int = 500,
        timeout_seconds: float = 60.0,
    ):
        self.max_concurrent = max_concurrent
        self.max_queue_size = max_queue_size
        self.timeout_seconds = timeout_seconds
        self._queue: list[QueuedRequest] = []
        self._active_count = 0
        self._lock = asyncio.Lock()
        self._slot_available = asyncio.Event()
        self._slot_available.set()
 
    async def submit(
        self,
        request_id: str,
        query: str,
        user_id: str,
        priority: Priority = Priority.NORMAL,
    ) -> asyncio.Future:
        """Submit a request to the queue. Returns a future for the result."""
        async with self._lock:
            if len(self._queue) >= self.max_queue_size:
                raise QueueFullError(
                    f"Queue is full ({self.max_queue_size} pending requests). "
                    "Please try again later."
                )
 
            future = asyncio.get_event_loop().create_future()
            request = QueuedRequest(
                priority=priority.value,
                submitted_at=datetime.utcnow().timestamp(),
                request_id=request_id,
                query=query,
                user_id=user_id,
                future=future,
            )
            heapq.heappush(self._queue, request)
 
        # Trigger processing
        asyncio.create_task(self._process_queue())
        return future
 
    async def _process_queue(self):
        """Process queued requests up to the concurrency limit."""
        async with self._lock:
            while self._queue and self._active_count < self.max_concurrent:
                request = heapq.heappop(self._queue)
 
                # Check for timeout
                age = datetime.utcnow().timestamp() - request.submitted_at
                if age > self.timeout_seconds:
                    request.future.set_exception(
                        TimeoutError("Request expired in queue")
                    )
                    continue
 
                self._active_count += 1
                asyncio.create_task(self._execute(request))
 
    async def _execute(self, request: QueuedRequest):
        """Execute a single request and manage the concurrency counter."""
        try:
            result = await self._handle_query(request.query, request.user_id)
            if not request.future.done():
                request.future.set_result(result)
        except Exception as e:
            if not request.future.done():
                request.future.set_exception(e)
        finally:
            async with self._lock:
                self._active_count -= 1
            # Process more items from the queue
            asyncio.create_task(self._process_queue())
 
    async def _handle_query(self, query: str, user_id: str) -> dict:
        """Placeholder for actual query execution."""
        # This would call the retrieval and generation pipeline
        pass
 
    @property
    def stats(self) -> dict:
        return {
            "queue_depth": len(self._queue),
            "active_requests": self._active_count,
            "max_concurrent": self.max_concurrent,
            "utilization": self._active_count / self.max_concurrent,
        }
 
class QueueFullError(Exception):
    pass

The priority system ensures that an executive checking a metric during a board meeting is not blocked behind a batch job that submitted 200 queries. The timeout mechanism prevents stale requests from consuming resources after the user has given up waiting.

Cost Management and Optimization

LLM API costs are the dominant operational expense for AI agent systems, and they scale linearly with usage. A system that costs $50 per month during testing can cost $5,000 per month in production. Without active cost management, budgets are blown quickly.

We track costs per query and per user, set budgets with automated alerts, and implement multiple cost-reduction strategies.

from dataclasses import dataclass
from datetime import datetime, timedelta
 
@dataclass
class CostTracker:
    """Track LLM API costs per query, user, and time period."""
 
    # Current pricing (per 1M tokens)
    MODEL_COSTS = {
        "gpt-4o": {"input": 2.50, "output": 10.00},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "text-embedding-3-small": {"input": 0.02, "output": 0.0},
    }
 
    def calculate_query_cost(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
    ) -> float:
        costs = self.MODEL_COSTS.get(model, {"input": 0, "output": 0})
        input_cost = (input_tokens / 1_000_000) * costs["input"]
        output_cost = (output_tokens / 1_000_000) * costs["output"]
        return input_cost + output_cost
 
    def estimate_monthly_cost(
        self,
        daily_queries: int,
        avg_input_tokens: int,
        avg_output_tokens: int,
        model_mix: dict[str, float],
    ) -> dict:
        """Estimate monthly costs based on usage patterns."""
        monthly_queries = daily_queries * 30
        total_cost = 0.0
        breakdown = {}
 
        for model, proportion in model_mix.items():
            model_queries = monthly_queries * proportion
            cost = self.calculate_query_cost(
                model,
                int(avg_input_tokens * model_queries),
                int(avg_output_tokens * model_queries),
            )
            breakdown[model] = round(cost, 2)
            total_cost += cost
 
        return {
            "total_monthly_estimate": round(total_cost, 2),
            "per_query_average": round(total_cost / monthly_queries, 4),
            "model_breakdown": breakdown,
        }
 
# Example cost estimation
tracker = CostTracker()
estimate = tracker.estimate_monthly_cost(
    daily_queries=500,
    avg_input_tokens=3000,
    avg_output_tokens=500,
    model_mix={
        "gpt-4o": 0.3,          # 30% of queries need the best model
        "gpt-4o-mini": 0.7,     # 70% handled by the faster model
    },
)

The most impactful cost optimization is model routing. Not every query needs the most expensive model. Simple factual lookups work perfectly well with smaller models, while complex analytical questions benefit from the full capabilities of larger models. Routing 70% of queries to GPT-4o-mini while reserving GPT-4o for complex queries reduced our monthly API costs by approximately 60%.

class CostAwarePipeline:
    """Pipeline that routes queries to appropriate models based on complexity."""
 
    def __init__(self, budget_daily_usd: float = 50.0):
        self.budget_daily = budget_daily_usd
        self.spent_today = 0.0
        self.cost_tracker = CostTracker()
 
    async def process_query(self, query: str, query_type: str) -> dict:
        # Select model based on query complexity and remaining budget
        model = self._select_model(query_type)
 
        # If approaching budget limit, downgrade all queries
        budget_utilization = self.spent_today / self.budget_daily
        if budget_utilization > 0.9:
            model = "gpt-4o-mini"  # Emergency cost control
 
        result = await self._execute_with_model(query, model)
 
        # Track cost
        cost = self.cost_tracker.calculate_query_cost(
            model,
            result["input_tokens"],
            result["output_tokens"],
        )
        self.spent_today += cost
        result["cost_usd"] = cost
        result["model_used"] = model
 
        return result
 
    def _select_model(self, query_type: str) -> str:
        complex_types = {
            "trend_analysis",
            "comparison",
            "multi_step",
            "data_quality",
        }
        if query_type in complex_types:
            return "gpt-4o"
        return "gpt-4o-mini"
 
    async def _execute_with_model(self, query: str, model: str) -> dict:
        """Execute the query pipeline with the specified model."""
        # Implementation depends on your pipeline architecture
        pass

Reliability and Graceful Degradation

Production AI agent systems must handle failures gracefully. LLM providers experience outages. Vector stores can become unavailable. Network partitions happen. The system should degrade gracefully rather than failing completely.

from enum import Enum
 
class DegradationLevel(str, Enum):
    FULL = "full"                   # All systems operational
    REDUCED_QUALITY = "reduced"     # Using fallback models
    CACHED_ONLY = "cached_only"     # Serving from cache only
    MAINTENANCE = "maintenance"     # System unavailable
 
class ResilienceManager:
    def __init__(self):
        self.llm_healthy = True
        self.vectorstore_healthy = True
        self.cache_healthy = True
 
    @property
    def degradation_level(self) -> DegradationLevel:
        if self.llm_healthy and self.vectorstore_healthy:
            return DegradationLevel.FULL
        elif self.cache_healthy and not self.llm_healthy:
            return DegradationLevel.CACHED_ONLY
        elif self.llm_healthy and not self.vectorstore_healthy:
            return DegradationLevel.REDUCED_QUALITY
        else:
            return DegradationLevel.MAINTENANCE
 
    async def health_check(self) -> dict:
        """Check health of all dependencies."""
        checks = {}
 
        # Check LLM provider
        try:
            from langchain_openai import ChatOpenAI
            llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
            await llm.ainvoke("ping")
            self.llm_healthy = True
            checks["llm"] = "healthy"
        except Exception as e:
            self.llm_healthy = False
            checks["llm"] = f"unhealthy: {str(e)}"
 
        # Check vector store
        try:
            # Attempt a simple query
            checks["vectorstore"] = "healthy"
            self.vectorstore_healthy = True
        except Exception as e:
            self.vectorstore_healthy = False
            checks["vectorstore"] = f"unhealthy: {str(e)}"
 
        # Check cache
        try:
            checks["cache"] = "healthy"
            self.cache_healthy = True
        except Exception as e:
            self.cache_healthy = False
            checks["cache"] = f"unhealthy: {str(e)}"
 
        checks["degradation_level"] = self.degradation_level.value
        return checks
 
    async def execute_with_fallback(self, query: str, user_id: str) -> dict:
        """Execute a query with appropriate fallback behavior."""
        level = self.degradation_level
 
        if level == DegradationLevel.MAINTENANCE:
            return {
                "answer": "The analytics agent is temporarily unavailable. "
                          "Please try again in a few minutes.",
                "status": "maintenance",
            }
 
        if level == DegradationLevel.CACHED_ONLY:
            cached = await self._check_cache(query)
            if cached:
                return {
                    "answer": cached["answer"],
                    "status": "cached",
                    "cache_age_seconds": cached["age"],
                    "warning": "This answer is from cache and may not "
                               "reflect the latest data.",
                }
            return {
                "answer": "The analytics agent is experiencing reduced "
                          "capacity. Your question could not be answered "
                          "from cache. Please try again shortly.",
                "status": "degraded",
            }
 
        if level == DegradationLevel.REDUCED_QUALITY:
            return await self._execute_without_retrieval(query)
 
        # Full capacity
        return await self._execute_full_pipeline(query, user_id)
 
    async def _check_cache(self, query: str) -> dict | None:
        """Check semantic cache for a matching query."""
        pass  # Implementation depends on cache backend
 
    async def _execute_without_retrieval(self, query: str) -> dict:
        """Answer from model knowledge when vector store is down."""
        pass
 
    async def _execute_full_pipeline(self, query: str, user_id: str) -> dict:
        """Normal execution path."""
        pass

The degradation levels ensure users always receive a response, even if it is a message explaining reduced capacity. A system that returns an error page is worse than one that honestly reports its limitations.

Monitoring and Alerting

Production systems need comprehensive monitoring that goes beyond traditional application metrics. AI agent systems require monitoring of quality metrics, cost metrics, and provider health alongside standard infrastructure metrics.

from dataclasses import dataclass
from typing import Callable
 
@dataclass
class AlertRule:
    name: str
    metric: str
    threshold: float
    comparison: str  # "gt", "lt", "gte", "lte"
    window_minutes: int
    severity: str  # "critical", "warning", "info"
    notify_channels: list[str]
 
ALERT_RULES = [
    AlertRule(
        name="high_error_rate",
        metric="agent.error_rate",
        threshold=0.05,
        comparison="gt",
        window_minutes=5,
        severity="critical",
        notify_channels=["pagerduty", "slack"],
    ),
    AlertRule(
        name="high_latency",
        metric="agent.p95_latency_ms",
        threshold=8000,
        comparison="gt",
        window_minutes=10,
        severity="warning",
        notify_channels=["slack"],
    ),
    AlertRule(
        name="cost_spike",
        metric="agent.hourly_cost_usd",
        threshold=10.0,
        comparison="gt",
        window_minutes=60,
        severity="warning",
        notify_channels=["slack", "email"],
    ),
    AlertRule(
        name="quality_regression",
        metric="agent.answer_quality_score",
        threshold=3.5,
        comparison="lt",
        window_minutes=60,
        severity="warning",
        notify_channels=["slack"],
    ),
    AlertRule(
        name="hallucination_spike",
        metric="agent.hallucination_rate",
        threshold=0.08,
        comparison="gt",
        window_minutes=30,
        severity="critical",
        notify_channels=["pagerduty", "slack"],
    ),
    AlertRule(
        name="queue_depth_high",
        metric="agent.queue_depth",
        threshold=100,
        comparison="gt",
        window_minutes=5,
        severity="warning",
        notify_channels=["slack"],
    ),
]

Team Organization and Operational Readiness

Scaling an AI agent is not purely a technical challenge. The team needs new skills and new operational practices.

On-call rotation. AI agent incidents differ from traditional application incidents. A latency spike might be caused by a model provider issue, a vector store scaling event, or a change in query patterns. On-call engineers need familiarity with the entire pipeline, not just individual components.

Evaluation as a practice. Every code change that affects prompts, retrieval, or generation must be evaluated against the test suite before deployment. This is a cultural shift for teams accustomed to relying on unit tests and integration tests. Evaluation is not testing; it is a continuous measurement practice.

Cost review cadence. Review LLM API costs weekly. Usage patterns change as adoption grows, and what was cost-effective at 100 queries per day may not be at 1,000. Weekly reviews catch cost anomalies before they become budget crises.

Feedback loops. Establish a process for users to report incorrect answers. These reports are the most valuable signal for improving the system. Without a feedback mechanism, quality degrades silently as data and query patterns evolve.

# Operational readiness checklist
READINESS_CHECKLIST = {
    "infrastructure": [
        "Separate services for API, retrieval, generation, indexing",
        "Horizontal autoscaling configured for API and generation",
        "Persistent storage for vector store with backup schedule",
        "Load balancer with health checks",
    ],
    "reliability": [
        "Graceful degradation for LLM provider outages",
        "Semantic cache for common queries",
        "Request queue with priority and backpressure",
        "Circuit breakers for external dependencies",
    ],
    "cost_management": [
        "Per-query cost tracking",
        "Daily budget limits with automatic downgrade",
        "Model routing based on query complexity",
        "Cache hit rate monitoring",
    ],
    "monitoring": [
        "Latency percentiles (p50, p95, p99)",
        "Error rate by error type",
        "Cost per query and daily totals",
        "Quality score from automated evaluation",
        "Queue depth and wait times",
    ],
    "team": [
        "On-call rotation with AI agent runbooks",
        "Evaluation suite in CI/CD pipeline",
        "Weekly cost review meeting",
        "User feedback collection and triage process",
    ],
}
 
def assess_readiness(completed: dict[str, list[str]]) -> dict:
    """Assess production readiness against the checklist."""
    results = {}
    total_items = 0
    completed_items = 0
 
    for category, items in READINESS_CHECKLIST.items():
        done = completed.get(category, [])
        category_total = len(items)
        category_done = len([i for i in items if i in done])
        total_items += category_total
        completed_items += category_done
        results[category] = {
            "total": category_total,
            "completed": category_done,
            "percentage": round(category_done / category_total * 100),
            "missing": [i for i in items if i not in done],
        }
 
    results["overall_readiness"] = round(completed_items / total_items * 100)
    return results

Conclusion

Scaling an AI agent from prototype to production is a multidimensional challenge that extends well beyond writing better prompts or adding more compute. Infrastructure architecture must separate components with different scaling characteristics. Request queuing with priority and backpressure prevents the system from collapsing under load. Cost management requires model routing, caching, and daily budget controls to prevent runaway expenses. Reliability engineering demands graceful degradation for the many failure modes unique to LLM-based systems. And organizational readiness means new on-call practices, evaluation discipline, and feedback processes.

The teams that succeed at this transition are those that treat AI agent operations as a distinct discipline, borrowing from traditional SRE practices but adapting them for the unique characteristics of nondeterministic, cost-per-call, quality-sensitive systems. Data Whispal Agent is now a production system that serves hundreds of users daily with sub-five-second response times, 95%+ answer accuracy, and predictable operational costs. Getting here required every practice described in this article.

Related Articles

business

Data Privacy and Security in AI Agent Systems

A practical guide to building privacy-preserving AI agent systems, covering data classification, access controls, PII handling, audit logging, and compliance requirements.

11 min read