Outbox pattern implementation for reliable event publishing.
Uses a storage strategy to fetch outbox entries.
Ensures that events are only removed from the outbox if they are processed successfully
(i.e., no exception is raised during publishing). If an exception occurs, the event remains
in the outbox for retry, providing at-least-once delivery semantics.
Parameters:
| Name |
Type |
Description |
Default |
strategy |
OutboxStorageStrategy
|
The backend strategy for outbox storage and entry management.
|
required
|
serde |
Serde
|
The serializer/deserializer for event records.
|
required
|
Source code in event_sourcery/_event_store/outbox.py
| class Outbox:
"""
Outbox pattern implementation for reliable event publishing.
Uses a storage strategy to fetch outbox entries.
Ensures that events are only removed from the outbox if they are processed successfully
(i.e., no exception is raised during publishing). If an exception occurs, the event remains
in the outbox for retry, providing at-least-once delivery semantics.
Args:
strategy (OutboxStorageStrategy): The backend strategy for outbox storage and entry management.
serde (Serde): The serializer/deserializer for event records.
"""
def __init__(self, strategy: OutboxStorageStrategy, serde: Serde) -> None:
self._strategy = strategy
self._serde = serde
def run(
self,
publisher: Callable[[Recorded], None],
limit: int = 100,
) -> None:
"""
Processes and publishes outbox entries using the provided publisher function.
Fetches entries from the outbox (up to the given limit), and passes each to the
publisher. If the publisher raises an exception, the event remains in the outbox
for retry. If processing succeeds, the event is removed from the outbox.
Args:
publisher (Callable[[Recorded], None]): Function to publish a single event.
limit (int, optional): Maximum number of entries to process in one run. Defaults to 100.
"""
stream = self._strategy.outbox_entries(limit=limit)
for entry in stream:
with entry as raw_record:
event = self._serde.deserialize(raw_record.entry)
record = Recorded(
wrapped_event=event,
stream_id=raw_record.entry.stream_id,
position=raw_record.position,
tenant_id=raw_record.tenant_id,
)
publisher(record)
|
run(publisher, limit=100)
Processes and publishes outbox entries using the provided publisher function.
Fetches entries from the outbox (up to the given limit), and passes each to the
publisher. If the publisher raises an exception, the event remains in the outbox
for retry. If processing succeeds, the event is removed from the outbox.
Parameters:
| Name |
Type |
Description |
Default |
publisher |
Callable[[Recorded], None]
|
Function to publish a single event.
|
required
|
limit |
int
|
Maximum number of entries to process in one run. Defaults to 100.
|
100
|
Source code in event_sourcery/_event_store/outbox.py
| def run(
self,
publisher: Callable[[Recorded], None],
limit: int = 100,
) -> None:
"""
Processes and publishes outbox entries using the provided publisher function.
Fetches entries from the outbox (up to the given limit), and passes each to the
publisher. If the publisher raises an exception, the event remains in the outbox
for retry. If processing succeeds, the event is removed from the outbox.
Args:
publisher (Callable[[Recorded], None]): Function to publish a single event.
limit (int, optional): Maximum number of entries to process in one run. Defaults to 100.
"""
stream = self._strategy.outbox_entries(limit=limit)
for entry in stream:
with entry as raw_record:
event = self._serde.deserialize(raw_record.entry)
record = Recorded(
wrapped_event=event,
stream_id=raw_record.entry.stream_id,
position=raw_record.position,
tenant_id=raw_record.tenant_id,
)
publisher(record)
|