Subscriptions & Event Streaming
React to events in real-time
Event sourcing systems need to do more than store events. They need to react to them. Subscriptions are the mechanism for receiving events as they happen.
The Problem
Section titled “The Problem”When you append an event to a stream, you often need to:
- Update read models (projections) for fast queries
- Send notifications (emails, push notifications, Slack)
- Trigger workflows (payment processing, order fulfillment)
- Integrate with external services (analytics, CRM, third-party APIs)
Without subscriptions, you’d have to constantly poll the event store: “Any new events? Any new events?” That’s wasteful and introduces latency.
Subscriptions flip this around: the event store pushes events to you as they happen.
Two Delivery Models
Section titled “Two Delivery Models”Push (Recommended)
Section titled “Push (Recommended)”The event store sends events to subscribers when they’re appended:
┌─────────────┐ append ┌─────────────┐│ Client │────────────────▶│ Event Store │└─────────────┘ └──────┬──────┘ │ │ push ▼ ┌─────────────────────────┐ │ Subscriber(s) │ │ ┌─────────┐ ┌─────────┐ │ │ │Webhook A│ │Webhook B│ │ │ └─────────┘ └─────────┘ │ └─────────────────────────┘Pros: Low latency, efficient
Cons: Requires handling retries, ordering guarantees vary
Pull (Polling)
Section titled “Pull (Polling)”Subscribers periodically ask for new events:
┌─────────────────┐ ┌─────────────┐│ Subscriber │────────▶│ Event Store ││ │ poll │ ││ "events since │◀────────│ ││ position 42?" │ events │ │└─────────────────┘ └─────────────┘Pros: Simple to implement, subscriber controls pace
Cons: Higher latency, wastes resources polling for no changes
Most production systems use push-based delivery with pull as a fallback for catch-up.
Delivery Mechanisms
Section titled “Delivery Mechanisms”Webhooks (HTTP Push)
Section titled “Webhooks (HTTP Push)”The event store sends a POST request to your endpoint:
// Your webhook endpoint receives eventsapp.post('/webhooks/events', async (req, res) => { const { events } = req.body;
for (const event of events) { switch (event.type) { case 'order.placed': await updateOrderReadModel(event); break; case 'payment.received': await sendConfirmationEmail(event); break; } }
res.status(200).json({ ok: true });});Best for: Server-side processing, projections, integrations
Latency: Seconds (HTTP round-trip)
WebSockets (Persistent Connection)
Section titled “WebSockets (Persistent Connection)”Maintain an open connection for instant delivery:
const ws = new WebSocket('wss://eventstore.example.com/subscribe');
ws.onopen = () => { ws.send(JSON.stringify({ action: 'subscribe', streams: ['orders-*'], fromPosition: 'latest', }));};
ws.onmessage = (message) => { const event = JSON.parse(message.data); console.log('New event:', event); // Update UI, show notification, etc.};Best for: Browser clients, real-time dashboards, live collaboration
Latency: Milliseconds
Message Queues
Section titled “Message Queues”Events are written to a queue (Kafka, SQS, RabbitMQ) for async processing:
Event Store ──▶ Message Queue ──▶ Consumer(s)Best for: High throughput, guaranteed delivery, complex routing
Latency: Variable (depends on consumer lag)
Event Filtering
Section titled “Event Filtering”Subscribers usually don’t want every event. Filtering lets you specify which events to receive.
By Event Type
Section titled “By Event Type”// Only user eventssubscribe({ filter: 'user.*' });
// Only creation eventssubscribe({ filter: '*.created' });
// Specific eventssubscribe({ filter: ['order.placed', 'order.shipped'] });By Stream
Section titled “By Stream”// All events from a specific streamsubscribe({ streamId: 'order-123' });
// All events from streams matching a patternsubscribe({ streamPrefix: 'order-' });By Position
Section titled “By Position”// From the beginning (replay all events)subscribe({ fromPosition: 0 });
// From a specific position (catch up)subscribe({ fromPosition: 4521 });
// Only new eventssubscribe({ fromPosition: 'latest' });Delivery Guarantees
Section titled “Delivery Guarantees”Different systems offer different guarantees:
At-Most-Once
Section titled “At-Most-Once”Events may be lost, but never duplicated.
Event Store ──▶ Subscriber │ (might not arrive)Use case: Metrics, analytics where losing some data is acceptable
At-Least-Once
Section titled “At-Least-Once”Events always arrive, but may be duplicated.
Event Store ──▶ Subscriber ──▶ (ack) │ │ └────▶ (retry) ◀──────────┘ (if no ack)Use case: Most applications - handle duplicates with idempotency
Exactly-Once
Section titled “Exactly-Once”Events arrive exactly once (usually via deduplication).
Event Store ──▶ Subscriber ──▶ Dedup Store │ (check if seen)Use case: Financial transactions, critical workflows
Handling Idempotency
Section titled “Handling Idempotency”With at-least-once delivery (the most common), you’ll receive duplicates. Design for it:
async function handleEvent(event) { // Check if we've already processed this event const processed = await db.get(`processed:${event.eventId}`); if (processed) { return; // Already handled, skip }
// Process the event await updateReadModel(event);
// Mark as processed await db.set(`processed:${event.eventId}`, true);}Or use a revision/version field in your read model:
async function handleEvent(event) { const current = await db.get(event.streamId);
// Skip if we've already processed this or a later event if (current && current._revision >= event.position) { return; }
// Update with new revision await db.put(event.streamId, { ...buildReadModel(event), _revision: event.position, });}Ordering Guarantees
Section titled “Ordering Guarantees”Per-Stream Ordering
Section titled “Per-Stream Ordering”Events within a stream are delivered in order:
Stream: order-123 Position 0: order.placed ──▶ delivered first Position 1: payment.received ──▶ delivered second Position 2: order.shipped ──▶ delivered thirdThis is the most common guarantee and usually sufficient.
Global Ordering
Section titled “Global Ordering”Events across all streams are delivered in global order:
Global position 100: order-123/order.placedGlobal position 101: user-456/user.profile-updatedGlobal position 102: order-123/payment.receivedHarder to achieve at scale, usually requires single-writer architecture.
No Ordering
Section titled “No Ordering”Events may arrive in any order. Rarely useful but sometimes acceptable for independent events.
Catch-Up Subscriptions
Section titled “Catch-Up Subscriptions”When a subscriber starts (or restarts), it needs to catch up on missed events:
class EventSubscriber { private lastProcessedPosition: number;
async start() { // Load last position from storage this.lastProcessedPosition = await this.loadCheckpoint();
// Catch up on missed events const missedEvents = await eventStore.readFrom(this.lastProcessedPosition); for (const event of missedEvents) { await this.handleEvent(event); }
// Now subscribe to live events await this.subscribeLive(); }
async handleEvent(event) { await this.processEvent(event); this.lastProcessedPosition = event.position; await this.saveCheckpoint(this.lastProcessedPosition); }}Backpressure
Section titled “Backpressure”What happens when events arrive faster than you can process them?
Buffering
Section titled “Buffering”Queue events in memory, process when ready.
const buffer = [];
ws.onmessage = (msg) => buffer.push(JSON.parse(msg.data));
async function processLoop() { while (true) { if (buffer.length > 0) { const event = buffer.shift(); await processEvent(event); } await sleep(10); }}Risk: Memory exhaustion if buffer grows unbounded
Dropping
Section titled “Dropping”Discard events when overloaded (for non-critical subscriptions).
Throttling
Section titled “Throttling”Ask the source to slow down (if supported).
Subscriptions with DeltaBase
Section titled “Subscriptions with DeltaBase”DeltaBase provides both webhook and WebSocket delivery out of the box.
Webhook Subscriptions
Section titled “Webhook Subscriptions”Configure in deltabase.config.ts:
const config = { eventStores: [{ name: 'my-service', subscriptions: [{ id: 'orders-projection', eventFilter: ['order.placed', 'order.shipped', 'order.cancelled'], subscriberType: 'webhook', webhook: { url: 'https://my-app.com/api/webhooks/orders', headers: { Authorization: 'Bearer your-secret-token', }, retryPolicy: { maxAttempts: 5, backoffMinutes: 2, }, }, }], }],};DeltaBase handles:
- Retry on failure (with exponential backoff)
- Batching events for efficiency
- Authentication via headers
WebSocket Subscriptions
Section titled “WebSocket Subscriptions”Connect from browser or server:
const ws = new WebSocket( 'wss://api.delta-base.com/event-stores/my-events/events/ws?apiKey=your-key');
ws.onopen = () => { ws.send(JSON.stringify({ action: 'subscribe', eventFilter: 'order.*', position: 'latest', }));};
ws.onmessage = (message) => { const data = JSON.parse(message.data); if (data.type === 'event') { updateDashboard(data.event); }};SDK Subscription
Section titled “SDK Subscription”Create subscriptions programmatically:
import { DeltaBase } from '@delta-base/server';
const deltabase = new DeltaBase({ apiKey: 'your-api-key', baseUrl: 'https://api.delta-base.com',});
const eventBus = deltabase.getEventBus('my-events');
await eventBus.subscribeWebhook({ id: 'my-webhook', eventFilter: 'user.*', webhook: { url: 'https://my-app.com/webhooks/events', headers: { Authorization: 'Bearer secret' }, },});What’s Next?
Section titled “What’s Next?”- Projections - Build read models from events
- Real-Time Events - Implement WebSocket clients
- Infrastructure as Code - Deploy subscriptions with CLI