Event Replay for Debugging and Recovery

A comprehensive guide to using event replay as a powerful debugging and recovery tool in event-driven systems, with TypeScript implementations for selective replay, time-travel debugging, and disaster recovery strategies.

technical12 min readBy Klivvr Engineering
Share:

One of the most underappreciated benefits of event-driven architecture is the ability to replay events. Because events are immutable records of everything that has happened in your system, you can reprocess them at any time --- to rebuild state after a failure, to debug a production issue by reproducing the exact sequence of events that caused it, or to populate a new service with historical data. Event replay transforms debugging from guesswork into science and recovery from prayer into procedure.

This article covers the principles, patterns, and TypeScript implementations for building robust event replay capabilities into your system, drawing on practices used in real-time processing platforms like Starburst.

The Power of Deterministic Replay

At its core, event replay relies on a simple property: if you process the same events in the same order, you should get the same result. This property --- deterministic processing --- is what makes event replay useful. It means that a bug that occurred in production at 3 AM can be reproduced on a developer's laptop by replaying the same sequence of events.

However, determinism is not free. Your event handlers must avoid side effects that depend on external state: the current time, random number generators, network calls, and database queries all introduce non-determinism. The solution is to make these dependencies explicit and injectable.

// A clock abstraction for deterministic replay
interface Clock {
  now(): Date;
}
 
class SystemClock implements Clock {
  now(): Date {
    return new Date();
  }
}
 
class ReplayClock implements Clock {
  constructor(private currentTime: Date) {}
 
  now(): Date {
    return this.currentTime;
  }
 
  advanceTo(time: Date): void {
    this.currentTime = time;
  }
}
 
// An event handler that is replay-safe
class OrderProcessor {
  constructor(
    private clock: Clock,
    private idGenerator: IdGenerator,
    private notificationService: NotificationService
  ) {}
 
  async processOrderPlaced(event: StoredEvent): Promise<ProcessingResult> {
    const processingTime = this.clock.now();
    const confirmationId = this.idGenerator.generate();
 
    // Business logic that depends on injected dependencies,
    // not global state
    const deadline = new Date(
      processingTime.getTime() + 7 * 24 * 60 * 60 * 1000
    );
 
    return {
      confirmationId,
      orderId: event.aggregateId,
      processedAt: processingTime,
      fulfillmentDeadline: deadline,
      status: "confirmed",
    };
  }
}
 
interface IdGenerator {
  generate(): string;
}
 
class UUIDGenerator implements IdGenerator {
  generate(): string {
    return crypto.randomUUID();
  }
}
 
class SequentialGenerator implements IdGenerator {
  private counter = 0;
 
  generate(): string {
    return `replay-${++this.counter}`;
  }
}
 
interface ProcessingResult {
  confirmationId: string;
  orderId: string;
  processedAt: Date;
  fulfillmentDeadline: Date;
  status: string;
}

By injecting a ReplayClock and a SequentialGenerator during replay, you ensure that the processor produces the same output regardless of when the replay runs. This is the foundation of deterministic replay.

Building a Replay Engine

A replay engine reads events from the event store and feeds them to processors in the correct order. It needs to handle filtering, rate limiting, and progress tracking.

// Replay engine with filtering and progress tracking
interface ReplayOptions {
  fromTimestamp?: Date;
  toTimestamp?: Date;
  eventTypes?: string[];
  aggregateIds?: string[];
  batchSize?: number;
  onProgress?: (progress: ReplayProgress) => void;
  rateLimit?: number; // Events per second
  dryRun?: boolean;
}
 
interface ReplayProgress {
  totalEvents: number;
  processedEvents: number;
  skippedEvents: number;
  failedEvents: number;
  currentTimestamp: Date;
  eventsPerSecond: number;
  estimatedRemainingSeconds: number;
}
 
interface ReplayResult {
  totalProcessed: number;
  totalSkipped: number;
  totalFailed: number;
  failures: Array<{
    eventId: string;
    error: string;
  }>;
  durationMs: number;
}
 
class ReplayEngine {
  constructor(
    private eventStore: EventStore,
    private processors: Map<string, EventProcessor>
  ) {}
 
  async replay(options: ReplayOptions): Promise<ReplayResult> {
    const startTime = Date.now();
    let processedCount = 0;
    let skippedCount = 0;
    let failedCount = 0;
    const failures: Array<{ eventId: string; error: string }> = [];
 
    // Count total events for progress reporting
    const totalEvents = await this.countEvents(options);
 
    const progressInterval = setInterval(() => {
      if (options.onProgress) {
        const elapsed = (Date.now() - startTime) / 1000;
        const rate = processedCount / elapsed;
        const remaining = totalEvents - processedCount - skippedCount;
 
        options.onProgress({
          totalEvents,
          processedEvents: processedCount,
          skippedEvents: skippedCount,
          failedEvents: failedCount,
          currentTimestamp: new Date(),
          eventsPerSecond: Math.round(rate),
          estimatedRemainingSeconds: rate > 0
            ? Math.round(remaining / rate)
            : 0,
        });
      }
    }, 1000);
 
    try {
      for await (const event of this.streamEvents(options)) {
        // Check if we have a processor for this event type
        const processor = this.processors.get(event.eventType);
 
        if (!processor) {
          skippedCount++;
          continue;
        }
 
        // Apply rate limiting
        if (options.rateLimit) {
          await this.applyRateLimit(options.rateLimit, processedCount, startTime);
        }
 
        if (options.dryRun) {
          processedCount++;
          continue;
        }
 
        try {
          await processor.process(event);
          processedCount++;
        } catch (error) {
          failedCount++;
          failures.push({
            eventId: event.eventId,
            error: error instanceof Error ? error.message : String(error),
          });
        }
      }
    } finally {
      clearInterval(progressInterval);
    }
 
    return {
      totalProcessed: processedCount,
      totalSkipped: skippedCount,
      totalFailed: failedCount,
      failures,
      durationMs: Date.now() - startTime,
    };
  }
 
  private async *streamEvents(
    options: ReplayOptions
  ): AsyncIterable<StoredEvent> {
    const batchSize = options.batchSize ?? 1000;
 
    for await (const event of this.eventStore.loadAll(0, batchSize)) {
      // Apply filters
      if (
        options.fromTimestamp &&
        event.timestamp < options.fromTimestamp
      ) {
        continue;
      }
 
      if (
        options.toTimestamp &&
        event.timestamp > options.toTimestamp
      ) {
        break; // Events are ordered, so we can stop early
      }
 
      if (
        options.eventTypes &&
        !options.eventTypes.includes(event.eventType)
      ) {
        continue;
      }
 
      if (
        options.aggregateIds &&
        !options.aggregateIds.includes(event.aggregateId)
      ) {
        continue;
      }
 
      yield event;
    }
  }
 
  private async applyRateLimit(
    maxPerSecond: number,
    processed: number,
    startTime: number
  ): Promise<void> {
    const elapsed = (Date.now() - startTime) / 1000;
    const expectedTime = processed / maxPerSecond;
 
    if (expectedTime > elapsed) {
      const delayMs = (expectedTime - elapsed) * 1000;
      await new Promise((resolve) => setTimeout(resolve, delayMs));
    }
  }
 
  private async countEvents(options: ReplayOptions): Promise<number> {
    let count = 0;
    for await (const _ of this.streamEvents(options)) {
      count++;
    }
    return count;
  }
}
 
interface EventProcessor {
  process(event: StoredEvent): Promise<void>;
}

Selective Replay: Targeting Specific Issues

Full replay is sometimes necessary, but for most debugging scenarios, you want to replay a subset of events. Selective replay lets you narrow down to a specific aggregate, time range, or event type.

// Selective replay builder with a fluent API
class ReplayBuilder {
  private options: ReplayOptions = {};
 
  from(timestamp: Date): this {
    this.options.fromTimestamp = timestamp;
    return this;
  }
 
  to(timestamp: Date): this {
    this.options.toTimestamp = timestamp;
    return this;
  }
 
  forAggregate(aggregateId: string): this {
    if (!this.options.aggregateIds) {
      this.options.aggregateIds = [];
    }
    this.options.aggregateIds.push(aggregateId);
    return this;
  }
 
  forEventTypes(...types: string[]): this {
    this.options.eventTypes = types;
    return this;
  }
 
  withBatchSize(size: number): this {
    this.options.batchSize = size;
    return this;
  }
 
  withRateLimit(eventsPerSecond: number): this {
    this.options.rateLimit = eventsPerSecond;
    return this;
  }
 
  withProgress(callback: (progress: ReplayProgress) => void): this {
    this.options.onProgress = callback;
    return this;
  }
 
  asDryRun(): this {
    this.options.dryRun = true;
    return this;
  }
 
  build(): ReplayOptions {
    return { ...this.options };
  }
}
 
// Usage: Debug a specific order that had issues
async function debugOrder(
  orderId: string,
  replayEngine: ReplayEngine
): Promise<void> {
  const options = new ReplayBuilder()
    .forAggregate(orderId)
    .withProgress((progress) => {
      console.log(
        `Replay progress: ${progress.processedEvents}/${progress.totalEvents} ` +
        `(${progress.eventsPerSecond} events/sec)`
      );
    })
    .build();
 
  console.log(`Starting replay for order ${orderId}...`);
  const result = await replayEngine.replay(options);
 
  console.log(
    `Replay complete. Processed: ${result.totalProcessed}, ` +
    `Failed: ${result.totalFailed}, Duration: ${result.durationMs}ms`
  );
 
  if (result.failures.length > 0) {
    console.log("Failures:");
    for (const failure of result.failures) {
      console.log(`  Event ${failure.eventId}: ${failure.error}`);
    }
  }
}

Time-Travel Debugging

One of the most powerful capabilities of event replay is time-travel debugging --- the ability to reconstruct the state of the system at any point in the past. This is invaluable when investigating production issues.

// Time-travel debugger
class TimeTravelDebugger {
  private stateHistory: Map<number, Record<string, unknown>> = new Map();
 
  constructor(
    private eventStore: EventStore,
    private stateReconstructor: StateReconstructor
  ) {}
 
  async getStateAt(
    aggregateId: string,
    targetTimestamp: Date
  ): Promise<AggregateStateSnapshot> {
    const events = await this.eventStore.load(aggregateId);
 
    // Filter events up to the target timestamp
    const relevantEvents = events.filter(
      (e) => e.timestamp <= targetTimestamp
    );
 
    // Reconstruct state by replaying events
    let state: Record<string, unknown> = {};
    const appliedEvents: StoredEvent[] = [];
 
    for (const event of relevantEvents) {
      state = this.stateReconstructor.apply(state, event);
      appliedEvents.push(event);
    }
 
    return {
      aggregateId,
      timestamp: targetTimestamp,
      state,
      eventCount: appliedEvents.length,
      lastEventId: appliedEvents.length > 0
        ? appliedEvents[appliedEvents.length - 1].eventId
        : null,
      lastEventType: appliedEvents.length > 0
        ? appliedEvents[appliedEvents.length - 1].eventType
        : null,
    };
  }
 
  async getStateTimeline(
    aggregateId: string,
    fromTimestamp?: Date,
    toTimestamp?: Date
  ): Promise<StateTimelineEntry[]> {
    const events = await this.eventStore.load(aggregateId);
    const timeline: StateTimelineEntry[] = [];
    let state: Record<string, unknown> = {};
 
    for (const event of events) {
      if (fromTimestamp && event.timestamp < fromTimestamp) {
        state = this.stateReconstructor.apply(state, event);
        continue;
      }
 
      if (toTimestamp && event.timestamp > toTimestamp) {
        break;
      }
 
      const previousState = { ...state };
      state = this.stateReconstructor.apply(state, event);
 
      timeline.push({
        timestamp: event.timestamp,
        eventType: event.eventType,
        eventId: event.eventId,
        previousState,
        newState: { ...state },
        changes: this.diffStates(previousState, state),
      });
    }
 
    return timeline;
  }
 
  async findEventCausingCondition(
    aggregateId: string,
    condition: (state: Record<string, unknown>) => boolean
  ): Promise<StoredEvent | null> {
    const events = await this.eventStore.load(aggregateId);
    let state: Record<string, unknown> = {};
 
    for (const event of events) {
      const previouslyMet = condition(state);
      state = this.stateReconstructor.apply(state, event);
      const nowMet = condition(state);
 
      if (!previouslyMet && nowMet) {
        return event;
      }
    }
 
    return null;
  }
 
  private diffStates(
    before: Record<string, unknown>,
    after: Record<string, unknown>
  ): StateDiff[] {
    const diffs: StateDiff[] = [];
    const allKeys = new Set([
      ...Object.keys(before),
      ...Object.keys(after),
    ]);
 
    for (const key of allKeys) {
      if (!(key in before)) {
        diffs.push({ field: key, type: "added", newValue: after[key] });
      } else if (!(key in after)) {
        diffs.push({ field: key, type: "removed", oldValue: before[key] });
      } else if (
        JSON.stringify(before[key]) !== JSON.stringify(after[key])
      ) {
        diffs.push({
          field: key,
          type: "changed",
          oldValue: before[key],
          newValue: after[key],
        });
      }
    }
 
    return diffs;
  }
}
 
interface AggregateStateSnapshot {
  aggregateId: string;
  timestamp: Date;
  state: Record<string, unknown>;
  eventCount: number;
  lastEventId: string | null;
  lastEventType: string | null;
}
 
interface StateTimelineEntry {
  timestamp: Date;
  eventType: string;
  eventId: string;
  previousState: Record<string, unknown>;
  newState: Record<string, unknown>;
  changes: StateDiff[];
}
 
interface StateDiff {
  field: string;
  type: "added" | "removed" | "changed";
  oldValue?: unknown;
  newValue?: unknown;
}
 
interface StateReconstructor {
  apply(
    state: Record<string, unknown>,
    event: StoredEvent
  ): Record<string, unknown>;
}

The findEventCausingCondition method is especially useful for debugging. Given a predicate like "the order total exceeded $10,000," it finds the exact event that caused the condition to become true.

Disaster Recovery with Event Replay

Event replay is not just for debugging --- it is a critical component of disaster recovery. If a read model becomes corrupted, a projection falls behind, or an entire database is lost, you can rebuild everything from the event store.

// Disaster recovery coordinator
class RecoveryCoordinator {
  constructor(
    private eventStore: EventStore,
    private projections: Map<string, RecoverableProjection>,
    private snapshotStore: SnapshotStore
  ) {}
 
  async recoverProjection(
    projectionName: string,
    options: RecoveryOptions = {}
  ): Promise<RecoveryResult> {
    const projection = this.projections.get(projectionName);
 
    if (!projection) {
      throw new Error(`Unknown projection: ${projectionName}`);
    }
 
    console.log(`Starting recovery of projection: ${projectionName}`);
 
    // Step 1: Reset the projection
    if (options.fullRebuild) {
      await projection.reset();
      console.log("Projection reset complete.");
    }
 
    // Step 2: Determine the starting position
    const startPosition = options.fullRebuild
      ? 0
      : await projection.getLastProcessedPosition();
 
    console.log(`Replaying from position: ${startPosition}`);
 
    // Step 3: Replay events
    let processedCount = 0;
    let lastPosition = startPosition;
    const startTime = Date.now();
 
    for await (const event of this.eventStore.loadAll(
      startPosition,
      options.batchSize ?? 5000
    )) {
      try {
        await projection.handleEvent(event);
        processedCount++;
        lastPosition = event.version;
 
        // Periodic checkpointing
        if (processedCount % 10000 === 0) {
          await projection.checkpoint(lastPosition);
          const elapsed = (Date.now() - startTime) / 1000;
          console.log(
            `Recovery progress: ${processedCount} events ` +
            `(${Math.round(processedCount / elapsed)} events/sec)`
          );
        }
      } catch (error) {
        if (options.stopOnError) {
          throw error;
        }
 
        console.error(
          `Error processing event ${event.eventId} ` +
          `during recovery:`,
          error
        );
      }
    }
 
    // Final checkpoint
    await projection.checkpoint(lastPosition);
 
    const totalDuration = Date.now() - startTime;
 
    return {
      projectionName,
      eventsProcessed: processedCount,
      finalPosition: lastPosition,
      durationMs: totalDuration,
      eventsPerSecond: Math.round(
        processedCount / (totalDuration / 1000)
      ),
    };
  }
 
  async recoverAll(
    options: RecoveryOptions = {}
  ): Promise<Map<string, RecoveryResult>> {
    const results = new Map<string, RecoveryResult>();
 
    for (const [name] of this.projections) {
      try {
        const result = await this.recoverProjection(name, options);
        results.set(name, result);
      } catch (error) {
        console.error(`Failed to recover projection ${name}:`, error);
      }
    }
 
    return results;
  }
}
 
interface RecoverableProjection {
  handleEvent(event: StoredEvent): Promise<void>;
  reset(): Promise<void>;
  getLastProcessedPosition(): Promise<number>;
  checkpoint(position: number): Promise<void>;
}
 
interface RecoveryOptions {
  fullRebuild?: boolean;
  batchSize?: number;
  stopOnError?: boolean;
}
 
interface RecoveryResult {
  projectionName: string;
  eventsProcessed: number;
  finalPosition: number;
  durationMs: number;
  eventsPerSecond: number;
}

Practical Tips

Make replay a first-class operation. Do not treat replay as an afterthought. Build it into your architecture from the start, and test it regularly. A replay mechanism that has never been exercised will fail when you need it most.

Version your event handlers. When you change how events are processed, keep the old processing logic available for replay. If you delete old handler code, you lose the ability to reproduce historical behavior.

Monitor replay lag. If your projection replay takes hours, you have a problem. Track the time it takes to rebuild each projection from scratch, and set alerts if it exceeds acceptable thresholds.

Be careful with side effects during replay. Sending emails, charging credit cards, and calling external APIs should all be suppressed during replay. Use the same dependency injection pattern shown earlier to swap real implementations for no-ops during replay.

// Side-effect suppression during replay
interface NotificationService {
  send(to: string, message: string): Promise<void>;
}
 
class RealNotificationService implements NotificationService {
  async send(to: string, message: string): Promise<void> {
    // Actually send the notification
    console.log(`Sending notification to ${to}: ${message}`);
  }
}
 
class NoOpNotificationService implements NotificationService {
  async send(to: string, message: string): Promise<void> {
    // Intentionally does nothing during replay
  }
}

Use checksums to verify replay correctness. After a replay, compare the resulting state against the expected state using checksums or hash functions. This catches subtle bugs in your replay logic.

Conclusion

Event replay is one of the most powerful capabilities in an event-driven system's toolkit. It transforms debugging from a process of guessing and log-searching into a precise, reproducible science. It turns disaster recovery from a stressful scramble into a well-practiced procedure.

The patterns covered here --- deterministic processing through dependency injection, replay engines with filtering and progress tracking, time-travel debugging, and disaster recovery coordination --- provide everything you need to make event replay a reliable part of your operational toolkit. When building systems like Starburst that process high volumes of real-time events, investing in robust replay capabilities pays dividends in reduced debugging time, faster recovery, and greater confidence in your system's correctness.

Related Articles

business

Monitoring Event-Driven Systems at Scale

A practical guide to building comprehensive monitoring and observability for event-driven systems, covering metrics, distributed tracing, alerting strategies, and operational dashboards for maintaining healthy event processing pipelines.

12 min read
business

Migrating to Event-Driven Architecture

A practical guide for planning and executing a migration from traditional request-response systems to event-driven architecture, covering assessment frameworks, migration strategies, risk management, and organizational change.

12 min read
business

Real-Time Data Processing: Business Impact and ROI

An exploration of the business value of real-time data processing, covering measurable ROI, competitive advantages, and practical frameworks for justifying investment in event-driven infrastructure.

11 min read