📤 Publish events - outbox
About the pattern
Outbox is a pattern that ensures a message is reliably sent from the system. It provides at-least-once semantics, making sure that every message that CAN be sent to e.g. RabbitMQ, gets there.
Tip
Use Outbox when you want to send events to a broker (e.g. RabbitMQ, Kafka) or external system for analytics purposes (e.g. Segment.io).
Basic usage
Configure Event Sourcery
To use outbox, you have to add with_outbox
method call on factory while setting up Event Sourcery:
factory = (
SQLAlchemyBackendFactory(session).with_outbox() # enable outbox
)
backend = factory.build()
Write publishing function
To publish messages, you need to write a little bit of glue code that will actually send a message.
We need a publishing function that takes Recorded as the only argument and will do the sending.
Take RabbitMQ and pika
for example:
# setting up connection and queue...
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue="events")
def publish(recorded: Recorded) -> None:
as_json = recorded.wrapped_event.event.model_dump_json()
channel.basic_publish(exchange="", routing_key="events", body=as_json)
Run outbox
Now you can run outbox:
This will loop over events and will try to call publishing function for each one of them.
Optionally, you can specify a number of events to be processed in a single run:
By default, outbox.run
will try to process 100 events.
Transactional outbox
If you use any of transactional backends (e.g. SQLAlchemy or Django), then every call to outbox.run
should be wrapped with a database transaction.
Warning
Event Sourcery outbox keeps track of messages sent and attempts left. Without commiting a transaction, same messages will be sent over and over again.
Running outbox
Normally you'd be running outbox in an infinite loop, in a separate process - just like you'd do with a Celery worker:
from time import sleep
while True:
with session.begin():
backend.outbox.run(publish=publisher)
session.commit()
sleep(3) # wait between runs to avoid hammering the database
Optional filterer
Warning
This feature and/or its API is provisional and will probably change soon.
Sometimes you want only specific events to be sent. You can pass an optional filterer
argument to with_outbox
.
It should be a callable (e.g. a function) that accepts an event instance and returns True if an event should be published. False otherwise.
factory = SQLAlchemyBackendFactory(session).with_outbox(
filterer=lambda e: "InvoicePaid" in e.name
)
backend = factory.build()
Handling retries
Sending each event will be retried up to 3 times.