Base class for event-sourced aggregates.
Aggregates encapsulate domain logic and state changes as a sequence of events.
They are responsible for applying events to mutate their state and for emitting new
events that represent business-relevant changes.
The Aggregate base class provides mechanisms for tracking unpersisted changes and
for applying events in a consistent way.
Attributes:
| Name |
Type |
Description |
category |
ClassVar[str]
|
StreamCategory for the aggregate type (group streams).
|
_changes |
list[Event]
|
List of yet not persisted events.
|
Source code in event_sourcery/event_sourcing/aggregate.py
| class Aggregate:
"""
Base class for event-sourced aggregates.
Aggregates encapsulate domain logic and state changes as a sequence of events.
They are responsible for applying events to mutate their state and for emitting new
events that represent business-relevant changes.
The Aggregate base class provides mechanisms for tracking unpersisted changes and
for applying events in a consistent way.
Attributes:
category (ClassVar[str]): StreamCategory for the aggregate type (group streams).
_changes (list[Event]): List of yet not persisted events.
"""
category: ClassVar[str]
_changes: list[Event]
@contextmanager
def __persisting_changes__(self) -> Iterator[Iterator[Event]]:
"""
Context manager for accessing and clearing unpersisted events.
Yields an iterator over all events that have been applied but not yet persisted.
After exiting the context, the list of unpersisted events is cleared.
Returns:
Iterator[Iterator[Event]]: Iterator over unpersisted events.
"""
yield iter(getattr(self, "_changes", []))
self._changes = []
def __apply__(self, event: Event) -> None:
"""
Applies a single event to mutate the aggregate's state.
This method must be implemented by subclasses to define how each event type
changes the aggregate's state.
Args:
event (Event): The event to apply.
"""
raise NotImplementedError
def _emit(self, event: Event) -> None:
"""
Applies and tracks a new event as a pending change.
Calls __apply__ to mutate the aggregate's state and appends the event to the list
of unpersisted changes.
Args:
event (Event): The event to emit and track.
"""
if not hasattr(self, "_changes"):
self._changes = []
self.__apply__(event)
self._changes.append(event)
|
__apply__(event)
Applies a single event to mutate the aggregate's state.
This method must be implemented by subclasses to define how each event type
changes the aggregate's state.
Parameters:
| Name |
Type |
Description |
Default |
event |
Event
|
|
required
|
Source code in event_sourcery/event_sourcing/aggregate.py
| def __apply__(self, event: Event) -> None:
"""
Applies a single event to mutate the aggregate's state.
This method must be implemented by subclasses to define how each event type
changes the aggregate's state.
Args:
event (Event): The event to apply.
"""
raise NotImplementedError
|
__persisting_changes__()
Context manager for accessing and clearing unpersisted events.
Yields an iterator over all events that have been applied but not yet persisted.
After exiting the context, the list of unpersisted events is cleared.
Returns:
| Type |
Description |
Iterator[Iterator[Event]]
|
Iterator[Iterator[Event]]: Iterator over unpersisted events.
|
Source code in event_sourcery/event_sourcing/aggregate.py
| @contextmanager
def __persisting_changes__(self) -> Iterator[Iterator[Event]]:
"""
Context manager for accessing and clearing unpersisted events.
Yields an iterator over all events that have been applied but not yet persisted.
After exiting the context, the list of unpersisted events is cleared.
Returns:
Iterator[Iterator[Event]]: Iterator over unpersisted events.
"""
yield iter(getattr(self, "_changes", []))
self._changes = []
|