Skip to content

EventBus

API reference for EventBus

@delta-base/server


EventBus client for managing subscriptions to an event store

new EventBus(http, eventStoreName): EventBus

Create a new EventBus client for a specific event store

HttpClient

The HTTP client used for API requests

string

The name of the event store to manage subscriptions for

EventBus

getSubscription(subscriberId): Promise<Subscription>

Get details about a specific subscription

string

ID of the subscription to retrieve

Promise<Subscription>

Subscription details

const subscription = await eventBus.getSubscription('sub_123456');
console.log(subscription.status); // 'ACTIVE'

listSubscriptions(options): Promise<ListSubscriptionsResponse>

List all subscriptions for this event store

ListSubscriptionsOptions = {}

Optional filtering and pagination parameters

Promise<ListSubscriptionsResponse>

List of subscriptions and total count

// List all webhook subscriptions
const { subscriptions, totalCount } = await eventBus.listSubscriptions({
subscriberType: SubscriberType.Webhook,
limit: 20,
offset: 0
});

subscribe(options): Promise<Subscription>

Subscribe to events from this event store

SubscribeOptions

Configuration for the subscription

Promise<Subscription>

The created subscription information

When subscription configuration is invalid

When the event store doesn’t exist

When request validation fails

When authentication fails

import {
isInvalidSubscriptionConfigError,
isEventStoreNotFoundError
} from '@delta-base/server';
try {
const subscription = await eventBus.subscribe({
eventFilter: 'user.*',
subscriber: {
type: SubscriberType.Webhook,
config: {
url: 'https://example.com/webhook',
headers: { 'X-API-Key': 'secret' },
retryPolicy: {
maxAttempts: 3,
backoffMinutes: 5
}
}
}
});
// Check if this is an existing subscription
if (subscription.isExistingSubscription) {
console.log('Found existing subscription:', subscription.message);
} else {
console.log('Created new subscription:', subscription.message);
}
} catch (error) {
if (isInvalidSubscriptionConfigError(error)) {
console.log(`Invalid configuration: ${error.configError}`);
// Handle invalid subscription config
} else if (isEventStoreNotFoundError(error)) {
console.log(`Event store '${error.eventStoreId}' not found`);
// Handle missing event store
} else {
throw error; // Re-throw unknown errors
}
}

subscribeWebhook(eventFilter, url, options): Promise<Subscription>

Create a webhook subscription

EventFilterPattern

Pattern determining which events to receive

string

The URL that will receive HTTP POST requests with events

Omit<WebhookConfig, "url"> & object = {}

Additional configuration options

Promise<Subscription>

The created subscription information

// Subscribe to all user events
const subscription = await eventBus.subscribeWebhook(
'user.*',
'https://example.com/webhook',
{
headers: { 'X-API-Key': 'secret' },
retryPolicy: {
maxAttempts: 3,
backoffMinutes: 5
}
}
);
// Check if this is an existing subscription with the same configuration
if (subscription.isExistingSubscription) {
console.log('Reusing existing subscription:', subscription.message);
} else {
console.log('Created new subscription:', subscription.message);
}

unsubscribe(subscriberId): Promise<{ message: string; success: boolean; }>

Unsubscribe from events (delete a subscription)

string

ID of the subscription to delete

Promise<{ message: string; success: boolean; }>

Success message

const result = await eventBus.unsubscribe('sub_123456');
console.log(result.success); // true

updateSubscription(subscriberId, options): Promise<Subscription>

Update an existing subscription (partial update)

Only the fields you provide will be changed. You must provide at least one field.

string

ID of the subscription to update

UpdateSubscriptionOptions

Fields to update

Promise<Subscription>

The updated subscription

When the subscription doesn’t exist

When the update payload is invalid

When request validation fails

// Pause a subscription
const updated = await eventBus.updateSubscription('sub_123456', {
status: 'SUSPENDED',
});
// Change the event filter and retry policy
const updated = await eventBus.updateSubscription('sub_123456', {
eventFilter: 'order.*',
retryPolicy: {
maxAttempts: 5,
strategy: 'exponential',
initialDelayMs: 1000,
maxDelayMs: 60000,
},
});