CQRS Implementation Walkthrough
Connect commands, events, projections, and queries into a working system
This guide shows how to implement CQRS (Command Query Responsibility Segregation) with DeltaBase. You’ll connect all the pieces into a complete, production-ready architecture.
The Big Picture
Section titled “The Big Picture”┌─────────────────────────────────────────────────────────────────────────┐│ CQRS Architecture ││ ││ WRITE SIDE READ SIDE ││ ┌──────────┐ ┌─────────┐ ┌───────────────┐ ││ │ Command │───▶│ Decider │ │ Read Model │ ││ │ (Intent) │ │ (Logic) │ │ (Optimized) │ ││ └──────────┘ └────┬────┘ └───────▲───────┘ ││ │ │ ││ ▼ │ ││ ┌─────────────────┐ │ ││ │ Event Store │────Webhook────▶│ ││ │ (DeltaBase) │ │ ││ └─────────────────┘ │ ││ │ ││ ┌──────────┐ ┌───────┴───────┐ ││ │ Query │◀─────────────────────────│ Projection │ ││ │ (Fast) │ │ (Transform) │ ││ └──────────┘ └───────────────┘ │└─────────────────────────────────────────────────────────────────────────┘Key insight: Commands and queries have different requirements. CQRS separates them so each can be optimized independently.
Step 1: Define Your Domain
Section titled “Step 1: Define Your Domain”Start with events, commands, and state. This is your domain model.
Events (Facts)
Section titled “Events (Facts)”Events describe things that happened. They’re immutable and in past tense.
import type { Event, Command, Decider, ReadEvent } from '@delta-base/toolkit';import { IllegalStateError, ValidationError } from '@delta-base/toolkit';
export namespace Orders { // Events - What happened export type OrderPlaced = Event< 'order.placed', { orderId: string; customerId: string; items: Array<{ productId: string; quantity: number; price: number }>; totalAmount: number; placedAt: string; } >;
export type OrderShipped = Event< 'order.shipped', { orderId: string; trackingNumber: string; carrier: string; shippedAt: string; } >;
export type OrderDelivered = Event< 'order.delivered', { orderId: string; deliveredAt: string; signedBy?: string; } >;
export type OrderCancelled = Event< 'order.cancelled', { orderId: string; reason: string; cancelledAt: string; } >;
export type OrderEvents = | OrderPlaced | OrderShipped | OrderDelivered | OrderCancelled;Commands (Intent)
Section titled “Commands (Intent)”Commands express what someone wants to do. They can be rejected.
// Commands - What someone wants to do export type PlaceOrder = Command< 'order.place', { orderId: string; customerId: string; items: Array<{ productId: string; quantity: number; price: number }>; } >;
export type ShipOrder = Command< 'order.ship', { orderId: string; trackingNumber: string; carrier: string; } >;
export type DeliverOrder = Command< 'order.deliver', { orderId: string; signedBy?: string; } >;
export type CancelOrder = Command< 'order.cancel', { orderId: string; reason: string; } >;
export type OrderCommands = | PlaceOrder | ShipOrder | DeliverOrder | CancelOrder;State (Aggregate)
Section titled “State (Aggregate)”State is derived from events. It’s what you need to make decisions.
// State - Current state of an order export type OrderStatus = 'pending' | 'shipped' | 'delivered' | 'cancelled';
export interface OrderState { orderId?: string; customerId?: string; items: Array<{ productId: string; quantity: number; price: number }>; totalAmount: number; status: OrderStatus | null; trackingNumber?: string; carrier?: string; placedAt?: string; shippedAt?: string; deliveredAt?: string; cancelledAt?: string; }Step 2: Implement the Decider
Section titled “Step 2: Implement the Decider”The Decider pattern separates business logic into pure functions.
// Decider - Business logic as pure functions
export const decide = ( command: OrderCommands, state: OrderState ): OrderEvents => { switch (command.type) { case 'order.place': return handlePlaceOrder(command, state); case 'order.ship': return handleShipOrder(command, state); case 'order.deliver': return handleDeliverOrder(command, state); case 'order.cancel': return handleCancelOrder(command, state); } };
const handlePlaceOrder = ( command: PlaceOrder, state: OrderState ): OrderPlaced => { if (state.status !== null) { throw new IllegalStateError( `Order ${command.data.orderId} already exists`, { orderId: command.data.orderId }, state.status, 'place order' ); }
if (command.data.items.length === 0) { throw new ValidationError('Order must have at least one item', {}); }
const totalAmount = command.data.items.reduce( (sum, item) => sum + item.price * item.quantity, 0 );
return { type: 'order.placed', data: { orderId: command.data.orderId, customerId: command.data.customerId, items: command.data.items, totalAmount, placedAt: new Date().toISOString(), }, }; };
const handleShipOrder = ( command: ShipOrder, state: OrderState ): OrderShipped => { if (state.status !== 'pending') { throw new IllegalStateError( `Cannot ship order in status: ${state.status}`, { orderId: command.data.orderId, status: state.status }, state.status ?? 'null', 'ship order' ); }
return { type: 'order.shipped', data: { orderId: command.data.orderId, trackingNumber: command.data.trackingNumber, carrier: command.data.carrier, shippedAt: new Date().toISOString(), }, }; };
const handleDeliverOrder = ( command: DeliverOrder, state: OrderState ): OrderDelivered => { if (state.status !== 'shipped') { throw new IllegalStateError( `Cannot deliver order in status: ${state.status}`, { orderId: command.data.orderId, status: state.status }, state.status ?? 'null', 'deliver order' ); }
return { type: 'order.delivered', data: { orderId: command.data.orderId, deliveredAt: new Date().toISOString(), signedBy: command.data.signedBy, }, }; };
const handleCancelOrder = ( command: CancelOrder, state: OrderState ): OrderCancelled => { if (state.status === 'delivered') { throw new IllegalStateError( 'Cannot cancel a delivered order', { orderId: command.data.orderId }, 'delivered', 'cancel order' ); }
if (state.status === 'cancelled') { throw new IllegalStateError( 'Order is already cancelled', { orderId: command.data.orderId }, 'cancelled', 'cancel order' ); }
if (state.status === null) { throw new IllegalStateError( 'Order does not exist', { orderId: command.data.orderId }, 'null', 'cancel order' ); }
return { type: 'order.cancelled', data: { orderId: command.data.orderId, reason: command.data.reason, cancelledAt: new Date().toISOString(), }, }; };
export const evolve = ( state: OrderState, event: ReadEvent<OrderEvents> ): OrderState => { switch (event.type) { case 'order.placed': return { orderId: event.data.orderId, customerId: event.data.customerId, items: event.data.items, totalAmount: event.data.totalAmount, status: 'pending', placedAt: event.data.placedAt, };
case 'order.shipped': return { ...state, status: 'shipped', trackingNumber: event.data.trackingNumber, carrier: event.data.carrier, shippedAt: event.data.shippedAt, };
case 'order.delivered': return { ...state, status: 'delivered', deliveredAt: event.data.deliveredAt, };
case 'order.cancelled': return { ...state, status: 'cancelled', cancelledAt: event.data.cancelledAt, }; } };
export const initialState = (): OrderState => ({ items: [], totalAmount: 0, status: null, });
export const decider: Decider<OrderState, OrderCommands, OrderEvents> = { decide, evolve, initialState, };}Step 3: Handle Commands
Section titled “Step 3: Handle Commands”Command handlers connect HTTP requests to the Decider.
import { handleCommandWithDecider } from '@delta-base/toolkit';import { Orders } from '../core/orders';import { eventStore } from '../shared/event-store';
export interface PlaceOrderRequest { customerId: string; items: Array<{ productId: string; quantity: number; price: number }>;}
export async function placeOrderHandler(request: PlaceOrderRequest) { const orderId = `order-${Date.now()}-${Math.random().toString(36).slice(2)}`;
const command: Orders.PlaceOrder = { type: 'order.place', data: { orderId, customerId: request.customerId, items: request.items, }, };
const result = await handleCommandWithDecider( eventStore, orderId, command, Orders.decider );
return { orderId, totalAmount: result.newState.totalAmount, status: result.newState.status, };}import { handleCommandWithDecider } from '@delta-base/toolkit';import { Orders } from '../core/orders';import { eventStore } from '../shared/event-store';
export interface ShipOrderRequest { trackingNumber: string; carrier: string;}
export async function shipOrderHandler( orderId: string, request: ShipOrderRequest) { const command: Orders.ShipOrder = { type: 'order.ship', data: { orderId, trackingNumber: request.trackingNumber, carrier: request.carrier, }, };
const result = await handleCommandWithDecider( eventStore, orderId, command, Orders.decider );
return { orderId, status: result.newState.status, trackingNumber: result.newState.trackingNumber, };}Step 4: Configure Infrastructure
Section titled “Step 4: Configure Infrastructure”Define your event store and webhook subscription.
import type { InfrastructureConfig as DeltaBaseConfig } from '@delta-base/server';
const SERVICE_URL = process.env.SERVICE_URL || 'http://localhost:3001';const PROJECTION_TOKEN = process.env.PROJECTION_AUTH_TOKEN || 'dev-token';
const config: DeltaBaseConfig = { eventStores: [ { name: 'orders', description: 'Order management system', subscriptions: [ { id: 'order-summary-projection', eventFilter: [ 'order.placed', 'order.shipped', 'order.delivered', 'order.cancelled', ], subscriberType: 'webhook', webhook: { url: `${SERVICE_URL}/api/projections/orders/events`, headers: { Authorization: `Bearer ${PROJECTION_TOKEN}`, }, retryPolicy: { maxAttempts: 5, backoffMinutes: 2, }, }, }, ], }, ],};
export default config;Deploy:
pnpx @delta-base/cli deployStep 5: Build the Projection
Section titled “Step 5: Build the Projection”Projections transform events into read models.
import type { IReadModelStore, Projection, ReadEvent, EventTypeOf,} from '@delta-base/toolkit';import { Orders } from '../core/orders';
export interface OrderSummaryReadModel { orderId: string; customerId: string; totalAmount: number; itemCount: number; status: string; trackingNumber?: string; carrier?: string; placedAt: string; shippedAt?: string; deliveredAt?: string; cancelledAt?: string; _revision: number;}
export class OrderSummaryProjection implements Projection<Orders.OrderEvents> { private readonly prefix = 'order-summary';
readonly supportedEventTypes: EventTypeOf<Orders.OrderEvents>[] = [ 'order.placed', 'order.shipped', 'order.delivered', 'order.cancelled', ];
constructor(private store: IReadModelStore) {}
async processEvents(events: ReadEvent<Orders.OrderEvents>[]): Promise<void> { for (const event of events) { await this.project(event); } }
private async project(event: ReadEvent<Orders.OrderEvents>): Promise<void> { const key = `${this.prefix}:${event.streamId}`; const existing = await this.store.get<OrderSummaryReadModel>(key);
// Idempotency if (existing && existing._revision >= event.streamPosition) { return; }
switch (event.type) { case 'order.placed': { const readModel: OrderSummaryReadModel = { orderId: event.data.orderId, customerId: event.data.customerId, totalAmount: event.data.totalAmount, itemCount: event.data.items.length, status: 'pending', placedAt: event.data.placedAt, _revision: event.streamPosition, }; await this.store.put(key, readModel); break; }
case 'order.shipped': { if (!existing) return; await this.store.put(key, { ...existing, status: 'shipped', trackingNumber: event.data.trackingNumber, carrier: event.data.carrier, shippedAt: event.data.shippedAt, _revision: event.streamPosition, }); break; }
case 'order.delivered': { if (!existing) return; await this.store.put(key, { ...existing, status: 'delivered', deliveredAt: event.data.deliveredAt, _revision: event.streamPosition, }); break; }
case 'order.cancelled': { if (!existing) return; await this.store.put(key, { ...existing, status: 'cancelled', cancelledAt: event.data.cancelledAt, _revision: event.streamPosition, }); break; } } }}Step 6: Create Webhook Handler
Section titled “Step 6: Create Webhook Handler”Receive events from DeltaBase and process them.
import { Hono } from 'hono';import type { IReadModelStore } from '@delta-base/toolkit';import { OrderSummaryProjection } from './orders.projection';
export function createOrdersProjectionRoute(store: IReadModelStore) { const app = new Hono();
app.post('/events', async (c) => { // Authenticate const auth = c.req.header('Authorization'); if (auth !== `Bearer ${process.env.PROJECTION_AUTH_TOKEN || 'dev-token'}`) { return c.json({ error: 'Unauthorized' }, 401); }
// Process events const { events } = await c.req.json(); const projection = new OrderSummaryProjection(store); await projection.processEvents(events);
return c.json({ success: true, processed: events.length }); });
return app;}Step 7: Query Read Models
Section titled “Step 7: Query Read Models”Fast queries from the projected read model.
import { Hono } from 'hono';import type { IReadModelStore } from '@delta-base/toolkit';import type { OrderSummaryReadModel } from '../projections/orders.projection';
export function createOrdersQueryRoute(store: IReadModelStore) { const app = new Hono();
// Get single order app.get('/:orderId', async (c) => { const orderId = c.req.param('orderId'); const order = await store.get<OrderSummaryReadModel>( `order-summary:${orderId}` );
if (!order) { return c.json({ error: 'Order not found' }, 404); }
return c.json(order); });
// List orders (with optional status filter) app.get('/', async (c) => { const status = c.req.query('status'); const orders = await store.list<OrderSummaryReadModel>('order-summary:');
const filtered = status ? orders.filter((o) => o.status === status) : orders;
return c.json(filtered); });
// Get orders by customer app.get('/customer/:customerId', async (c) => { const customerId = c.req.param('customerId'); const allOrders = await store.list<OrderSummaryReadModel>('order-summary:'); const customerOrders = allOrders.filter((o) => o.customerId === customerId);
return c.json(customerOrders); });
return app;}Putting It Together
Section titled “Putting It Together”import { Hono } from 'hono';import { serve } from '@hono/node-server';import { InMemoryReadModelStore } from '@delta-base/toolkit';
import { placeOrderHandler } from './functions/place-order';import { shipOrderHandler } from './functions/ship-order';import { createOrdersProjectionRoute } from './projections/orders.route';import { createOrdersQueryRoute } from './routes/orders';
const app = new Hono();const store = new InMemoryReadModelStore();
// Command endpoints (write side)app.post('/orders', async (c) => { const body = await c.req.json(); const result = await placeOrderHandler(body); return c.json(result, 201);});
app.post('/orders/:orderId/ship', async (c) => { const orderId = c.req.param('orderId'); const body = await c.req.json(); const result = await shipOrderHandler(orderId, body); return c.json(result);});
// Query endpoints (read side)app.route('/orders', createOrdersQueryRoute(store));
// Projection webhookapp.route('/api/projections/orders', createOrdersProjectionRoute(store));
serve({ fetch: app.fetch, port: 3001 });The Complete Flow
Section titled “The Complete Flow”1. Client sends command POST /orders { customerId, items } │ ▼2. Command handler validates and processes placeOrderHandler() → handleCommandWithDecider() │ ▼3. Decider applies business rules decide(command, state) → event │ ▼4. Event stored in DeltaBase appendToStream('order-123', [event]) │ ▼5. Webhook triggered POST /api/projections/orders/events { events } │ ▼6. Projection updates read model OrderSummaryProjection.processEvents() │ ▼7. Client queries read model GET /orders/order-123 → { status: 'pending', ... }Testing the Decider
Section titled “Testing the Decider”Because decide and evolve are pure functions, they’re easy to test:
import { describe, it, expect } from 'vitest';import { Orders } from '../core/orders';
describe('Orders.decide', () => { it('places an order', () => { const command: Orders.PlaceOrder = { type: 'order.place', data: { orderId: 'order-1', customerId: 'cust-1', items: [{ productId: 'prod-1', quantity: 2, price: 10 }], }, };
const event = Orders.decide(command, Orders.initialState());
expect(event.type).toBe('order.placed'); expect(event.data.totalAmount).toBe(20); });
it('rejects shipping a non-existent order', () => { const command: Orders.ShipOrder = { type: 'order.ship', data: { orderId: 'order-1', trackingNumber: 'TRK123', carrier: 'UPS', }, };
expect(() => { Orders.decide(command, Orders.initialState()); }).toThrow('Cannot ship order'); });});Best Practices
Section titled “Best Practices”1. Keep Commands Focused
Section titled “1. Keep Commands Focused”One command = one intent. Don’t combine operations.
// Good: Separate commandstype ShipOrder = Command<'order.ship', { ... }>;type AddTrackingNumber = Command<'order.add-tracking', { ... }>;
// Avoid: Combined commandstype ShipAndTrack = Command<'order.ship-and-track', { ... }>;2. Events Should Be Self-Contained
Section titled “2. Events Should Be Self-Contained”Include all data needed to reconstruct state:
// Good: Complete eventtype OrderPlaced = Event<'order.placed', { orderId: string; customerId: string; items: Item[]; totalAmount: number; // Calculated and stored placedAt: string;}>;
// Avoid: Incomplete event (requires lookup)type OrderPlaced = Event<'order.placed', { orderId: string; customerId: string; itemIds: string[]; // Requires separate lookup}>;3. Projections Should Be Rebuildable
Section titled “3. Projections Should Be Rebuildable”Design projections so you can replay all events and get the same result:
// Delete all read models and replayawait store.delete('order-summary:*');const allEvents = await eventStore.readAll();await projection.processEvents(allEvents);4. Use Separate Projections for Different Queries
Section titled “4. Use Separate Projections for Different Queries”Don’t overload a single read model:
// Good: Purpose-built projectionsclass OrderSummaryProjection { ... } // For order detailsclass CustomerOrdersProjection { ... } // For customer order historyclass OrderAnalyticsProjection { ... } // For reporting
// Avoid: One projection trying to do everythingclass EverythingProjection { ... }What’s Next?
Section titled “What’s Next?”- Projections - Deep dive into read models
- Real-Time Events - Add WebSocket updates
- Banking System Tutorial - Complete working example