🗞️ React to events - subscriptions
Subscriptions allow for asynchronous processing of events in another process.
Having backend
object after integrating with your application, grab its .subscriber
to start building a subscription.
Iterating over events one by one
Let's say we want to know about all paid invoices and we have an event for that - InvoicePaid
.
subscription = (
backend.subscriber.start_from(0) # read from position 0...
.to_events([InvoicePaid]) # ...InvoicePaid events...
.build_iter(timelimit=1) # ... iterate events one by one...
# ...and wait up to 1 second for a new event
)
subscription
is an iterable. Thus, it can be used with for loop:
for recorded_event in subscription:
if recorded_event is None: # no more events for now
break
# process the event
print(recorded_event.wrapped_event)
print(recorded_event.position)
print(recorded_event.tenant_id)
With every iteration we're getting an instance of Recorded or None
if there are no new events available.
Note
subscription
is an infinite iterator. Getting None
means that you are 'up-to-date' with Event Store
but as soon as a new event is appended, it will be returned in another iteration of the loop.
Iterating over events in batches
When we have to process a bigger amount of events it makes sense to do it in batches instead of processing events one by one.
To do it, we need to slightly alter code responsible for building our subscription
. Instead of build_iter
we call build_batch
:
batch_subscription = (
backend.subscriber.start_from(0)
.to_events([InvoicePaid])
.build_batch(size=10, timelimit=1) # try getting 10 events at a time
)
Then we can pass subscritpion
to for loop:
for batch in batch_subscription:
# process the batch
print(batch)
if len(batch) == 0: # no more events at the moment
break
In this example, batch is a list of Recorded.
Just like in previous case, subscription
is an infinite iterator. It will be returning batches of given size as long as there is enough events.
If there are fewer 'new' events available, batch subscription will return whatever it can. For example, if you specify batch size of 10 but get a list with 7 events, it means there were only 7 new events and time limit has passed.
When batch subscription catches up with event store, it will be returning empty lists. At least until some new events are saved.