Projections & Read Models
Transform events into query-optimized views
Projections transform events into read models - denormalized, query-optimized views of your data. They bridge the gap between how you store data (events) and how you query it (fast lookups).
The Problem Projections Solve
Section titled “The Problem Projections Solve”Event stores are optimized for appending events. They’re not designed for queries like:
- “Find a user by email”
- “List all orders for customer X”
- “Get the top 10 products by revenue”
To answer these queries by replaying events every time would be slow. Projections solve this by maintaining pre-computed views that are optimized for your specific queries.
Events (append-only) Read Model (query-optimized)┌──────────────────┐ ┌────────────────────────┐│ user.created │ │ users_by_email ││ user.updated │──Projection─▶│ ┌────────────────────┐ ││ user.deleted │ │ │ alice@... → {...} │ │└──────────────────┘ │ │ bob@... → {...} │ │ │ └────────────────────┘ │ └────────────────────────┘ Fast O(1) lookupsHow Projections Work
Section titled “How Projections Work”A projection is just a function that processes events and updates a read model:
// Event typestype UserCreated = { type: 'user.created'; data: { userId: string; email: string; name: string; createdAt: string };};
type UserUpdated = { type: 'user.updated'; data: { userId: string; email?: string; name?: string; updatedAt: string };};
type UserEvent = UserCreated | UserUpdated;
// Read model typetype UserReadModel = { id: string; email: string; name: string; createdAt: string; updatedAt: string;};
// The projection functionfunction projectUserEvent( event: UserEvent, store: Map<string, UserReadModel>): void { switch (event.type) { case 'user.created': { store.set(event.data.userId, { id: event.data.userId, email: event.data.email, name: event.data.name, createdAt: event.data.createdAt, updatedAt: event.data.createdAt, }); break; }
case 'user.updated': { const existing = store.get(event.data.userId); if (existing) { store.set(event.data.userId, { ...existing, email: event.data.email ?? existing.email, name: event.data.name ?? existing.name, updatedAt: event.data.updatedAt, }); } break; } }}That’s it. A projection takes events and updates a read model.
Building a Complete Projection
Section titled “Building a Complete Projection”Let’s build a projection that processes a batch of events:
class UsersProjection { constructor(private store: Map<string, UserReadModel>) {}
// Process a batch of events processEvents(events: UserEvent[]): void { for (const event of events) { this.project(event); } }
private project(event: UserEvent): void { switch (event.type) { case 'user.created': this.handleUserCreated(event); break; case 'user.updated': this.handleUserUpdated(event); break; } }
private handleUserCreated(event: UserCreated): void { const readModel: UserReadModel = { id: event.data.userId, email: event.data.email, name: event.data.name, createdAt: event.data.createdAt, updatedAt: event.data.createdAt, };
this.store.set(event.data.userId, readModel); }
private handleUserUpdated(event: UserUpdated): void { const existing = this.store.get(event.data.userId); if (!existing) { console.warn(`User ${event.data.userId} not found for update`); return; }
// If email changed, update the email index if (event.data.email && event.data.email !== existing.email) { this.store.delete(`email:${existing.email}`); }
const updated: UserReadModel = { ...existing, email: event.data.email ?? existing.email, name: event.data.name ?? existing.name, updatedAt: event.data.updatedAt, };
this.store.set(event.data.userId, updated); }}Keeping Projections Up to Date
Section titled “Keeping Projections Up to Date”There are three main approaches:
1. Synchronous (Same Transaction)
Section titled “1. Synchronous (Same Transaction)”Update the read model immediately when events are appended:
async function handleCommand(command: Command) { // Append events const events = await eventStore.append(streamId, newEvents);
// Update projection in same request projection.processEvents(events);}Pros: Read model always consistent with events
Cons: Command handler is slower, coupling between write and read
2. Asynchronous (Event Handlers)
Section titled “2. Asynchronous (Event Handlers)”Subscribe to events and update projections in the background:
// Some event subscription mechanismeventBus.subscribe('user.*', async (events) => { projection.processEvents(events);});Pros: Fast command handling, decoupled
Cons: Eventual consistency (read model may lag)
3. On-Demand (Rebuild)
Section titled “3. On-Demand (Rebuild)”Rebuild the read model from scratch when needed:
async function rebuildUsersProjection() { const allEvents = await eventStore.readAll({ type: 'user.*' });
const store = new Map<string, UserReadModel>(); const projection = new UsersProjection(store);
projection.processEvents(allEvents);
return store;}Pros: Can always recreate from events, good for fixing bugs
Cons: Slow for large event sets
Handling Idempotency
Section titled “Handling Idempotency”Events may be delivered more than once. Your projection should handle duplicates gracefully.
The common pattern: track which events you’ve processed.
type UserReadModel = { id: string; email: string; name: string; _revision: number; // Track the last processed event position};
class UsersProjection { private project(event: UserEvent & { position: number }): void { const existing = this.store.get(event.data.userId);
// Skip if we've already processed this or a later event if (existing && existing._revision >= event.position) { return; }
// Process the event... }}Multiple Read Models from Same Events
Section titled “Multiple Read Models from Same Events”The power of projections: create different views for different use cases.
// Same events, different read modelstype UsersByIdReadModel = Map<string, UserReadModel>;type UsersByEmailReadModel = Map<string, UserReadModel>;type UserStatsReadModel = { totalUsers: number; activeUsers: number };
// Different projectionsclass UsersByIdProjection { /* index by ID */ }class UsersByEmailProjection { /* index by email */ }class UserStatsProjection { /* compute statistics */ }Each projection is independent and can use different storage:
- Users by ID → Key-value store for fast lookups
- Users by email → Key-value store with different index
- User stats → Simple counter, maybe in Redis
Cross-Stream Projections
Section titled “Cross-Stream Projections”Sometimes read models need data from multiple streams:
type OrderWithCustomerReadModel = { orderId: string; customerName: string; // From customer stream customerEmail: string; // From customer stream items: OrderItem[]; // From order stream total: number; status: string;};
class OrderWithCustomerProjection { processEvent(event: OrderEvent | CustomerEvent): void { if (event.type === 'order.placed') { // Create order, fetch customer data const customer = this.customerStore.get(event.data.customerId); this.store.set(event.data.orderId, { orderId: event.data.orderId, customerName: customer?.name ?? 'Unknown', customerEmail: customer?.email ?? 'Unknown', items: event.data.items, total: event.data.total, status: 'placed', }); }
if (event.type === 'customer.updated') { // Update all orders for this customer for (const order of this.getOrdersByCustomer(event.data.customerId)) { order.customerName = event.data.name ?? order.customerName; order.customerEmail = event.data.email ?? order.customerEmail; } } }}This is denormalization - storing redundant data for faster reads.
Best Practices
Section titled “Best Practices”1. Design for Rebuild
Section titled “1. Design for Rebuild”Projections should be rebuildable from scratch. This means:
- Don’t rely on external state that might change
- Make handlers idempotent
- Keep projection logic pure (same events → same read model)
2. One Projection Per Read Model
Section titled “2. One Projection Per Read Model”Keep projections focused. Don’t mix concerns:
// Good: Focused projectionsclass UsersByIdProjection { /* ... */ }class UsersByEmailProjection { /* ... */ }class UserStatsProjection { /* ... */ }
// Avoid: One projection doing everythingclass EverythingProjection { /* ... */ }3. Handle Missing State Gracefully
Section titled “3. Handle Missing State Gracefully”Events might arrive out of order or for entities that don’t exist:
private handleUserUpdated(event: UserUpdated): void { const existing = this.store.get(event.data.userId);
if (!existing) { // Log and skip, don't crash console.warn(`User ${event.data.userId} not found for update`); return; }
// Continue processing...}4. Consider Storage Requirements
Section titled “4. Consider Storage Requirements”Different storage for different needs:
| Read Model | Good Storage Options |
|---|---|
| Fast key-value lookup | Redis, DynamoDB, KV stores |
| Full-text search | Elasticsearch, Algolia |
| Analytics/aggregations | ClickHouse, BigQuery |
| Relational queries | PostgreSQL read replica |
Projections with DeltaBase
Section titled “Projections with DeltaBase”DeltaBase provides infrastructure for running projections: event subscriptions deliver events to your webhook endpoints.
Event Delivery via Webhooks
Section titled “Event Delivery via Webhooks”Configure a subscription that delivers events to your projection endpoint:
const config = { eventStores: [{ name: 'my-service', subscriptions: [{ id: 'users-projection', eventFilter: ['user.created', 'user.updated', 'user.deleted'], subscriberType: 'webhook', webhook: { url: 'https://my-service.com/api/projections/users', headers: { Authorization: 'Bearer secret' }, retryPolicy: { maxAttempts: 5, backoffMinutes: 2 }, }, }], }],};IReadModelStore Interface
Section titled “IReadModelStore Interface”DeltaBase provides a IReadModelStore abstraction for read model storage:
import type { IReadModelStore } from '@delta-base/toolkit';
interface IReadModelStore { get<T>(key: string): Promise<T | null>; put<T>(key: string, value: T): Promise<void>; delete(key: string): Promise<void>; list<T>(prefix: string): Promise<T[]>;}Implementations available:
KVReadModelStore- Cloudflare Workers KVInMemoryReadModelStore- For testing
Projection Class Pattern
Section titled “Projection Class Pattern”DeltaBase projections implement a standard interface:
import type { Projection, ReadEvent, EventTypeOf, IReadModelStore } from '@delta-base/toolkit';
export class UsersProjection implements Projection<UserEvents> { readonly supportedEventTypes: EventTypeOf<UserEvents>[] = [ 'user.created', 'user.updated', 'user.deleted', ];
constructor(private store: IReadModelStore) {}
async processEvents(events: ReadEvent<UserEvents>[]): Promise<void> { for (const event of events) { // Check idempotency const existing = await this.store.get<UserReadModel>(`user:${event.streamId}`); if (existing && existing._revision >= event.streamPosition) { continue; }
// Process event await this.project(event); } }
private async project(event: ReadEvent<UserEvents>): Promise<void> { // Handle each event type... }}Webhook Handler
Section titled “Webhook Handler”Receive events in your HTTP endpoint:
app.post('/api/projections/users', async (c) => { // Verify request const authHeader = c.req.header('Authorization'); if (authHeader !== `Bearer ${process.env.PROJECTION_TOKEN}`) { return c.json({ error: 'Unauthorized' }, 401); }
// Process events const { events } = await c.req.json(); const projection = new UsersProjection(c.get('readModelStore')); await projection.processEvents(events);
return c.json({ success: true });});What’s Next?
Section titled “What’s Next?”- Subscriptions & Streaming - Configure event delivery
- CQRS Implementation - End-to-end walkthrough
- Infrastructure as Code - Deploy projections with CLI