CQRS Patterns for Scalable Applications
An in-depth exploration of Command Query Responsibility Segregation patterns in TypeScript, covering command handling, query optimization, and synchronization strategies for event-driven systems.
Command Query Responsibility Segregation --- CQRS --- is one of those architectural patterns that sounds deceptively simple but has far-reaching implications for how systems are designed, scaled, and maintained. At its heart, CQRS says that the model you use to update data should be separate from the model you use to read data. In a world where read and write workloads have fundamentally different characteristics, this separation unlocks optimization opportunities that are impossible with a single unified model.
For event-processing services like Starburst, where data flows continuously and consumers have diverse query needs, CQRS is not just a nice-to-have --- it is often a necessity. This article explores the patterns, trade-offs, and TypeScript implementations that make CQRS practical in production systems.
Why Separate Reads from Writes
In a traditional application, a single data model serves both reads and writes. A relational table stores orders, and the same table is queried when displaying orders to a user. This works well for simple applications, but it creates tension as complexity grows.
Write operations need strong consistency, validation, and transactional guarantees. They operate on aggregates with business rules that must be enforced. Read operations need speed, flexibility, and denormalization. A dashboard might need to join data from five different aggregates, something that is awkward and slow when done against a normalized write model.
CQRS resolves this tension by giving each side its own model, optimized for its purpose.
// The Command side: Strongly typed commands and handlers
interface Command {
readonly type: string;
readonly payload: Record<string, unknown>;
readonly metadata: CommandMetadata;
}
interface CommandMetadata {
readonly commandId: string;
readonly userId: string;
readonly timestamp: Date;
readonly correlationId: string;
}
// Concrete commands
interface CreateOrderCommand extends Command {
readonly type: "CreateOrder";
readonly payload: {
customerId: string;
items: Array<{
productId: string;
quantity: number;
unitPrice: number;
}>;
shippingAddress: Address;
};
}
interface CancelOrderCommand extends Command {
readonly type: "CancelOrder";
readonly payload: {
orderId: string;
reason: string;
};
}
interface Address {
street: string;
city: string;
state: string;
postalCode: string;
country: string;
}On the query side, you define read models that are shaped exactly for the views that consume them. No impedance mismatch, no complex joins, no compromises.
// The Query side: Read models shaped for specific views
interface OrderSummaryReadModel {
orderId: string;
customerName: string;
orderDate: Date;
status: string;
itemCount: number;
totalAmount: number;
}
interface OrderDetailReadModel {
orderId: string;
customerName: string;
customerEmail: string;
orderDate: Date;
status: string;
items: Array<{
productName: string;
quantity: number;
unitPrice: number;
lineTotal: number;
}>;
subtotal: number;
tax: number;
shippingCost: number;
totalAmount: number;
shippingAddress: Address;
statusHistory: Array<{
status: string;
changedAt: Date;
changedBy: string;
}>;
}
// Query interface
interface OrderQueries {
getOrderSummaries(
customerId: string,
options?: PaginationOptions
): Promise<PaginatedResult<OrderSummaryReadModel>>;
getOrderDetail(orderId: string): Promise<OrderDetailReadModel | null>;
getOrdersByStatus(
status: string,
options?: PaginationOptions
): Promise<PaginatedResult<OrderSummaryReadModel>>;
getRevenueByPeriod(
startDate: Date,
endDate: Date
): Promise<RevenueSummary>;
}
interface PaginationOptions {
page: number;
pageSize: number;
sortBy?: string;
sortDirection?: "asc" | "desc";
}
interface PaginatedResult<T> {
items: T[];
totalCount: number;
page: number;
pageSize: number;
totalPages: number;
}
interface RevenueSummary {
totalRevenue: number;
orderCount: number;
averageOrderValue: number;
periodStart: Date;
periodEnd: Date;
}Implementing the Command Side
The command side is responsible for accepting commands, validating them, executing business logic, and producing events. A command handler receives a command, loads the relevant aggregate, invokes the appropriate method, and persists the resulting events.
// Command handler infrastructure
type CommandHandler<C extends Command> = (command: C) => Promise<CommandResult>;
interface CommandResult {
success: boolean;
aggregateId?: string;
error?: string;
events?: string[];
}
class CommandBus {
private handlers = new Map<string, CommandHandler<any>>();
register<C extends Command>(
commandType: string,
handler: CommandHandler<C>
): void {
if (this.handlers.has(commandType)) {
throw new Error(
`Handler already registered for command type: ${commandType}`
);
}
this.handlers.set(commandType, handler);
}
async dispatch<C extends Command>(command: C): Promise<CommandResult> {
const handler = this.handlers.get(command.type);
if (!handler) {
return {
success: false,
error: `No handler registered for command type: ${command.type}`,
};
}
try {
return await handler(command);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
return { success: false, error: message };
}
}
}
// Command handler with validation middleware
class CreateOrderHandler {
constructor(
private orderRepository: AggregateRepository<Order>,
private inventoryService: InventoryService
) {}
async handle(command: CreateOrderCommand): Promise<CommandResult> {
// Validate command
const validation = this.validate(command);
if (!validation.valid) {
return { success: false, error: validation.error };
}
// Check inventory availability
for (const item of command.payload.items) {
const available = await this.inventoryService.checkAvailability(
item.productId,
item.quantity
);
if (!available) {
return {
success: false,
error: `Insufficient inventory for product ${item.productId}`,
};
}
}
// Create the aggregate and execute the command
const order = Order.create(
crypto.randomUUID(),
command.payload.customerId,
command.payload.items,
command.payload.shippingAddress
);
await this.orderRepository.save(order);
return {
success: true,
aggregateId: order.id,
events: order.getUncommittedEvents().map(e => e.eventType),
};
}
private validate(
command: CreateOrderCommand
): { valid: true } | { valid: false; error: string } {
if (!command.payload.customerId) {
return { valid: false, error: "Customer ID is required." };
}
if (!command.payload.items || command.payload.items.length === 0) {
return { valid: false, error: "At least one item is required." };
}
for (const item of command.payload.items) {
if (item.quantity <= 0) {
return { valid: false, error: "Item quantity must be positive." };
}
if (item.unitPrice <= 0) {
return { valid: false, error: "Unit price must be positive." };
}
}
return { valid: true };
}
}The command bus pattern provides a clean way to route commands to their handlers. It also creates a natural extension point for cross-cutting concerns like logging, authorization, and validation middleware.
// Middleware pattern for command processing
type CommandMiddleware = (
command: Command,
next: () => Promise<CommandResult>
) => Promise<CommandResult>;
class CommandPipeline {
private middlewares: CommandMiddleware[] = [];
use(middleware: CommandMiddleware): void {
this.middlewares.push(middleware);
}
async execute(
command: Command,
handler: CommandHandler<any>
): Promise<CommandResult> {
let index = 0;
const next = async (): Promise<CommandResult> => {
if (index < this.middlewares.length) {
const middleware = this.middlewares[index++];
return middleware(command, next);
}
return handler(command);
};
return next();
}
}
// Logging middleware
const loggingMiddleware: CommandMiddleware = async (command, next) => {
const startTime = Date.now();
console.log(`Processing command: ${command.type}`, {
commandId: command.metadata.commandId,
userId: command.metadata.userId,
});
const result = await next();
console.log(`Command ${command.type} completed in ${Date.now() - startTime}ms`, {
success: result.success,
error: result.error,
});
return result;
};
// Authorization middleware
const authorizationMiddleware = (
authService: AuthorizationService
): CommandMiddleware => {
return async (command, next) => {
const authorized = await authService.canExecute(
command.metadata.userId,
command.type
);
if (!authorized) {
return {
success: false,
error: `User ${command.metadata.userId} is not authorized ` +
`to execute ${command.type}`,
};
}
return next();
};
};Building and Synchronizing Read Models
Read models are built by subscribing to the event stream and projecting events into denormalized data structures. The key challenge is keeping read models synchronized with the write side while handling the inherent eventual consistency.
// Read model updater infrastructure
interface ReadModelUpdater {
readonly name: string;
handleEvent(event: StoredEvent): Promise<void>;
}
class OrderSummaryUpdater implements ReadModelUpdater {
readonly name = "OrderSummaryUpdater";
constructor(private readonly db: DatabasePool) {}
async handleEvent(event: StoredEvent): Promise<void> {
switch (event.eventType) {
case "OrderCreated":
await this.onOrderCreated(event);
break;
case "OrderItemAdded":
await this.onItemAdded(event);
break;
case "OrderStatusChanged":
await this.onStatusChanged(event);
break;
case "OrderCancelled":
await this.onOrderCancelled(event);
break;
}
}
private async onOrderCreated(event: StoredEvent): Promise<void> {
await this.db.query(
`INSERT INTO order_summaries
(order_id, customer_id, customer_name, order_date,
status, item_count, total_amount)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (order_id) DO NOTHING`,
[
event.aggregateId,
event.payload.customerId,
event.payload.customerName,
event.timestamp,
"pending",
0,
0,
]
);
}
private async onItemAdded(event: StoredEvent): Promise<void> {
const lineTotal =
(event.payload.quantity as number) *
(event.payload.unitPrice as number);
await this.db.query(
`UPDATE order_summaries
SET item_count = item_count + 1,
total_amount = total_amount + $1,
updated_at = NOW()
WHERE order_id = $2`,
[lineTotal, event.aggregateId]
);
}
private async onStatusChanged(event: StoredEvent): Promise<void> {
await this.db.query(
`UPDATE order_summaries
SET status = $1, updated_at = NOW()
WHERE order_id = $2`,
[event.payload.newStatus, event.aggregateId]
);
}
private async onOrderCancelled(event: StoredEvent): Promise<void> {
await this.db.query(
`UPDATE order_summaries
SET status = 'cancelled', updated_at = NOW()
WHERE order_id = $1`,
[event.aggregateId]
);
}
}For high-throughput systems, building read models synchronously for each event can become a bottleneck. Batch processing and eventual consistency are your friends here.
// Batch read model updater for high throughput
class BatchReadModelUpdater {
private eventBuffer: StoredEvent[] = [];
private flushTimer: ReturnType<typeof setTimeout> | null = null;
constructor(
private updater: ReadModelUpdater,
private readonly batchSize: number = 100,
private readonly flushIntervalMs: number = 500
) {}
async enqueue(event: StoredEvent): Promise<void> {
this.eventBuffer.push(event);
if (this.eventBuffer.length >= this.batchSize) {
await this.flush();
} else if (!this.flushTimer) {
this.flushTimer = setTimeout(() => this.flush(), this.flushIntervalMs);
}
}
async flush(): Promise<void> {
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
}
if (this.eventBuffer.length === 0) return;
const batch = this.eventBuffer.splice(0, this.batchSize);
for (const event of batch) {
try {
await this.updater.handleEvent(event);
} catch (error) {
console.error(
`Failed to process event ${event.eventId} in ` +
`${this.updater.name}:`,
error
);
}
}
}
}Handling Eventual Consistency in the UI
One of the most practical challenges with CQRS is managing eventual consistency in user-facing applications. A user submits a command, and if they immediately query the read model, the change may not be reflected yet. Several strategies address this problem.
// Strategy 1: Return the command result with enough data to
// optimistically update the UI
interface EnrichedCommandResult extends CommandResult {
updatedFields?: Record<string, unknown>;
expectedReadModelDelay?: number;
}
// Strategy 2: Poll until the read model catches up
class ReadModelPoller<T> {
constructor(
private readonly query: () => Promise<T>,
private readonly condition: (result: T) => boolean,
private readonly options: {
maxAttempts?: number;
intervalMs?: number;
timeoutMs?: number;
} = {}
) {}
async waitForConsistency(): Promise<T | null> {
const maxAttempts = this.options.maxAttempts ?? 10;
const intervalMs = this.options.intervalMs ?? 200;
const timeoutMs = this.options.timeoutMs ?? 5000;
const startTime = Date.now();
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if (Date.now() - startTime > timeoutMs) {
return null;
}
const result = await this.query();
if (this.condition(result)) {
return result;
}
await new Promise(resolve => setTimeout(resolve, intervalMs));
}
return null;
}
}
// Strategy 3: Version-aware queries
interface VersionedQuery {
minimumVersion?: number;
}
class VersionAwareQueryHandler {
constructor(private readonly db: DatabasePool) {}
async getOrder(
orderId: string,
options?: VersionedQuery
): Promise<OrderDetailReadModel | null> {
if (options?.minimumVersion) {
// Wait for the read model to reach the expected version
const poller = new ReadModelPoller(
() => this.fetchOrder(orderId),
(order) =>
order !== null && order.version >= options.minimumVersion!,
{ maxAttempts: 20, intervalMs: 100 }
);
return poller.waitForConsistency();
}
return this.fetchOrder(orderId);
}
private async fetchOrder(
orderId: string
): Promise<(OrderDetailReadModel & { version: number }) | null> {
const result = await this.db.query(
`SELECT * FROM order_details WHERE order_id = $1`,
[orderId]
);
return result.rows[0] ?? null;
}
}Scaling Read and Write Sides Independently
One of the greatest benefits of CQRS is the ability to scale the read and write sides independently. Write operations typically require strong consistency and are bound by the performance of your event store. Read operations can be scaled horizontally by adding more read replicas, caching layers, or even entirely different storage technologies.
// Read model backed by different storage for different query patterns
class MultiStoreQueryService implements OrderQueries {
constructor(
private postgresDb: DatabasePool, // For complex queries
private redisClient: RedisClient, // For fast lookups
private elasticClient: ElasticClient // For full-text search
) {}
async getOrderSummaries(
customerId: string,
options?: PaginationOptions
): Promise<PaginatedResult<OrderSummaryReadModel>> {
// Use PostgreSQL for paginated, sortable queries
const offset = ((options?.page ?? 1) - 1) * (options?.pageSize ?? 20);
const limit = options?.pageSize ?? 20;
const [countResult, dataResult] = await Promise.all([
this.postgresDb.query(
`SELECT COUNT(*) FROM order_summaries WHERE customer_id = $1`,
[customerId]
),
this.postgresDb.query(
`SELECT * FROM order_summaries
WHERE customer_id = $1
ORDER BY order_date DESC
OFFSET $2 LIMIT $3`,
[customerId, offset, limit]
),
]);
return {
items: dataResult.rows,
totalCount: parseInt(countResult.rows[0].count),
page: options?.page ?? 1,
pageSize: limit,
totalPages: Math.ceil(parseInt(countResult.rows[0].count) / limit),
};
}
async getOrderDetail(
orderId: string
): Promise<OrderDetailReadModel | null> {
// Try Redis cache first for fast single-entity lookups
const cached = await this.redisClient.get(`order:${orderId}`);
if (cached) {
return JSON.parse(cached);
}
// Fall back to PostgreSQL
const result = await this.postgresDb.query(
`SELECT * FROM order_details WHERE order_id = $1`,
[orderId]
);
if (result.rows[0]) {
// Cache for future lookups
await this.redisClient.setex(
`order:${orderId}`,
300,
JSON.stringify(result.rows[0])
);
}
return result.rows[0] ?? null;
}
async getOrdersByStatus(
status: string,
options?: PaginationOptions
): Promise<PaginatedResult<OrderSummaryReadModel>> {
// Use PostgreSQL with appropriate indexes
return this.getFilteredOrders({ status }, options);
}
async getRevenueByPeriod(
startDate: Date,
endDate: Date
): Promise<RevenueSummary> {
const result = await this.postgresDb.query(
`SELECT
SUM(total_amount) as total_revenue,
COUNT(*) as order_count,
AVG(total_amount) as avg_order_value
FROM order_summaries
WHERE order_date BETWEEN $1 AND $2
AND status != 'cancelled'`,
[startDate, endDate]
);
return {
totalRevenue: parseFloat(result.rows[0].total_revenue ?? "0"),
orderCount: parseInt(result.rows[0].order_count ?? "0"),
averageOrderValue: parseFloat(result.rows[0].avg_order_value ?? "0"),
periodStart: startDate,
periodEnd: endDate,
};
}
private async getFilteredOrders(
filters: Record<string, string>,
options?: PaginationOptions
): Promise<PaginatedResult<OrderSummaryReadModel>> {
// Implementation for filtered, paginated queries
const conditions = Object.entries(filters)
.map(([key], i) => `${key} = $${i + 1}`)
.join(" AND ");
const values = Object.values(filters);
const offset = ((options?.page ?? 1) - 1) * (options?.pageSize ?? 20);
const result = await this.postgresDb.query(
`SELECT * FROM order_summaries
WHERE ${conditions}
ORDER BY order_date DESC
OFFSET $${values.length + 1} LIMIT $${values.length + 2}`,
[...values, offset, options?.pageSize ?? 20]
);
return {
items: result.rows,
totalCount: result.rowCount ?? 0,
page: options?.page ?? 1,
pageSize: options?.pageSize ?? 20,
totalPages: Math.ceil((result.rowCount ?? 0) / (options?.pageSize ?? 20)),
};
}
}Practical Tips
Start with a simple CQRS setup. You do not need separate databases for reads and writes on day one. Begin by separating the interfaces and models in code. Physical separation can come later when the need arises.
Make read model rebuilds a first-class operation. Because read models are derived from events, you should be able to rebuild them from scratch at any time. Automate this process and test it regularly.
Use correlation IDs to bridge the consistency gap. When a command produces events that will update a read model, return the correlation ID to the client so it can query for the specific version.
Be deliberate about your consistency boundaries. Not every query needs real-time data. Reports, dashboards, and analytics can tolerate minutes or even hours of staleness. Reserve strong consistency for operations that truly need it.
Conclusion
CQRS provides a powerful framework for building applications where read and write workloads have different requirements. By separating these concerns, you gain the freedom to optimize each side independently, scale them according to their specific demands, and evolve them at their own pace.
The patterns we have explored --- command buses with middleware, event-driven read model updates, consistency strategies, and multi-store query services --- form a practical toolkit for implementing CQRS in TypeScript. Combined with event sourcing and the real-time stream processing capabilities of a platform like Starburst, CQRS enables you to build systems that are both highly responsive and deeply auditable.
Related Articles
Monitoring Event-Driven Systems at Scale
A practical guide to building comprehensive monitoring and observability for event-driven systems, covering metrics, distributed tracing, alerting strategies, and operational dashboards for maintaining healthy event processing pipelines.
Migrating to Event-Driven Architecture
A practical guide for planning and executing a migration from traditional request-response systems to event-driven architecture, covering assessment frameworks, migration strategies, risk management, and organizational change.
Real-Time Data Processing: Business Impact and ROI
An exploration of the business value of real-time data processing, covering measurable ROI, competitive advantages, and practical frameworks for justifying investment in event-driven infrastructure.