Skip to content

Subscriptions & Event Streaming

React to events in real-time

Event sourcing systems need to do more than store events. They need to react to them. Subscriptions are the mechanism for receiving events as they happen.

When you append an event to a stream, you often need to:

  • Update read models (projections) for fast queries
  • Send notifications (emails, push notifications, Slack)
  • Trigger workflows (payment processing, order fulfillment)
  • Integrate with external services (analytics, CRM, third-party APIs)

Without subscriptions, you’d have to constantly poll the event store: “Any new events? Any new events?” That’s wasteful and introduces latency.

Subscriptions flip this around: the event store pushes events to you as they happen.

The event store sends events to subscribers when they’re appended:

┌─────────────┐ append ┌─────────────┐
│ Client │────────────────▶│ Event Store │
└─────────────┘ └──────┬──────┘
│ push
┌─────────────────────────┐
│ Subscriber(s) │
│ ┌─────────┐ ┌─────────┐ │
│ │Webhook A│ │Webhook B│ │
│ └─────────┘ └─────────┘ │
└─────────────────────────┘

Pros: Low latency, efficient
Cons: Requires handling retries, ordering guarantees vary

Subscribers periodically ask for new events:

┌─────────────────┐ ┌─────────────┐
│ Subscriber │────────▶│ Event Store │
│ │ poll │ │
│ "events since │◀────────│ │
│ position 42?" │ events │ │
└─────────────────┘ └─────────────┘

Pros: Simple to implement, subscriber controls pace
Cons: Higher latency, wastes resources polling for no changes

Most production systems use push-based delivery with pull as a fallback for catch-up.

The event store sends a POST request to your endpoint:

// Your webhook endpoint receives events
app.post('/webhooks/events', async (req, res) => {
const { events } = req.body;
for (const event of events) {
switch (event.type) {
case 'order.placed':
await updateOrderReadModel(event);
break;
case 'payment.received':
await sendConfirmationEmail(event);
break;
}
}
res.status(200).json({ ok: true });
});

Best for: Server-side processing, projections, integrations
Latency: Seconds (HTTP round-trip)

Maintain an open connection for instant delivery:

const ws = new WebSocket('wss://eventstore.example.com/subscribe');
ws.onopen = () => {
ws.send(JSON.stringify({
action: 'subscribe',
streams: ['orders-*'],
fromPosition: 'latest',
}));
};
ws.onmessage = (message) => {
const event = JSON.parse(message.data);
console.log('New event:', event);
// Update UI, show notification, etc.
};

Best for: Browser clients, real-time dashboards, live collaboration
Latency: Milliseconds

Events are written to a queue (Kafka, SQS, RabbitMQ) for async processing:

Event Store ──▶ Message Queue ──▶ Consumer(s)

Best for: High throughput, guaranteed delivery, complex routing
Latency: Variable (depends on consumer lag)

Subscribers usually don’t want every event. Filtering lets you specify which events to receive.

// Only user events
subscribe({ filter: 'user.*' });
// Only creation events
subscribe({ filter: '*.created' });
// Specific events
subscribe({ filter: ['order.placed', 'order.shipped'] });
// All events from a specific stream
subscribe({ streamId: 'order-123' });
// All events from streams matching a pattern
subscribe({ streamPrefix: 'order-' });
// From the beginning (replay all events)
subscribe({ fromPosition: 0 });
// From a specific position (catch up)
subscribe({ fromPosition: 4521 });
// Only new events
subscribe({ fromPosition: 'latest' });

Different systems offer different guarantees:

Events may be lost, but never duplicated.

Event Store ──▶ Subscriber
(might not arrive)

Use case: Metrics, analytics where losing some data is acceptable

Events always arrive, but may be duplicated.

Event Store ──▶ Subscriber ──▶ (ack)
│ │
└────▶ (retry) ◀──────────┘
(if no ack)

Use case: Most applications - handle duplicates with idempotency

Events arrive exactly once (usually via deduplication).

Event Store ──▶ Subscriber ──▶ Dedup Store
(check if seen)

Use case: Financial transactions, critical workflows

With at-least-once delivery (the most common), you’ll receive duplicates. Design for it:

async function handleEvent(event) {
// Check if we've already processed this event
const processed = await db.get(`processed:${event.eventId}`);
if (processed) {
return; // Already handled, skip
}
// Process the event
await updateReadModel(event);
// Mark as processed
await db.set(`processed:${event.eventId}`, true);
}

Or use a revision/version field in your read model:

async function handleEvent(event) {
const current = await db.get(event.streamId);
// Skip if we've already processed this or a later event
if (current && current._revision >= event.position) {
return;
}
// Update with new revision
await db.put(event.streamId, {
...buildReadModel(event),
_revision: event.position,
});
}

Events within a stream are delivered in order:

Stream: order-123
Position 0: order.placed ──▶ delivered first
Position 1: payment.received ──▶ delivered second
Position 2: order.shipped ──▶ delivered third

This is the most common guarantee and usually sufficient.

Events across all streams are delivered in global order:

Global position 100: order-123/order.placed
Global position 101: user-456/user.profile-updated
Global position 102: order-123/payment.received

Harder to achieve at scale, usually requires single-writer architecture.

Events may arrive in any order. Rarely useful but sometimes acceptable for independent events.

When a subscriber starts (or restarts), it needs to catch up on missed events:

class EventSubscriber {
private lastProcessedPosition: number;
async start() {
// Load last position from storage
this.lastProcessedPosition = await this.loadCheckpoint();
// Catch up on missed events
const missedEvents = await eventStore.readFrom(this.lastProcessedPosition);
for (const event of missedEvents) {
await this.handleEvent(event);
}
// Now subscribe to live events
await this.subscribeLive();
}
async handleEvent(event) {
await this.processEvent(event);
this.lastProcessedPosition = event.position;
await this.saveCheckpoint(this.lastProcessedPosition);
}
}

What happens when events arrive faster than you can process them?

Queue events in memory, process when ready.

const buffer = [];
ws.onmessage = (msg) => buffer.push(JSON.parse(msg.data));
async function processLoop() {
while (true) {
if (buffer.length > 0) {
const event = buffer.shift();
await processEvent(event);
}
await sleep(10);
}
}

Risk: Memory exhaustion if buffer grows unbounded

Discard events when overloaded (for non-critical subscriptions).

Ask the source to slow down (if supported).


DeltaBase provides both webhook and WebSocket delivery out of the box.

Configure in deltabase.config.ts:

const config = {
eventStores: [{
name: 'my-service',
subscriptions: [{
id: 'orders-projection',
eventFilter: ['order.placed', 'order.shipped', 'order.cancelled'],
subscriberType: 'webhook',
webhook: {
url: 'https://my-app.com/api/webhooks/orders',
headers: {
Authorization: 'Bearer your-secret-token',
},
retryPolicy: {
maxAttempts: 5,
backoffMinutes: 2,
},
},
}],
}],
};

DeltaBase handles:

  • Retry on failure (with exponential backoff)
  • Batching events for efficiency
  • Authentication via headers

Connect from browser or server:

const ws = new WebSocket(
'wss://api.delta-base.com/event-stores/my-events/events/ws?apiKey=your-key'
);
ws.onopen = () => {
ws.send(JSON.stringify({
action: 'subscribe',
eventFilter: 'order.*',
position: 'latest',
}));
};
ws.onmessage = (message) => {
const data = JSON.parse(message.data);
if (data.type === 'event') {
updateDashboard(data.event);
}
};

Create subscriptions programmatically:

import { DeltaBase } from '@delta-base/server';
const deltabase = new DeltaBase({
apiKey: 'your-api-key',
baseUrl: 'https://api.delta-base.com',
});
const eventBus = deltabase.getEventBus('my-events');
await eventBus.subscribeWebhook({
id: 'my-webhook',
eventFilter: 'user.*',
webhook: {
url: 'https://my-app.com/webhooks/events',
headers: { Authorization: 'Bearer secret' },
},
});