Bases: BackendFactory
Lightweight in-memory backend factory for testing and development.
Source code in event_sourcery/event_store/in_memory.py
| @dataclass(repr=False)
class InMemoryBackendFactory(BackendFactory):
"""Lightweight in-memory backend factory for testing and development."""
serde = Serde(Event.__registry__)
_config: Config = field(default_factory=Config)
_storage: Storage = field(default_factory=Storage)
_outbox_strategy: InMemoryOutboxStorageStrategy | None = None
_subscription_strategy: InMemorySubscriptionStrategy = field(init=False)
def __post_init__(self) -> None:
self._subscription_strategy = InMemorySubscriptionStrategy(self._storage)
def build(self) -> TransactionalBackend:
backend = TransactionalBackend()
backend.serde = self.serde
backend.in_transaction = Dispatcher(backend.serde)
backend.event_store = EventStore(
InMemoryStorageStrategy(
self._storage,
backend.in_transaction,
self._outbox_strategy,
),
backend.serde,
)
backend.outbox = Outbox(
self._outbox_strategy or NoOutboxStorageStrategy(),
backend.serde,
)
backend.subscriber = subscription.SubscriptionBuilder(
_serde=backend.serde,
_strategy=self._subscription_strategy,
)
return backend
def with_event_registry(self, event_registry: EventRegistry) -> Self:
self.serde = Serde(event_registry)
return self
def with_outbox(self, filterer: OutboxFiltererStrategy = no_filter) -> Self:
self._outbox_strategy = InMemoryOutboxStorageStrategy(
filterer,
self._config.outbox_attempts,
)
return self
def without_outbox(self, filterer: OutboxFiltererStrategy = no_filter) -> Self:
self._outbox_strategy = None
return self
|