Building Real-Time Analytics Pipelines
A technical guide to building real-time analytics pipelines using change data capture, stream processing, and materialized views — with patterns from Klivvr's Data Platform.
Batch processing handles most analytics workloads well. Running dbt models every hour or every day gives stakeholders data that is fresh enough for strategic decisions. But some use cases demand lower latency. Fraud detection needs sub-second response times. Real-time dashboards for operations teams lose value if the data is an hour old. Customer-facing features like spending summaries need to reflect the latest transactions.
This article covers the architecture patterns Klivvr's Data Platform uses to deliver real-time analytics, from change data capture at the source to materialized views at the consumption layer.
Change Data Capture
Change Data Capture (CDC) is the foundation of real-time data pipelines. Instead of periodically querying source databases for changes, CDC captures every insert, update, and delete as it happens by reading the database's transaction log.
# CDC consumer using Debezium format
import json
from dataclasses import dataclass
from datetime import datetime
from nats.aio.client import Client as NATS
@dataclass
class ChangeEvent:
operation: str # "c" create, "u" update, "d" delete
table: str
before: dict | None
after: dict | None
timestamp: datetime
transaction_id: str
class CDCConsumer:
def __init__(self, nats_url: str, subjects: list[str]):
self.nats_url = nats_url
self.subjects = subjects
self.nc = NATS()
self.handlers: dict[str, callable] = {}
async def connect(self):
await self.nc.connect(self.nats_url)
for subject in self.subjects:
await self.nc.subscribe(subject, cb=self._handle_message)
async def _handle_message(self, msg):
payload = json.loads(msg.data.decode())
event = ChangeEvent(
operation=payload["op"],
table=payload["source"]["table"],
before=payload.get("before"),
after=payload.get("after"),
timestamp=datetime.fromtimestamp(
payload["ts_ms"] / 1000
),
transaction_id=payload["source"].get("txId", "")
)
handler = self.handlers.get(event.table)
if handler:
await handler(event)
def on_change(self, table: str):
"""Decorator to register a handler for a table."""
def decorator(func):
self.handlers[table] = func
return func
return decorator
# Usage
consumer = CDCConsumer(
nats_url="nats://localhost:4222",
subjects=["cdc.payments.*", "cdc.customers.*"]
)
@consumer.on_change("payments")
async def handle_payment_change(event: ChangeEvent):
if event.operation in ("c", "u"):
payment = event.after
await update_real_time_metrics(payment)
await check_fraud_rules(payment)CDC provides three guarantees that batch extraction cannot: completeness (no changes are missed between extraction windows), ordering (events arrive in transaction order), and timeliness (changes are available within seconds of the commit).
Stream Processing Architecture
Raw CDC events need processing before they are useful for analytics. Stream processing transforms, enriches, and aggregates events in real time.
# Stream processor for real-time payment analytics
from collections import defaultdict
from datetime import datetime, timedelta
import asyncio
class PaymentStreamProcessor:
def __init__(self):
self.windows: dict[str, list] = defaultdict(list)
self.aggregates: dict[str, dict] = {}
async def process_payment(self, event: ChangeEvent):
payment = event.after
window_key = self._get_window_key(event.timestamp)
# Add to current window
self.windows[window_key].append(payment)
# Update running aggregates
await self._update_aggregates(window_key, payment)
# Emit aggregated metrics
await self._emit_metrics(window_key)
def _get_window_key(self, timestamp: datetime) -> str:
"""1-minute tumbling window."""
truncated = timestamp.replace(second=0, microsecond=0)
return truncated.isoformat()
async def _update_aggregates(self, window_key: str, payment: dict):
if window_key not in self.aggregates:
self.aggregates[window_key] = {
"total_amount": 0,
"transaction_count": 0,
"unique_customers": set(),
"currency_breakdown": defaultdict(float),
"status_breakdown": defaultdict(int),
}
agg = self.aggregates[window_key]
amount = float(payment["amount"]) / 100
agg["total_amount"] += amount
agg["transaction_count"] += 1
agg["unique_customers"].add(payment["customer_id"])
agg["currency_breakdown"][payment["currency"]] += amount
agg["status_breakdown"][payment["status"]] += 1
async def _emit_metrics(self, window_key: str):
agg = self.aggregates[window_key]
metrics = {
"window": window_key,
"total_amount": agg["total_amount"],
"transaction_count": agg["transaction_count"],
"unique_customers": len(agg["unique_customers"]),
"avg_transaction": (
agg["total_amount"] / agg["transaction_count"]
if agg["transaction_count"] > 0 else 0
),
"currency_breakdown": dict(agg["currency_breakdown"]),
"success_rate": (
agg["status_breakdown"].get("succeeded", 0)
/ agg["transaction_count"]
if agg["transaction_count"] > 0 else 0
),
}
await publish_metrics("analytics.payments.realtime", metrics)Materialized Views for Low-Latency Queries
Real-time aggregates need to be queryable by dashboards and APIs without scanning raw event streams. Materialized views provide pre-computed, incrementally updated query results.
-- Continuously updated materialized view
-- Refreshes automatically as new data arrives
CREATE MATERIALIZED VIEW realtime.mv_payment_metrics AS
SELECT
DATE_TRUNC('minute', created_at) AS minute_window,
currency,
COUNT(*) AS transaction_count,
SUM(amount) AS total_amount,
AVG(amount) AS avg_amount,
COUNT(DISTINCT customer_id) AS unique_customers,
SUM(CASE WHEN status = 'succeeded' THEN 1 ELSE 0 END)::FLOAT
/ COUNT(*) AS success_rate
FROM silver.payments_realtime
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY 1, 2;
-- Index for dashboard queries
CREATE INDEX idx_payment_metrics_window
ON realtime.mv_payment_metrics (minute_window DESC);
-- API query: last hour's metrics by minute
SELECT
minute_window,
SUM(transaction_count) AS transactions,
SUM(total_amount) AS revenue,
AVG(success_rate) AS avg_success_rate
FROM realtime.mv_payment_metrics
WHERE minute_window >= NOW() - INTERVAL '1 hour'
GROUP BY minute_window
ORDER BY minute_window DESC;Bridging Real-Time and Batch
Real-time and batch pipelines serve different purposes, but their outputs must be consistent. A real-time dashboard showing today's revenue must eventually agree with the batch-processed daily revenue report. We achieve this through a lambda-inspired architecture where the real-time layer handles the most recent data and the batch layer provides the canonical historical record.
# Reconciliation: ensure real-time and batch agree
class PipelineReconciler:
def __init__(self, batch_store, realtime_store):
self.batch = batch_store
self.realtime = realtime_store
async def reconcile(self, date: str):
batch_total = await self.batch.query(
"SELECT SUM(amount) FROM gold.fct_daily_revenue "
"WHERE revenue_date = %s", [date]
)
realtime_total = await self.realtime.query(
"SELECT SUM(total_amount) FROM realtime.mv_payment_metrics "
"WHERE DATE(minute_window) = %s", [date]
)
drift = abs(batch_total - realtime_total) / batch_total
if drift > 0.01: # More than 1% drift
await alert(
f"Pipeline drift detected for {date}: "
f"batch={batch_total}, realtime={realtime_total}, "
f"drift={drift:.2%}"
)
# Trigger reprocessing of real-time layer
await self.realtime.reprocess(date)Conclusion
Real-time analytics pipelines add complexity but unlock use cases that batch processing cannot serve. CDC provides the foundation by capturing changes as they happen. Stream processing transforms raw events into meaningful aggregates. Materialized views make those aggregates queryable with low latency. And reconciliation ensures that real-time and batch layers tell the same story. At Klivvr, we apply real-time processing selectively — only where the business value of low-latency data justifies the operational complexity — while keeping batch processing as the reliable backbone of the Data Platform.
Related Articles
Data-Driven Decision Making in Fintech
How a well-architected data platform enables better business decisions across product, finance, and operations — with practical examples of self-service analytics and data democratization at Klivvr.
Incremental Models in dbt: Processing Data Efficiently
A deep dive into dbt's incremental materialization strategy, covering when to use incremental models, how to implement them correctly, and how to avoid the common pitfalls that lead to data inconsistencies.
Data Quality Testing: Strategies and Implementation
A comprehensive guide to implementing data quality testing across the data pipeline, from schema validation and freshness checks to statistical anomaly detection and business rule enforcement.