JetStream: Persistent Messaging with NATS

A deep dive into NATS JetStream for persistent messaging, covering streams, consumers, delivery guarantees, and practical TypeScript patterns for building reliable event-driven systems.

technical10 min readBy Klivvr Engineering
Share:

Core NATS is a fire-and-forget system. Messages are delivered to active subscribers and discarded immediately. For many use cases this is perfect, but production systems frequently need stronger guarantees: messages that survive subscriber downtime, exactly-once processing semantics, and the ability to replay historical events. JetStream is the persistence layer built into the NATS server that provides all of these capabilities without sacrificing the simplicity that makes NATS distinctive.

JetStream was introduced as a first-class component of the NATS server, not as a bolt-on addon. It uses the same subject-based routing, the same client connections, and the same wire protocol. From the perspective of the node-nats client library, JetStream is an extension of the familiar API rather than a separate system to learn. This article covers how JetStream works, how to configure it, and how to use it effectively from TypeScript.

Streams: Capturing Messages

A stream is the fundamental storage unit in JetStream. It captures messages published to one or more subjects and stores them according to configurable retention policies. Think of a stream as a persistent, append-only log that the NATS server manages on your behalf.

Creating and managing streams programmatically with node-nats uses the JetStream management API:

import { connect, AckPolicy, RetentionPolicy, StorageType } from "nats";
 
async function setupStream() {
  const nc = await connect({ servers: "nats://localhost:4222" });
  const jsm = await nc.jetstreamManager();
 
  // Add a stream that captures all order events
  await jsm.streams.add({
    name: "ORDERS",
    subjects: ["events.orders.>"],
    retention: RetentionPolicy.Limits,
    storage: StorageType.File,
    max_msgs: 1_000_000,
    max_bytes: 1024 * 1024 * 1024, // 1 GB
    max_age: 30 * 24 * 60 * 60 * 1_000_000_000, // 30 days in nanoseconds
    num_replicas: 3,
  });
 
  console.log("ORDERS stream created");
 
  // List existing streams
  const streams = await jsm.streams.list().next();
  for (const si of streams) {
    console.log(`Stream: ${si.config.name}, Messages: ${si.state.messages}`);
  }
 
  await nc.drain();
}

Several configuration choices deserve attention. The subjects array defines which published messages this stream captures. Using wildcards like events.orders.> means the stream automatically captures messages on any subject under that prefix, including events.orders.created, events.orders.shipped, and events.orders.cancelled.

The retention policy controls when messages are deleted. Limits (the default) removes messages when any configured limit (count, size, or age) is exceeded. Interest removes a message once all defined consumers have acknowledged it. WorkQueue removes a message as soon as any single consumer acknowledges it. Choosing the right retention policy is critical for both correctness and storage costs.

Storage can be File (persistent to disk) or Memory (faster but volatile). For production workloads where durability matters, file storage with replication across multiple NATS servers provides the strongest guarantees.

Publishing to JetStream

Publishing to a JetStream-enabled subject uses a dedicated API that provides publish acknowledgments, something core NATS does not offer:

import { connect, JSONCodec } from "nats";
 
const jc = JSONCodec();
 
async function publishOrders() {
  const nc = await connect({ servers: "nats://localhost:4222" });
  const js = nc.jetstream();
 
  // Publish with acknowledgment
  const pa = await js.publish(
    "events.orders.created",
    jc.encode({
      orderId: "ord_20250806_001",
      userId: "usr_abc123",
      total: 149.99,
      currency: "USD",
      items: [
        { productId: "prod_widget", quantity: 2, price: 49.99 },
        { productId: "prod_gadget", quantity: 1, price: 50.01 },
      ],
    })
  );
 
  console.log(
    `Published to stream: ${pa.stream}, seq: ${pa.seq}, duplicate: ${pa.duplicate}`
  );
 
  // Publish with deduplication using a message ID
  const pa2 = await js.publish(
    "events.orders.created",
    jc.encode({
      orderId: "ord_20250806_002",
      userId: "usr_def456",
      total: 29.99,
      currency: "USD",
      items: [{ productId: "prod_basic", quantity: 1, price: 29.99 }],
    }),
    {
      msgID: "ord_20250806_002",
      expect: { lastSubjectSequence: pa.seq },
    }
  );
 
  await nc.drain();
}

The js.publish call returns a PubAck that confirms the message was persisted to the stream. This is a significant difference from core NATS nc.publish, which is fire-and-forget. The PubAck includes the stream name, sequence number, and a duplicate flag.

The msgID option enables server-side deduplication. If you publish the same message ID twice within the deduplication window (configurable per stream, default two minutes), the server returns the original sequence number with duplicate: true rather than storing a second copy. This is essential for achieving exactly-once publish semantics in systems where the publisher might retry after a timeout.

Consumers: Reading from Streams

Consumers define how messages are read from a stream. A consumer tracks which messages have been delivered and acknowledged, maintaining its position in the stream independently of other consumers. JetStream supports two types of consumers: durable and ephemeral.

Durable consumers survive client disconnections. Their state (including the last acknowledged message) is persisted by the server, so a reconnecting client resumes exactly where it left off. Ephemeral consumers are automatically cleaned up when their last subscription is closed.

import {
  connect,
  JSONCodec,
  AckPolicy,
  DeliverPolicy,
  ConsumerConfig,
} from "nats";
 
const jc = JSONCodec();
 
async function consumeOrders() {
  const nc = await connect({ servers: "nats://localhost:4222" });
  const js = nc.jetstream();
  const jsm = await nc.jetstreamManager();
 
  // Create a durable consumer
  await jsm.consumers.add("ORDERS", {
    durable_name: "order-processor",
    ack_policy: AckPolicy.Explicit,
    deliver_policy: DeliverPolicy.All,
    filter_subject: "events.orders.created",
    max_deliver: 5,
    ack_wait: 30_000_000_000, // 30 seconds in nanoseconds
  });
 
  // Consume messages
  const consumer = await js.consumers.get("ORDERS", "order-processor");
  const messages = await consumer.consume();
 
  for await (const msg of messages) {
    const order = jc.decode(msg.data) as {
      orderId: string;
      userId: string;
      total: number;
    };
 
    try {
      console.log(`Processing order ${order.orderId}, total: $${order.total}`);
      await processOrder(order);
 
      // Acknowledge successful processing
      msg.ack();
    } catch (error) {
      console.error(`Failed to process order ${order.orderId}:`, error);
 
      // Negative acknowledge -- triggers redelivery
      msg.nak();
    }
  }
}

The acknowledgment policy is the most important consumer setting. Explicit means every message must be individually acknowledged by the client. None means messages are considered acknowledged upon delivery. All means acknowledging message N implicitly acknowledges all messages before N. For most production workloads, Explicit is the correct choice because it gives you precise control over which messages have been successfully processed.

The max_deliver setting limits how many times a message can be redelivered after negative acknowledgments or timeouts. Once the limit is reached, the message is placed in the stream's dead letter subject (if configured) or simply marked as terminal. This prevents poison messages from blocking the consumer indefinitely.

Pull vs Push Consumers

JetStream supports two delivery models: pull and push. Pull consumers require the client to explicitly request batches of messages. Push consumers deliver messages to the client as they arrive, similar to core NATS subscriptions.

Pull consumers are generally recommended for most server-side workloads because they give the consumer explicit flow control:

async function pullConsumer() {
  const nc = await connect({ servers: "nats://localhost:4222" });
  const js = nc.jetstream();
 
  const consumer = await js.consumers.get("ORDERS", "order-processor");
 
  // Fetch a batch of messages
  const batch = consumer.fetch({ max_messages: 10, expires: 5000 });
 
  for await (const msg of batch) {
    const data = jc.decode(msg.data);
    console.log(`Seq ${msg.seq}: ${JSON.stringify(data)}`);
    msg.ack();
  }
 
  // Or use consume() for continuous processing with auto-refill
  const messages = await consumer.consume({
    max_messages: 100,
    expires: 30_000,
  });
 
  for await (const msg of messages) {
    await handleMessage(msg);
    msg.ack();
  }
 
  await nc.drain();
}

The consume() method on a pull consumer provides a convenient abstraction that automatically requests new batches as messages are processed. The max_messages parameter controls how many messages are in-flight at any time, providing natural backpressure. If your processing is slow, fewer messages are outstanding; if it is fast, the consumer keeps the pipeline full.

Push consumers are useful for scenarios where you want the simplest possible consumption model and are comfortable with the server controlling delivery pace:

async function orderedConsumer() {
  const nc = await connect({ servers: "nats://localhost:4222" });
  const js = nc.jetstream();
 
  // Ordered consumers are ephemeral push consumers that guarantee
  // messages arrive in order, automatically recovering from gaps
  const consumer = await js.consumers.get("ORDERS", "order-processor");
  const messages = await consumer.consume();
 
  for await (const msg of messages) {
    console.log(`Stream seq ${msg.seq}: ${msg.subject}`);
    msg.ack();
  }
}

Exactly-Once Processing with Idempotency

JetStream provides at-least-once delivery by default. Combined with publisher deduplication, you can achieve exactly-once semantics, but only if your consumers are idempotent. The node-nats library gives you the metadata needed to implement this:

import { connect, JSONCodec } from "nats";
 
const jc = JSONCodec();
 
// Track processed message IDs to ensure idempotency
const processedMessages = new Set<string>();
 
async function idempotentConsumer() {
  const nc = await connect({ servers: "nats://localhost:4222" });
  const js = nc.jetstream();
 
  const consumer = await js.consumers.get("ORDERS", "order-processor");
  const messages = await consumer.consume();
 
  for await (const msg of messages) {
    // Use stream sequence as deduplication key
    const dedupeKey = `${msg.info.stream}:${msg.seq}`;
 
    if (processedMessages.has(dedupeKey)) {
      console.log(`Skipping duplicate: ${dedupeKey}`);
      msg.ack();
      continue;
    }
 
    const order = jc.decode(msg.data) as { orderId: string; total: number };
 
    try {
      // Use a transaction or idempotency key in your database
      await processOrderIdempotently(order, dedupeKey);
      processedMessages.add(dedupeKey);
      msg.ack();
    } catch (error) {
      msg.nak();
    }
  }
}
 
async function processOrderIdempotently(
  order: { orderId: string; total: number },
  dedupeKey: string
) {
  // In production, use a database transaction:
  // INSERT INTO processed_messages (key) VALUES ($1)
  // ON CONFLICT DO NOTHING
  // If the insert succeeds, process the order
  // If it conflicts, the order was already processed
  await db.transaction(async (tx) => {
    const existing = await tx.query(
      "INSERT INTO processed_events (dedup_key) VALUES ($1) ON CONFLICT DO NOTHING RETURNING id",
      [dedupeKey]
    );
 
    if (existing.rowCount === 0) {
      return; // Already processed
    }
 
    await tx.query(
      "INSERT INTO orders (order_id, total) VALUES ($1, $2)",
      [order.orderId, order.total]
    );
  });
}

The pattern of combining JetStream's message sequence numbers with a database-level idempotency check provides genuine exactly-once processing semantics. The stream sequence is monotonically increasing and unique within a stream, making it an ideal deduplication key.

Practical Tips for JetStream in Production

Stream configuration is not something you set once and forget. Plan your streams around access patterns, not around organizational boundaries. A single stream capturing events.orders.> with filtered consumers is usually better than separate streams for events.orders.created, events.orders.updated, and events.orders.cancelled. Fewer streams mean less operational overhead, and filtered consumers give you the same logical separation.

Set realistic retention limits from day one. A stream with no limits will grow until it exhausts disk space. Calculate your expected message rate, average message size, and required retention period, then configure max_bytes and max_age accordingly. Monitor stream size as a core operational metric.

Use the ack_wait setting thoughtfully. It should be longer than your worst-case processing time but short enough to detect genuinely failed consumers. For most workloads, 30 seconds is a reasonable starting point. If your processing involves external API calls, consider 60 seconds or more.

Test consumer recovery by intentionally killing consumer processes during load testing. Verify that messages are redelivered correctly, that no messages are lost, and that your idempotency logic handles duplicates gracefully. JetStream's guarantees are only as strong as your consumer implementation.

Conclusion

JetStream transforms NATS from a real-time messaging bus into a complete event streaming platform. Streams provide durable, replicated storage for messages. Consumers provide flexible, trackable read positions with explicit acknowledgment. Publisher deduplication and consumer idempotency together enable exactly-once processing semantics. The node-nats client library exposes all of this through a consistent, type-safe TypeScript API that builds naturally on the core NATS primitives. For teams already using NATS for real-time messaging, JetStream is the path to adding persistence, replay, and delivery guarantees without introducing a separate technology into the stack.

Related Articles

business

Operating NATS in Production: Monitoring and Scaling

A practical operations guide for running NATS in production environments, covering monitoring strategies, capacity planning, scaling patterns, upgrade procedures, and incident response for engineering and platform teams.

12 min read
business

Messaging Architecture for Fintech Systems

A strategic guide to designing messaging architectures for financial technology systems, covering regulatory requirements, data consistency patterns, auditability, and the role of NATS in building compliant, resilient fintech infrastructure.

11 min read
technical

Securing NATS: Authentication and Authorization

A comprehensive guide to securing NATS deployments with authentication mechanisms, fine-grained authorization, TLS encryption, and account-based multi-tenancy, with practical TypeScript client configuration examples.

10 min read