Monitoring Event-Driven Systems at Scale
A practical guide to building comprehensive monitoring and observability for event-driven systems, covering metrics, distributed tracing, alerting strategies, and operational dashboards for maintaining healthy event processing pipelines.
Monitoring an event-driven system is fundamentally different from monitoring a traditional request-response application. In a synchronous system, a request enters, processing occurs, and a response exits. The entire lifecycle is visible in a single trace. In an event-driven system, a single business operation may span multiple services, multiple event channels, and multiple time scales. An order placed at 10:00 AM might not be fully fulfilled until 10:15 AM, after events have flowed through inventory, payment, shipping, and notification services. Understanding what is happening --- and what is going wrong --- requires a different approach to observability.
This article outlines a comprehensive monitoring strategy for event-driven systems, with practical TypeScript implementations and operational guidance drawn from experience running real-time event processing platforms like Starburst at scale.
The Three Pillars of Event-Driven Observability
Traditional observability rests on three pillars: metrics, logs, and traces. For event-driven systems, these pillars remain relevant but need to be adapted, and a fourth pillar --- event flow visibility --- becomes equally important.
Metrics in event-driven systems focus on throughput, latency, and lag rather than request rates and response times. The most important metrics are events produced per second, events consumed per second, consumer lag (the difference between the latest produced event and the latest consumed event), and processing duration per event.
// Metrics collection for event processing
interface EventMetrics {
recordEventProduced(eventType: string, source: string): void;
recordEventConsumed(eventType: string, consumer: string): void;
recordProcessingDuration(
eventType: string,
consumer: string,
durationMs: number
): void;
recordConsumerLag(
consumer: string,
lagMs: number,
lagEvents: number
): void;
recordEventFailed(
eventType: string,
consumer: string,
errorType: string
): void;
}
class PrometheusEventMetrics implements EventMetrics {
private counters = new Map<string, number>();
private histograms = new Map<string, number[]>();
private gauges = new Map<string, number>();
recordEventProduced(eventType: string, source: string): void {
const key = `events_produced_total{type="${eventType}",source="${source}"}`;
this.increment(key);
}
recordEventConsumed(eventType: string, consumer: string): void {
const key = `events_consumed_total{type="${eventType}",consumer="${consumer}"}`;
this.increment(key);
}
recordProcessingDuration(
eventType: string,
consumer: string,
durationMs: number
): void {
const key = `event_processing_duration_ms{type="${eventType}",consumer="${consumer}"}`;
if (!this.histograms.has(key)) {
this.histograms.set(key, []);
}
this.histograms.get(key)!.push(durationMs);
}
recordConsumerLag(
consumer: string,
lagMs: number,
lagEvents: number
): void {
this.gauges.set(`consumer_lag_ms{consumer="${consumer}"}`, lagMs);
this.gauges.set(
`consumer_lag_events{consumer="${consumer}"}`,
lagEvents
);
}
recordEventFailed(
eventType: string,
consumer: string,
errorType: string
): void {
const key = `events_failed_total{type="${eventType}",consumer="${consumer}",error="${errorType}"}`;
this.increment(key);
}
getPercentile(key: string, percentile: number): number {
const values = this.histograms.get(key);
if (!values || values.length === 0) return 0;
const sorted = [...values].sort((a, b) => a - b);
const index = Math.ceil(sorted.length * (percentile / 100)) - 1;
return sorted[index] ?? 0;
}
private increment(key: string): void {
this.counters.set(key, (this.counters.get(key) ?? 0) + 1);
}
}
// Instrumented event processor
class InstrumentedEventProcessor {
constructor(
private processor: EventProcessor,
private metrics: EventMetrics,
private consumerName: string
) {}
async process(event: StoredEvent): Promise<void> {
const startTime = Date.now();
try {
await this.processor.process(event);
this.metrics.recordEventConsumed(event.eventType, this.consumerName);
this.metrics.recordProcessingDuration(
event.eventType,
this.consumerName,
Date.now() - startTime
);
} catch (error) {
const errorType =
error instanceof Error ? error.constructor.name : "UnknownError";
this.metrics.recordEventFailed(
event.eventType,
this.consumerName,
errorType
);
throw error;
}
}
}Distributed tracing connects events across services. When an order is placed, a trace should follow the event as it flows through inventory reservation, payment processing, and shipping notification. Without tracing, understanding the end-to-end flow of a business operation is nearly impossible.
// Distributed tracing for event flows
interface TraceContext {
traceId: string;
spanId: string;
parentSpanId?: string;
baggage: Record<string, string>;
}
interface Span {
traceId: string;
spanId: string;
parentSpanId?: string;
operationName: string;
serviceName: string;
startTime: Date;
endTime?: Date;
tags: Record<string, string | number | boolean>;
logs: Array<{
timestamp: Date;
fields: Record<string, string>;
}>;
status: "ok" | "error";
}
class EventTracer {
private spans: Span[] = [];
// Extract trace context from an incoming event
extractContext(event: StoredEvent): TraceContext {
const metadata = event.metadata as Record<string, unknown>;
return {
traceId:
(metadata.traceId as string) ?? this.generateId(),
spanId: this.generateId(),
parentSpanId: metadata.spanId as string | undefined,
baggage: (metadata.baggage as Record<string, string>) ?? {},
};
}
// Inject trace context into an outgoing event
injectContext(
context: TraceContext,
metadata: Record<string, unknown>
): Record<string, unknown> {
return {
...metadata,
traceId: context.traceId,
spanId: context.spanId,
parentSpanId: context.parentSpanId,
baggage: context.baggage,
};
}
startSpan(
context: TraceContext,
operationName: string,
serviceName: string
): Span {
const span: Span = {
traceId: context.traceId,
spanId: context.spanId,
parentSpanId: context.parentSpanId,
operationName,
serviceName,
startTime: new Date(),
tags: {},
logs: [],
status: "ok",
};
this.spans.push(span);
return span;
}
finishSpan(span: Span, status: "ok" | "error" = "ok"): void {
span.endTime = new Date();
span.status = status;
}
addSpanTag(
span: Span,
key: string,
value: string | number | boolean
): void {
span.tags[key] = value;
}
addSpanLog(
span: Span,
fields: Record<string, string>
): void {
span.logs.push({ timestamp: new Date(), fields });
}
private generateId(): string {
return crypto.randomUUID().replace(/-/g, "").substring(0, 16);
}
}
// Usage in an event consumer
class TracedOrderProcessor {
constructor(
private tracer: EventTracer,
private metrics: EventMetrics,
private orderService: OrderService
) {}
async processOrderPlaced(event: StoredEvent): Promise<void> {
const context = this.tracer.extractContext(event);
const span = this.tracer.startSpan(
context,
"processOrderPlaced",
"order-service"
);
this.tracer.addSpanTag(span, "event.type", event.eventType);
this.tracer.addSpanTag(span, "order.id", event.aggregateId);
try {
this.tracer.addSpanLog(span, {
event: "validation_started",
orderId: event.aggregateId,
});
await this.orderService.validateOrder(event);
this.tracer.addSpanLog(span, {
event: "validation_completed",
orderId: event.aggregateId,
});
await this.orderService.reserveInventory(event);
this.tracer.addSpanLog(span, {
event: "inventory_reserved",
orderId: event.aggregateId,
});
this.tracer.finishSpan(span, "ok");
} catch (error) {
this.tracer.addSpanTag(
span,
"error",
error instanceof Error ? error.message : "unknown"
);
this.tracer.finishSpan(span, "error");
throw error;
}
}
}
interface OrderService {
validateOrder(event: StoredEvent): Promise<void>;
reserveInventory(event: StoredEvent): Promise<void>;
}Consumer Lag: The Most Important Metric
In event-driven systems, consumer lag is the single most important metric to monitor. It tells you the distance between the latest event produced and the latest event consumed by each consumer. Growing lag means a consumer is falling behind, which leads to stale data, delayed processing, and eventually, system failure.
// Consumer lag monitor
interface ConsumerLagMonitor {
measure(consumer: string): Promise<LagMeasurement>;
measureAll(): Promise<Map<string, LagMeasurement>>;
}
interface LagMeasurement {
consumer: string;
currentOffset: number;
latestOffset: number;
lagEvents: number;
lagMs: number;
estimatedCatchUpTimeMs: number;
isHealthy: boolean;
}
class EventStreamLagMonitor implements ConsumerLagMonitor {
private processingRates = new Map<string, number[]>();
constructor(
private eventStore: EventStoreMetrics,
private consumerRegistry: ConsumerRegistry,
private alertThresholds: LagThresholds
) {}
async measure(consumer: string): Promise<LagMeasurement> {
const consumerOffset = await this.consumerRegistry.getOffset(consumer);
const latestOffset = await this.eventStore.getLatestOffset();
const lagEvents = latestOffset - consumerOffset;
// Estimate time-based lag
const latestEventTime = await this.eventStore.getEventTime(latestOffset);
const consumerEventTime = await this.eventStore.getEventTime(
consumerOffset
);
const lagMs = latestEventTime.getTime() - consumerEventTime.getTime();
// Calculate estimated catch-up time based on recent processing rate
const recentRate = this.getRecentProcessingRate(consumer);
const estimatedCatchUpTimeMs =
recentRate > 0 ? (lagEvents / recentRate) * 1000 : Infinity;
const isHealthy =
lagEvents <= this.alertThresholds.maxLagEvents &&
lagMs <= this.alertThresholds.maxLagMs;
return {
consumer,
currentOffset: consumerOffset,
latestOffset,
lagEvents,
lagMs,
estimatedCatchUpTimeMs,
isHealthy,
};
}
async measureAll(): Promise<Map<string, LagMeasurement>> {
const consumers = await this.consumerRegistry.listConsumers();
const measurements = new Map<string, LagMeasurement>();
for (const consumer of consumers) {
const measurement = await this.measure(consumer);
measurements.set(consumer, measurement);
}
return measurements;
}
recordProcessingRate(consumer: string, eventsPerSecond: number): void {
if (!this.processingRates.has(consumer)) {
this.processingRates.set(consumer, []);
}
const rates = this.processingRates.get(consumer)!;
rates.push(eventsPerSecond);
// Keep only the last 60 measurements (one minute of data at 1Hz)
if (rates.length > 60) {
rates.shift();
}
}
private getRecentProcessingRate(consumer: string): number {
const rates = this.processingRates.get(consumer);
if (!rates || rates.length === 0) return 0;
const sum = rates.reduce((a, b) => a + b, 0);
return sum / rates.length;
}
}
interface LagThresholds {
maxLagEvents: number;
maxLagMs: number;
}
interface EventStoreMetrics {
getLatestOffset(): Promise<number>;
getEventTime(offset: number): Promise<Date>;
}
interface ConsumerRegistry {
getOffset(consumer: string): Promise<number>;
listConsumers(): Promise<string[]>;
}Building Effective Alerts
Alerting in event-driven systems requires a different philosophy than in request-response systems. You cannot simply alert on error rates, because many "errors" in an event-driven system are transient and resolved by retries. Instead, alerts should focus on symptoms that indicate genuine problems.
// Alerting rules engine for event-driven systems
interface AlertRule {
name: string;
description: string;
severity: "info" | "warning" | "critical";
evaluate(context: AlertContext): Promise<AlertEvaluation>;
cooldownMinutes: number;
}
interface AlertContext {
lagMeasurements: Map<string, LagMeasurement>;
errorRates: Map<string, number>;
throughputRates: Map<string, number>;
deadLetterQueueSizes: Map<string, number>;
}
interface AlertEvaluation {
shouldFire: boolean;
message: string;
details: Record<string, unknown>;
}
// Consumer falling behind
const consumerLagAlert: AlertRule = {
name: "ConsumerLagHigh",
description: "A consumer is falling behind the event stream",
severity: "warning",
cooldownMinutes: 5,
async evaluate(context: AlertContext): Promise<AlertEvaluation> {
const unhealthyConsumers: string[] = [];
for (const [consumer, measurement] of context.lagMeasurements) {
if (!measurement.isHealthy) {
unhealthyConsumers.push(consumer);
}
}
return {
shouldFire: unhealthyConsumers.length > 0,
message:
`${unhealthyConsumers.length} consumer(s) have high lag: ` +
unhealthyConsumers.join(", "),
details: {
consumers: unhealthyConsumers,
measurements: Object.fromEntries(
unhealthyConsumers.map((c) => [
c,
context.lagMeasurements.get(c),
])
),
},
};
},
};
// Dead letter queue growing
const dlqGrowingAlert: AlertRule = {
name: "DeadLetterQueueGrowing",
description: "Events are accumulating in a dead letter queue",
severity: "critical",
cooldownMinutes: 15,
async evaluate(context: AlertContext): Promise<AlertEvaluation> {
const problematicQueues: Array<{
queue: string;
size: number;
}> = [];
for (const [queue, size] of context.deadLetterQueueSizes) {
if (size > 0) {
problematicQueues.push({ queue, size });
}
}
return {
shouldFire: problematicQueues.length > 0,
message:
`Dead letter queues contain unprocessed events: ` +
problematicQueues
.map((q) => `${q.queue} (${q.size} events)`)
.join(", "),
details: { queues: problematicQueues },
};
},
};
// Throughput drop (possible producer failure)
const throughputDropAlert: AlertRule = {
name: "ThroughputDrop",
description: "Event production rate has dropped significantly",
severity: "warning",
cooldownMinutes: 10,
async evaluate(context: AlertContext): Promise<AlertEvaluation> {
const drops: Array<{ source: string; rate: number }> = [];
for (const [source, rate] of context.throughputRates) {
// Alert if throughput drops below 10% of normal
// In practice, this baseline would be calculated from historical data
if (rate < 1) {
drops.push({ source, rate });
}
}
return {
shouldFire: drops.length > 0,
message:
`Event production rate has dropped for: ` +
drops.map((d) => `${d.source} (${d.rate}/sec)`).join(", "),
details: { drops },
};
},
};
// Alert manager that evaluates rules and sends notifications
class AlertManager {
private lastFired = new Map<string, Date>();
private rules: AlertRule[] = [];
constructor(private notifier: AlertNotifier) {}
registerRule(rule: AlertRule): void {
this.rules.push(rule);
}
async evaluate(context: AlertContext): Promise<void> {
for (const rule of this.rules) {
// Check cooldown
const lastFiredAt = this.lastFired.get(rule.name);
if (lastFiredAt) {
const cooldownMs = rule.cooldownMinutes * 60 * 1000;
if (Date.now() - lastFiredAt.getTime() < cooldownMs) {
continue;
}
}
const evaluation = await rule.evaluate(context);
if (evaluation.shouldFire) {
await this.notifier.send({
ruleName: rule.name,
severity: rule.severity,
message: evaluation.message,
details: evaluation.details,
timestamp: new Date(),
});
this.lastFired.set(rule.name, new Date());
}
}
}
}
interface AlertNotifier {
send(alert: {
ruleName: string;
severity: string;
message: string;
details: Record<string, unknown>;
timestamp: Date;
}): Promise<void>;
}Operational Dashboards
A well-designed operational dashboard provides at-a-glance visibility into the health of your event-driven system. The dashboard should answer four questions: Is the system producing events? Is the system consuming events? Are consumers keeping up? Are there errors?
// Dashboard data aggregator
interface DashboardData {
timestamp: Date;
overview: SystemOverview;
consumers: ConsumerHealth[];
eventTypes: EventTypeMetrics[];
alerts: ActiveAlert[];
recentErrors: RecentError[];
}
interface SystemOverview {
totalEventsProducedPerSecond: number;
totalEventsConsumedPerSecond: number;
totalConsumers: number;
healthyConsumers: number;
unhealthyConsumers: number;
deadLetterQueueTotal: number;
oldestUnprocessedEventAge: number; // milliseconds
}
interface ConsumerHealth {
name: string;
status: "healthy" | "degraded" | "unhealthy";
eventsPerSecond: number;
lagEvents: number;
lagMs: number;
errorRate: number;
lastProcessedAt: Date;
}
interface EventTypeMetrics {
eventType: string;
producedPerSecond: number;
consumedPerSecond: number;
averageProcessingMs: number;
p99ProcessingMs: number;
errorCount: number;
}
interface ActiveAlert {
ruleName: string;
severity: string;
message: string;
firedAt: Date;
acknowledged: boolean;
}
interface RecentError {
eventId: string;
eventType: string;
consumer: string;
error: string;
timestamp: Date;
retryCount: number;
}
class DashboardAggregator {
constructor(
private metrics: PrometheusEventMetrics,
private lagMonitor: EventStreamLagMonitor,
private alertManager: AlertManager,
private errorLog: ErrorLog
) {}
async aggregate(): Promise<DashboardData> {
const lagMeasurements = await this.lagMonitor.measureAll();
const recentErrors = await this.errorLog.getRecent(50);
const consumers: ConsumerHealth[] = [];
let healthyCount = 0;
let unhealthyCount = 0;
for (const [name, lag] of lagMeasurements) {
const status = lag.isHealthy
? "healthy"
: lag.lagMs > 60000
? "unhealthy"
: "degraded";
if (status === "healthy") healthyCount++;
else unhealthyCount++;
consumers.push({
name,
status,
eventsPerSecond: 0, // Would be populated from metrics
lagEvents: lag.lagEvents,
lagMs: lag.lagMs,
errorRate: 0, // Would be populated from metrics
lastProcessedAt: new Date(), // Would track actual timestamp
});
}
return {
timestamp: new Date(),
overview: {
totalEventsProducedPerSecond: 0, // From metrics
totalEventsConsumedPerSecond: 0, // From metrics
totalConsumers: consumers.length,
healthyConsumers: healthyCount,
unhealthyConsumers: unhealthyCount,
deadLetterQueueTotal: 0, // From DLQ metrics
oldestUnprocessedEventAge: Math.max(
...Array.from(lagMeasurements.values()).map((l) => l.lagMs),
0
),
},
consumers,
eventTypes: [], // Would be populated from metrics
alerts: [], // Would be populated from alert manager
recentErrors: recentErrors.map((e) => ({
eventId: e.eventId,
eventType: e.eventType,
consumer: e.consumer,
error: e.errorMessage,
timestamp: e.timestamp,
retryCount: e.retryCount,
})),
};
}
}
interface ErrorLog {
getRecent(count: number): Promise<Array<{
eventId: string;
eventType: string;
consumer: string;
errorMessage: string;
timestamp: Date;
retryCount: number;
}>>;
}Practical Tips for Monitoring at Scale
Alert on symptoms, not causes. Alert when consumer lag exceeds a threshold, not when a specific error type occurs. Symptom-based alerts are more robust and generate fewer false positives.
Use anomaly detection for throughput. Rather than setting static thresholds for event production rates, use anomaly detection that accounts for daily, weekly, and seasonal patterns. A 50% throughput drop at 3 AM might be normal; the same drop at 3 PM is probably a problem.
Build runbooks for every alert. When an on-call engineer receives an alert at 2 AM, they should not have to figure out what to do from scratch. Every alert should have a corresponding runbook that describes the likely cause, investigation steps, and remediation actions.
Monitor the monitoring. If your metrics collection or alerting pipeline has an outage, you lose visibility into your entire system. Monitor the health of your observability infrastructure with separate, independent mechanisms.
Keep dashboards simple. A dashboard with fifty graphs is not useful. The primary operational dashboard should answer the four key questions in under five seconds. Detailed investigation dashboards can be linked from the primary dashboard for drill-down.
Track business metrics alongside technical metrics. Display the number of orders processed per minute alongside the event processing rate. When technical issues have business impact, seeing both on the same dashboard makes the urgency immediately clear.
Conclusion
Monitoring event-driven systems at scale requires a deliberate, comprehensive approach that goes beyond traditional application monitoring. The combination of event-specific metrics, distributed tracing, intelligent alerting, and well-designed dashboards gives you the visibility needed to operate event-driven systems with confidence.
The patterns and implementations covered here --- instrumented processors, consumer lag monitoring, symptom-based alerting, and dashboard aggregation --- provide a practical foundation for building observability into your event processing infrastructure. When running platforms like Starburst at scale, investing in monitoring is not optional; it is the difference between operating a system and merely hoping it works.
Related Articles
Migrating to Event-Driven Architecture
A practical guide for planning and executing a migration from traditional request-response systems to event-driven architecture, covering assessment frameworks, migration strategies, risk management, and organizational change.
Real-Time Data Processing: Business Impact and ROI
An exploration of the business value of real-time data processing, covering measurable ROI, competitive advantages, and practical frameworks for justifying investment in event-driven infrastructure.
Event Replay for Debugging and Recovery
A comprehensive guide to using event replay as a powerful debugging and recovery tool in event-driven systems, with TypeScript implementations for selective replay, time-travel debugging, and disaster recovery strategies.