Streaming Patterns with NATS JetStream
An exploration of advanced streaming patterns using NATS JetStream, including event sourcing, CQRS, windowed aggregations, and stream processing pipelines with practical TypeScript implementations.
JetStream turns NATS from a real-time message router into a stream processing platform. But having a persistent log is only the beginning. The real value emerges when you apply streaming patterns that transform raw event data into actionable state: event sourcing to reconstruct entities, CQRS to separate reads from writes, windowed aggregations to produce analytics, and stream processing pipelines to compose transformations. Each pattern solves a specific class of problem, and JetStream's architecture makes them straightforward to implement in TypeScript.
This article presents five streaming patterns that we use in production with the node-nats client library. Each pattern includes a complete TypeScript implementation, an explanation of when to use it, and the trade-offs involved.
Event Sourcing with JetStream
Event sourcing stores every state change as an immutable event rather than overwriting the current state. The event log becomes the source of truth, and the current state is a derivation of that log. JetStream streams are a natural fit for event stores because they are append-only, ordered, and persistent.
import { connect, JSONCodec, AckPolicy, DeliverPolicy } from "nats";
const jc = JSONCodec();
// Define domain events
type AccountEvent =
| { type: "AccountOpened"; accountId: string; ownerId: string; currency: string; timestamp: number }
| { type: "FundsDeposited"; accountId: string; amount: number; reference: string; timestamp: number }
| { type: "FundsWithdrawn"; accountId: string; amount: number; reference: string; timestamp: number }
| { type: "AccountFrozen"; accountId: string; reason: string; timestamp: number }
| { type: "AccountClosed"; accountId: string; timestamp: number };
// Current state derived from events
interface AccountState {
accountId: string;
ownerId: string;
currency: string;
balance: number;
status: "active" | "frozen" | "closed";
version: number;
}
// Reducer: apply an event to produce new state
function applyEvent(state: AccountState | null, event: AccountEvent): AccountState {
switch (event.type) {
case "AccountOpened":
return {
accountId: event.accountId,
ownerId: event.ownerId,
currency: event.currency,
balance: 0,
status: "active",
version: 1,
};
case "FundsDeposited":
if (!state) throw new Error("Cannot deposit to non-existent account");
return { ...state, balance: state.balance + event.amount, version: state.version + 1 };
case "FundsWithdrawn":
if (!state) throw new Error("Cannot withdraw from non-existent account");
if (state.balance < event.amount) throw new Error("Insufficient funds");
return { ...state, balance: state.balance - event.amount, version: state.version + 1 };
case "AccountFrozen":
if (!state) throw new Error("Cannot freeze non-existent account");
return { ...state, status: "frozen", version: state.version + 1 };
case "AccountClosed":
if (!state) throw new Error("Cannot close non-existent account");
return { ...state, status: "closed", version: state.version + 1 };
}
}
// Publish events to JetStream
async function appendEvent(
js: JetStreamClient,
accountId: string,
event: AccountEvent
): Promise<number> {
const pa = await js.publish(
`events.accounts.${accountId}`,
jc.encode(event),
{ msgID: `${accountId}-${Date.now()}-${event.type}` }
);
return pa.seq;
}
// Rebuild state from event history
async function loadAccount(
js: JetStreamClient,
accountId: string
): Promise<AccountState | null> {
const consumer = await js.consumers.get("ACCOUNTS", `loader-${accountId}`);
let state: AccountState | null = null;
// Fetch all messages for this account
const messages = consumer.fetch({ max_messages: 1000 });
for await (const msg of messages) {
const event = jc.decode(msg.data) as AccountEvent;
state = applyEvent(state, event);
msg.ack();
}
return state;
}The stream for this event store should use a subject pattern like events.accounts.> so each account's events are on a unique subject. Consumers can filter by specific account IDs or process all account events:
async function setupEventStore() {
const nc = await connect({ servers: "nats://localhost:4222" });
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name: "ACCOUNTS",
subjects: ["events.accounts.>"],
retention: RetentionPolicy.Limits,
storage: StorageType.File,
max_age: 0, // Never expire -- event store is permanent
num_replicas: 3,
});
}The key trade-off with event sourcing on JetStream is replay performance. Rebuilding an account with thousands of events takes time. For frequently accessed entities, you should maintain a snapshot cache that stores the latest state alongside the event version number, rebuilding from the snapshot rather than from the beginning of the stream.
CQRS: Separating Reads and Writes
Command Query Responsibility Segregation (CQRS) pairs naturally with event sourcing. Write operations (commands) append events to the JetStream event store. Read operations (queries) hit a separate read model that is built and maintained by consuming the event stream. This separation lets you optimize the read model for specific query patterns without affecting the write path.
// Write side: command handler
async function handleDepositCommand(
js: JetStreamClient,
command: { accountId: string; amount: number; reference: string }
): Promise<{ success: boolean; newBalance: number }> {
// Load current state to validate
const account = await loadAccount(js, command.accountId);
if (!account) throw new Error("Account not found");
if (account.status !== "active") throw new Error("Account is not active");
// Append event
const event: AccountEvent = {
type: "FundsDeposited",
accountId: command.accountId,
amount: command.amount,
reference: command.reference,
timestamp: Date.now(),
};
await appendEvent(js, command.accountId, event);
return { success: true, newBalance: account.balance + command.amount };
}
// Read side: projection that builds a queryable read model
async function startAccountProjection(nc: NatsConnection) {
const js = nc.jetstream();
const consumer = await js.consumers.get("ACCOUNTS", "account-projection");
const messages = await consumer.consume();
for await (const msg of messages) {
const event = jc.decode(msg.data) as AccountEvent;
switch (event.type) {
case "AccountOpened":
await db.query(
`INSERT INTO accounts_read (account_id, owner_id, currency, balance, status, updated_at)
VALUES ($1, $2, $3, 0, 'active', NOW())`,
[event.accountId, event.ownerId, event.currency]
);
break;
case "FundsDeposited":
await db.query(
`UPDATE accounts_read SET balance = balance + $1, updated_at = NOW()
WHERE account_id = $2`,
[event.amount, event.accountId]
);
break;
case "FundsWithdrawn":
await db.query(
`UPDATE accounts_read SET balance = balance - $1, updated_at = NOW()
WHERE account_id = $2`,
[event.amount, event.accountId]
);
break;
case "AccountFrozen":
await db.query(
`UPDATE accounts_read SET status = 'frozen', updated_at = NOW()
WHERE account_id = $1`,
[event.accountId]
);
break;
case "AccountClosed":
await db.query(
`UPDATE accounts_read SET status = 'closed', updated_at = NOW()
WHERE account_id = $1`,
[event.accountId]
);
break;
}
msg.ack();
}
}
// Read side: query the optimized read model
async function getAccountBalance(accountId: string): Promise<number> {
const result = await db.query(
"SELECT balance FROM accounts_read WHERE account_id = $1",
[accountId]
);
return result.rows[0]?.balance ?? 0;
}
// Build specialized read models for different query patterns
async function startAccountSummaryProjection(nc: NatsConnection) {
const js = nc.jetstream();
const consumer = await js.consumers.get("ACCOUNTS", "summary-projection");
const messages = await consumer.consume();
for await (const msg of messages) {
const event = jc.decode(msg.data) as AccountEvent;
if (event.type === "FundsDeposited" || event.type === "FundsWithdrawn") {
// Update daily transaction summary
await db.query(
`INSERT INTO daily_summaries (account_id, date, transaction_count, volume)
VALUES ($1, CURRENT_DATE, 1, $2)
ON CONFLICT (account_id, date) DO UPDATE
SET transaction_count = daily_summaries.transaction_count + 1,
volume = daily_summaries.volume + $2`,
[event.accountId, event.amount]
);
}
msg.ack();
}
}Each projection runs as an independent JetStream consumer with its own durable subscription. If you need to rebuild a read model (for example, after adding a new index or fixing a projection bug), you create a new consumer that starts from the beginning of the stream. The old read model continues serving queries until the new one catches up.
Windowed Aggregations
Streaming data often needs to be aggregated over time windows: transactions per minute, average response time over the last five minutes, peak throughput per hour. JetStream provides the raw event stream, and your consumer maintains the windowed state:
interface TimeWindow {
start: number;
end: number;
count: number;
sum: number;
min: number;
max: number;
}
class WindowedAggregator {
private windows: Map<string, TimeWindow> = new Map();
constructor(
private windowSizeMs: number,
private onWindowClose: (key: string, window: TimeWindow) => Promise<void>
) {
// Periodically flush closed windows
setInterval(() => this.flushClosedWindows(), this.windowSizeMs / 2);
}
add(key: string, value: number, timestamp: number): void {
const windowStart =
Math.floor(timestamp / this.windowSizeMs) * this.windowSizeMs;
const windowKey = `${key}:${windowStart}`;
const existing = this.windows.get(windowKey);
if (existing) {
existing.count++;
existing.sum += value;
existing.min = Math.min(existing.min, value);
existing.max = Math.max(existing.max, value);
} else {
this.windows.set(windowKey, {
start: windowStart,
end: windowStart + this.windowSizeMs,
count: 1,
sum: value,
min: value,
max: value,
});
}
}
private async flushClosedWindows(): Promise<void> {
const now = Date.now();
for (const [windowKey, window] of this.windows) {
if (window.end <= now) {
const key = windowKey.split(":")[0];
await this.onWindowClose(key, window);
this.windows.delete(windowKey);
}
}
}
}
// Usage: aggregate transaction volumes per minute
async function startTransactionAggregator(nc: NatsConnection) {
const js = nc.jetstream();
const aggregator = new WindowedAggregator(
60_000, // 1-minute windows
async (accountId, window) => {
// Publish aggregated results
nc.publish(
`analytics.transactions.minute`,
jc.encode({
accountId,
windowStart: new Date(window.start).toISOString(),
windowEnd: new Date(window.end).toISOString(),
transactionCount: window.count,
totalVolume: window.sum,
averageAmount: window.sum / window.count,
minAmount: window.min,
maxAmount: window.max,
})
);
}
);
const consumer = await js.consumers.get("ACCOUNTS", "transaction-aggregator");
const messages = await consumer.consume();
for await (const msg of messages) {
const event = jc.decode(msg.data) as AccountEvent;
if (event.type === "FundsDeposited" || event.type === "FundsWithdrawn") {
aggregator.add(event.accountId, event.amount, event.timestamp);
}
msg.ack();
}
}The aggregated results are published to a separate NATS subject, which can feed dashboards, alerting systems, or another JetStream stream for historical storage.
Stream Processing Pipelines
Complex data transformations can be composed as a pipeline of JetStream consumers, where each stage reads from one stream and writes to another:
// Stage 1: Enrich raw transaction events with user data
async function startEnrichmentStage(nc: NatsConnection) {
const js = nc.jetstream();
const client = new ServiceClient(nc);
const consumer = await js.consumers.get("RAW_TRANSACTIONS", "enrichment");
const messages = await consumer.consume();
for await (const msg of messages) {
const txn = jc.decode(msg.data) as {
transactionId: string;
accountId: string;
amount: number;
type: string;
};
try {
// Enrich with account details
const account = await client.call<
{ accountId: string },
{ ownerId: string; currency: string; tier: string }
>("services.accounts.get", { accountId: txn.accountId });
// Write enriched event to next stream
await js.publish(
"pipeline.transactions.enriched",
jc.encode({
...txn,
ownerId: account.ownerId,
currency: account.currency,
accountTier: account.tier,
enrichedAt: Date.now(),
})
);
msg.ack();
} catch (error) {
console.error(`Enrichment failed for ${txn.transactionId}:`, error);
msg.nak(); // Retry
}
}
}
// Stage 2: Classify transactions for risk scoring
async function startClassificationStage(nc: NatsConnection) {
const js = nc.jetstream();
const consumer = await js.consumers.get("ENRICHED_TRANSACTIONS", "classifier");
const messages = await consumer.consume();
for await (const msg of messages) {
const txn = jc.decode(msg.data) as {
transactionId: string;
amount: number;
accountTier: string;
type: string;
};
const riskLevel = classifyRisk(txn);
const subject =
riskLevel === "high"
? "pipeline.transactions.high_risk"
: "pipeline.transactions.normal";
await js.publish(
subject,
jc.encode({ ...txn, riskLevel, classifiedAt: Date.now() })
);
msg.ack();
}
}
function classifyRisk(txn: {
amount: number;
accountTier: string;
type: string;
}): "high" | "medium" | "low" {
if (txn.amount > 10_000) return "high";
if (txn.type === "international" && txn.amount > 5_000) return "high";
if (txn.amount > 1_000) return "medium";
return "low";
}
// Stage 3: Alert on high-risk transactions
async function startAlertStage(nc: NatsConnection) {
const js = nc.jetstream();
const consumer = await js.consumers.get("CLASSIFIED_TRANSACTIONS", "alerter");
const messages = await consumer.consume();
for await (const msg of messages) {
const txn = jc.decode(msg.data) as {
transactionId: string;
amount: number;
ownerId: string;
riskLevel: string;
};
if (txn.riskLevel === "high") {
await sendAlert({
type: "high_risk_transaction",
transactionId: txn.transactionId,
amount: txn.amount,
ownerId: txn.ownerId,
});
}
msg.ack();
}
}Each pipeline stage is independently deployable and scalable. If the enrichment stage falls behind, you add more instances with the same queue group. If you need to add a new classification rule, you deploy a new version of the classification stage without touching the others.
Stream Mirroring and Sourcing
JetStream supports stream mirroring and sourcing, which allow you to compose streams from other streams at the server level without writing consumer code:
async function setupStreamTopology(nc: NatsConnection) {
const jsm = await nc.jetstreamManager();
// Source stream: captures raw events
await jsm.streams.add({
name: "RAW_EVENTS",
subjects: ["events.>"],
storage: StorageType.File,
});
// Aggregate stream: sources from multiple upstream streams
await jsm.streams.add({
name: "ALL_FINANCIAL_EVENTS",
sources: [
{ name: "ACCOUNTS" },
{ name: "PAYMENTS" },
{ name: "TRANSFERS" },
],
storage: StorageType.File,
max_age: 90 * 24 * 60 * 60 * 1_000_000_000, // 90 days
});
// Mirror stream: exact copy of another stream (useful for cross-region replication)
await jsm.streams.add({
name: "ACCOUNTS_MIRROR",
mirror: { name: "ACCOUNTS" },
storage: StorageType.File,
});
}Stream sourcing pulls messages from multiple source streams into a single destination stream. This is invaluable for creating aggregate views without application-level code. Stream mirroring creates an exact replica of a stream, which is useful for disaster recovery, cross-region replication, or providing a read-only copy for analytics workloads.
Practical Tips for Streaming Patterns
Design your streams around event lifetimes, not around organizational boundaries. A stream of financial events that should be retained for seven years has different configuration than a stream of telemetry events that can be discarded after a week. Mixing retention requirements in a single stream forces you to use the most conservative setting.
Use consumer filter_subject extensively. A single stream can capture events for many entity types, and filtered consumers let each processing pipeline read only the events it cares about. This is more efficient than creating separate streams for each event type.
Monitor consumer lag as your primary health metric. If a consumer's pending message count grows over time, it is falling behind the stream. Either add more consumer instances or investigate why processing is slow. JetStream exposes consumer info including pending counts, acknowledgment floor, and redelivery counts through the management API.
Test replay scenarios regularly. Delete a consumer's durable state and let it rebuild from the beginning of the stream. Verify that your projections converge to the correct state and that the rebuild completes in an acceptable time frame.
Conclusion
JetStream's persistent, ordered, replicated streams provide the foundation for sophisticated streaming patterns. Event sourcing captures the complete history of state changes. CQRS builds optimized read models from event streams. Windowed aggregations transform real-time events into periodic summaries. Stream processing pipelines compose multi-stage transformations with independent scaling. The node-nats client library makes all of these patterns accessible through TypeScript's type system and async iteration, and JetStream's server-level features like mirroring and sourcing reduce the amount of application code needed to build complex streaming topologies.
Related Articles
Operating NATS in Production: Monitoring and Scaling
A practical operations guide for running NATS in production environments, covering monitoring strategies, capacity planning, scaling patterns, upgrade procedures, and incident response for engineering and platform teams.
Messaging Architecture for Fintech Systems
A strategic guide to designing messaging architectures for financial technology systems, covering regulatory requirements, data consistency patterns, auditability, and the role of NATS in building compliant, resilient fintech infrastructure.
Securing NATS: Authentication and Authorization
A comprehensive guide to securing NATS deployments with authentication mechanisms, fine-grained authorization, TLS encryption, and account-based multi-tenancy, with practical TypeScript client configuration examples.