Skip to content

Encryption

Integrates encryption logic and key management. Uses the KeyStorageStrategy and EncryptionStrategy interfaces. Responsible for: - Retrieving the key for a given subject_id - Encrypting and decrypting data - Masking data when the key is not available (crypto-shredding) - Processing event encryption/decryption with field annotations

Source code in event_sourcery/_event_store/event/encryption.py
@dataclass
class Encryption:
    """
    Integrates encryption logic and key management.
    Uses the KeyStorageStrategy and EncryptionStrategy interfaces.
    Responsible for:
      - Retrieving the key for a given subject_id
      - Encrypting and decrypting data
      - Masking data when the key is not available (crypto-shredding)
      - Processing event encryption/decryption with field annotations
    """

    registry: EventRegistry
    strategy: EncryptionStrategy
    key_storage: EncryptionKeyStorageStrategy

    def encrypt(self, event: BaseModel, stream_id: StreamId) -> dict[str, Any]:
        """
        Encrypts all fields of the event marked as encrypted.

        Args:
            event (BaseModel): The event instance to encrypt.
            stream_id (StreamId): The stream identifier used for subject resolution.

        Returns:
            dict[str, Any]: The event data with encrypted fields.

        Raises:
            NoSubjectIdFound: If the subject id cannot be determined for encryption.
            KeyNotFoundError: If the encryption key for a subject is missing.
        """
        event_type = type(event)
        data = NestedDict(event.model_dump(mode="json"))
        for field_name in self.registry.encrypted_fields(of=event_type).keys():
            subject_field = self.registry.subject_filed(field_name, of=event_type)
            subject_id = (
                subject_field and getattr(event, subject_field)
            ) or stream_id.name

            if subject_id is None:
                raise NoSubjectIdFound(stream_id)
            data[field_name] = self._encrypt_value(data[field_name], subject_id)
        return data.data

    def decrypt(
        self,
        event_type: type[BaseModel],
        raw: dict[str, Any],
        stream_id: StreamId,
    ) -> dict[str, Any]:
        """
        Decrypts all fields of the event marked as encrypted in the registry.

        Args:
            event_type (type[BaseModel]): The event class type.
            raw (dict[str, Any]): The raw event data with encrypted fields.
            stream_id (StreamId): The stream identifier used for subject resolution.

        Returns:
            dict[str, Any]: The event data with decrypted fields (or masked if no key).
        """
        data = NestedDict(raw)
        encrypted_fields = self.registry.encrypted_fields(of=event_type)
        for field_name, encrypted_config in encrypted_fields.items():
            subject_field = self.registry.subject_filed(field_name, of=event_type)
            subject_id = data[subject_field] if subject_field else stream_id.name or ""
            data[field_name] = self._decrypt_value(
                data[field_name],
                subject_id,
                encrypted_config.mask_value,
            )
        return data.data

    def _decrypt_value(self, value: str, subject_id: str, mask_value: Any) -> Any:
        key = self.key_storage.get(subject_id)
        if key is None:
            return mask_value
        decrypted = self.strategy.decrypt(value, key)
        try:
            return json.loads(decrypted)
        except (json.JSONDecodeError, TypeError):
            return decrypted

    def _encrypt_value(self, value: Any, subject_id: str) -> str:
        key = self.key_storage.get(subject_id)
        if key is None:
            raise KeyNotFoundError(subject_id)
        match value:
            case str():
                serialized = value
            case _:
                serialized = json.dumps(value)
        return self.strategy.encrypt(serialized, key)

    def shred(self, subject_id: str) -> None:
        """
        Deletes the encryption key for the given subject, effectively making all
        encrypted data for that subject unrecoverable (crypto-shredding).

        Args:
            subject_id (str): The subject identifier whose key should be deleted.
        """
        self.key_storage.delete(subject_id)

decrypt(event_type, raw, stream_id)

Decrypts all fields of the event marked as encrypted in the registry.

Parameters:

Name Type Description Default
event_type type[BaseModel]

The event class type.

required
raw dict[str, Any]

The raw event data with encrypted fields.

required
stream_id StreamId

The stream identifier used for subject resolution.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The event data with decrypted fields (or masked if no key).

Source code in event_sourcery/_event_store/event/encryption.py
def decrypt(
    self,
    event_type: type[BaseModel],
    raw: dict[str, Any],
    stream_id: StreamId,
) -> dict[str, Any]:
    """
    Decrypts all fields of the event marked as encrypted in the registry.

    Args:
        event_type (type[BaseModel]): The event class type.
        raw (dict[str, Any]): The raw event data with encrypted fields.
        stream_id (StreamId): The stream identifier used for subject resolution.

    Returns:
        dict[str, Any]: The event data with decrypted fields (or masked if no key).
    """
    data = NestedDict(raw)
    encrypted_fields = self.registry.encrypted_fields(of=event_type)
    for field_name, encrypted_config in encrypted_fields.items():
        subject_field = self.registry.subject_filed(field_name, of=event_type)
        subject_id = data[subject_field] if subject_field else stream_id.name or ""
        data[field_name] = self._decrypt_value(
            data[field_name],
            subject_id,
            encrypted_config.mask_value,
        )
    return data.data

encrypt(event, stream_id)

Encrypts all fields of the event marked as encrypted.

Parameters:

Name Type Description Default
event BaseModel

The event instance to encrypt.

required
stream_id StreamId

The stream identifier used for subject resolution.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The event data with encrypted fields.

Raises:

Type Description
NoSubjectIdFound

If the subject id cannot be determined for encryption.

KeyNotFoundError

If the encryption key for a subject is missing.

Source code in event_sourcery/_event_store/event/encryption.py
def encrypt(self, event: BaseModel, stream_id: StreamId) -> dict[str, Any]:
    """
    Encrypts all fields of the event marked as encrypted.

    Args:
        event (BaseModel): The event instance to encrypt.
        stream_id (StreamId): The stream identifier used for subject resolution.

    Returns:
        dict[str, Any]: The event data with encrypted fields.

    Raises:
        NoSubjectIdFound: If the subject id cannot be determined for encryption.
        KeyNotFoundError: If the encryption key for a subject is missing.
    """
    event_type = type(event)
    data = NestedDict(event.model_dump(mode="json"))
    for field_name in self.registry.encrypted_fields(of=event_type).keys():
        subject_field = self.registry.subject_filed(field_name, of=event_type)
        subject_id = (
            subject_field and getattr(event, subject_field)
        ) or stream_id.name

        if subject_id is None:
            raise NoSubjectIdFound(stream_id)
        data[field_name] = self._encrypt_value(data[field_name], subject_id)
    return data.data

shred(subject_id)

Deletes the encryption key for the given subject, effectively making all encrypted data for that subject unrecoverable (crypto-shredding).

Parameters:

Name Type Description Default
subject_id str

The subject identifier whose key should be deleted.

required
Source code in event_sourcery/_event_store/event/encryption.py
def shred(self, subject_id: str) -> None:
    """
    Deletes the encryption key for the given subject, effectively making all
    encrypted data for that subject unrecoverable (crypto-shredding).

    Args:
        subject_id (str): The subject identifier whose key should be deleted.
    """
    self.key_storage.delete(subject_id)