Leadline Architecture Design

Event-driven patterns

Event-driven architecture, CQRS with NestJS, event sourcing concepts, and saga orchestration.

Event-driven architecture (EDA)

EDA couples components primarily through events: producers emit facts after successful state changes; consumers react asynchronously; a bus or broker delivers messages. In NestJS, @nestjs/cqrs provides an in-process EventBus; distributed deployments usually add Kafka, RabbitMQ, NATS, or similar behind the same conceptual boundaries.

When it fits: fan-out workflows (notifications, projections, analytics), temporal decoupling, or independent scaling of producers and consumers.

Practices:

  • Design consumers to be idempotent; at-least-once delivery implies duplicates.
  • Version event payloads that cross service boundaries and preserve backward compatibility.
  • Move from in-process buses to durable brokers when you need survivability across processes or instances.

In-process publish and handle

// order-placed.event.ts
export class OrderPlacedEvent {
  constructor(public readonly orderId: string) {}
}
// checkout.service.ts
import { Injectable } from "@nestjs/common";
import { EventBus } from "@nestjs/cqrs";
import { OrderPlacedEvent } from "./order-placed.event";

@Injectable()
export class CheckoutService {
  constructor(private readonly events: EventBus) {}

  placeOrder(orderId: string) {
    this.events.publish(new OrderPlacedEvent(orderId));
    return { ok: true };
  }
}
// send-receipt.listener.ts
import { EventsHandler, IEventHandler } from "@nestjs/cqrs";
import { OrderPlacedEvent } from "./order-placed.event";

@EventsHandler(OrderPlacedEvent)
export class SendReceiptListener implements IEventHandler<OrderPlacedEvent> {
  handle(event: OrderPlacedEvent) {
    void event.orderId;
  }
}

Trade-offs: modularity and scalability versus harder debugging, ordering concerns, and explicit schema evolution.

Command Query Responsibility Segregation (CQRS)

CQRS separates commands (writes) and queries (reads) so each path can use different models, stores, and scaling policies.

When it fits: read-heavy workloads benefiting from denormalized views, different deployment cadence for read vs write, or pipelines that already publish domain events.

NestJS building blocks (@nestjs/cqrs)

Register CQRSModule once per process (commonly in the root or feature module). Then provide:

ConceptRole
Commands + @CommandHandlerExecute write-side rules; persist; publish events.
CommandBusDispatch commands from controllers or facades.
Queries + @QueryHandlerServe read models or projections.
QueryBusDispatch queries.
EventBus + @EventsHandlerPublish and subscribe to domain or integration events.

Module wiring (sketch)

import { Module } from "@nestjs/common";
import { CQRSModule } from "@nestjs/cqrs";
import { CreateItemHandler } from "./create-item.handler";
import { GetItemsHandler } from "./get-items.handler";
import { ItemCreatedHandler } from "./item-created.handler";
import { ItemsFacade } from "./items.facade";

@Module({
  imports: [CQRSModule],
  providers: [CreateItemHandler, GetItemsHandler, ItemCreatedHandler, ItemsFacade],
})
export class ItemsModule {}

Command, handler, query, facade

export class ItemCreatedEvent {
  constructor(
    public readonly id: string,
    public readonly name: string,
  ) {}
}
export class CreateItemCommand {
  constructor(public readonly name: string) {}
}
import { CommandHandler, EventBus, ICommandHandler } from "@nestjs/cqrs";
import { CreateItemCommand } from "./create-item.command";
import { ItemCreatedEvent } from "./item-created.event";

@CommandHandler(CreateItemCommand)
export class CreateItemHandler implements ICommandHandler<CreateItemCommand> {
  constructor(private readonly eventBus: EventBus) {}

  async execute(command: CreateItemCommand) {
    const id = crypto.randomUUID();
    this.eventBus.publish(new ItemCreatedEvent(id, command.name));
    return { id };
  }
}
export class GetItemsQuery {
  constructor(public readonly limit: number) {}
}
import { IQueryHandler, QueryHandler } from "@nestjs/cqrs";
import { GetItemsQuery } from "./get-items.query";

@QueryHandler(GetItemsQuery)
export class GetItemsHandler implements IQueryHandler<GetItemsQuery> {
  async execute(query: GetItemsQuery) {
    return [{ id: "1", name: "Demo" }].slice(0, query.limit);
  }
}
import { Injectable } from "@nestjs/common";
import { CommandBus, QueryBus } from "@nestjs/cqrs";
import { CreateItemCommand } from "./create-item.command";
import { GetItemsQuery } from "./get-items.query";

@Injectable()
export class ItemsFacade {
  constructor(
    private readonly commands: CommandBus,
    private readonly queries: QueryBus,
  ) {}

  create(name: string) {
    return this.commands.execute(new CreateItemCommand(name));
  }

  list() {
    return this.queries.execute(new GetItemsQuery(10));
  }
}

Eventual consistency

Separate write and read models imply readers are typically eventually consistent. Design APIs and UX for lag; consider read-your-writes strategies, retries, or outbox patterns instead of assuming the query side updates instantly after every command.

Event sourcing

Event sourcing stores state as an append-only log of domain events rather than only overwriting current rows. The log is the authoritative history—supporting audit, replay, and temporal reasoning—often alongside CQRS.

When it fits: strict audit requirements, “what did we know at time T?” questions, or systems already committed to messaging and CQRS where the added complexity is justified.

Aggregate sketch (AggregateRoot)

export class ItemAddedEvent {
  constructor(
    public readonly cartId: string,
    public readonly sku: string,
  ) {}
}
import { AggregateRoot } from "@nestjs/cqrs";
import { ItemAddedEvent } from "./cart.events";

export class ShoppingCart extends AggregateRoot {
  constructor(private readonly cartId: string) {
    super();
  }

  get id() {
    return this.cartId;
  }

  addItem(sku: string) {
    this.apply(new ItemAddedEvent(this.cartId, sku));
  }
}

apply records uncommitted events; commit() publishes them through the configured publisher so an event store can append them. Real aggregates add event handlers on the class to rehydrate private state when replaying from storage.

Rehydration: load events for an aggregate id, merge with EventPublisher as your stack requires, call loadFromHistory, execute new behavior, then commit.

Snapshots: materialize aggregate state at a version to cap replay cost; add them when profiling shows replay dominates.

Practices: version aggregates and events; never mutate historical records; keep projections idempotent.

Sagas

A saga coordinates a long-running or distributed process by reacting to events and issuing commands across aggregates or services. In Nest CQRS, sagas are often RxJS pipelines over the event stream, using ofType and operators such as mergeMap, race, or time-based logic—used carefully to avoid leaks and unbounded buffering.

import { ICommand } from "@nestjs/cqrs";

export class OrderPlacedEvent {
  constructor(public readonly orderId: string) {}
}

export class ReserveInventoryCommand implements ICommand {
  constructor(public readonly orderId: string) {}
}
import { Injectable } from "@nestjs/common";
import { ICommand, Saga, ofType } from "@nestjs/cqrs";
import { Observable } from "rxjs";
import { map } from "rxjs/operators";
import { OrderPlacedEvent } from "./order-placed.event";
import { ReserveInventoryCommand } from "./reserve-inventory.command";

@Injectable()
export class OrdersSagas {
  @Saga()
  orderPlaced = (events$: Observable<unknown>): Observable<ICommand> =>
    events$.pipe(
      ofType(OrderPlacedEvent),
      map((event) => new ReserveInventoryCommand(event.orderId)),
    );
}

Register saga classes in module providers. In-process sagas do not survive crashes unless backed by durable messaging and idempotent progression.

This page and links

Loading map…

On this page