Skip to content

KurrentDBBackend

Bases: Backend

KurrentDB integration backend for Event Sourcery.

Source code in event_sourcery_kurrentdb/__init__.py
class KurrentDBBackend(Backend):
    """
    KurrentDB integration backend for Event Sourcery.
    """

    def __init__(self) -> None:
        super().__init__()
        self[KurrentDBClient] = not_configured(
            "Configure backend with `.configure(kurrentdb_client, config)`",
        )
        self[KurrentDBConfig] = not_configured(
            "Configure backend with `.configure(kurrentdb_client, config)`",
        )
        self[StorageStrategy] = lambda c: KurrentDBStorageStrategy(
            c[KurrentDBClient],
            c[KurrentDBConfig].timeout,
        ).scoped_for_tenant(c[TenantId])
        self[SubscriptionStrategy] = lambda c: KurrentDBSubscriptionStrategy(
            c[KurrentDBClient],
        )

    def configure(
        self, client: KurrentDBClient, config: KurrentDBConfig | None = None
    ) -> Self:
        """
        Sets the backend configuration for KurrentDB client and outbox behavior.

        Allows you to provide a KurrentDBClient instance and an optional SQLAlchemyConfig.
        If no config is provided, the default configuration is used.
        This method must be called before using the backend in production
        to ensure correct event publishing and subscription reliability.

        Args:
            client (KurrentDBClient):
                The KurrentDB client instance to use for backend operations.
            config (KurrentDBConfig | None):
                Optional custom configuration. If None, uses default SQLAlchemyConfig().

        Returns:
            Self: The configured backend instance (for chaining).
        """
        self[KurrentDBClient] = client
        self[KurrentDBConfig] = config or KurrentDBConfig()
        return self

    def with_outbox(self, filterer: OutboxFiltererStrategy = no_filter) -> Self:
        self[OutboxFiltererStrategy] = filterer  # type: ignore[type-abstract]
        self[KurrentDBOutboxStorageStrategy] = lambda c: KurrentDBOutboxStorageStrategy(
            c[KurrentDBClient],
            c[OutboxFiltererStrategy],  # type: ignore[type-abstract]
            c[KurrentDBConfig].outbox_name,
            c[KurrentDBConfig].outbox_attempts,
            c[KurrentDBConfig].timeout,
        )
        self[OutboxStorageStrategy] = lambda c: c[KurrentDBOutboxStorageStrategy]
        self[KurrentDBOutboxStorageStrategy].create_subscription()
        return self

configure(client, config=None)

Sets the backend configuration for KurrentDB client and outbox behavior.

Allows you to provide a KurrentDBClient instance and an optional SQLAlchemyConfig. If no config is provided, the default configuration is used. This method must be called before using the backend in production to ensure correct event publishing and subscription reliability.

Parameters:

Name Type Description Default
client KurrentDBClient

The KurrentDB client instance to use for backend operations.

required
config KurrentDBConfig | None

Optional custom configuration. If None, uses default SQLAlchemyConfig().

None

Returns:

Name Type Description
Self Self

The configured backend instance (for chaining).

Source code in event_sourcery_kurrentdb/__init__.py
def configure(
    self, client: KurrentDBClient, config: KurrentDBConfig | None = None
) -> Self:
    """
    Sets the backend configuration for KurrentDB client and outbox behavior.

    Allows you to provide a KurrentDBClient instance and an optional SQLAlchemyConfig.
    If no config is provided, the default configuration is used.
    This method must be called before using the backend in production
    to ensure correct event publishing and subscription reliability.

    Args:
        client (KurrentDBClient):
            The KurrentDB client instance to use for backend operations.
        config (KurrentDBConfig | None):
            Optional custom configuration. If None, uses default SQLAlchemyConfig().

    Returns:
        Self: The configured backend instance (for chaining).
    """
    self[KurrentDBClient] = client
    self[KurrentDBConfig] = config or KurrentDBConfig()
    return self

KurrentDBConfig

Bases: BaseModel

Configuration for KurrentDBBackend event store integration.

Attributes:

Name Type Description
timeout Seconds | None

Optional timeout (in seconds) for KurrentDB operations. Controls the maximum time allowed for backend requests. If None, the default client timeout is used.

outbox_name str

Name of the outbox stream used for reliable event publishing.

outbox_attempts PositiveInt

Maximum number of outbox delivery attempts per event before giving up.

Source code in event_sourcery_kurrentdb/__init__.py
class KurrentDBConfig(BaseModel):
    """
    Configuration for KurrentDBBackend event store integration.

    Attributes:
        timeout (Seconds | None):
            Optional timeout (in seconds) for KurrentDB operations. Controls the maximum
            time allowed for backend requests.
            If None, the default client timeout is used.
        outbox_name (str):
            Name of the outbox stream used for reliable event publishing.
        outbox_attempts (PositiveInt):
            Maximum number of outbox delivery attempts per event before giving up.
    """

    model_config = ConfigDict(extra="forbid", frozen=True)

    timeout: Seconds | None = None
    outbox_name: str = "pyes-outbox"
    outbox_attempts: PositiveInt = 3