Real-Time Stream Processing with TypeScript
A hands-on guide to building real-time stream processing pipelines in TypeScript, covering windowing, aggregation, joins, and back-pressure management for high-throughput event-driven systems.
Stream processing is the practice of continuously ingesting, transforming, and analyzing data as it arrives, rather than waiting for it to accumulate in a batch. In a world where business decisions depend on up-to-the-second information, the ability to process data in motion is a competitive advantage. From fraud detection and IoT monitoring to live recommendation engines, stream processing powers the real-time experiences that modern users expect.
TypeScript, with its strong type system and asynchronous capabilities, is an excellent choice for building stream processing pipelines. This article walks through the core concepts and patterns, with practical implementations you can adapt for your own real-time data workflows using Starburst.
Stream Abstractions in TypeScript
At its core, a stream is an unbounded sequence of data elements. Unlike arrays or lists, streams have no defined end --- new elements can arrive at any time. TypeScript's AsyncIterable and AsyncGenerator interfaces provide a natural foundation for working with streams.
// Core stream types
type StreamProcessor<TInput, TOutput> = (
source: AsyncIterable<TInput>
) => AsyncIterable<TOutput>;
type StreamSource<T> = () => AsyncIterable<T>;
type StreamSink<T> = (source: AsyncIterable<T>) => Promise<void>;
// A composable stream pipeline
class Pipeline<TInput, TOutput> {
private constructor(
private readonly transform: (
source: AsyncIterable<TInput>
) => AsyncIterable<TOutput>
) {}
static from<T>(source: AsyncIterable<T>): Pipeline<T, T> {
return new Pipeline((input) => input) as unknown as Pipeline<T, T>;
}
pipe<TNext>(
processor: StreamProcessor<TOutput, TNext>
): Pipeline<TInput, TNext> {
const currentTransform = this.transform;
return new Pipeline((source) =>
processor(currentTransform(source))
);
}
async run(
source: AsyncIterable<TInput>,
sink: StreamSink<TOutput>
): Promise<void> {
await sink(this.transform(source));
}
}
// Basic stream operators
function map<T, U>(
fn: (item: T) => U | Promise<U>
): StreamProcessor<T, U> {
return async function* (source) {
for await (const item of source) {
yield await fn(item);
}
};
}
function filter<T>(
predicate: (item: T) => boolean | Promise<boolean>
): StreamProcessor<T, T> {
return async function* (source) {
for await (const item of source) {
if (await predicate(item)) {
yield item;
}
}
};
}
function flatMap<T, U>(
fn: (item: T) => AsyncIterable<U> | Iterable<U>
): StreamProcessor<T, U> {
return async function* (source) {
for await (const item of source) {
yield* fn(item) as AsyncIterable<U>;
}
};
}
function take<T>(count: number): StreamProcessor<T, T> {
return async function* (source) {
let taken = 0;
for await (const item of source) {
if (taken >= count) break;
yield item;
taken++;
}
};
}These building blocks let you compose processing pipelines in a declarative, type-safe way. Each operator is a pure function that transforms one stream into another, making pipelines easy to test, reason about, and extend.
Windowing: Making Sense of Unbounded Data
Raw streams are unbounded, but most analyses require finite boundaries. Windowing divides a stream into manageable chunks based on time or count, allowing you to compute aggregates over meaningful intervals.
The three most common windowing strategies are tumbling windows (fixed-size, non-overlapping), sliding windows (fixed-size, overlapping), and session windows (variable-size, based on activity gaps).
interface WindowedItem<T> {
item: T;
windowStart: Date;
windowEnd: Date;
}
// Tumbling window: Fixed-size, non-overlapping
function tumblingWindow<T>(
durationMs: number,
timestampExtractor: (item: T) => Date
): StreamProcessor<T, T[]> {
return async function* (source) {
let windowStart: number | null = null;
let currentWindow: T[] = [];
for await (const item of source) {
const timestamp = timestampExtractor(item).getTime();
if (windowStart === null) {
windowStart = timestamp - (timestamp % durationMs);
}
const windowEnd = windowStart + durationMs;
if (timestamp >= windowEnd) {
// Emit the completed window
if (currentWindow.length > 0) {
yield currentWindow;
}
// Start a new window
windowStart = timestamp - (timestamp % durationMs);
currentWindow = [item];
} else {
currentWindow.push(item);
}
}
// Emit any remaining items
if (currentWindow.length > 0) {
yield currentWindow;
}
};
}
// Sliding window: Fixed-size, overlapping
function slidingWindow<T>(
windowSizeMs: number,
slideDurationMs: number,
timestampExtractor: (item: T) => Date
): StreamProcessor<T, T[]> {
return async function* (source) {
const buffer: Array<{ item: T; timestamp: number }> = [];
let nextEmitTime: number | null = null;
for await (const item of source) {
const timestamp = timestampExtractor(item).getTime();
buffer.push({ item, timestamp });
if (nextEmitTime === null) {
nextEmitTime = timestamp + slideDurationMs;
}
// Clean expired items from buffer
const cutoff = timestamp - windowSizeMs;
while (buffer.length > 0 && buffer[0].timestamp < cutoff) {
buffer.shift();
}
if (timestamp >= nextEmitTime) {
yield buffer.map(entry => entry.item);
nextEmitTime = timestamp + slideDurationMs;
}
}
if (buffer.length > 0) {
yield buffer.map(entry => entry.item);
}
};
}
// Session window: Variable-size based on activity gap
function sessionWindow<T>(
gapMs: number,
timestampExtractor: (item: T) => Date
): StreamProcessor<T, T[]> {
return async function* (source) {
let session: T[] = [];
let lastTimestamp: number | null = null;
for await (const item of source) {
const timestamp = timestampExtractor(item).getTime();
if (lastTimestamp !== null && timestamp - lastTimestamp > gapMs) {
// Gap exceeded, emit the session
yield session;
session = [];
}
session.push(item);
lastTimestamp = timestamp;
}
if (session.length > 0) {
yield session;
}
};
}Windowing is fundamental to answering questions like "how many orders were placed in the last five minutes" or "what is the average sensor reading over the past hour." Without windowing, you are limited to per-event processing, which is too fine-grained for most analytical use cases.
Stream Aggregation and Stateful Processing
Many stream processing tasks require maintaining state across events. Computing running averages, counting unique visitors, or detecting patterns all need some form of stateful processing.
// Stateful stream aggregator
interface AggregateState<TState, TOutput> {
initialState(): TState;
update(state: TState, item: any): TState;
output(state: TState): TOutput;
}
function aggregate<T, TState, TOutput>(
aggregator: AggregateState<TState, TOutput>
): StreamProcessor<T[], TOutput> {
return async function* (source) {
for await (const window of source) {
let state = aggregator.initialState();
for (const item of window) {
state = aggregator.update(state, item);
}
yield aggregator.output(state);
}
};
}
// Example: Real-time metrics aggregation
interface MetricEvent {
metricName: string;
value: number;
timestamp: Date;
tags: Record<string, string>;
}
interface MetricSummary {
metricName: string;
count: number;
sum: number;
min: number;
max: number;
average: number;
p95: number;
windowStart: Date;
windowEnd: Date;
}
const metricAggregator: AggregateState<
{
values: number[];
metricName: string;
windowStart: Date;
windowEnd: Date;
},
MetricSummary
> = {
initialState() {
return {
values: [],
metricName: "",
windowStart: new Date(),
windowEnd: new Date(),
};
},
update(state, item: MetricEvent) {
state.values.push(item.value);
state.metricName = item.metricName;
if (
state.values.length === 1 ||
item.timestamp < state.windowStart
) {
state.windowStart = item.timestamp;
}
if (item.timestamp > state.windowEnd) {
state.windowEnd = item.timestamp;
}
return state;
},
output(state): MetricSummary {
const sorted = [...state.values].sort((a, b) => a - b);
const sum = sorted.reduce((acc, v) => acc + v, 0);
const p95Index = Math.ceil(sorted.length * 0.95) - 1;
return {
metricName: state.metricName,
count: sorted.length,
sum,
min: sorted[0] ?? 0,
max: sorted[sorted.length - 1] ?? 0,
average: sorted.length > 0 ? sum / sorted.length : 0,
p95: sorted[p95Index] ?? 0,
windowStart: state.windowStart,
windowEnd: state.windowEnd,
};
},
};For more complex stateful processing, you might need to maintain state across windows or across multiple streams. This is where a dedicated state store becomes important.
// Key-value state store for stateful processing
interface StateStore<K, V> {
get(key: K): Promise<V | undefined>;
put(key: K, value: V): Promise<void>;
delete(key: K): Promise<void>;
entries(): AsyncIterable<[K, V]>;
}
// Stateful processor with external state
function groupByKey<T>(
keyExtractor: (item: T) => string,
stateStore: StateStore<string, T[]>
): StreamProcessor<T, { key: string; items: T[] }> {
return async function* (source) {
for await (const item of source) {
const key = keyExtractor(item);
const existing = (await stateStore.get(key)) ?? [];
existing.push(item);
await stateStore.put(key, existing);
}
for await (const [key, items] of stateStore.entries()) {
yield { key, items };
}
};
}Stream Joins: Combining Multiple Sources
Real-world stream processing often requires combining data from multiple sources. Stream joins are significantly more complex than database joins because both sides are unbounded and events may arrive out of order.
// Temporal join: Match events from two streams within a time window
async function* temporalJoin<TLeft, TRight, TOutput>(
leftStream: AsyncIterable<TLeft>,
rightStream: AsyncIterable<TRight>,
joinCondition: (left: TLeft, right: TRight) => boolean,
combiner: (left: TLeft, right: TRight) => TOutput,
windowMs: number,
leftTimestamp: (item: TLeft) => number,
rightTimestamp: (item: TRight) => number
): AsyncIterable<TOutput> {
const leftBuffer: Array<{ item: TLeft; ts: number }> = [];
const rightBuffer: Array<{ item: TRight; ts: number }> = [];
// Read both streams concurrently
const leftIterator = leftStream[Symbol.asyncIterator]();
const rightIterator = rightStream[Symbol.asyncIterator]();
let leftDone = false;
let rightDone = false;
while (!leftDone || !rightDone) {
const promises: Promise<{
side: "left" | "right";
result: IteratorResult<any>;
}>[] = [];
if (!leftDone) {
promises.push(
leftIterator.next().then((result) => ({
side: "left" as const,
result,
}))
);
}
if (!rightDone) {
promises.push(
rightIterator.next().then((result) => ({
side: "right" as const,
result,
}))
);
}
const { side, result } = await Promise.race(promises);
if (result.done) {
if (side === "left") leftDone = true;
else rightDone = true;
continue;
}
const now = Date.now();
if (side === "left") {
const leftItem = result.value as TLeft;
const ts = leftTimestamp(leftItem);
leftBuffer.push({ item: leftItem, ts });
// Try to match with right buffer
for (const right of rightBuffer) {
if (
Math.abs(ts - right.ts) <= windowMs &&
joinCondition(leftItem, right.item)
) {
yield combiner(leftItem, right.item);
}
}
} else {
const rightItem = result.value as TRight;
const ts = rightTimestamp(rightItem);
rightBuffer.push({ item: rightItem, ts });
// Try to match with left buffer
for (const left of leftBuffer) {
if (
Math.abs(ts - left.ts) <= windowMs &&
joinCondition(left.item, rightItem)
) {
yield combiner(left.item, rightItem);
}
}
}
// Evict expired items from both buffers
const cutoff = now - windowMs;
while (leftBuffer.length > 0 && leftBuffer[0].ts < cutoff) {
leftBuffer.shift();
}
while (rightBuffer.length > 0 && rightBuffer[0].ts < cutoff) {
rightBuffer.shift();
}
}
}
// Usage example: Join orders with payments
interface OrderEvent {
orderId: string;
amount: number;
timestamp: number;
}
interface PaymentEvent {
orderId: string;
transactionId: string;
amount: number;
timestamp: number;
}
interface OrderWithPayment {
orderId: string;
orderAmount: number;
transactionId: string;
paymentAmount: number;
timeDeltaMs: number;
}
// Join orders with their corresponding payments within a 5-minute window
const enrichedOrders = temporalJoin<
OrderEvent,
PaymentEvent,
OrderWithPayment
>(
orderStream,
paymentStream,
(order, payment) => order.orderId === payment.orderId,
(order, payment) => ({
orderId: order.orderId,
orderAmount: order.amount,
transactionId: payment.transactionId,
paymentAmount: payment.amount,
timeDeltaMs: Math.abs(order.timestamp - payment.timestamp),
}),
5 * 60 * 1000, // 5-minute window
(order) => order.timestamp,
(payment) => payment.timestamp
);Back-Pressure and Flow Control
When a stream produces data faster than downstream processors can consume it, the system needs a strategy to avoid unbounded memory growth. This is the problem of back-pressure.
// Bounded async queue with back-pressure
class BoundedQueue<T> {
private queue: T[] = [];
private waitingConsumers: Array<(value: T) => void> = [];
private waitingProducers: Array<() => void> = [];
private closed = false;
constructor(private readonly capacity: number) {}
async enqueue(item: T): Promise<void> {
if (this.closed) {
throw new Error("Queue is closed.");
}
if (this.waitingConsumers.length > 0) {
const consumer = this.waitingConsumers.shift()!;
consumer(item);
return;
}
if (this.queue.length >= this.capacity) {
// Back-pressure: wait until there is space
await new Promise<void>((resolve) => {
this.waitingProducers.push(resolve);
});
}
this.queue.push(item);
}
async dequeue(): Promise<T | null> {
if (this.queue.length > 0) {
const item = this.queue.shift()!;
// Signal a waiting producer that space is available
if (this.waitingProducers.length > 0) {
const producer = this.waitingProducers.shift()!;
producer();
}
return item;
}
if (this.closed) return null;
return new Promise<T | null>((resolve) => {
this.waitingConsumers.push(resolve as (value: T) => void);
});
}
close(): void {
this.closed = true;
for (const consumer of this.waitingConsumers) {
(consumer as unknown as (value: T | null) => void)(null);
}
this.waitingConsumers = [];
}
}
// Rate-limited stream processor
function rateLimit<T>(
maxPerSecond: number
): StreamProcessor<T, T> {
return async function* (source) {
const intervalMs = 1000 / maxPerSecond;
let lastEmitTime = 0;
for await (const item of source) {
const now = Date.now();
const elapsed = now - lastEmitTime;
if (elapsed < intervalMs) {
await new Promise((resolve) =>
setTimeout(resolve, intervalMs - elapsed)
);
}
lastEmitTime = Date.now();
yield item;
}
};
}Practical Tips for Production Stream Processing
Handle late-arriving data gracefully. In distributed systems, events can arrive out of order. Your windowing logic should account for events that arrive after their window has closed. A common strategy is to allow a configurable "grace period" before finalizing a window.
Monitor lag obsessively. The time between when an event is produced and when it is processed is the most important metric in a stream processing system. If lag grows unbounded, something is wrong.
Design for exactly-once semantics, implement at-least-once. True exactly-once processing is extremely difficult in distributed systems. Instead, implement at-least-once delivery with idempotent processors. The result is the same, and the implementation is far simpler.
Test with time. Stream processing logic is inherently temporal. Your tests should be able to manipulate time to verify that windowing, timeouts, and late-arrival handling work correctly.
Use checkpointing for fault tolerance. Periodically save the state of your stream processors so that they can resume from the last checkpoint rather than reprocessing the entire stream after a failure.
Conclusion
Real-time stream processing in TypeScript is not only practical but elegant. The language's async iteration primitives, strong type system, and functional programming capabilities make it well-suited for building composable, testable processing pipelines. The patterns covered here --- stream operators, windowing, stateful aggregation, joins, and back-pressure management --- form the foundation for any stream processing workload.
Whether you are building a real-time analytics dashboard, a fraud detection engine, or a complex event-processing platform like Starburst, these patterns will help you process data as it moves, delivering insights and actions at the speed your business demands.
Related Articles
Monitoring Event-Driven Systems at Scale
A practical guide to building comprehensive monitoring and observability for event-driven systems, covering metrics, distributed tracing, alerting strategies, and operational dashboards for maintaining healthy event processing pipelines.
Migrating to Event-Driven Architecture
A practical guide for planning and executing a migration from traditional request-response systems to event-driven architecture, covering assessment frameworks, migration strategies, risk management, and organizational change.
Real-Time Data Processing: Business Impact and ROI
An exploration of the business value of real-time data processing, covering measurable ROI, competitive advantages, and practical frameworks for justifying investment in event-driven infrastructure.