I first reached for CQRS when a monolithic order service I was working on started buckling under 50K orders per day. Reads and writes were competing for the same database connections, queries were getting slower, and every new feature meant touching the same bloated service layer. Splitting commands from queries wasn’t just an architectural preference — it was a survival move.

In this article I’ll walk through how I implement CQRS and Event Sourcing in NestJS using @nestjs/cqrs, covering the patterns that actually matter in production.

Why CQRS and Event Sourcing?

Quick recap if you need it:

  • CQRS separates your write model (commands) from your read model (queries). Different data shapes, different optimization strategies, different scaling characteristics.
  • Event Sourcing persists every state change as an immutable event instead of overwriting the current state. You get a full audit trail, temporal queries, and the ability to rebuild projections from scratch.

The two work well together but they’re independent concepts. You can do CQRS without Event Sourcing, and vice versa. In my experience, combining them pays off when you have complex domain logic, audit requirements, or multiple read models that need different shapes of the same data.

Setting Up

npm install @nestjs/cqrs

Register the module in your feature module:

import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';

@Module({
  imports: [CqrsModule],
  providers: [
    // handlers, sagas, etc.
  ],
})
export class OrderModule {}

For event storage, I typically use PostgreSQL with JSONB columns. If your event sourcing needs are more advanced (global ordering, subscriptions, projections), consider EventStoreDB — but for most NestJS projects, Postgres is more than enough.

Commands: Expressing Intent

A command represents an intention to change state. It’s a simple class — no logic, just data:

export class CreateOrderCommand {
  constructor(
    public readonly orderId: string,
    public readonly userId: string,
    public readonly items: { productId: string; quantity: number }[],
  ) {}
}

The handler is where the actual work happens:

import { CommandHandler, ICommandHandler, EventPublisher } from '@nestjs/cqrs';
import { OrderRepository } from './order.repository';

@CommandHandler(CreateOrderCommand)
export class CreateOrderHandler implements ICommandHandler<CreateOrderCommand> {
  constructor(
    private readonly repository: OrderRepository,
    private readonly publisher: EventPublisher,
  ) {}

  async execute(command: CreateOrderCommand) {
    const order = this.publisher.mergeObjectContext(
      Order.create(command.orderId, command.userId, command.items),
    );

    await this.repository.save(order);
    order.commit();
  }
}

Notice the EventPublisher — this is how you bridge aggregates and the event bus. Calling order.commit() dispatches all uncommitted events from the aggregate.

One mistake I made early on: putting business logic in the handler instead of the aggregate. The handler should orchestrate (load, call domain methods, save, commit) — the domain model should own the rules.

Queries: Optimized Reads

Queries hit the read model, which can be a completely different database, a materialized view, or a denormalized table optimized for the specific read pattern:

export class GetOrderDetailsQuery {
  constructor(public readonly orderId: string) {}
}
import { QueryHandler, IQueryHandler } from '@nestjs/cqrs';
import { OrderReadRepository } from './order-read.repository';

@QueryHandler(GetOrderDetailsQuery)
export class GetOrderDetailsHandler implements IQueryHandler<GetOrderDetailsQuery> {
  constructor(private readonly readRepo: OrderReadRepository) {}

  async execute(query: GetOrderDetailsQuery) {
    const order = await this.readRepo.findById(query.orderId);
    if (!order) {
      throw new NotFoundException(`Order ${query.orderId} not found`);
    }
    return order;
  }
}

In production, I keep the read repository completely separate from the write repository. Different connection pools, different query patterns, sometimes even different databases (Postgres for writes, Elasticsearch for search-heavy reads).

Events: What Actually Happened

Events are past-tense facts. They don’t request anything — they state what happened:

export class OrderCreatedEvent {
  constructor(
    public readonly orderId: string,
    public readonly userId: string,
    public readonly items: { productId: string; quantity: number }[],
    public readonly createdAt: Date,
  ) {}
}

Handlers react to events — sending notifications, updating read models, triggering downstream processes:

import { EventsHandler, IEventHandler } from '@nestjs/cqrs';

@EventsHandler(OrderCreatedEvent)
export class OrderCreatedProjectionHandler implements IEventHandler<OrderCreatedEvent> {
  constructor(private readonly readRepo: OrderReadRepository) {}

  async handle(event: OrderCreatedEvent) {
    await this.readRepo.upsert({
      orderId: event.orderId,
      userId: event.userId,
      itemCount: event.items.length,
      status: 'created',
      createdAt: event.createdAt,
    });
  }
}

@EventsHandler(OrderCreatedEvent)
export class OrderNotificationHandler implements IEventHandler<OrderCreatedEvent> {
  constructor(private readonly mailer: MailerService) {}

  async handle(event: OrderCreatedEvent) {
    await this.mailer.sendOrderConfirmation(event.userId, event.orderId);
  }
}

Multiple handlers for the same event is one of the biggest wins. Your projection handler, notification handler, and analytics handler all do their thing independently.

Sagas: Coordinating Long-Running Processes

Sagas in @nestjs/cqrs are RxJS-based event listeners that can dispatch new commands in response to events. They’re perfect for workflows that span multiple aggregates.

import { Injectable } from '@nestjs/common';
import { Saga, ofType } from '@nestjs/cqrs';
import { Observable } from 'rxjs';
import { map, delay } from 'rxjs/operators';

@Injectable()
export class OrderSaga {
  @Saga()
  orderCreated = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(OrderCreatedEvent),
      map(event => new ReserveInventoryCommand(event.orderId, event.items)),
    );
  };

  @Saga()
  inventoryReservationFailed = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(InventoryReservationFailedEvent),
      map(event => new CancelOrderCommand(event.orderId, 'Insufficient inventory')),
    );
  };
}

The @Saga() decorator is important — without it, NestJS won’t register the method as a saga listener. I’ve seen this cause silent failures where events are published but the saga never fires.

For production, keep sagas simple and idempotent. If a saga dispatches a command that fails, you need a compensation strategy. The second saga above (inventoryReservationFailed) is an example of that — it cancels the order when inventory reservation fails.

Event Store: Persisting Events

A proper event store needs to handle serialization, ordering, and efficient querying by aggregate:

import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { StoredEvent } from './stored-event.entity';

@Injectable()
export class EventStore {
  constructor(
    @InjectRepository(StoredEvent)
    private readonly repo: Repository<StoredEvent>,
  ) {}

  async append(aggregateId: string, events: any[], expectedVersion: number): Promise<void> {
    const currentVersion = await this.getAggregateVersion(aggregateId);
    if (currentVersion !== expectedVersion) {
      throw new ConflictException(
        `Concurrency conflict: expected version ${expectedVersion}, got ${currentVersion}`,
      );
    }

    const entities = events.map((event, i) => ({
      aggregateId,
      type: event.constructor.name,
      payload: event,
      version: expectedVersion + i + 1,
      createdAt: new Date(),
    }));

    await this.repo.save(entities);
  }

  async loadEvents(aggregateId: string, fromVersion = 0): Promise<StoredEvent[]> {
    return this.repo.find({
      where: { aggregateId },
      order: { version: 'ASC' },
      ...(fromVersion > 0 && { where: { aggregateId, version: MoreThan(fromVersion) } }),
    });
  }

  private async getAggregateVersion(aggregateId: string): Promise<number> {
    const result = await this.repo
      .createQueryBuilder('e')
      .select('MAX(e.version)', 'version')
      .where('e.aggregateId = :aggregateId', { aggregateId })
      .getRawOne();
    return result?.version ?? 0;
  }
}

The expectedVersion check is critical — it prevents concurrent writes from corrupting your event stream. This is optimistic concurrency control and it’s one of those things you absolutely need in production but almost never see in tutorials.

Snapshots

When an aggregate has hundreds or thousands of events, replaying all of them gets slow. Snapshots capture the aggregate state at a specific version:

async loadAggregate(aggregateId: string): Promise<Order> {
  const snapshot = await this.snapshotRepo.findLatest(aggregateId);
  const fromVersion = snapshot?.version ?? 0;
  const events = await this.eventStore.loadEvents(aggregateId, fromVersion);

  const order = snapshot
    ? Order.fromSnapshot(snapshot.state)
    : new Order();

  for (const event of events) {
    order.apply(this.deserialize(event), true);
  }

  return order;
}

I typically snapshot every 100 events or on a scheduled basis. Don’t over-optimize this early — profile first, snapshot later.

Error Handling in Production

Three patterns that have saved me:

Retry with backoff for transient failures in event handlers:

@EventsHandler(OrderCreatedEvent)
export class ProjectionHandler implements IEventHandler<OrderCreatedEvent> {
  async handle(event: OrderCreatedEvent) {
    await retry(() => this.updateProjection(event), {
      retries: 3,
      minTimeout: 1000,
      factor: 2,
    });
  }
}

Dead letter queue for events that repeatedly fail processing. Log them, alert on them, and fix the root cause before replaying:

async handle(event: OrderCreatedEvent) {
  try {
    await this.process(event);
  } catch (error) {
    await this.deadLetterQueue.push({
      event,
      error: error.message,
      failedAt: new Date(),
    });
    this.logger.error(`Failed to process event`, { event, error });
  }
}

Idempotency — every event handler should be safe to run multiple times with the same event. Use the event ID or a combination of aggregate ID + version as a deduplication key.

Testing

For command handlers, test the domain logic through the aggregate:

describe('CreateOrderHandler', () => {
  let handler: CreateOrderHandler;
  let repository: jest.Mocked<OrderRepository>;
  let publisher: EventPublisher;

  beforeEach(async () => {
    const module = await Test.createTestingModule({
      providers: [
        CreateOrderHandler,
        { provide: OrderRepository, useValue: { save: jest.fn() } },
        { provide: EventPublisher, useValue: { mergeObjectContext: jest.fn(o => o) } },
      ],
    }).compile();

    handler = module.get(CreateOrderHandler);
    repository = module.get(OrderRepository);
    publisher = module.get(EventPublisher);
  });

  it('should create an order and persist it', async () => {
    const command = new CreateOrderCommand('order-1', 'user-1', [
      { productId: 'prod-1', quantity: 2 },
    ]);

    await handler.execute(command);

    expect(repository.save).toHaveBeenCalledWith(
      expect.objectContaining({ orderId: 'order-1' }),
    );
  });
});

For sagas, test the RxJS stream directly with marble testing or by pushing events through a Subject and asserting the output commands.

For the event store, integration tests against a real Postgres instance are worth the setup cost. In-memory fakes miss too many edge cases around concurrency and ordering.

Key Takeaways

After running CQRS + Event Sourcing in production across multiple services, here’s what I’d tell myself on day one:

  • Start with CQRS alone. Add Event Sourcing only when you need audit trails or multiple projections. The complexity tax is real.
  • Keep aggregates small. If your aggregate has 20+ event types, it’s probably doing too much.
  • Optimistic concurrency is not optional. The expectedVersion check in the event store will save you from data corruption.
  • Make every handler idempotent. Events will be delivered more than once — plan for it.
  • Monitor your event store size. Implement snapshots and archival strategies before your events table hits millions of rows.

Thanks for reading. If you found this useful or have questions, feel free to reach out — I always enjoy talking architecture. See you in the next one.

© 2026 Akin Gundogdu. All Rights Reserved.