Event Sourcing in TypeScript: A Practical Guide

Learn how to implement event sourcing in TypeScript from scratch, including aggregate design, event stores, projections, and snapshot strategies for high-performance event-driven systems.

technical10 min readBy Klivvr Engineering
Share:

Event sourcing is a pattern where the state of an application is derived entirely from a sequence of events. Rather than storing the current state in a database and mutating it with each change, you store every state change as an immutable event. The current state is reconstructed by replaying these events from the beginning. This approach offers remarkable benefits for auditability, debugging, and temporal queries, but it also introduces complexity that must be managed carefully.

In this guide, we walk through a complete implementation of event sourcing in TypeScript, covering the core building blocks, practical considerations, and performance optimization strategies that are essential for production systems like Starburst.

The Event Store: Foundation of Event Sourcing

The event store is the persistent backbone of an event-sourced system. It is an append-only log of events, ordered by the sequence in which they occurred. Unlike a traditional database where rows are updated in place, the event store only ever grows. Every state change is captured as a new event appended to the log.

// Core event store types
interface StoredEvent {
  readonly eventId: string;
  readonly aggregateId: string;
  readonly aggregateType: string;
  readonly eventType: string;
  readonly version: number;
  readonly timestamp: Date;
  readonly payload: Record<string, unknown>;
  readonly metadata: Record<string, unknown>;
}
 
interface EventStore {
  append(
    aggregateId: string,
    events: StoredEvent[],
    expectedVersion: number
  ): Promise<void>;
 
  load(
    aggregateId: string,
    fromVersion?: number
  ): Promise<StoredEvent[]>;
 
  loadAll(
    fromPosition?: number,
    batchSize?: number
  ): AsyncIterable<StoredEvent>;
}
 
// PostgreSQL-backed event store implementation
class PostgresEventStore implements EventStore {
  constructor(private readonly pool: DatabasePool) {}
 
  async append(
    aggregateId: string,
    events: StoredEvent[],
    expectedVersion: number
  ): Promise<void> {
    const client = await this.pool.connect();
 
    try {
      await client.query("BEGIN");
 
      // Optimistic concurrency check
      const result = await client.query(
        `SELECT MAX(version) as current_version
         FROM events
         WHERE aggregate_id = $1`,
        [aggregateId]
      );
 
      const currentVersion = result.rows[0]?.current_version ?? 0;
 
      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, ` +
          `but current version is ${currentVersion}`
        );
      }
 
      for (const event of events) {
        await client.query(
          `INSERT INTO events
           (event_id, aggregate_id, aggregate_type, event_type,
            version, timestamp, payload, metadata)
           VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
          [
            event.eventId,
            event.aggregateId,
            event.aggregateType,
            event.eventType,
            event.version,
            event.timestamp,
            JSON.stringify(event.payload),
            JSON.stringify(event.metadata),
          ]
        );
      }
 
      await client.query("COMMIT");
    } catch (error) {
      await client.query("ROLLBACK");
      throw error;
    } finally {
      client.release();
    }
  }
 
  async load(
    aggregateId: string,
    fromVersion: number = 0
  ): Promise<StoredEvent[]> {
    const result = await this.pool.query(
      `SELECT * FROM events
       WHERE aggregate_id = $1 AND version > $2
       ORDER BY version ASC`,
      [aggregateId, fromVersion]
    );
 
    return result.rows.map(this.mapRow);
  }
 
  async *loadAll(
    fromPosition: number = 0,
    batchSize: number = 1000
  ): AsyncIterable<StoredEvent> {
    let currentPosition = fromPosition;
    let hasMore = true;
 
    while (hasMore) {
      const result = await this.pool.query(
        `SELECT * FROM events
         WHERE global_position > $1
         ORDER BY global_position ASC
         LIMIT $2`,
        [currentPosition, batchSize]
      );
 
      for (const row of result.rows) {
        yield this.mapRow(row);
        currentPosition = row.global_position;
      }
 
      hasMore = result.rows.length === batchSize;
    }
  }
 
  private mapRow(row: any): StoredEvent {
    return {
      eventId: row.event_id,
      aggregateId: row.aggregate_id,
      aggregateType: row.aggregate_type,
      eventType: row.event_type,
      version: row.version,
      timestamp: row.timestamp,
      payload: row.payload,
      metadata: row.metadata,
    };
  }
}
 
class ConcurrencyError extends Error {
  constructor(message: string) {
    super(message);
    this.name = "ConcurrencyError";
  }
}

The optimistic concurrency check in the append method is crucial. Without it, two concurrent operations on the same aggregate could produce conflicting event sequences. By checking that the current version matches the expected version, we ensure that no events have been written between when the aggregate was loaded and when new events are being appended.

Aggregates: The Domain Model

In event sourcing, aggregates are the domain objects that produce and consume events. An aggregate encapsulates business logic and enforces invariants. When a command is issued, the aggregate validates it against its current state and produces zero or more events. The state itself is built by applying events sequentially.

// Base aggregate class
abstract class Aggregate {
  private uncommittedEvents: StoredEvent[] = [];
  private _version: number = 0;
 
  get version(): number {
    return this._version;
  }
 
  protected abstract applyEvent(event: StoredEvent): void;
 
  loadFromHistory(events: StoredEvent[]): void {
    for (const event of events) {
      this.applyEvent(event);
      this._version = event.version;
    }
  }
 
  getUncommittedEvents(): StoredEvent[] {
    return [...this.uncommittedEvents];
  }
 
  clearUncommittedEvents(): void {
    this.uncommittedEvents = [];
  }
 
  protected addEvent(
    eventType: string,
    payload: Record<string, unknown>,
    metadata: Record<string, unknown> = {}
  ): void {
    const event: StoredEvent = {
      eventId: crypto.randomUUID(),
      aggregateId: this.getId(),
      aggregateType: this.getType(),
      eventType,
      version: this._version + this.uncommittedEvents.length + 1,
      timestamp: new Date(),
      payload,
      metadata,
    };
 
    this.applyEvent(event);
    this.uncommittedEvents.push(event);
  }
 
  protected abstract getId(): string;
  protected abstract getType(): string;
}
 
// Concrete aggregate: ShoppingCart
class ShoppingCart extends Aggregate {
  private id: string = "";
  private customerId: string = "";
  private items: Map<string, CartItem> = new Map();
  private status: "active" | "checked_out" | "abandoned" = "active";
 
  protected getId(): string {
    return this.id;
  }
 
  protected getType(): string {
    return "ShoppingCart";
  }
 
  // Command: Create a new cart
  static create(cartId: string, customerId: string): ShoppingCart {
    const cart = new ShoppingCart();
    cart.addEvent("CartCreated", { cartId, customerId });
    return cart;
  }
 
  // Command: Add an item to the cart
  addItem(productId: string, name: string, price: number, quantity: number): void {
    if (this.status !== "active") {
      throw new Error("Cannot add items to a non-active cart.");
    }
 
    if (price <= 0) {
      throw new Error("Price must be positive.");
    }
 
    if (quantity <= 0) {
      throw new Error("Quantity must be positive.");
    }
 
    this.addEvent("ItemAddedToCart", {
      productId,
      name,
      price,
      quantity,
    });
  }
 
  // Command: Remove an item from the cart
  removeItem(productId: string): void {
    if (this.status !== "active") {
      throw new Error("Cannot remove items from a non-active cart.");
    }
 
    if (!this.items.has(productId)) {
      throw new Error(`Product ${productId} is not in the cart.`);
    }
 
    this.addEvent("ItemRemovedFromCart", { productId });
  }
 
  // Command: Check out
  checkout(): void {
    if (this.status !== "active") {
      throw new Error("Cart is not active.");
    }
 
    if (this.items.size === 0) {
      throw new Error("Cannot checkout an empty cart.");
    }
 
    const total = Array.from(this.items.values()).reduce(
      (sum, item) => sum + item.price * item.quantity,
      0
    );
 
    this.addEvent("CartCheckedOut", {
      totalAmount: total,
      itemCount: this.items.size,
    });
  }
 
  // Apply events to rebuild state
  protected applyEvent(event: StoredEvent): void {
    switch (event.eventType) {
      case "CartCreated":
        this.id = event.payload.cartId as string;
        this.customerId = event.payload.customerId as string;
        this.status = "active";
        break;
 
      case "ItemAddedToCart": {
        const productId = event.payload.productId as string;
        const existing = this.items.get(productId);
 
        if (existing) {
          this.items.set(productId, {
            ...existing,
            quantity: existing.quantity + (event.payload.quantity as number),
          });
        } else {
          this.items.set(productId, {
            productId,
            name: event.payload.name as string,
            price: event.payload.price as number,
            quantity: event.payload.quantity as number,
          });
        }
        break;
      }
 
      case "ItemRemovedFromCart":
        this.items.delete(event.payload.productId as string);
        break;
 
      case "CartCheckedOut":
        this.status = "checked_out";
        break;
    }
  }
}
 
interface CartItem {
  productId: string;
  name: string;
  price: number;
  quantity: number;
}

Notice how the aggregate separates command handling (the public methods that validate business rules) from event application (the applyEvent method that mutates state). This separation is the heart of event sourcing: commands represent intent, events represent facts.

Projections: Read Models from Event Streams

While aggregates provide the write side of your application, projections provide the read side. A projection subscribes to the event stream and builds a read-optimized representation of the data. This is where event sourcing intersects with CQRS (Command Query Responsibility Segregation).

// Projection infrastructure
interface Projection {
  readonly projectionName: string;
  handle(event: StoredEvent): Promise<void>;
  getPosition(): Promise<number>;
  setPosition(position: number): Promise<void>;
}
 
class ProjectionEngine {
  private projections: Projection[] = [];
  private running = false;
 
  constructor(private eventStore: EventStore) {}
 
  register(projection: Projection): void {
    this.projections.push(projection);
  }
 
  async start(): Promise<void> {
    this.running = true;
 
    while (this.running) {
      for (const projection of this.projections) {
        const position = await projection.getPosition();
 
        for await (const event of this.eventStore.loadAll(position, 500)) {
          await projection.handle(event);
          await projection.setPosition(event.version);
        }
      }
 
      // Poll interval when caught up
      await new Promise(resolve => setTimeout(resolve, 100));
    }
  }
 
  stop(): void {
    this.running = false;
  }
}
 
// Concrete projection: Cart summary for display
class CartSummaryProjection implements Projection {
  readonly projectionName = "CartSummary";
  private position = 0;
 
  constructor(private readonly db: DatabasePool) {}
 
  async handle(event: StoredEvent): Promise<void> {
    switch (event.eventType) {
      case "CartCreated":
        await this.db.query(
          `INSERT INTO cart_summaries (cart_id, customer_id, item_count, total, status)
           VALUES ($1, $2, 0, 0, 'active')`,
          [event.payload.cartId, event.payload.customerId]
        );
        break;
 
      case "ItemAddedToCart":
        await this.db.query(
          `UPDATE cart_summaries
           SET item_count = item_count + $1,
               total = total + ($2 * $1),
               updated_at = NOW()
           WHERE cart_id = $3`,
          [
            event.payload.quantity,
            event.payload.price,
            event.aggregateId,
          ]
        );
        break;
 
      case "CartCheckedOut":
        await this.db.query(
          `UPDATE cart_summaries
           SET status = 'checked_out', updated_at = NOW()
           WHERE cart_id = $1`,
          [event.aggregateId]
        );
        break;
    }
  }
 
  async getPosition(): Promise<number> {
    return this.position;
  }
 
  async setPosition(position: number): Promise<void> {
    this.position = position;
  }
}

Projections are inherently eventual --- they are always slightly behind the event stream. This is a trade-off you accept when adopting event sourcing. However, the benefit is that you can create multiple projections from the same event stream, each optimized for a different query pattern.

Snapshots: Optimizing Replay Performance

As aggregates accumulate thousands of events, replaying from the beginning becomes slow. Snapshots solve this by periodically saving the aggregate's state so that only events since the last snapshot need to be replayed.

interface Snapshot {
  aggregateId: string;
  aggregateType: string;
  version: number;
  state: Record<string, unknown>;
  createdAt: Date;
}
 
interface SnapshotStore {
  save(snapshot: Snapshot): Promise<void>;
  load(aggregateId: string): Promise<Snapshot | null>;
}
 
class SnapshotAwareRepository<T extends Aggregate & Snapshotable> {
  private readonly snapshotInterval: number;
 
  constructor(
    private eventStore: EventStore,
    private snapshotStore: SnapshotStore,
    private factory: () => T,
    options: { snapshotInterval?: number } = {}
  ) {
    this.snapshotInterval = options.snapshotInterval ?? 100;
  }
 
  async load(aggregateId: string): Promise<T> {
    const aggregate = this.factory();
    const snapshot = await this.snapshotStore.load(aggregateId);
 
    if (snapshot) {
      aggregate.restoreFromSnapshot(snapshot.state);
      const events = await this.eventStore.load(
        aggregateId,
        snapshot.version
      );
      aggregate.loadFromHistory(events);
    } else {
      const events = await this.eventStore.load(aggregateId);
      aggregate.loadFromHistory(events);
    }
 
    return aggregate;
  }
 
  async save(aggregate: T): Promise<void> {
    const uncommitted = aggregate.getUncommittedEvents();
 
    if (uncommitted.length === 0) return;
 
    await this.eventStore.append(
      aggregate.getId(),
      uncommitted,
      aggregate.version - uncommitted.length
    );
 
    // Take snapshot if needed
    if (aggregate.version % this.snapshotInterval === 0) {
      await this.snapshotStore.save({
        aggregateId: aggregate.getId(),
        aggregateType: aggregate.getType(),
        version: aggregate.version,
        state: aggregate.toSnapshot(),
        createdAt: new Date(),
      });
    }
 
    aggregate.clearUncommittedEvents();
  }
}
 
interface Snapshotable {
  getId(): string;
  getType(): string;
  toSnapshot(): Record<string, unknown>;
  restoreFromSnapshot(state: Record<string, unknown>): void;
}

A good rule of thumb is to take snapshots every 100 to 500 events, depending on the complexity of your aggregate. Too frequent and you waste storage; too infrequent and replay becomes slow.

Practical Tips for Production Event Sourcing

Schema evolution is inevitable. Events are immutable, but your understanding of the domain will change. Use upcasters --- functions that transform old event formats into new ones during replay --- rather than trying to migrate historical events.

type Upcaster = (event: StoredEvent) => StoredEvent;
 
const upcasters: Map<string, Upcaster[]> = new Map([
  [
    "ItemAddedToCart",
    [
      // v1 -> v2: Add currency field
      (event) => ({
        ...event,
        payload: {
          ...event.payload,
          currency: event.payload.currency ?? "USD",
        },
      }),
    ],
  ],
]);
 
function upcast(event: StoredEvent): StoredEvent {
  const eventUpcasters = upcasters.get(event.eventType) ?? [];
  return eventUpcasters.reduce((e, upcaster) => upcaster(e), event);
}

Keep events small and focused. Each event should represent a single meaningful change. Avoid "god events" that carry the entire state of an aggregate.

Test your aggregates thoroughly. Because aggregates are pure functions of their event history, they are remarkably testable. Given a sequence of past events and a command, assert that the correct new events are produced.

Monitor your event store size. Event stores grow indefinitely. Plan for archival strategies, compression, and retention policies from the start.

Conclusion

Event sourcing in TypeScript provides a powerful foundation for building systems where every state change is captured, traceable, and replayable. The combination of TypeScript's type system with event sourcing patterns creates a development experience where many errors are caught at compile time rather than in production.

The implementation patterns we have covered --- event stores with optimistic concurrency, aggregates that separate commands from events, projections for read models, and snapshots for performance --- form the core toolkit for any event-sourced system. While the initial investment is higher than traditional CRUD approaches, the long-term benefits in auditability, debugging capability, and system evolution make event sourcing an excellent choice for complex domains where data integrity and traceability are paramount.

Related Articles

business

Monitoring Event-Driven Systems at Scale

A practical guide to building comprehensive monitoring and observability for event-driven systems, covering metrics, distributed tracing, alerting strategies, and operational dashboards for maintaining healthy event processing pipelines.

12 min read
business

Migrating to Event-Driven Architecture

A practical guide for planning and executing a migration from traditional request-response systems to event-driven architecture, covering assessment frameworks, migration strategies, risk management, and organizational change.

12 min read
business

Real-Time Data Processing: Business Impact and ROI

An exploration of the business value of real-time data processing, covering measurable ROI, competitive advantages, and practical frameworks for justifying investment in event-driven infrastructure.

11 min read