Skip to content

StorageStrategy

Interface for event store backends.

Defines the contract for low-level operations on event streams, such as fetching, inserting, and deleting events. Concrete implementations provide storage logic for specific backends.

Source code in event_sourcery/_event_store/event_store.py
class StorageStrategy:
    """
    Interface for event store backends.

    Defines the contract for low-level operations on event streams, such as fetching,
    inserting, and deleting events.
    Concrete implementations provide storage logic for specific backends.
    """

    def fetch_events(
        self,
        stream_id: StreamId,
        start: int | None = None,
        stop: int | None = None,
    ) -> list[RawEvent]:
        """
        Fetches events from a stream in the given range.

        Args:
            stream_id (StreamId): The stream identifier to fetch events from.
            start (int | None): From version (inclusive), or None for the beginning.
            stop (int | None): Stop before version (exclusive), or None for the end.

        Returns:
            list[RawEvent]: List of raw events in the specified range.
        """
        raise NotImplementedError()

    def insert_events(
        self,
        stream_id: StreamId,
        versioning: Versioning,
        events: list[RawEvent],
    ) -> None:
        """
        Inserts events into a stream with using versioning strategy.

        Args:
            stream_id (StreamId): The stream identifier to insert events into.
            versioning (Versioning): Versioning strategy for optimistic locking.
            events (list[RawEvent]): List of raw events to insert.
        """
        raise NotImplementedError()

    def save_snapshot(self, snapshot: RawEvent) -> None:
        """
        Saves a snapshot of the stream. Stream will be fetched from newest snapshot.

        Args:
            snapshot (RawEvent): The snapshot event to save.
        """
        raise NotImplementedError()

    def delete_stream(self, stream_id: StreamId) -> None:
        """
        Deletes a stream and all its events.

        Args:
            stream_id (StreamId): The stream identifier to delete.
        """
        raise NotImplementedError()

    @property
    def current_position(self) -> Position | None:
        """
        Returns the current position (offset) in the event store, if supported.
        """
        raise NotImplementedError()

    def scoped_for_tenant(self, tenant_id: str) -> Self:
        """
        Returns a backend instance scoped for the given tenant.

        Args:
            tenant_id (str): The tenant identifier.

        Returns:
            Self: The backend instance for the tenant.
        """
        raise NotImplementedError()

current_position: Position | None property

Returns the current position (offset) in the event store, if supported.

delete_stream(stream_id)

Deletes a stream and all its events.

Parameters:

Name Type Description Default
stream_id StreamId

The stream identifier to delete.

required
Source code in event_sourcery/_event_store/event_store.py
def delete_stream(self, stream_id: StreamId) -> None:
    """
    Deletes a stream and all its events.

    Args:
        stream_id (StreamId): The stream identifier to delete.
    """
    raise NotImplementedError()

fetch_events(stream_id, start=None, stop=None)

Fetches events from a stream in the given range.

Parameters:

Name Type Description Default
stream_id StreamId

The stream identifier to fetch events from.

required
start int | None

From version (inclusive), or None for the beginning.

None
stop int | None

Stop before version (exclusive), or None for the end.

None

Returns:

Type Description
list[RawEvent]

list[RawEvent]: List of raw events in the specified range.

Source code in event_sourcery/_event_store/event_store.py
def fetch_events(
    self,
    stream_id: StreamId,
    start: int | None = None,
    stop: int | None = None,
) -> list[RawEvent]:
    """
    Fetches events from a stream in the given range.

    Args:
        stream_id (StreamId): The stream identifier to fetch events from.
        start (int | None): From version (inclusive), or None for the beginning.
        stop (int | None): Stop before version (exclusive), or None for the end.

    Returns:
        list[RawEvent]: List of raw events in the specified range.
    """
    raise NotImplementedError()

insert_events(stream_id, versioning, events)

Inserts events into a stream with using versioning strategy.

Parameters:

Name Type Description Default
stream_id StreamId

The stream identifier to insert events into.

required
versioning Versioning

Versioning strategy for optimistic locking.

required
events list[RawEvent]

List of raw events to insert.

required
Source code in event_sourcery/_event_store/event_store.py
def insert_events(
    self,
    stream_id: StreamId,
    versioning: Versioning,
    events: list[RawEvent],
) -> None:
    """
    Inserts events into a stream with using versioning strategy.

    Args:
        stream_id (StreamId): The stream identifier to insert events into.
        versioning (Versioning): Versioning strategy for optimistic locking.
        events (list[RawEvent]): List of raw events to insert.
    """
    raise NotImplementedError()

save_snapshot(snapshot)

Saves a snapshot of the stream. Stream will be fetched from newest snapshot.

Parameters:

Name Type Description Default
snapshot RawEvent

The snapshot event to save.

required
Source code in event_sourcery/_event_store/event_store.py
def save_snapshot(self, snapshot: RawEvent) -> None:
    """
    Saves a snapshot of the stream. Stream will be fetched from newest snapshot.

    Args:
        snapshot (RawEvent): The snapshot event to save.
    """
    raise NotImplementedError()

scoped_for_tenant(tenant_id)

Returns a backend instance scoped for the given tenant.

Parameters:

Name Type Description Default
tenant_id str

The tenant identifier.

required

Returns:

Name Type Description
Self Self

The backend instance for the tenant.

Source code in event_sourcery/_event_store/event_store.py
def scoped_for_tenant(self, tenant_id: str) -> Self:
    """
    Returns a backend instance scoped for the given tenant.

    Args:
        tenant_id (str): The tenant identifier.

    Returns:
        Self: The backend instance for the tenant.
    """
    raise NotImplementedError()