Skip to content

InMemoryBackendFactory

Bases: BackendFactory

Lightweight in-memory backend factory for testing and development.

Source code in event_sourcery/event_store/in_memory.py
@dataclass(repr=False)
class InMemoryBackendFactory(BackendFactory):
    """Lightweight in-memory backend factory for testing and development."""

    serde = Serde(Event.__registry__)

    _config: Config = field(default_factory=Config)
    _storage: Storage = field(default_factory=Storage)
    _outbox_strategy: InMemoryOutboxStorageStrategy | None = None
    _subscription_strategy: InMemorySubscriptionStrategy = field(init=False)

    def __post_init__(self) -> None:
        self._subscription_strategy = InMemorySubscriptionStrategy(self._storage)

    def build(self) -> TransactionalBackend:
        backend = TransactionalBackend()
        backend.serde = self.serde
        backend.in_transaction = Dispatcher(backend.serde)
        backend.event_store = EventStore(
            InMemoryStorageStrategy(
                self._storage,
                backend.in_transaction,
                self._outbox_strategy,
            ),
            backend.serde,
        )
        backend.outbox = Outbox(
            self._outbox_strategy or NoOutboxStorageStrategy(),
            backend.serde,
        )
        backend.subscriber = subscription.SubscriptionBuilder(
            _serde=backend.serde,
            _strategy=self._subscription_strategy,
        )
        return backend

    def with_event_registry(self, event_registry: EventRegistry) -> Self:
        self.serde = Serde(event_registry)
        return self

    def with_outbox(self, filterer: OutboxFiltererStrategy = no_filter) -> Self:
        self._outbox_strategy = InMemoryOutboxStorageStrategy(
            filterer,
            self._config.outbox_attempts,
        )
        return self

    def without_outbox(self, filterer: OutboxFiltererStrategy = no_filter) -> Self:
        self._outbox_strategy = None
        return self