Skip to content

📝 Save and read events

Basic usage

Once you have an EventStore instance after integrating with your application and some events defined, you can persist them:

invoice_paid = InvoicePaid(invoice_number="1003")
stream_id = StreamId(name="invoices/1003")
event_store.append(invoice_paid, stream_id=stream_id)

Events can be later retrieved by using load_stream method:

events = event_store.load_stream(stream_id)
# [
#   WrappedEvent(
#       event=InvoicePaid(invoice_number='1003'),
#       version=1,
#       uuid=UUID('831cd32b-02b9-48d2-a67c-28cf7dbb37fa'),
#       created_at=datetime.datetime(2025, 3, 16, 10, 3, 2, 138667),
#       context=Context(correlation_id=None, causation_id=None)
#   )
# ]

load_stream returns a list of WrappedEvent objects. They contain a saved event under .event attribute with its metadata in other attributes.

Version control

All events within a stream by default have assigned version number. This can be used to detect a situation of concurrent writes to the same stream.

You have a choice whether you want to check for versions conflict or not.

Explicit versioning

There is no need to add an expected version when adding some events to the stream for the first time, i.e. creating the stream:

stream_id = StreamId(name="invoices/1111")
an_event = InvoicePaid(invoice_number="1111")
event_store.append(
    an_event, stream_id=stream_id
)  # no `expected_version` argument given

However, when you add events to the stream for the second and subsequent times, you need to pass the expected version explicitly.

Otherwise, appending will fail with an exception:

another_event = InvoicePaid(invoice_number="1112")
# 👇 this would raise an exception
# event_store.append(another_event, stream_id=stream_id)

Hence, it is assumed that you will use the events versioning and protection against concurrent writes.

Info

This is a deliberate design choice, so you must decide explicitly if you need a concurrent writes protection or not.

In a typical flow, you'll first load a stream, perform some logic, then try to append new events. You'll then get the latest version from the last event loaded:

present_events = event_store.load_stream(stream_id)
last_version = present_events[-1].version

# ... some logic

another_event = InvoicePaid(invoice_number="1112")
event_store.append(
    another_event, stream_id=stream_id, expected_version=last_version
)

No versioning

In case when you don't need protection against concurrent writes, you can disable versioning. NO_VERSIONING must be used consistently for every append to such a stream.

stream_id = StreamId(name="invoices/123")

an_event = InvoicePaid(invoice_number="1111")
event_store.append(an_event, stream_id=stream_id, expected_version=NO_VERSIONING)

another_event = InvoicePaid(invoice_number="1112")
event_store.append(
    another_event, stream_id=stream_id, expected_version=NO_VERSIONING
)

Info

Once a stream has been created with disabled versioning, you cannot enable it. It is also forbidden the other way around. You can always create a new stream and delete the old one.