Real-Time Customer Profiles with Event Streaming

A technical guide to building real-time customer profile systems using event streaming in TypeScript, covering event-driven architecture, stream processing, profile materialization, and consistency guarantees.

technical11 min readBy Klivvr Engineering
Share:

A customer profile that reflects yesterday's data is a liability in a world where customer interactions happen in real time. When a customer completes a large transaction, changes their address, or contacts support, the systems that serve them — the mobile app, the support dashboard, the personalization engine — need to see the updated profile immediately, not after the next nightly batch run. At Klivvr, CVM Nova's real-time profile system uses event streaming to maintain a continuously updated view of every customer, with sub-second latency from event occurrence to profile update.

This article covers the architecture, implementation, and operational considerations of building real-time customer profiles with event streaming in TypeScript.

Event-Driven Profile Architecture

The core idea is simple: instead of periodically querying databases to build a customer profile, you continuously consume events that describe changes to the customer's state and apply them to a materialized profile in real time. The event stream becomes the source of truth, and the materialized profile is a derived view that is always up to date.

interface ProfileEvent {
  eventId: string;
  customerId: string;
  eventType: string;
  payload: Record<string, unknown>;
  timestamp: Date;
  source: string;
  schemaVersion: number;
}
 
interface MaterializedProfile {
  customerId: string;
  version: number;
  lastUpdatedAt: Date;
 
  // Identity
  firstName: string;
  lastName: string;
  email: string;
  phone: string;
 
  // Financial summary
  totalBalance: number;
  accountCount: number;
  lastTransactionDate: Date | null;
  transactionCount30d: number;
  totalTransactionValue30d: number;
 
  // Engagement
  lastLoginDate: Date | null;
  loginCount7d: number;
  preferredChannel: string;
 
  // Computed
  segment: string;
  churnRiskScore: number;
  lifetimeValue: number;
 
  // Raw event counters
  eventCounts: Record<string, number>;
}
 
class ProfileMaterializer {
  private profiles: Map<string, MaterializedProfile> = new Map();
 
  apply(event: ProfileEvent): MaterializedProfile {
    const profile = this.getOrCreate(event.customerId);
 
    switch (event.eventType) {
      case "customer.profile.updated":
        this.applyProfileUpdate(profile, event);
        break;
      case "transaction.completed":
        this.applyTransaction(profile, event);
        break;
      case "account.opened":
        this.applyAccountOpened(profile, event);
        break;
      case "account.closed":
        this.applyAccountClosed(profile, event);
        break;
      case "session.started":
        this.applySessionStarted(profile, event);
        break;
      case "support.ticket.created":
        this.applySupportTicket(profile, event);
        break;
      default:
        // Track event counts for unknown event types
        break;
    }
 
    // Update common fields
    profile.version++;
    profile.lastUpdatedAt = event.timestamp;
    profile.eventCounts[event.eventType] =
      (profile.eventCounts[event.eventType] ?? 0) + 1;
 
    this.profiles.set(event.customerId, profile);
    return profile;
  }
 
  private applyProfileUpdate(
    profile: MaterializedProfile,
    event: ProfileEvent
  ): void {
    const fields = event.payload as Record<string, unknown>;
    if (fields.firstName) profile.firstName = fields.firstName as string;
    if (fields.lastName) profile.lastName = fields.lastName as string;
    if (fields.email) profile.email = fields.email as string;
    if (fields.phone) profile.phone = fields.phone as string;
  }
 
  private applyTransaction(
    profile: MaterializedProfile,
    event: ProfileEvent
  ): void {
    const amount = event.payload.amount as number;
    profile.lastTransactionDate = event.timestamp;
    profile.transactionCount30d++;
    profile.totalTransactionValue30d += amount;
  }
 
  private applyAccountOpened(
    profile: MaterializedProfile,
    event: ProfileEvent
  ): void {
    profile.accountCount++;
    const initialBalance = (event.payload.initialBalance as number) ?? 0;
    profile.totalBalance += initialBalance;
  }
 
  private applyAccountClosed(
    profile: MaterializedProfile,
    event: ProfileEvent
  ): void {
    profile.accountCount = Math.max(0, profile.accountCount - 1);
  }
 
  private applySessionStarted(
    profile: MaterializedProfile,
    event: ProfileEvent
  ): void {
    profile.lastLoginDate = event.timestamp;
    profile.loginCount7d++;
    if (event.payload.channel) {
      profile.preferredChannel = event.payload.channel as string;
    }
  }
 
  private applySupportTicket(
    profile: MaterializedProfile,
    event: ProfileEvent
  ): void {
    // Support interactions can affect engagement and churn risk
  }
 
  private getOrCreate(customerId: string): MaterializedProfile {
    const existing = this.profiles.get(customerId);
    if (existing) return existing;
 
    const newProfile: MaterializedProfile = {
      customerId,
      version: 0,
      lastUpdatedAt: new Date(),
      firstName: "",
      lastName: "",
      email: "",
      phone: "",
      totalBalance: 0,
      accountCount: 0,
      lastTransactionDate: null,
      transactionCount30d: 0,
      totalTransactionValue30d: 0,
      lastLoginDate: null,
      loginCount7d: 0,
      preferredChannel: "app",
      segment: "unknown",
      churnRiskScore: 0,
      lifetimeValue: 0,
      eventCounts: {},
    };
 
    this.profiles.set(customerId, newProfile);
    return newProfile;
  }
}

The ProfileMaterializer is the heart of the system. Each event type has a dedicated handler that knows how to update the relevant fields on the profile. The pattern is deliberately repetitive rather than clever — each handler is easy to understand, test, and modify independently. When a new event type arrives, adding a new handler is a five-minute change.

Stream Processing Pipeline

The materializer needs a steady feed of events. In CVM Nova, events flow from multiple sources — the core banking system, the mobile app, the support platform, the campaign engine — through a message broker (Apache Kafka in production) and into the processing pipeline.

interface StreamProcessor {
  topic: string;
  groupId: string;
  process(event: ProfileEvent): Promise<void>;
}
 
class ProfileStreamProcessor implements StreamProcessor {
  topic = "customer-events";
  groupId = "profile-materializer";
 
  private materializer: ProfileMaterializer;
  private profileStore: ProfileStore;
  private deadLetterQueue: DeadLetterQueue;
 
  constructor(
    materializer: ProfileMaterializer,
    profileStore: ProfileStore,
    deadLetterQueue: DeadLetterQueue
  ) {
    this.materializer = materializer;
    this.profileStore = profileStore;
    this.deadLetterQueue = deadLetterQueue;
  }
 
  async process(event: ProfileEvent): Promise<void> {
    try {
      // Validate event schema
      const validationErrors = this.validateEvent(event);
      if (validationErrors.length > 0) {
        await this.deadLetterQueue.send(event, validationErrors);
        return;
      }
 
      // Apply event to in-memory profile
      const updatedProfile = this.materializer.apply(event);
 
      // Persist to durable store
      await this.profileStore.upsert(updatedProfile);
 
      // Emit profile-updated event for downstream consumers
      await this.emitProfileUpdated(updatedProfile, event);
    } catch (error) {
      await this.deadLetterQueue.send(event, [
        `Processing error: ${(error as Error).message}`,
      ]);
    }
  }
 
  private validateEvent(event: ProfileEvent): string[] {
    const errors: string[] = [];
    if (!event.eventId) errors.push("Missing eventId");
    if (!event.customerId) errors.push("Missing customerId");
    if (!event.eventType) errors.push("Missing eventType");
    if (!event.timestamp) errors.push("Missing timestamp");
    return errors;
  }
 
  private async emitProfileUpdated(
    profile: MaterializedProfile,
    triggeringEvent: ProfileEvent
  ): Promise<void> {
    // Publish to a "profile-updates" topic for downstream consumers
    // like the personalization engine, the support dashboard, etc.
  }
}
 
interface ProfileStore {
  upsert(profile: MaterializedProfile): Promise<void>;
  get(customerId: string): Promise<MaterializedProfile | null>;
}
 
interface DeadLetterQueue {
  send(event: ProfileEvent, errors: string[]): Promise<void>;
}

Error handling is critical in stream processing. A malformed event must not crash the processor or block subsequent events. The dead-letter queue captures events that cannot be processed — either because they fail validation or because the handler throws an error — and routes them for manual investigation. This pattern ensures that the pipeline continues processing even when individual events are problematic.

The emitProfileUpdated step is what makes the real-time profile system composable. After updating the profile, the processor publishes a "profile updated" event that downstream systems can subscribe to. The personalization engine uses it to refresh recommendations. The support dashboard uses it to show the agent the latest customer state. The segment evaluator uses it to check whether the customer's segment membership has changed.

Handling Time Windows and Decay

Several profile fields are time-windowed: "transaction count in the last 30 days," "login count in the last 7 days." In a batch system, these are simple SQL window functions. In a streaming system, they require explicit management because you need to both increment counters when new events arrive and decrement them when events fall outside the window.

interface TimeWindowCounter {
  customerId: string;
  metricName: string;
  windowDays: number;
  events: Array<{ timestamp: Date; value: number }>;
}
 
class WindowedMetricTracker {
  private counters: Map<string, TimeWindowCounter> = new Map();
 
  addEvent(
    customerId: string,
    metricName: string,
    windowDays: number,
    timestamp: Date,
    value: number = 1
  ): number {
    const key = `${customerId}:${metricName}`;
    const counter = this.counters.get(key) ?? {
      customerId,
      metricName,
      windowDays,
      events: [],
    };
 
    counter.events.push({ timestamp, value });
 
    // Prune events outside the window
    const windowStart = new Date(
      Date.now() - windowDays * 24 * 60 * 60 * 1000
    );
    counter.events = counter.events.filter(
      (e) => e.timestamp >= windowStart
    );
 
    this.counters.set(key, counter);
 
    // Return the current windowed sum
    return counter.events.reduce((sum, e) => sum + e.value, 0);
  }
 
  getCurrentValue(customerId: string, metricName: string): number {
    const key = `${customerId}:${metricName}`;
    const counter = this.counters.get(key);
    if (!counter) return 0;
 
    // Prune expired events before returning
    const windowStart = new Date(
      Date.now() - counter.windowDays * 24 * 60 * 60 * 1000
    );
    counter.events = counter.events.filter(
      (e) => e.timestamp >= windowStart
    );
 
    return counter.events.reduce((sum, e) => sum + e.value, 0);
  }
 
  pruneAll(): number {
    let prunedCount = 0;
    for (const [key, counter] of this.counters) {
      const before = counter.events.length;
      const windowStart = new Date(
        Date.now() - counter.windowDays * 24 * 60 * 60 * 1000
      );
      counter.events = counter.events.filter(
        (e) => e.timestamp >= windowStart
      );
      prunedCount += before - counter.events.length;
 
      if (counter.events.length === 0) {
        this.counters.delete(key);
      }
    }
    return prunedCount;
  }
}

In production, keeping every individual event timestamp in memory is expensive for high-volume metrics. CVM Nova uses a hybrid approach: for the current window, it maintains precise counts; for historical context, it uses pre-aggregated daily buckets. This provides exact values for current metrics and approximate values for trend calculations, which is an acceptable trade-off.

Consistency and Ordering Guarantees

Event ordering is a fundamental challenge in distributed streaming systems. Events from different sources can arrive out of order — a "transaction completed" event might arrive before the "account opened" event if the two systems have different latencies.

CVM Nova handles this through a combination of partitioned ordering and event timestamping.

interface EventOrderingConfig {
  partitionKey: "customerId"; // Events for the same customer go to the same partition
  lateTolerance: number; // milliseconds
  reorderBufferSize: number;
}
 
class OrderedEventProcessor {
  private buffer: Map<string, ProfileEvent[]> = new Map();
  private lastProcessedTimestamp: Map<string, number> = new Map();
  private config: EventOrderingConfig;
 
  constructor(config: EventOrderingConfig) {
    this.config = config;
  }
 
  async ingest(event: ProfileEvent): Promise<ProfileEvent[]> {
    const customerId = event.customerId;
    const eventTime = event.timestamp.getTime();
    const lastTime = this.lastProcessedTimestamp.get(customerId) ?? 0;
 
    if (eventTime < lastTime - this.config.lateTolerance) {
      // Event is too old; log and skip
      console.warn(
        `Dropping late event ${event.eventId} for customer ${customerId}: ` +
        `event time ${event.timestamp.toISOString()} is before ` +
        `tolerance window of ${new Date(lastTime - this.config.lateTolerance).toISOString()}`
      );
      return [];
    }
 
    // Add to buffer
    const customerBuffer = this.buffer.get(customerId) ?? [];
    customerBuffer.push(event);
    this.buffer.set(customerId, customerBuffer);
 
    // Check if buffer should be flushed
    if (customerBuffer.length >= this.config.reorderBufferSize) {
      return this.flush(customerId);
    }
 
    return [];
  }
 
  flush(customerId: string): ProfileEvent[] {
    const customerBuffer = this.buffer.get(customerId) ?? [];
    if (customerBuffer.length === 0) return [];
 
    // Sort by timestamp and process in order
    customerBuffer.sort(
      (a, b) => a.timestamp.getTime() - b.timestamp.getTime()
    );
 
    const lastEvent = customerBuffer[customerBuffer.length - 1];
    this.lastProcessedTimestamp.set(
      customerId,
      lastEvent.timestamp.getTime()
    );
    this.buffer.set(customerId, []);
 
    return customerBuffer;
  }
}

Partitioning by customer ID ensures that all events for the same customer are processed by the same consumer instance, which eliminates cross-partition ordering issues. Within a partition, the reorder buffer provides a small window for out-of-order events to be sorted before processing. Events that arrive after the tolerance window are dropped and logged — this is a pragmatic trade-off that accepts a tiny amount of data loss in exchange for bounded latency and memory usage.

Profile Serving Layer

The final piece is serving the materialized profile to downstream systems with low latency. CVM Nova uses a two-tier serving architecture: a Redis cache for hot profiles (recently accessed or recently updated) and a PostgreSQL table for the full profile store.

class ProfileServingLayer {
  private redis: RedisClient;
  private postgres: PostgresClient;
  private readonly cacheTtlSeconds = 300;
 
  constructor(redis: RedisClient, postgres: PostgresClient) {
    this.redis = redis;
    this.postgres = postgres;
  }
 
  async getProfile(customerId: string): Promise<MaterializedProfile | null> {
    // Try cache first
    const cached = await this.redis.get(`profile:${customerId}`);
    if (cached) {
      return JSON.parse(cached) as MaterializedProfile;
    }
 
    // Fall back to database
    const profile = await this.postgres.query<MaterializedProfile>(
      "SELECT * FROM materialized_profiles WHERE customer_id = $1",
      [customerId]
    );
 
    if (profile) {
      // Populate cache for subsequent reads
      await this.redis.setex(
        `profile:${customerId}`,
        this.cacheTtlSeconds,
        JSON.stringify(profile)
      );
    }
 
    return profile;
  }
 
  async onProfileUpdated(profile: MaterializedProfile): Promise<void> {
    // Write-through: update both cache and database
    await Promise.all([
      this.redis.setex(
        `profile:${profile.customerId}`,
        this.cacheTtlSeconds,
        JSON.stringify(profile)
      ),
      this.postgres.query(
        `INSERT INTO materialized_profiles (customer_id, data, version, updated_at)
         VALUES ($1, $2, $3, $4)
         ON CONFLICT (customer_id) DO UPDATE
         SET data = $2, version = $3, updated_at = $4`,
        [
          profile.customerId,
          JSON.stringify(profile),
          profile.version,
          profile.lastUpdatedAt,
        ]
      ),
    ]);
  }
}
 
interface RedisClient {
  get(key: string): Promise<string | null>;
  setex(key: string, seconds: number, value: string): Promise<void>;
}
 
interface PostgresClient {
  query<T>(sql: string, params: unknown[]): Promise<T | null>;
}

The write-through cache pattern ensures that profiles updated by the stream processor are immediately available in the cache, eliminating the stale-read problem that read-through caching suffers from. The five-minute TTL is a safety net rather than the primary freshness mechanism — profiles are actively refreshed on every update, so the TTL only matters if the update notification is lost.

Conclusion

Real-time customer profiles with event streaming transform a CRM from a system that reflects yesterday into a system that reflects right now. The architecture is straightforward — consume events, apply them to a materialized profile, serve the profile with low latency — but the operational details matter enormously. Event validation, error handling, time-windowed metrics, ordering guarantees, and multi-tier serving all require careful implementation to achieve reliability at scale.

The most important lesson from building CVM Nova's real-time profile system is to start with a clear understanding of which fields need to be real-time and which can tolerate batch latency. Not everything needs to be updated in milliseconds. Transaction counts and balances should be real-time. Lifetime value predictions and segment assignments can run in batch. By applying real-time processing selectively, you get the responsiveness where it matters without the operational complexity where it does not.

Related Articles

business

Customer Engagement Metrics That Matter

A practical guide to defining, measuring, and acting on customer engagement metrics in CRM platforms, with a focus on metrics that drive retention and revenue in fintech.

11 min read
business

Data-Driven CRM: Strategy and Implementation

A strategic guide to building and operating a data-driven CRM practice, covering organizational alignment, data governance, analytics maturity models, and practical implementation roadmaps.

9 min read
technical

Building a Campaign Management System

A technical walkthrough of designing and implementing a campaign management system in TypeScript, covering campaign lifecycle, audience targeting, multi-channel delivery, and performance tracking.

10 min read