Real-Time Events with WebSockets
Implement live event streaming in your application
Every DeltaBase event store includes a WebSocket server. Connect to it and receive events as they happen - perfect for live dashboards, notifications, and collaborative features.
When to Use WebSockets vs Webhooks
Section titled “When to Use WebSockets vs Webhooks”┌─────────────────┬───────────────────────┬───────────────────────┐│ │ WebSocket │ Webhook │├─────────────────┼───────────────────────┼───────────────────────┤│ Latency │ Milliseconds │ Seconds ││ Best for │ Browser clients │ Server-side ││ Connection │ Persistent │ Per-request ││ Retry on fail │ Manual reconnection │ Automatic ││ Authentication │ Query param or header │ Header ││ Use cases │ Live UI, dashboards │ Projections, integs │└─────────────────┴───────────────────────┴───────────────────────┘Rule of thumb:
- WebSockets for browser clients that need instant updates
- Webhooks for server-side projections and integrations
WebSocket Connection
Section titled “WebSocket Connection”Connection URL
Section titled “Connection URL”wss://api.delta-base.com/event-stores/{eventStoreName}/events/ws?apiKey={apiKey}For local development:
ws://localhost:8787/event-stores/{eventStoreName}/events/wsBasic Connection
Section titled “Basic Connection”const eventStoreName = 'my-events';const apiKey = 'your-api-key';
const ws = new WebSocket( `wss://api.delta-base.com/event-stores/${eventStoreName}/events/ws?apiKey=${apiKey}`);
ws.onopen = () => { console.log('Connected to DeltaBase');};
ws.onmessage = (message) => { const data = JSON.parse(message.data); console.log('Received:', data);};
ws.onerror = (error) => { console.error('WebSocket error:', error);};
ws.onclose = () => { console.log('Disconnected from DeltaBase');};Subscribing to Events
Section titled “Subscribing to Events”After connecting, send a subscribe message to start receiving events.
Subscribe Message
Section titled “Subscribe Message”ws.onopen = () => { ws.send(JSON.stringify({ action: 'subscribe', eventFilter: 'order.*', // Pattern to match events position: 'latest', // 'latest' or 'earliest' }));};Event Filter Patterns
Section titled “Event Filter Patterns”| Pattern | Matches |
|---|---|
* | All events |
order.* | order.placed, order.shipped, etc. |
*.created | user.created, order.created, etc. |
order.placed | Only order.placed |
Subscribe to Specific Stream
Section titled “Subscribe to Specific Stream”ws.send(JSON.stringify({ action: 'subscribe', eventFilter: '*', streamId: 'order-123', // Only events from this stream position: 'latest',}));Receiving Events
Section titled “Receiving Events”Event Message Format
Section titled “Event Message Format”ws.onmessage = (message) => { const data = JSON.parse(message.data);
switch (data.type) { case 'event': handleEvent(data.event); break; case 'subscribed': console.log('Subscription confirmed'); break; case 'error': console.error('Error:', data.message); break; }};
function handleEvent(event: { streamId: string; streamPosition: number; globalPosition: number; eventId: string; type: string; data: Record<string, unknown>; metadata?: Record<string, unknown>; createdAt: string;}) { console.log(`Event ${event.type} on stream ${event.streamId}`); console.log('Data:', event.data);}Handling Reconnection
Section titled “Handling Reconnection”WebSocket connections can drop. Implement reconnection logic for production use.
class DeltaBaseWebSocket { private ws: WebSocket | null = null; private reconnectAttempts = 0; private maxReconnectAttempts = 10; private reconnectDelay = 1000; private subscriptions: Array<{ eventFilter: string; streamId?: string; }> = [];
constructor( private eventStoreName: string, private apiKey: string, private onEvent: (event: any) => void ) {}
connect(): void { const url = `wss://api.delta-base.com/event-stores/${this.eventStoreName}/events/ws?apiKey=${this.apiKey}`;
this.ws = new WebSocket(url);
this.ws.onopen = () => { console.log('Connected'); this.reconnectAttempts = 0; this.reconnectDelay = 1000;
// Re-subscribe to all subscriptions this.subscriptions.forEach((sub) => { this.subscribe(sub.eventFilter, sub.streamId); }); };
this.ws.onmessage = (message) => { const data = JSON.parse(message.data); if (data.type === 'event') { this.onEvent(data.event); } };
this.ws.onclose = () => { console.log('Disconnected'); this.attemptReconnect(); };
this.ws.onerror = (error) => { console.error('WebSocket error:', error); }; }
subscribe(eventFilter: string, streamId?: string): void { // Store subscription for reconnection const existing = this.subscriptions.find( (s) => s.eventFilter === eventFilter && s.streamId === streamId ); if (!existing) { this.subscriptions.push({ eventFilter, streamId }); }
// Send if connected if (this.ws?.readyState === WebSocket.OPEN) { this.ws.send( JSON.stringify({ action: 'subscribe', eventFilter, streamId, position: 'latest', }) ); } }
private attemptReconnect(): void { if (this.reconnectAttempts >= this.maxReconnectAttempts) { console.error('Max reconnection attempts reached'); return; }
this.reconnectAttempts++; console.log( `Reconnecting in ${this.reconnectDelay}ms (attempt ${this.reconnectAttempts})` );
setTimeout(() => { this.connect(); }, this.reconnectDelay);
// Exponential backoff with max of 30 seconds this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000); }
disconnect(): void { if (this.ws) { this.ws.close(); this.ws = null; } }}
// Usageconst wsClient = new DeltaBaseWebSocket( 'my-events', 'your-api-key', (event) => { console.log('Received event:', event); });
wsClient.connect();wsClient.subscribe('order.*');Example: Live Order Dashboard
Section titled “Example: Live Order Dashboard”Build a real-time order dashboard that updates as orders are placed and shipped.
Backend Setup
Section titled “Backend Setup”First, ensure you have a webhook subscription for projections and a WebSocket for live updates:
const config: DeltaBaseConfig = { eventStores: [ { name: 'orders', subscriptions: [ { id: 'orders-projection', eventFilter: ['order.placed', 'order.shipped', 'order.delivered'], subscriberType: 'webhook', webhook: { url: `${process.env.SERVICE_URL}/api/projections/orders/events`, // ... }, }, ], }, ],};Frontend Implementation
Section titled “Frontend Implementation”interface Order { orderId: string; customerId: string; totalAmount: number; status: string; placedAt: string;}
class OrderDashboard { private orders: Map<string, Order> = new Map(); private wsClient: DeltaBaseWebSocket; private onUpdate: (orders: Order[]) => void;
constructor( apiKey: string, onUpdate: (orders: Order[]) => void ) { this.onUpdate = onUpdate; this.wsClient = new DeltaBaseWebSocket( 'orders', apiKey, (event) => this.handleEvent(event) ); }
async initialize(): Promise<void> { // Load existing orders from API const response = await fetch('/api/orders'); const orders = await response.json();
for (const order of orders) { this.orders.set(order.orderId, order); }
this.notifyUpdate();
// Connect WebSocket for live updates this.wsClient.connect(); this.wsClient.subscribe('order.*'); }
private handleEvent(event: any): void { switch (event.type) { case 'order.placed': this.orders.set(event.data.orderId, { orderId: event.data.orderId, customerId: event.data.customerId, totalAmount: event.data.totalAmount, status: 'pending', placedAt: event.data.placedAt, }); break;
case 'order.shipped': { const order = this.orders.get(event.data.orderId); if (order) { order.status = 'shipped'; } break; }
case 'order.delivered': { const order = this.orders.get(event.data.orderId); if (order) { order.status = 'delivered'; } break; } }
this.notifyUpdate(); }
private notifyUpdate(): void { const orderList = Array.from(this.orders.values()); orderList.sort((a, b) => new Date(b.placedAt).getTime() - new Date(a.placedAt).getTime() ); this.onUpdate(orderList); }
disconnect(): void { this.wsClient.disconnect(); }}
// Usageconst dashboard = new OrderDashboard( 'your-api-key', (orders) => { // Update your UI console.log('Orders updated:', orders.length); renderOrders(orders); });
dashboard.initialize();UI Rendering (Vanilla JS Example)
Section titled “UI Rendering (Vanilla JS Example)”function renderOrders(orders: Order[]): void { const container = document.getElementById('orders'); if (!container) return;
container.innerHTML = orders .map( (order) => ` <div class="order ${order.status}"> <div class="order-id">${order.orderId}</div> <div class="order-status">${order.status}</div> <div class="order-amount">$${order.totalAmount.toFixed(2)}</div> <div class="order-time">${new Date(order.placedAt).toLocaleString()}</div> </div> ` ) .join('');}Example: Live Account Balance
Section titled “Example: Live Account Balance”Extend the banking tutorial with real-time balance updates.
class AccountBalanceMonitor { private balance: number = 0; private wsClient: DeltaBaseWebSocket;
constructor( private accountId: string, apiKey: string, private onBalanceChange: (balance: number) => void ) { this.wsClient = new DeltaBaseWebSocket( 'banking', apiKey, (event) => this.handleEvent(event) ); }
async initialize(): Promise<void> { // Get initial balance const response = await fetch(`/api/accounts/${this.accountId}`); const account = await response.json(); this.balance = account.balance; this.onBalanceChange(this.balance);
// Subscribe to this account's events this.wsClient.connect(); this.wsClient.subscribe('money.*', this.accountId); }
private handleEvent(event: any): void { // Only handle events for our account if (event.streamId !== this.accountId) return;
switch (event.type) { case 'money.deposited': this.balance += event.data.amount; break; case 'money.withdrawn': this.balance -= event.data.amount; break; }
this.onBalanceChange(this.balance); }
disconnect(): void { this.wsClient.disconnect(); }}
// Usageconst monitor = new AccountBalanceMonitor( 'account-123', 'your-api-key', (balance) => { document.getElementById('balance').textContent = `$${balance.toFixed(2)}`; });
monitor.initialize();Best Practices
Section titled “Best Practices”1. Always Implement Reconnection
Section titled “1. Always Implement Reconnection”WebSocket connections will drop. Plan for it.
// Don't just connect oncews.onclose = () => { // Reconnect with exponential backoff setTimeout(() => connect(), reconnectDelay); reconnectDelay = Math.min(reconnectDelay * 2, 30000);};2. Re-subscribe After Reconnection
Section titled “2. Re-subscribe After Reconnection”Store your subscriptions and re-apply them after reconnecting.
3. Handle Missed Events
Section titled “3. Handle Missed Events”If the connection drops, you might miss events. For critical data, also fetch from the API:
ws.onopen = async () => { // Re-fetch current state to catch missed events const currentState = await fetch('/api/orders').then((r) => r.json()); updateUI(currentState);
// Then subscribe for live updates ws.send(JSON.stringify({ action: 'subscribe', eventFilter: 'order.*' }));};4. Throttle UI Updates
Section titled “4. Throttle UI Updates”If events come in rapidly, batch UI updates:
let pendingUpdate = false;let pendingOrders: Order[] = [];
function scheduleUpdate(orders: Order[]): void { pendingOrders = orders;
if (!pendingUpdate) { pendingUpdate = true; requestAnimationFrame(() => { renderOrders(pendingOrders); pendingUpdate = false; }); }}5. Clean Up on Unmount
Section titled “5. Clean Up on Unmount”Close connections when your component unmounts:
// React exampleuseEffect(() => { const dashboard = new OrderDashboard(apiKey, setOrders); dashboard.initialize();
return () => { dashboard.disconnect(); };}, []);Security Considerations
Section titled “Security Considerations”Don’t Expose API Keys in Browser
Section titled “Don’t Expose API Keys in Browser”For browser clients, use a backend proxy or short-lived tokens:
// Backend: Generate short-lived WebSocket tokenapp.get('/api/ws-token', authMiddleware, (c) => { const token = generateShortLivedToken(c.get('user')); return c.json({ token });});
// Frontend: Use token instead of API keyconst { token } = await fetch('/api/ws-token').then((r) => r.json());const ws = new WebSocket(`wss://api.delta-base.com/...?token=${token}`);Validate Events Client-Side
Section titled “Validate Events Client-Side”Don’t trust event data blindly:
function handleEvent(event: any): void { // Validate event structure if (!event.type || !event.data || !event.streamId) { console.warn('Invalid event:', event); return; }
// Process valid event // ...}What’s Next?
Section titled “What’s Next?”- Subscriptions & Streaming - Webhook subscriptions for server-side
- Banking System Tutorial - Complete example with real-time updates
- API Reference - WebSocket API details