EventStore
API reference for EventStore
Class: EventStore
Section titled “Class: EventStore”Implementation of the EventStore interface for DeltaBase
Implements
Section titled “Implements”EventStore
Constructors
Section titled “Constructors”Constructor
Section titled “Constructor”new EventStore(
http,eventStoreName,options?):EventStore
Creates a new EventStore client instance
Parameters
Section titled “Parameters”The HTTP client to use for API requests
eventStoreName
Section titled “eventStoreName”string
The name of the event store to interact with
options?
Section titled “options?”Optional configuration including inline projections
Returns
Section titled “Returns”EventStore
Methods
Section titled “Methods”aggregateStream()
Section titled “aggregateStream()”aggregateStream<
State,EventType>(streamId,options):Promise<AggregateStreamResult<State>>
Aggregate events from a stream and compute a state
Type Parameters
Section titled “Type Parameters”State
EventType
Section titled “EventType”EventType extends Readonly<{ data: EventData; metadata?: PlatformEventMetadata; tags?: string[]; type: string; }>
Parameters
Section titled “Parameters”streamId
Section titled “streamId”string
The ID of the stream to aggregate events from
options
Section titled “options”AggregateStreamOptions<State, EventType>
Configuration options for the aggregation process
Returns
Section titled “Returns”Promise<AggregateStreamResult<State>>
Promise resolving to the aggregation result with the computed state and stream metadata
Example
Section titled “Example”// Define your state type and event typestype UserState = { email: string, isVerified: boolean };type UserEvent = | { type: 'user.created', data: { email: string } } | { type: 'user.verified', data: { verifiedAt: string } };
// Aggregate the stream into a stateconst result = await eventStore.aggregateStream<UserState, UserEvent>( 'user-123', { initialState: () => ({ email: '', isVerified: false }), evolve: (state, event) => { switch (event.type) { case 'user.created': return { ...state, email: event.data.email }; case 'user.verified': return { ...state, isVerified: true }; default: return state; } }, read: { from: 0 } });Implementation of
Section titled “Implementation of”EventStoreInterface.aggregateStream
append()
Section titled “append()”append<
EventType>(events,condition?):Promise<AppendResult<EventType>>
Append events without a stream using DCB append conditions
Type Parameters
Section titled “Type Parameters”EventType
Section titled “EventType”EventType extends Readonly<{ data: EventData; metadata?: PlatformEventMetadata; tags?: string[]; type: string; }>
Parameters
Section titled “Parameters”events
Section titled “events”EventType[]
condition?
Section titled “condition?”AppendCondition
Returns
Section titled “Returns”Promise<AppendResult<EventType>>
Implementation of
Section titled “Implementation of”EventStoreInterface.append
appendToStream()
Section titled “appendToStream()”appendToStream<
EventType>(streamId,events,options?):Promise<AppendToStreamResult>
Append events to a stream
Type Parameters
Section titled “Type Parameters”EventType
Section titled “EventType”EventType extends Readonly<{ data: EventData; metadata?: PlatformEventMetadata; tags?: string[]; type: string; }>
Parameters
Section titled “Parameters”streamId
Section titled “streamId”string
The ID of the stream to append events to
events
Section titled “events”EventType[]
Array of events to append to the stream
options?
Section titled “options?”Optional parameters for the append operation
Returns
Section titled “Returns”Promise<AppendToStreamResult>
Promise resolving to the append result with the next expected version
Throws
Section titled “Throws”When expectedStreamVersion doesn’t match current stream version
Throws
Section titled “Throws”When request validation fails
Throws
Section titled “Throws”When the event store doesn’t exist
Throws
Section titled “Throws”When authentication fails
Example
Section titled “Example”import { isVersionConflictError, isValidationError} from '@delta-base/server';
try { // Append with optimistic concurrency control await eventStore.appendToStream( 'user-123', [{ type: 'user.updated', data: { email: 'updated@example.com' } }], { expectedStreamVersion: 0n } );} catch (error) { if (isVersionConflictError(error)) { console.log(`Version conflict: expected ${error.expectedVersion}, got ${error.currentVersion}`); // Handle concurrency conflict } else if (isValidationError(error)) { console.log('Validation errors:', error.validationErrors); // Handle validation failures } else { throw error; // Re-throw unknown errors }}Implementation of
Section titled “Implementation of”EventStoreInterface.appendToStream
catchUpProjections()
Section titled “catchUpProjections()”catchUpProjections():
Promise<void>
Explicitly catch up all registered inline projections. Call during Worker startup or in a warm-up request to avoid slower first-request behavior.
Returns
Section titled “Returns”Promise<void>
listStreams()
Section titled “listStreams()”listStreams(
options?):Promise<{streams:string[];total:number; }>
Get a list of stream IDs in an event store
Parameters
Section titled “Parameters”options?
Section titled “options?”Optional parameters for listing streams
limit?
Section titled “limit?”number
Maximum number of stream IDs to return
offset?
Section titled “offset?”number
Number of stream IDs to skip
pattern?
Section titled “pattern?”string
Pattern to match stream IDs (e.g., ‘user-*‘)
Returns
Section titled “Returns”Promise<{ streams: string[]; total: number; }>
Promise resolving to an object containing stream IDs and total count
Example
Section titled “Example”// List all streamsconst { streams, total } = await eventStore.listStreams();
// List streams with paginationconst { streams, total } = await eventStore.listStreams({ limit: 50, offset: 100});
// List streams matching a patternconst { streams, total } = await eventStore.listStreams({ pattern: 'user-*'});multiStreamAppend()
Section titled “multiStreamAppend()”multiStreamAppend<
EventType>(requests):Promise<MultiStreamAppendResult<EventType>>
Append events to multiple streams atomically.
Every request supplies a stream ID, events for that stream, and an optional per-stream expected version. DeltaBase commits the whole operation in one event-store transaction: all version checks pass and all events are written, or no events are written.
Ordering rules:
- global order follows request order, then event order within each request
streamsin the result preserves request ordereventsis always returned and sorted by global position
Duplicate stream IDs are rejected. The reserved __dcb__ stream cannot be
used here; use append() for DCB append conditions instead.
Inline projections, when configured, run once per touched stream after the event-store transaction commits. Projection writes are not part of the event-store transaction.
Type Parameters
Section titled “Type Parameters”EventType
Section titled “EventType”EventType extends Readonly<{ data: EventData; metadata?: PlatformEventMetadata; tags?: string[]; type: string; }>
Parameters
Section titled “Parameters”requests
Section titled “requests”MultiStreamAppendRequest<EventType>[]
One append request per stream.
Returns
Section titled “Returns”Promise<MultiStreamAppendResult<EventType>>
The global append result, per-stream results, and all written events.
Throws
Section titled “Throws”When any stream’s expected version does not match its current version.
Throws
Section titled “Throws”When the request is invalid, too large, or contains duplicate streams.
Example
Section titled “Example”const result = await eventStore.multiStreamAppend([ { streamId: 'order-123', events: [{ type: 'order.created', data: { orderId: '123' } }], expectedStreamVersion: 'no_stream', }, { streamId: 'customer-456', events: [{ type: 'customer.orderAdded', data: { orderId: '123' } }], expectedStreamVersion: 7, },]);
console.log(result.lastPosition);console.log(result.streams[0]?.nextExpectedStreamVersion);Implementation of
Section titled “Implementation of”EventStoreInterface.multiStreamAppend
queryEvents()
Section titled “queryEvents()”queryEvents<
EventType>(options):Promise<QueryEventsResult<EventType>>
Query events with flexible filtering options
Type Parameters
Section titled “Type Parameters”EventType
Section titled “EventType”EventType extends Readonly<{ data: EventData; metadata?: PlatformEventMetadata; tags?: string[]; type: string; }> = Readonly<{ data: EventData; metadata?: PlatformEventMetadata; tags?: string[]; type: string; }>
Parameters
Section titled “Parameters”options
Section titled “options”QueryEventsOptions = {}
Query parameters for filtering events
Returns
Section titled “Returns”Promise<QueryEventsResult<EventType>>
Promise resolving to the query result with events and pagination info
Example
Section titled “Example”// Query all events of a specific typeconst result = await eventStore.queryEvents({ type: 'user.created'});
// Query events with paginationconst result = await eventStore.queryEvents({ limit: 20, offset: 40, includeCount: true});
// Query events within a time rangeconst result = await eventStore.queryEvents({ fromDate: '2023-01-01T00:00:00Z', toDate: '2023-01-31T23:59:59Z'});Implementation of
Section titled “Implementation of”EventStoreInterface.queryEvents
queryStreamEvents()
Section titled “queryStreamEvents()”queryStreamEvents<
EventType>(streamId,options):Promise<QueryEventsResult<EventType>>
Query events for a specific stream with filtering options
Type Parameters
Section titled “Type Parameters”EventType
Section titled “EventType”EventType extends Readonly<{ data: EventData; metadata?: PlatformEventMetadata; tags?: string[]; type: string; }> = Readonly<{ data: EventData; metadata?: PlatformEventMetadata; tags?: string[]; type: string; }>
Parameters
Section titled “Parameters”streamId
Section titled “streamId”string
The ID of the stream to query events from
options
Section titled “options”Omit<QueryEventsOptions, "streamId"> = {}
Query parameters for filtering events
Returns
Section titled “Returns”Promise<QueryEventsResult<EventType>>
Promise resolving to the query result with events and pagination info
Example
Section titled “Example”// Query events for a specific streamconst result = await eventStore.queryStreamEvents('user-123', { type: 'user.updated'});queryStreams()
Section titled “queryStreams()”queryStreams(
options):Promise<QueryStreamsResult>
Query streams with filtering options
Parameters
Section titled “Parameters”options
Section titled “options”QueryStreamsOptions = {}
Query parameters for filtering streams
Returns
Section titled “Returns”Promise<QueryStreamsResult>
Promise resolving to the query result with streams and pagination info
Example
Section titled “Example”// Query all streamsconst result = await eventStore.queryStreams();
// Query streams by typeconst result = await eventStore.queryStreams({ streamType: 'user'});
// Query streams with a pattern matchconst result = await eventStore.queryStreams({ streamIdPattern: 'user-%'});queryStreamsByType()
Section titled “queryStreamsByType()”queryStreamsByType(
streamType,options):Promise<QueryStreamsResult>
Query streams of a specific type with filtering options
Parameters
Section titled “Parameters”streamType
Section titled “streamType”string
The stream type to filter by
options
Section titled “options”Omit<QueryStreamsOptions, "streamType"> = {}
Query parameters for filtering streams
Returns
Section titled “Returns”Promise<QueryStreamsResult>
Promise resolving to the query result with streams and pagination info
Example
Section titled “Example”// Query all user streamsconst result = await eventStore.queryStreamsByType('user');
// Query user streams with paginationconst result = await eventStore.queryStreamsByType('user', { limit: 20, offset: 0});readByQuery()
Section titled “readByQuery()”readByQuery<
EventType>(options):Promise<ReadByQueryResult<EventType>>
Read events by DCB query
Type Parameters
Section titled “Type Parameters”EventType
Section titled “EventType”EventType extends Readonly<{ data: EventData; metadata?: PlatformEventMetadata; tags?: string[]; type: string; }>
Parameters
Section titled “Parameters”options
Section titled “options”ReadByQueryOptions
Returns
Section titled “Returns”Promise<ReadByQueryResult<EventType>>
Implementation of
Section titled “Implementation of”EventStoreInterface.readByQuery
readStream()
Section titled “readStream()”readStream<
EventType>(streamId,options?):Promise<ReadStreamResult<EventType>>
Read events from a stream
Type Parameters
Section titled “Type Parameters”EventType
Section titled “EventType”EventType extends Readonly<{ data: EventData; metadata?: PlatformEventMetadata; tags?: string[]; type: string; }>
Parameters
Section titled “Parameters”streamId
Section titled “streamId”string
The ID of the stream to read events from
options?
Section titled “options?”The options for reading events
Returns
Section titled “Returns”Promise<ReadStreamResult<EventType>>
Promise resolving to the read result containing events and stream metadata
Example
Section titled “Example”// Read all events from a streamconst result = await eventStore.readStream('user-123');
// Read events with a specific starting positionconst result = await eventStore.readStream('user-123', { from: 5 });
// Read a specific range of eventsconst result = await eventStore.readStream('user-123', { from: 5, to: 10 });
// Read a limited number of eventsconst result = await eventStore.readStream('user-123', { maxCount: 100 });Implementation of
Section titled “Implementation of”EventStoreInterface.readStream