Skip to content

CQRS Implementation Walkthrough

Connect commands, events, projections, and queries into a working system

This guide shows how to implement CQRS (Command Query Responsibility Segregation) with DeltaBase. You’ll connect all the pieces into a complete, production-ready architecture.


┌─────────────────────────────────────────────────────────────────────────┐
│ CQRS Architecture │
│ │
│ WRITE SIDE READ SIDE │
│ ┌──────────┐ ┌─────────┐ ┌───────────────┐ │
│ │ Command │───▶│ Decider │ │ Read Model │ │
│ │ (Intent) │ │ (Logic) │ │ (Optimized) │ │
│ └──────────┘ └────┬────┘ └───────▲───────┘ │
│ │ │ │
│ ▼ │ │
│ ┌─────────────────┐ │ │
│ │ Event Store │────Webhook────▶│ │
│ │ (DeltaBase) │ │ │
│ └─────────────────┘ │ │
│ │ │
│ ┌──────────┐ ┌───────┴───────┐ │
│ │ Query │◀─────────────────────────│ Projection │ │
│ │ (Fast) │ │ (Transform) │ │
│ └──────────┘ └───────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘

Key insight: Commands and queries have different requirements. CQRS separates them so each can be optimized independently.


Start with events, commands, and state. This is your domain model.

Events describe things that happened. They’re immutable and in past tense.

src/core/orders.ts
import type { Event, Command, Decider, ReadEvent } from '@delta-base/toolkit';
import { IllegalStateError, ValidationError } from '@delta-base/toolkit';
export namespace Orders {
// Events - What happened
export type OrderPlaced = Event<
'order.placed',
{
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
totalAmount: number;
placedAt: string;
}
>;
export type OrderShipped = Event<
'order.shipped',
{
orderId: string;
trackingNumber: string;
carrier: string;
shippedAt: string;
}
>;
export type OrderDelivered = Event<
'order.delivered',
{
orderId: string;
deliveredAt: string;
signedBy?: string;
}
>;
export type OrderCancelled = Event<
'order.cancelled',
{
orderId: string;
reason: string;
cancelledAt: string;
}
>;
export type OrderEvents =
| OrderPlaced
| OrderShipped
| OrderDelivered
| OrderCancelled;

Commands express what someone wants to do. They can be rejected.

// Commands - What someone wants to do
export type PlaceOrder = Command<
'order.place',
{
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
}
>;
export type ShipOrder = Command<
'order.ship',
{
orderId: string;
trackingNumber: string;
carrier: string;
}
>;
export type DeliverOrder = Command<
'order.deliver',
{
orderId: string;
signedBy?: string;
}
>;
export type CancelOrder = Command<
'order.cancel',
{
orderId: string;
reason: string;
}
>;
export type OrderCommands =
| PlaceOrder
| ShipOrder
| DeliverOrder
| CancelOrder;

State is derived from events. It’s what you need to make decisions.

// State - Current state of an order
export type OrderStatus = 'pending' | 'shipped' | 'delivered' | 'cancelled';
export interface OrderState {
orderId?: string;
customerId?: string;
items: Array<{ productId: string; quantity: number; price: number }>;
totalAmount: number;
status: OrderStatus | null;
trackingNumber?: string;
carrier?: string;
placedAt?: string;
shippedAt?: string;
deliveredAt?: string;
cancelledAt?: string;
}

The Decider pattern separates business logic into pure functions.

// Decider - Business logic as pure functions
export const decide = (
command: OrderCommands,
state: OrderState
): OrderEvents => {
switch (command.type) {
case 'order.place':
return handlePlaceOrder(command, state);
case 'order.ship':
return handleShipOrder(command, state);
case 'order.deliver':
return handleDeliverOrder(command, state);
case 'order.cancel':
return handleCancelOrder(command, state);
}
};
const handlePlaceOrder = (
command: PlaceOrder,
state: OrderState
): OrderPlaced => {
if (state.status !== null) {
throw new IllegalStateError(
`Order ${command.data.orderId} already exists`,
{ orderId: command.data.orderId },
state.status,
'place order'
);
}
if (command.data.items.length === 0) {
throw new ValidationError('Order must have at least one item', {});
}
const totalAmount = command.data.items.reduce(
(sum, item) => sum + item.price * item.quantity,
0
);
return {
type: 'order.placed',
data: {
orderId: command.data.orderId,
customerId: command.data.customerId,
items: command.data.items,
totalAmount,
placedAt: new Date().toISOString(),
},
};
};
const handleShipOrder = (
command: ShipOrder,
state: OrderState
): OrderShipped => {
if (state.status !== 'pending') {
throw new IllegalStateError(
`Cannot ship order in status: ${state.status}`,
{ orderId: command.data.orderId, status: state.status },
state.status ?? 'null',
'ship order'
);
}
return {
type: 'order.shipped',
data: {
orderId: command.data.orderId,
trackingNumber: command.data.trackingNumber,
carrier: command.data.carrier,
shippedAt: new Date().toISOString(),
},
};
};
const handleDeliverOrder = (
command: DeliverOrder,
state: OrderState
): OrderDelivered => {
if (state.status !== 'shipped') {
throw new IllegalStateError(
`Cannot deliver order in status: ${state.status}`,
{ orderId: command.data.orderId, status: state.status },
state.status ?? 'null',
'deliver order'
);
}
return {
type: 'order.delivered',
data: {
orderId: command.data.orderId,
deliveredAt: new Date().toISOString(),
signedBy: command.data.signedBy,
},
};
};
const handleCancelOrder = (
command: CancelOrder,
state: OrderState
): OrderCancelled => {
if (state.status === 'delivered') {
throw new IllegalStateError(
'Cannot cancel a delivered order',
{ orderId: command.data.orderId },
'delivered',
'cancel order'
);
}
if (state.status === 'cancelled') {
throw new IllegalStateError(
'Order is already cancelled',
{ orderId: command.data.orderId },
'cancelled',
'cancel order'
);
}
if (state.status === null) {
throw new IllegalStateError(
'Order does not exist',
{ orderId: command.data.orderId },
'null',
'cancel order'
);
}
return {
type: 'order.cancelled',
data: {
orderId: command.data.orderId,
reason: command.data.reason,
cancelledAt: new Date().toISOString(),
},
};
};
export const evolve = (
state: OrderState,
event: ReadEvent<OrderEvents>
): OrderState => {
switch (event.type) {
case 'order.placed':
return {
orderId: event.data.orderId,
customerId: event.data.customerId,
items: event.data.items,
totalAmount: event.data.totalAmount,
status: 'pending',
placedAt: event.data.placedAt,
};
case 'order.shipped':
return {
...state,
status: 'shipped',
trackingNumber: event.data.trackingNumber,
carrier: event.data.carrier,
shippedAt: event.data.shippedAt,
};
case 'order.delivered':
return {
...state,
status: 'delivered',
deliveredAt: event.data.deliveredAt,
};
case 'order.cancelled':
return {
...state,
status: 'cancelled',
cancelledAt: event.data.cancelledAt,
};
}
};
export const initialState = (): OrderState => ({
items: [],
totalAmount: 0,
status: null,
});
export const decider: Decider<OrderState, OrderCommands, OrderEvents> = {
decide,
evolve,
initialState,
};
}

Command handlers connect HTTP requests to the Decider.

src/functions/place-order.ts
import { handleCommandWithDecider } from '@delta-base/toolkit';
import { Orders } from '../core/orders';
import { eventStore } from '../shared/event-store';
export interface PlaceOrderRequest {
customerId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
}
export async function placeOrderHandler(request: PlaceOrderRequest) {
const orderId = `order-${Date.now()}-${Math.random().toString(36).slice(2)}`;
const command: Orders.PlaceOrder = {
type: 'order.place',
data: {
orderId,
customerId: request.customerId,
items: request.items,
},
};
const result = await handleCommandWithDecider(
eventStore,
orderId,
command,
Orders.decider
);
return {
orderId,
totalAmount: result.newState.totalAmount,
status: result.newState.status,
};
}
src/functions/ship-order.ts
import { handleCommandWithDecider } from '@delta-base/toolkit';
import { Orders } from '../core/orders';
import { eventStore } from '../shared/event-store';
export interface ShipOrderRequest {
trackingNumber: string;
carrier: string;
}
export async function shipOrderHandler(
orderId: string,
request: ShipOrderRequest
) {
const command: Orders.ShipOrder = {
type: 'order.ship',
data: {
orderId,
trackingNumber: request.trackingNumber,
carrier: request.carrier,
},
};
const result = await handleCommandWithDecider(
eventStore,
orderId,
command,
Orders.decider
);
return {
orderId,
status: result.newState.status,
trackingNumber: result.newState.trackingNumber,
};
}

Define your event store and webhook subscription.

deltabase.config.ts
import type { InfrastructureConfig as DeltaBaseConfig } from '@delta-base/server';
const SERVICE_URL = process.env.SERVICE_URL || 'http://localhost:3001';
const PROJECTION_TOKEN = process.env.PROJECTION_AUTH_TOKEN || 'dev-token';
const config: DeltaBaseConfig = {
eventStores: [
{
name: 'orders',
description: 'Order management system',
subscriptions: [
{
id: 'order-summary-projection',
eventFilter: [
'order.placed',
'order.shipped',
'order.delivered',
'order.cancelled',
],
subscriberType: 'webhook',
webhook: {
url: `${SERVICE_URL}/api/projections/orders/events`,
headers: {
Authorization: `Bearer ${PROJECTION_TOKEN}`,
},
retryPolicy: {
maxAttempts: 5,
backoffMinutes: 2,
},
},
},
],
},
],
};
export default config;

Deploy:

Terminal window
pnpx @delta-base/cli deploy

Projections transform events into read models.

src/projections/orders.projection.ts
import type {
IReadModelStore,
Projection,
ReadEvent,
EventTypeOf,
} from '@delta-base/toolkit';
import { Orders } from '../core/orders';
export interface OrderSummaryReadModel {
orderId: string;
customerId: string;
totalAmount: number;
itemCount: number;
status: string;
trackingNumber?: string;
carrier?: string;
placedAt: string;
shippedAt?: string;
deliveredAt?: string;
cancelledAt?: string;
_revision: number;
}
export class OrderSummaryProjection implements Projection<Orders.OrderEvents> {
private readonly prefix = 'order-summary';
readonly supportedEventTypes: EventTypeOf<Orders.OrderEvents>[] = [
'order.placed',
'order.shipped',
'order.delivered',
'order.cancelled',
];
constructor(private store: IReadModelStore) {}
async processEvents(events: ReadEvent<Orders.OrderEvents>[]): Promise<void> {
for (const event of events) {
await this.project(event);
}
}
private async project(event: ReadEvent<Orders.OrderEvents>): Promise<void> {
const key = `${this.prefix}:${event.streamId}`;
const existing = await this.store.get<OrderSummaryReadModel>(key);
// Idempotency
if (existing && existing._revision >= event.streamPosition) {
return;
}
switch (event.type) {
case 'order.placed': {
const readModel: OrderSummaryReadModel = {
orderId: event.data.orderId,
customerId: event.data.customerId,
totalAmount: event.data.totalAmount,
itemCount: event.data.items.length,
status: 'pending',
placedAt: event.data.placedAt,
_revision: event.streamPosition,
};
await this.store.put(key, readModel);
break;
}
case 'order.shipped': {
if (!existing) return;
await this.store.put(key, {
...existing,
status: 'shipped',
trackingNumber: event.data.trackingNumber,
carrier: event.data.carrier,
shippedAt: event.data.shippedAt,
_revision: event.streamPosition,
});
break;
}
case 'order.delivered': {
if (!existing) return;
await this.store.put(key, {
...existing,
status: 'delivered',
deliveredAt: event.data.deliveredAt,
_revision: event.streamPosition,
});
break;
}
case 'order.cancelled': {
if (!existing) return;
await this.store.put(key, {
...existing,
status: 'cancelled',
cancelledAt: event.data.cancelledAt,
_revision: event.streamPosition,
});
break;
}
}
}
}

Receive events from DeltaBase and process them.

src/projections/orders.route.ts
import { Hono } from 'hono';
import type { IReadModelStore } from '@delta-base/toolkit';
import { OrderSummaryProjection } from './orders.projection';
export function createOrdersProjectionRoute(store: IReadModelStore) {
const app = new Hono();
app.post('/events', async (c) => {
// Authenticate
const auth = c.req.header('Authorization');
if (auth !== `Bearer ${process.env.PROJECTION_AUTH_TOKEN || 'dev-token'}`) {
return c.json({ error: 'Unauthorized' }, 401);
}
// Process events
const { events } = await c.req.json();
const projection = new OrderSummaryProjection(store);
await projection.processEvents(events);
return c.json({ success: true, processed: events.length });
});
return app;
}

Fast queries from the projected read model.

src/routes/orders.ts
import { Hono } from 'hono';
import type { IReadModelStore } from '@delta-base/toolkit';
import type { OrderSummaryReadModel } from '../projections/orders.projection';
export function createOrdersQueryRoute(store: IReadModelStore) {
const app = new Hono();
// Get single order
app.get('/:orderId', async (c) => {
const orderId = c.req.param('orderId');
const order = await store.get<OrderSummaryReadModel>(
`order-summary:${orderId}`
);
if (!order) {
return c.json({ error: 'Order not found' }, 404);
}
return c.json(order);
});
// List orders (with optional status filter)
app.get('/', async (c) => {
const status = c.req.query('status');
const orders = await store.list<OrderSummaryReadModel>('order-summary:');
const filtered = status
? orders.filter((o) => o.status === status)
: orders;
return c.json(filtered);
});
// Get orders by customer
app.get('/customer/:customerId', async (c) => {
const customerId = c.req.param('customerId');
const allOrders = await store.list<OrderSummaryReadModel>('order-summary:');
const customerOrders = allOrders.filter((o) => o.customerId === customerId);
return c.json(customerOrders);
});
return app;
}

src/index.ts
import { Hono } from 'hono';
import { serve } from '@hono/node-server';
import { InMemoryReadModelStore } from '@delta-base/toolkit';
import { placeOrderHandler } from './functions/place-order';
import { shipOrderHandler } from './functions/ship-order';
import { createOrdersProjectionRoute } from './projections/orders.route';
import { createOrdersQueryRoute } from './routes/orders';
const app = new Hono();
const store = new InMemoryReadModelStore();
// Command endpoints (write side)
app.post('/orders', async (c) => {
const body = await c.req.json();
const result = await placeOrderHandler(body);
return c.json(result, 201);
});
app.post('/orders/:orderId/ship', async (c) => {
const orderId = c.req.param('orderId');
const body = await c.req.json();
const result = await shipOrderHandler(orderId, body);
return c.json(result);
});
// Query endpoints (read side)
app.route('/orders', createOrdersQueryRoute(store));
// Projection webhook
app.route('/api/projections/orders', createOrdersProjectionRoute(store));
serve({ fetch: app.fetch, port: 3001 });

1. Client sends command
POST /orders { customerId, items }
2. Command handler validates and processes
placeOrderHandler() → handleCommandWithDecider()
3. Decider applies business rules
decide(command, state) → event
4. Event stored in DeltaBase
appendToStream('order-123', [event])
5. Webhook triggered
POST /api/projections/orders/events { events }
6. Projection updates read model
OrderSummaryProjection.processEvents()
7. Client queries read model
GET /orders/order-123 → { status: 'pending', ... }

Because decide and evolve are pure functions, they’re easy to test:

import { describe, it, expect } from 'vitest';
import { Orders } from '../core/orders';
describe('Orders.decide', () => {
it('places an order', () => {
const command: Orders.PlaceOrder = {
type: 'order.place',
data: {
orderId: 'order-1',
customerId: 'cust-1',
items: [{ productId: 'prod-1', quantity: 2, price: 10 }],
},
};
const event = Orders.decide(command, Orders.initialState());
expect(event.type).toBe('order.placed');
expect(event.data.totalAmount).toBe(20);
});
it('rejects shipping a non-existent order', () => {
const command: Orders.ShipOrder = {
type: 'order.ship',
data: {
orderId: 'order-1',
trackingNumber: 'TRK123',
carrier: 'UPS',
},
};
expect(() => {
Orders.decide(command, Orders.initialState());
}).toThrow('Cannot ship order');
});
});

One command = one intent. Don’t combine operations.

// Good: Separate commands
type ShipOrder = Command<'order.ship', { ... }>;
type AddTrackingNumber = Command<'order.add-tracking', { ... }>;
// Avoid: Combined commands
type ShipAndTrack = Command<'order.ship-and-track', { ... }>;

Include all data needed to reconstruct state:

// Good: Complete event
type OrderPlaced = Event<'order.placed', {
orderId: string;
customerId: string;
items: Item[];
totalAmount: number; // Calculated and stored
placedAt: string;
}>;
// Avoid: Incomplete event (requires lookup)
type OrderPlaced = Event<'order.placed', {
orderId: string;
customerId: string;
itemIds: string[]; // Requires separate lookup
}>;

Design projections so you can replay all events and get the same result:

// Delete all read models and replay
await store.delete('order-summary:*');
const allEvents = await eventStore.readAll();
await projection.processEvents(allEvents);

4. Use Separate Projections for Different Queries

Section titled “4. Use Separate Projections for Different Queries”

Don’t overload a single read model:

// Good: Purpose-built projections
class OrderSummaryProjection { ... } // For order details
class CustomerOrdersProjection { ... } // For customer order history
class OrderAnalyticsProjection { ... } // For reporting
// Avoid: One projection trying to do everything
class EverythingProjection { ... }