Skip to content

📤 Publish events - outbox

About the pattern

Outbox is a pattern that ensures a message is reliably sent from the system. It provides at-least-once semantics, making sure that every message that CAN be sent to e.g. RabbitMQ, gets there.

Tip

Use Outbox when you want to send events to a broker (e.g. RabbitMQ, Kafka) or external system for analytics purposes (e.g. Segment.io).

Basic usage

Configure Event Sourcery

To use outbox, you have to add with_outbox method call on factory while setting up Event Sourcery:

factory = (
    SQLAlchemyBackendFactory(session).with_outbox()  # enable outbox
)
backend = factory.build()

Write publishing function

To publish messages, you need to write a little bit of glue code that will actually send a message.

We need a publishing function that takes Recorded as the only argument and will do the sending.

Take RabbitMQ and pika for example:

# setting up connection and queue...
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue="events")


def publish(recorded: Recorded) -> None:
    as_json = recorded.wrapped_event.event.model_dump_json()
    channel.basic_publish(exchange="", routing_key="events", body=as_json)

Run outbox

Now you can run outbox:

backend.outbox.run(publisher=publish)

This will loop over events and will try to call publishing function for each one of them.

Optionally, you can specify a number of events to be processed in a single run:

backend.outbox.run(publisher=publish, limit=50)

By default, outbox.run will try to process 100 events.

Transactional outbox

If you use any of transactional backends (e.g. SQLAlchemy or Django), then every call to outbox.run should be wrapped with a database transaction.

with session.begin():
    backend.outbox.run(publisher=publish)
    session.commit()

Warning

Event Sourcery outbox keeps track of messages sent and attempts left. Without commiting a transaction, same messages will be sent over and over again.

Running outbox

Normally you'd be running outbox in an infinite loop, in a separate process - just like you'd do with a Celery worker:

from time import sleep


while True:
    with session.begin():
        backend.outbox.run(publish=publisher)
        session.commit()
        sleep(3)  # wait between runs to avoid hammering the database

Optional filterer

Warning

This feature and/or its API is provisional and will probably change soon.

Sometimes you want only specific events to be sent. You can pass an optional filterer argument to with_outbox.

It should be a callable (e.g. a function) that accepts an event instance and returns True if an event should be published. False otherwise.

factory = SQLAlchemyBackendFactory(session).with_outbox(
    filterer=lambda e: "InvoicePaid" in e.name
)
backend = factory.build()

Handling retries

Sending each event will be retried up to 3 times.