📝 Save and read events
Basic usage
Once you have an EventStore instance after integrating with your application and some events defined, you can persist them:
invoice_paid = InvoicePaid(invoice_number="1003")
stream_id = StreamId(name="invoices/1003")
event_store.append(invoice_paid, stream_id=stream_id)
Events can be later retrieved by using load_stream
method:
events = event_store.load_stream(stream_id)
# [
# WrappedEvent(
# event=InvoicePaid(invoice_number='1003'),
# version=1,
# uuid=UUID('831cd32b-02b9-48d2-a67c-28cf7dbb37fa'),
# created_at=datetime.datetime(2025, 3, 16, 10, 3, 2, 138667),
# context=Context(correlation_id=None, causation_id=None)
# )
# ]
load_stream
returns a list of WrappedEvent objects. They contain a saved event under .event
attribute with its metadata in other attributes.
Version control
All events within a stream by default have assigned version number. This can be used to detect a situation of concurrent writes to the same stream.
You have a choice whether you want to check for versions conflict or not.
Explicit versioning
There is no need to add an expected version when adding some events to the stream for the first time, i.e. creating the stream:
stream_id = StreamId(name="invoices/1111")
an_event = InvoicePaid(invoice_number="1111")
event_store.append(
an_event, stream_id=stream_id
) # no `expected_version` argument given
However, when you add events to the stream for the second and subsequent times, you need to pass the expected version explicitly.
Otherwise, appending will fail with an exception:
another_event = InvoicePaid(invoice_number="1112")
# 👇 this would raise an exception
# event_store.append(another_event, stream_id=stream_id)
Hence, it is assumed that you will use the events versioning and protection against concurrent writes.
Info
This is a deliberate design choice, so you must decide explicitly if you need a concurrent writes protection or not.
In a typical flow, you'll first load a stream, perform some logic, then try to append new events. You'll then get the latest version from the last event loaded:
present_events = event_store.load_stream(stream_id)
last_version = present_events[-1].version
# ... some logic
another_event = InvoicePaid(invoice_number="1112")
event_store.append(
another_event, stream_id=stream_id, expected_version=last_version
)
No versioning
In case when you don't need protection against concurrent writes, you can disable versioning. NO_VERSIONING
must be used consistently for every append to such a stream.
stream_id = StreamId(name="invoices/123")
an_event = InvoicePaid(invoice_number="1111")
event_store.append(an_event, stream_id=stream_id, expected_version=NO_VERSIONING)
another_event = InvoicePaid(invoice_number="1112")
event_store.append(
another_event, stream_id=stream_id, expected_version=NO_VERSIONING
)
Info
Once a stream has been created with disabled versioning, you cannot enable it. It is also forbidden the other way around. You can always create a new stream and delete the old one.