Skip to content

Outbox

Outbox pattern implementation for reliable event publishing.

Uses a storage strategy to fetch outbox entries. Ensures that events are only removed from the outbox if they are processed successfully (i.e., no exception is raised during publishing). If an exception occurs, the event remains in the outbox for retry, providing at-least-once delivery semantics.

Parameters:

Name Type Description Default
strategy OutboxStorageStrategy

The backend strategy for outbox storage and entry management.

required
serde Serde

The serializer/deserializer for event records.

required
Source code in event_sourcery/_event_store/outbox.py
class Outbox:
    """
    Outbox pattern implementation for reliable event publishing.

    Uses a storage strategy to fetch outbox entries.
    Ensures that events are only removed from the outbox if they are processed successfully
    (i.e., no exception is raised during publishing). If an exception occurs, the event remains
    in the outbox for retry, providing at-least-once delivery semantics.

    Args:
        strategy (OutboxStorageStrategy): The backend strategy for outbox storage and entry management.
        serde (Serde): The serializer/deserializer for event records.
    """

    def __init__(self, strategy: OutboxStorageStrategy, serde: Serde) -> None:
        self._strategy = strategy
        self._serde = serde

    def run(
        self,
        publisher: Callable[[Recorded], None],
        limit: int = 100,
    ) -> None:
        """
        Processes and publishes outbox entries using the provided publisher function.

        Fetches entries from the outbox (up to the given limit), and passes each to the
        publisher. If the publisher raises an exception, the event remains in the outbox
        for retry. If processing succeeds, the event is removed from the outbox.

        Args:
            publisher (Callable[[Recorded], None]): Function to publish a single event.
            limit (int, optional): Maximum number of entries to process in one run. Defaults to 100.
        """
        stream = self._strategy.outbox_entries(limit=limit)
        for entry in stream:
            with entry as raw_record:
                event = self._serde.deserialize(raw_record.entry)
                record = Recorded(
                    wrapped_event=event,
                    stream_id=raw_record.entry.stream_id,
                    position=raw_record.position,
                    tenant_id=raw_record.tenant_id,
                )
                publisher(record)

run(publisher, limit=100)

Processes and publishes outbox entries using the provided publisher function.

Fetches entries from the outbox (up to the given limit), and passes each to the publisher. If the publisher raises an exception, the event remains in the outbox for retry. If processing succeeds, the event is removed from the outbox.

Parameters:

Name Type Description Default
publisher Callable[[Recorded], None]

Function to publish a single event.

required
limit int

Maximum number of entries to process in one run. Defaults to 100.

100
Source code in event_sourcery/_event_store/outbox.py
def run(
    self,
    publisher: Callable[[Recorded], None],
    limit: int = 100,
) -> None:
    """
    Processes and publishes outbox entries using the provided publisher function.

    Fetches entries from the outbox (up to the given limit), and passes each to the
    publisher. If the publisher raises an exception, the event remains in the outbox
    for retry. If processing succeeds, the event is removed from the outbox.

    Args:
        publisher (Callable[[Recorded], None]): Function to publish a single event.
        limit (int, optional): Maximum number of entries to process in one run. Defaults to 100.
    """
    stream = self._strategy.outbox_entries(limit=limit)
    for entry in stream:
        with entry as raw_record:
            event = self._serde.deserialize(raw_record.entry)
            record = Recorded(
                wrapped_event=event,
                stream_id=raw_record.entry.stream_id,
                position=raw_record.position,
                tenant_id=raw_record.tenant_id,
            )
            publisher(record)