Building Real-Time Analytics Pipelines

A technical guide to building real-time analytics pipelines using change data capture, stream processing, and materialized views — with patterns from Klivvr's Data Platform.

technical5 min readBy Klivvr Engineering
Share:

Batch processing handles most analytics workloads well. Running dbt models every hour or every day gives stakeholders data that is fresh enough for strategic decisions. But some use cases demand lower latency. Fraud detection needs sub-second response times. Real-time dashboards for operations teams lose value if the data is an hour old. Customer-facing features like spending summaries need to reflect the latest transactions.

This article covers the architecture patterns Klivvr's Data Platform uses to deliver real-time analytics, from change data capture at the source to materialized views at the consumption layer.

Change Data Capture

Change Data Capture (CDC) is the foundation of real-time data pipelines. Instead of periodically querying source databases for changes, CDC captures every insert, update, and delete as it happens by reading the database's transaction log.

# CDC consumer using Debezium format
import json
from dataclasses import dataclass
from datetime import datetime
from nats.aio.client import Client as NATS
 
@dataclass
class ChangeEvent:
    operation: str  # "c" create, "u" update, "d" delete
    table: str
    before: dict | None
    after: dict | None
    timestamp: datetime
    transaction_id: str
 
class CDCConsumer:
    def __init__(self, nats_url: str, subjects: list[str]):
        self.nats_url = nats_url
        self.subjects = subjects
        self.nc = NATS()
        self.handlers: dict[str, callable] = {}
 
    async def connect(self):
        await self.nc.connect(self.nats_url)
        for subject in self.subjects:
            await self.nc.subscribe(subject, cb=self._handle_message)
 
    async def _handle_message(self, msg):
        payload = json.loads(msg.data.decode())
        event = ChangeEvent(
            operation=payload["op"],
            table=payload["source"]["table"],
            before=payload.get("before"),
            after=payload.get("after"),
            timestamp=datetime.fromtimestamp(
                payload["ts_ms"] / 1000
            ),
            transaction_id=payload["source"].get("txId", "")
        )
 
        handler = self.handlers.get(event.table)
        if handler:
            await handler(event)
 
    def on_change(self, table: str):
        """Decorator to register a handler for a table."""
        def decorator(func):
            self.handlers[table] = func
            return func
        return decorator
 
# Usage
consumer = CDCConsumer(
    nats_url="nats://localhost:4222",
    subjects=["cdc.payments.*", "cdc.customers.*"]
)
 
@consumer.on_change("payments")
async def handle_payment_change(event: ChangeEvent):
    if event.operation in ("c", "u"):
        payment = event.after
        await update_real_time_metrics(payment)
        await check_fraud_rules(payment)

CDC provides three guarantees that batch extraction cannot: completeness (no changes are missed between extraction windows), ordering (events arrive in transaction order), and timeliness (changes are available within seconds of the commit).

Stream Processing Architecture

Raw CDC events need processing before they are useful for analytics. Stream processing transforms, enriches, and aggregates events in real time.

# Stream processor for real-time payment analytics
from collections import defaultdict
from datetime import datetime, timedelta
import asyncio
 
class PaymentStreamProcessor:
    def __init__(self):
        self.windows: dict[str, list] = defaultdict(list)
        self.aggregates: dict[str, dict] = {}
 
    async def process_payment(self, event: ChangeEvent):
        payment = event.after
        window_key = self._get_window_key(event.timestamp)
 
        # Add to current window
        self.windows[window_key].append(payment)
 
        # Update running aggregates
        await self._update_aggregates(window_key, payment)
 
        # Emit aggregated metrics
        await self._emit_metrics(window_key)
 
    def _get_window_key(self, timestamp: datetime) -> str:
        """1-minute tumbling window."""
        truncated = timestamp.replace(second=0, microsecond=0)
        return truncated.isoformat()
 
    async def _update_aggregates(self, window_key: str, payment: dict):
        if window_key not in self.aggregates:
            self.aggregates[window_key] = {
                "total_amount": 0,
                "transaction_count": 0,
                "unique_customers": set(),
                "currency_breakdown": defaultdict(float),
                "status_breakdown": defaultdict(int),
            }
 
        agg = self.aggregates[window_key]
        amount = float(payment["amount"]) / 100
 
        agg["total_amount"] += amount
        agg["transaction_count"] += 1
        agg["unique_customers"].add(payment["customer_id"])
        agg["currency_breakdown"][payment["currency"]] += amount
        agg["status_breakdown"][payment["status"]] += 1
 
    async def _emit_metrics(self, window_key: str):
        agg = self.aggregates[window_key]
        metrics = {
            "window": window_key,
            "total_amount": agg["total_amount"],
            "transaction_count": agg["transaction_count"],
            "unique_customers": len(agg["unique_customers"]),
            "avg_transaction": (
                agg["total_amount"] / agg["transaction_count"]
                if agg["transaction_count"] > 0 else 0
            ),
            "currency_breakdown": dict(agg["currency_breakdown"]),
            "success_rate": (
                agg["status_breakdown"].get("succeeded", 0)
                / agg["transaction_count"]
                if agg["transaction_count"] > 0 else 0
            ),
        }
 
        await publish_metrics("analytics.payments.realtime", metrics)

Materialized Views for Low-Latency Queries

Real-time aggregates need to be queryable by dashboards and APIs without scanning raw event streams. Materialized views provide pre-computed, incrementally updated query results.

-- Continuously updated materialized view
-- Refreshes automatically as new data arrives
CREATE MATERIALIZED VIEW realtime.mv_payment_metrics AS
SELECT
    DATE_TRUNC('minute', created_at) AS minute_window,
    currency,
    COUNT(*) AS transaction_count,
    SUM(amount) AS total_amount,
    AVG(amount) AS avg_amount,
    COUNT(DISTINCT customer_id) AS unique_customers,
    SUM(CASE WHEN status = 'succeeded' THEN 1 ELSE 0 END)::FLOAT
        / COUNT(*) AS success_rate
FROM silver.payments_realtime
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY 1, 2;
 
-- Index for dashboard queries
CREATE INDEX idx_payment_metrics_window
    ON realtime.mv_payment_metrics (minute_window DESC);
 
-- API query: last hour's metrics by minute
SELECT
    minute_window,
    SUM(transaction_count) AS transactions,
    SUM(total_amount) AS revenue,
    AVG(success_rate) AS avg_success_rate
FROM realtime.mv_payment_metrics
WHERE minute_window >= NOW() - INTERVAL '1 hour'
GROUP BY minute_window
ORDER BY minute_window DESC;

Bridging Real-Time and Batch

Real-time and batch pipelines serve different purposes, but their outputs must be consistent. A real-time dashboard showing today's revenue must eventually agree with the batch-processed daily revenue report. We achieve this through a lambda-inspired architecture where the real-time layer handles the most recent data and the batch layer provides the canonical historical record.

# Reconciliation: ensure real-time and batch agree
class PipelineReconciler:
    def __init__(self, batch_store, realtime_store):
        self.batch = batch_store
        self.realtime = realtime_store
 
    async def reconcile(self, date: str):
        batch_total = await self.batch.query(
            "SELECT SUM(amount) FROM gold.fct_daily_revenue "
            "WHERE revenue_date = %s", [date]
        )
 
        realtime_total = await self.realtime.query(
            "SELECT SUM(total_amount) FROM realtime.mv_payment_metrics "
            "WHERE DATE(minute_window) = %s", [date]
        )
 
        drift = abs(batch_total - realtime_total) / batch_total
 
        if drift > 0.01:  # More than 1% drift
            await alert(
                f"Pipeline drift detected for {date}: "
                f"batch={batch_total}, realtime={realtime_total}, "
                f"drift={drift:.2%}"
            )
            # Trigger reprocessing of real-time layer
            await self.realtime.reprocess(date)

Conclusion

Real-time analytics pipelines add complexity but unlock use cases that batch processing cannot serve. CDC provides the foundation by capturing changes as they happen. Stream processing transforms raw events into meaningful aggregates. Materialized views make those aggregates queryable with low latency. And reconciliation ensures that real-time and batch layers tell the same story. At Klivvr, we apply real-time processing selectively — only where the business value of low-latency data justifies the operational complexity — while keeping batch processing as the reliable backbone of the Data Platform.

Related Articles

business

Data-Driven Decision Making in Fintech

How a well-architected data platform enables better business decisions across product, finance, and operations — with practical examples of self-service analytics and data democratization at Klivvr.

6 min read
technical

Incremental Models in dbt: Processing Data Efficiently

A deep dive into dbt's incremental materialization strategy, covering when to use incremental models, how to implement them correctly, and how to avoid the common pitfalls that lead to data inconsistencies.

8 min read
technical

Data Quality Testing: Strategies and Implementation

A comprehensive guide to implementing data quality testing across the data pipeline, from schema validation and freshness checks to statistical anomaly detection and business rule enforcement.

8 min read