Skip to content

SQLAlchemyBackend

Bases: TransactionalBackend

SQLAlchemy integration backend for Event Sourcery.

Source code in event_sourcery_sqlalchemy/__init__.py
class SQLAlchemyBackend(TransactionalBackend):
    """
    SQLAlchemy integration backend for Event Sourcery.
    """

    UNCONFIGURED_MESSAGE = "Configure backend with `.configure(session, config)`"

    def __init__(self) -> None:
        super().__init__()
        self[Models] = not_configured(self.UNCONFIGURED_MESSAGE)
        self[Session] = not_configured(self.UNCONFIGURED_MESSAGE)
        self[SQLAlchemyConfig] = not_configured(self.UNCONFIGURED_MESSAGE)
        self[StorageStrategy] = lambda c: SqlAlchemyStorageStrategy(
            c[Session],
            c[Dispatcher],
            c.get(SqlAlchemyOutboxStorageStrategy, None),
            c[Models].event_model,
            c[Models].snapshot_model,
            c[Models].stream_model,
        ).scoped_for_tenant(c[TenantId])
        self[SubscriptionStrategy] = lambda c: SqlAlchemySubscriptionStrategy(
            c[Session],
            c[SQLAlchemyConfig].gap_retry_interval,
            c[Models].event_model,
            c[Models].stream_model,
        )

    def configure(
        self,
        session: Session,
        config: SQLAlchemyConfig | None = None,
        custom_models: Models | None = None,
    ) -> Self:
        """
        Sets the backend configuration for SQLAlchemy session, outbox, and models.

        Allows you to provide a SQLAlchemy Session instance, an optional Config
        instance, and optional custom ORM models.
        If no config or models are provided, the default configuration and models are
        used.
        This method must be called before using the backend in production to ensure
        correct event publishing and subscription reliability.

        Args:
            session (Session): The SQLAlchemy session instance to use for backend operations.
            config (SQLAlchemyConfig | None): Optional custom configuration. If None, uses default Config().
            custom_models (Models | None): Optional custom ORM models. If None, uses default models.

        Returns:
            Self: The configured backend instance (for chaining).
        """

        if custom_models is None:
            custom_models = Models(
                event_model=DefaultEvent,
                stream_model=DefaultStream,
                snapshot_model=DefaultSnapshot,
                outbox_entry_model=DefaultOutboxEntry,
            )

        self[Session] = session
        self[SQLAlchemyConfig] = config or SQLAlchemyConfig()
        self[Models] = custom_models
        return self

    def with_outbox(self, filterer: OutboxFiltererStrategy = no_filter) -> Self:
        self[OutboxFiltererStrategy] = filterer  # type: ignore[type-abstract]
        self[SqlAlchemyOutboxStorageStrategy] = (
            lambda c: SqlAlchemyOutboxStorageStrategy(
                c[Session],
                c[OutboxFiltererStrategy],  # type: ignore[type-abstract]
                c[SQLAlchemyConfig].outbox_attempts,
                c[Models].outbox_entry_model,
            )
        )
        self[OutboxStorageStrategy] = lambda c: c[SqlAlchemyOutboxStorageStrategy]
        return self

configure(session, config=None, custom_models=None)

Sets the backend configuration for SQLAlchemy session, outbox, and models.

Allows you to provide a SQLAlchemy Session instance, an optional Config instance, and optional custom ORM models. If no config or models are provided, the default configuration and models are used. This method must be called before using the backend in production to ensure correct event publishing and subscription reliability.

Parameters:

Name Type Description Default
session Session

The SQLAlchemy session instance to use for backend operations.

required
config SQLAlchemyConfig | None

Optional custom configuration. If None, uses default Config().

None
custom_models Models | None

Optional custom ORM models. If None, uses default models.

None

Returns:

Name Type Description
Self Self

The configured backend instance (for chaining).

Source code in event_sourcery_sqlalchemy/__init__.py
def configure(
    self,
    session: Session,
    config: SQLAlchemyConfig | None = None,
    custom_models: Models | None = None,
) -> Self:
    """
    Sets the backend configuration for SQLAlchemy session, outbox, and models.

    Allows you to provide a SQLAlchemy Session instance, an optional Config
    instance, and optional custom ORM models.
    If no config or models are provided, the default configuration and models are
    used.
    This method must be called before using the backend in production to ensure
    correct event publishing and subscription reliability.

    Args:
        session (Session): The SQLAlchemy session instance to use for backend operations.
        config (SQLAlchemyConfig | None): Optional custom configuration. If None, uses default Config().
        custom_models (Models | None): Optional custom ORM models. If None, uses default models.

    Returns:
        Self: The configured backend instance (for chaining).
    """

    if custom_models is None:
        custom_models = Models(
            event_model=DefaultEvent,
            stream_model=DefaultStream,
            snapshot_model=DefaultSnapshot,
            outbox_entry_model=DefaultOutboxEntry,
        )

    self[Session] = session
    self[SQLAlchemyConfig] = config or SQLAlchemyConfig()
    self[Models] = custom_models
    return self

SQLAlchemyConfig

Bases: BaseModel

Configuration for SQLAlchemyBackend event store integration.

Attributes:

Name Type Description
outbox_attempts PositiveInt

Maximum number of outbox delivery attempts per event before giving up.

gap_retry_interval timedelta

Time to wait before retrying a subscription gap. If the subscription detects a gap in event identifiers (e.g., missing event IDs), it assumes there may be an open transaction and the database has already assigned IDs for new events that are not yet committed. This interval determines how long the subscription waits before retrying to fetch events, preventing loss of events that are in the process of being written to the database.

Source code in event_sourcery_sqlalchemy/__init__.py
class SQLAlchemyConfig(BaseModel):
    """
    Configuration for SQLAlchemyBackend event store integration.

    Attributes:
        outbox_attempts (PositiveInt):
            Maximum number of outbox delivery attempts per event before giving up.
        gap_retry_interval (timedelta):
            Time to wait before retrying a subscription gap. If the subscription detects a gap in event identifiers (e.g., missing event IDs),
            it assumes there may be an open transaction and the database has already assigned IDs for new events that are not yet committed.
            This interval determines how long the subscription waits before retrying to fetch events, preventing loss of events that are in the process of being written to the database.
    """

    model_config = ConfigDict(extra="forbid", frozen=True)

    outbox_attempts: PositiveInt = 3
    gap_retry_interval: timedelta = timedelta(seconds=0.5)

Models

Container for SQLAlchemy ORM models used by the backend.

Allows customization of the event, stream, snapshot, and outbox entry models used by the backend. It allows to have different event store models for different modules in modular monolith applications still having in transactional event dispatching between them.

By default, uses the standard models provided by Event Sourcery.

Attributes:

Name Type Description
event_model type[BaseEvent]

SQLAlchemy model for events.

stream_model type[BaseStream]

SQLAlchemy model for streams.

snapshot_model type[BaseSnapshot]

SQLAlchemy model for snapshots.

outbox_entry_model type[BaseOutboxEntry]

SQLAlchemy model for outbox entries (default: DefaultOutboxEntry).

Source code in event_sourcery_sqlalchemy/__init__.py
@dataclass(frozen=True)
class Models:
    """
    Container for SQLAlchemy ORM models used by the backend.

    Allows customization of the event, stream, snapshot, and outbox entry models used by
    the backend. It allows to have different event store models for different modules in
    modular monolith applications still having in transactional event dispatching
    between them.

    By default, uses the standard models provided by Event Sourcery.

    Attributes:
        event_model (type[BaseEvent]): SQLAlchemy model for events.
        stream_model (type[BaseStream]): SQLAlchemy model for streams.
        snapshot_model (type[BaseSnapshot]): SQLAlchemy model for snapshots.
        outbox_entry_model (type[BaseOutboxEntry]):
            SQLAlchemy model for outbox entries (default: DefaultOutboxEntry).
    """

    event_model: type[BaseEvent]
    stream_model: type[BaseStream]
    snapshot_model: type[BaseSnapshot]
    outbox_entry_model: type[BaseOutboxEntry] = DefaultOutboxEntry

configure_models

Configures SQLAlchemy ORM models for Event Sourcery backend.

Sets up mapping information and registers the provided (or default) models with SQLAlchemy's registry. This function allows customization of event, stream, snapshot, outbox entry, and projector cursor models for advanced scenarios, or uses the default models for standard usage. Ensures all models are mapped declaratively and share the same metadata and class registry, enabling flexible schema management and migrations.

Parameters:

Name Type Description Default
base type[BaseProto]

Base class providing SQLAlchemy MetaData for model registration.

required
event_model type[BaseEvent]

Event model class to use. Defaults to DefaultEvent.

DefaultEvent
stream_model type[BaseStream]

Stream model class to use. Defaults to DefaultStream.

DefaultStream
snapshot_model type[BaseSnapshot]

Snapshot model class to use. Defaults to DefaultSnapshot.

DefaultSnapshot
outbox_entry_model type[BaseOutboxEntry]

Outbox entry model class to use. Defaults to DefaultOutboxEntry.

DefaultOutboxEntry
projector_cursor_model type[BaseProjectorCursor]

Projector cursor model class to use. Defaults to DefaultProjectorCursor.

DefaultProjectorCursor
Source code in event_sourcery_sqlalchemy/models/__init__.py
def configure_models(
    base: type[BaseProto],
    event_model: type[BaseEvent] = DefaultEvent,
    stream_model: type[BaseStream] = DefaultStream,
    snapshot_model: type[BaseSnapshot] = DefaultSnapshot,
    outbox_entry_model: type[BaseOutboxEntry] = DefaultOutboxEntry,
    projector_cursor_model: type[BaseProjectorCursor] = DefaultProjectorCursor,
) -> None:
    """
    Configures SQLAlchemy ORM models for Event Sourcery backend.

    Sets up mapping information and registers the provided (or default) models with
    SQLAlchemy's registry.
    This function allows customization of event, stream, snapshot, outbox entry, and
    projector cursor models for advanced scenarios, or uses the default models for
    standard usage.
    Ensures all models are mapped declaratively and share the same metadata and class
    registry, enabling flexible schema management and migrations.

    Args:
        base (type[BaseProto]):
            Base class providing SQLAlchemy MetaData for model registration.
        event_model (type[BaseEvent], optional):
            Event model class to use. Defaults to DefaultEvent.
        stream_model (type[BaseStream], optional):
            Stream model class to use. Defaults to DefaultStream.
        snapshot_model (type[BaseSnapshot], optional):
            Snapshot model class to use. Defaults to DefaultSnapshot.
        outbox_entry_model (type[BaseOutboxEntry], optional):
            Outbox entry model class to use. Defaults to DefaultOutboxEntry.
        projector_cursor_model (type[BaseProjectorCursor], optional):
            Projector cursor model class to use. Defaults to DefaultProjectorCursor.
    """
    event_model.__set_mapping_information__(stream_model)
    snapshot_model.__set_mapping_information__(stream_model)
    stream_model.__set_mapping_information__(event_model, snapshot_model)

    mapping_registry = registry(metadata=base.metadata, class_registry=_class_registry)
    for model_cls in (
        stream_model,
        event_model,
        snapshot_model,
        outbox_entry_model,
        projector_cursor_model,
    ):
        if model_cls in _class_registry.values():
            continue

        mapping_registry.map_declaratively(model_cls)

BaseEvent

Source code in event_sourcery_sqlalchemy/models/base.py
class BaseEvent:
    __stream_model__: type["BaseStream"]
    __tablename__: str

    @declared_attr  # type: ignore[arg-type]
    @classmethod
    def __table_args__(cls) -> tuple[Index | UniqueConstraint, ...]:
        return (
            Index(
                f"ix_events_stream_id_version_{cls.__tablename__}",
                "db_stream_id",
                "version",
                unique=True,
            ),
        )

    def __init__(
        self,
        uuid: UUID,
        created_at: datetime,
        name: str,
        data: dict,
        event_context: dict,
        version: int | None,
    ) -> None:
        self.uuid = uuid
        self.created_at = created_at
        self.name = name
        self.data = data
        self.event_context = event_context
        self.version = version

    @classmethod
    def __set_mapping_information__(cls, stream_model: type[BaseStream]) -> None:
        cls.__stream_model__ = stream_model

    @declared_attr
    @classmethod
    def _db_stream_id(cls) -> MappedColumn[Any]:
        return mapped_column(
            "db_stream_id",
            BigInteger().with_variant(Integer(), "sqlite"),
            ForeignKey(cls.__stream_model__.id),
            nullable=False,
            index=True,
        )

    @declared_attr
    @classmethod
    def stream(cls) -> Mapped[BaseStream]:
        return relationship(cls.__stream_model__, back_populates="events")

    id = mapped_column(BigInteger().with_variant(Integer(), "sqlite"), primary_key=True)
    version = mapped_column(Integer(), nullable=True)
    uuid = mapped_column(GUID(), index=True, unique=True)
    stream_id: AssociationProxy[StreamId] = association_proxy("stream", "stream_id")
    tenant_id: AssociationProxy[TenantId] = association_proxy("stream", "tenant_id")
    name = mapped_column(String(200), nullable=False)
    data = mapped_column(JSONB(), nullable=False)
    event_context = mapped_column(JSONB(), nullable=False)
    created_at = mapped_column(DateTime(), nullable=False, index=True)

BaseStream

Source code in event_sourcery_sqlalchemy/models/base.py
class BaseStream:
    __event_model__: type["BaseEvent"]
    __snapshot_model__: type["BaseSnapshot"]

    @declared_attr  # type: ignore[arg-type]
    @classmethod
    def __table_args__(cls) -> tuple[Index | UniqueConstraint, ...]:
        return (
            UniqueConstraint("uuid", "category", "tenant_id"),
            UniqueConstraint("name", "category", "tenant_id"),
        )

    id = mapped_column(BigInteger().with_variant(Integer(), "sqlite"), primary_key=True)
    uuid = mapped_column(GUID(), nullable=False)
    name = mapped_column(String(255), nullable=True, default=None)
    category = mapped_column(String(255), nullable=False, default="")
    tenant_id = mapped_column(String(255), nullable=False)
    version = mapped_column(BigInteger(), nullable=True)

    @classmethod
    def __set_mapping_information__(
        cls, event_model: type["BaseEvent"], snapshot_model: type["BaseSnapshot"]
    ) -> None:
        cls.__event_model__ = event_model
        cls.__snapshot_model__ = snapshot_model

    @declared_attr
    @classmethod
    def events(cls) -> Mapped[list["BaseEvent"]]:
        return relationship(cls.__event_model__, back_populates="stream")

    @declared_attr
    @classmethod
    def snapshots(cls) -> Mapped[list["BaseSnapshot"]]:
        return relationship(cls.__snapshot_model__, back_populates="stream")

    @hybrid_property
    def stream_id(self) -> StreamId:
        return StreamId(self.uuid, self.name, category=self.category or None)

    @stream_id.inplace.comparator
    @classmethod
    def _stream_id_comparator(cls) -> StreamIdComparator:
        return StreamIdComparator(cls.uuid, cls.name, cls.category)

BaseOutboxEntry

Source code in event_sourcery_sqlalchemy/models/base.py
class BaseOutboxEntry:
    id = mapped_column(BigInteger().with_variant(Integer(), "sqlite"), primary_key=True)
    created_at = mapped_column(DateTime(), nullable=False, index=True)
    data = mapped_column(JSONB(), nullable=False)
    stream_name = mapped_column(String(255), nullable=True)
    position = mapped_column(BigInteger().with_variant(Integer(), "sqlite"))
    tries_left = mapped_column(Integer(), nullable=False)

BaseProjectorCursor

Source code in event_sourcery_sqlalchemy/models/base.py
class BaseProjectorCursor:
    __tablename__: str

    @declared_attr  # type: ignore[arg-type]
    @classmethod
    def __table_args__(cls) -> tuple[Index | UniqueConstraint, ...]:
        return (
            Index(
                f"ix_name_stream_id_{cls.__tablename__}",
                "name",
                "stream_id",
                "category",
                unique=True,
            ),
        )

    id = mapped_column(BigInteger().with_variant(Integer(), "sqlite"), primary_key=True)
    name = mapped_column(String(255), nullable=False)
    stream_id = mapped_column(GUID(), nullable=False, index=True)
    category = mapped_column(String(255), nullable=True)
    version = mapped_column(BigInteger(), nullable=False)

BaseSnapshot

Source code in event_sourcery_sqlalchemy/models/base.py
class BaseSnapshot:
    __stream_model__: type["BaseStream"]

    def __init__(
        self,
        uuid: UUID,
        created_at: datetime,
        name: str,
        data: dict,
        event_context: dict,
        version: int | None,
    ) -> None:
        self.uuid = uuid
        self.created_at = created_at
        self.name = name
        self.data = data
        self.event_context = event_context
        self.version = version

    @classmethod
    def __set_mapping_information__(cls, stream_model: type[BaseStream]) -> None:
        cls.__stream_model__ = stream_model

    @declared_attr
    @classmethod
    def _db_stream_id(cls) -> MappedColumn[Any]:
        return mapped_column(
            "db_stream_id",
            BigInteger().with_variant(Integer(), "sqlite"),
            ForeignKey(cls.__stream_model__.id),
            nullable=False,
            index=True,
        )

    @declared_attr
    @classmethod
    def stream(cls) -> Mapped[BaseStream]:
        return relationship(cls.__stream_model__, back_populates="snapshots")

    uuid = mapped_column(GUID, primary_key=True)
    version = mapped_column(Integer(), nullable=False)
    stream_id: AssociationProxy[StreamId] = association_proxy("stream", "stream_id")
    name = mapped_column(String(50), nullable=False)
    data = mapped_column(JSONB(), nullable=False)
    event_context = mapped_column(JSONB(), nullable=False)
    created_at = mapped_column(DateTime(), nullable=False)