Skip to content

Real-Time Events with WebSockets

Implement live event streaming in your application

Every DeltaBase event store includes a WebSocket server. Connect to it and receive events as they happen - perfect for live dashboards, notifications, and collaborative features.


┌─────────────────┬───────────────────────┬───────────────────────┐
│ │ WebSocket │ Webhook │
├─────────────────┼───────────────────────┼───────────────────────┤
│ Latency │ Milliseconds │ Seconds │
│ Best for │ Browser clients │ Server-side │
│ Connection │ Persistent │ Per-request │
│ Retry on fail │ Manual reconnection │ Automatic │
│ Authentication │ Query param or header │ Header │
│ Use cases │ Live UI, dashboards │ Projections, integs │
└─────────────────┴───────────────────────┴───────────────────────┘

Rule of thumb:

  • WebSockets for browser clients that need instant updates
  • Webhooks for server-side projections and integrations

wss://api.delta-base.com/event-stores/{eventStoreName}/events/ws?apiKey={apiKey}

For local development:

ws://localhost:8787/event-stores/{eventStoreName}/events/ws
const eventStoreName = 'my-events';
const apiKey = 'your-api-key';
const ws = new WebSocket(
`wss://api.delta-base.com/event-stores/${eventStoreName}/events/ws?apiKey=${apiKey}`
);
ws.onopen = () => {
console.log('Connected to DeltaBase');
};
ws.onmessage = (message) => {
const data = JSON.parse(message.data);
console.log('Received:', data);
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
ws.onclose = () => {
console.log('Disconnected from DeltaBase');
};

After connecting, send a subscribe message to start receiving events.

ws.onopen = () => {
ws.send(JSON.stringify({
action: 'subscribe',
eventFilter: 'order.*', // Pattern to match events
position: 'latest', // 'latest' or 'earliest'
}));
};
PatternMatches
*All events
order.*order.placed, order.shipped, etc.
*.createduser.created, order.created, etc.
order.placedOnly order.placed
ws.send(JSON.stringify({
action: 'subscribe',
eventFilter: '*',
streamId: 'order-123', // Only events from this stream
position: 'latest',
}));

ws.onmessage = (message) => {
const data = JSON.parse(message.data);
switch (data.type) {
case 'event':
handleEvent(data.event);
break;
case 'subscribed':
console.log('Subscription confirmed');
break;
case 'error':
console.error('Error:', data.message);
break;
}
};
function handleEvent(event: {
streamId: string;
streamPosition: number;
globalPosition: number;
eventId: string;
type: string;
data: Record<string, unknown>;
metadata?: Record<string, unknown>;
createdAt: string;
}) {
console.log(`Event ${event.type} on stream ${event.streamId}`);
console.log('Data:', event.data);
}

WebSocket connections can drop. Implement reconnection logic for production use.

class DeltaBaseWebSocket {
private ws: WebSocket | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 10;
private reconnectDelay = 1000;
private subscriptions: Array<{
eventFilter: string;
streamId?: string;
}> = [];
constructor(
private eventStoreName: string,
private apiKey: string,
private onEvent: (event: any) => void
) {}
connect(): void {
const url = `wss://api.delta-base.com/event-stores/${this.eventStoreName}/events/ws?apiKey=${this.apiKey}`;
this.ws = new WebSocket(url);
this.ws.onopen = () => {
console.log('Connected');
this.reconnectAttempts = 0;
this.reconnectDelay = 1000;
// Re-subscribe to all subscriptions
this.subscriptions.forEach((sub) => {
this.subscribe(sub.eventFilter, sub.streamId);
});
};
this.ws.onmessage = (message) => {
const data = JSON.parse(message.data);
if (data.type === 'event') {
this.onEvent(data.event);
}
};
this.ws.onclose = () => {
console.log('Disconnected');
this.attemptReconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
}
subscribe(eventFilter: string, streamId?: string): void {
// Store subscription for reconnection
const existing = this.subscriptions.find(
(s) => s.eventFilter === eventFilter && s.streamId === streamId
);
if (!existing) {
this.subscriptions.push({ eventFilter, streamId });
}
// Send if connected
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(
JSON.stringify({
action: 'subscribe',
eventFilter,
streamId,
position: 'latest',
})
);
}
}
private attemptReconnect(): void {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('Max reconnection attempts reached');
return;
}
this.reconnectAttempts++;
console.log(
`Reconnecting in ${this.reconnectDelay}ms (attempt ${this.reconnectAttempts})`
);
setTimeout(() => {
this.connect();
}, this.reconnectDelay);
// Exponential backoff with max of 30 seconds
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
}
disconnect(): void {
if (this.ws) {
this.ws.close();
this.ws = null;
}
}
}
// Usage
const wsClient = new DeltaBaseWebSocket(
'my-events',
'your-api-key',
(event) => {
console.log('Received event:', event);
}
);
wsClient.connect();
wsClient.subscribe('order.*');

Build a real-time order dashboard that updates as orders are placed and shipped.

First, ensure you have a webhook subscription for projections and a WebSocket for live updates:

deltabase.config.ts
const config: DeltaBaseConfig = {
eventStores: [
{
name: 'orders',
subscriptions: [
{
id: 'orders-projection',
eventFilter: ['order.placed', 'order.shipped', 'order.delivered'],
subscriberType: 'webhook',
webhook: {
url: `${process.env.SERVICE_URL}/api/projections/orders/events`,
// ...
},
},
],
},
],
};
order-dashboard.ts
interface Order {
orderId: string;
customerId: string;
totalAmount: number;
status: string;
placedAt: string;
}
class OrderDashboard {
private orders: Map<string, Order> = new Map();
private wsClient: DeltaBaseWebSocket;
private onUpdate: (orders: Order[]) => void;
constructor(
apiKey: string,
onUpdate: (orders: Order[]) => void
) {
this.onUpdate = onUpdate;
this.wsClient = new DeltaBaseWebSocket(
'orders',
apiKey,
(event) => this.handleEvent(event)
);
}
async initialize(): Promise<void> {
// Load existing orders from API
const response = await fetch('/api/orders');
const orders = await response.json();
for (const order of orders) {
this.orders.set(order.orderId, order);
}
this.notifyUpdate();
// Connect WebSocket for live updates
this.wsClient.connect();
this.wsClient.subscribe('order.*');
}
private handleEvent(event: any): void {
switch (event.type) {
case 'order.placed':
this.orders.set(event.data.orderId, {
orderId: event.data.orderId,
customerId: event.data.customerId,
totalAmount: event.data.totalAmount,
status: 'pending',
placedAt: event.data.placedAt,
});
break;
case 'order.shipped': {
const order = this.orders.get(event.data.orderId);
if (order) {
order.status = 'shipped';
}
break;
}
case 'order.delivered': {
const order = this.orders.get(event.data.orderId);
if (order) {
order.status = 'delivered';
}
break;
}
}
this.notifyUpdate();
}
private notifyUpdate(): void {
const orderList = Array.from(this.orders.values());
orderList.sort((a, b) =>
new Date(b.placedAt).getTime() - new Date(a.placedAt).getTime()
);
this.onUpdate(orderList);
}
disconnect(): void {
this.wsClient.disconnect();
}
}
// Usage
const dashboard = new OrderDashboard(
'your-api-key',
(orders) => {
// Update your UI
console.log('Orders updated:', orders.length);
renderOrders(orders);
}
);
dashboard.initialize();
function renderOrders(orders: Order[]): void {
const container = document.getElementById('orders');
if (!container) return;
container.innerHTML = orders
.map(
(order) => `
<div class="order ${order.status}">
<div class="order-id">${order.orderId}</div>
<div class="order-status">${order.status}</div>
<div class="order-amount">$${order.totalAmount.toFixed(2)}</div>
<div class="order-time">${new Date(order.placedAt).toLocaleString()}</div>
</div>
`
)
.join('');
}

Extend the banking tutorial with real-time balance updates.

class AccountBalanceMonitor {
private balance: number = 0;
private wsClient: DeltaBaseWebSocket;
constructor(
private accountId: string,
apiKey: string,
private onBalanceChange: (balance: number) => void
) {
this.wsClient = new DeltaBaseWebSocket(
'banking',
apiKey,
(event) => this.handleEvent(event)
);
}
async initialize(): Promise<void> {
// Get initial balance
const response = await fetch(`/api/accounts/${this.accountId}`);
const account = await response.json();
this.balance = account.balance;
this.onBalanceChange(this.balance);
// Subscribe to this account's events
this.wsClient.connect();
this.wsClient.subscribe('money.*', this.accountId);
}
private handleEvent(event: any): void {
// Only handle events for our account
if (event.streamId !== this.accountId) return;
switch (event.type) {
case 'money.deposited':
this.balance += event.data.amount;
break;
case 'money.withdrawn':
this.balance -= event.data.amount;
break;
}
this.onBalanceChange(this.balance);
}
disconnect(): void {
this.wsClient.disconnect();
}
}
// Usage
const monitor = new AccountBalanceMonitor(
'account-123',
'your-api-key',
(balance) => {
document.getElementById('balance').textContent = `$${balance.toFixed(2)}`;
}
);
monitor.initialize();

WebSocket connections will drop. Plan for it.

// Don't just connect once
ws.onclose = () => {
// Reconnect with exponential backoff
setTimeout(() => connect(), reconnectDelay);
reconnectDelay = Math.min(reconnectDelay * 2, 30000);
};

Store your subscriptions and re-apply them after reconnecting.

If the connection drops, you might miss events. For critical data, also fetch from the API:

ws.onopen = async () => {
// Re-fetch current state to catch missed events
const currentState = await fetch('/api/orders').then((r) => r.json());
updateUI(currentState);
// Then subscribe for live updates
ws.send(JSON.stringify({ action: 'subscribe', eventFilter: 'order.*' }));
};

If events come in rapidly, batch UI updates:

let pendingUpdate = false;
let pendingOrders: Order[] = [];
function scheduleUpdate(orders: Order[]): void {
pendingOrders = orders;
if (!pendingUpdate) {
pendingUpdate = true;
requestAnimationFrame(() => {
renderOrders(pendingOrders);
pendingUpdate = false;
});
}
}

Close connections when your component unmounts:

// React example
useEffect(() => {
const dashboard = new OrderDashboard(apiKey, setOrders);
dashboard.initialize();
return () => {
dashboard.disconnect();
};
}, []);

For browser clients, use a backend proxy or short-lived tokens:

// Backend: Generate short-lived WebSocket token
app.get('/api/ws-token', authMiddleware, (c) => {
const token = generateShortLivedToken(c.get('user'));
return c.json({ token });
});
// Frontend: Use token instead of API key
const { token } = await fetch('/api/ws-token').then((r) => r.json());
const ws = new WebSocket(`wss://api.delta-base.com/...?token=${token}`);

Don’t trust event data blindly:

function handleEvent(event: any): void {
// Validate event structure
if (!event.type || !event.data || !event.streamId) {
console.warn('Invalid event:', event);
return;
}
// Process valid event
// ...
}