1# This file is dual licensed under the terms of the Apache License, Version 
    2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 
    3# for complete details. 
    4 
    5from __future__ import annotations 
    6 
    7import abc 
    8import typing 
    9 
    10from cryptography.hazmat.bindings._rust import openssl as rust_openssl 
    11from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm 
    12from cryptography.hazmat.primitives.ciphers import modes 
    13from cryptography.utils import Buffer 
    14 
    15 
    16class CipherContext(metaclass=abc.ABCMeta): 
    17    @abc.abstractmethod 
    18    def update(self, data: Buffer) -> bytes: 
    19        """ 
    20        Processes the provided bytes through the cipher and returns the results 
    21        as bytes. 
    22        """ 
    23 
    24    @abc.abstractmethod 
    25    def update_into(self, data: Buffer, buf: Buffer) -> int: 
    26        """ 
    27        Processes the provided bytes and writes the resulting data into the 
    28        provided buffer. Returns the number of bytes written. 
    29        """ 
    30 
    31    @abc.abstractmethod 
    32    def finalize(self) -> bytes: 
    33        """ 
    34        Returns the results of processing the final block as bytes. 
    35        """ 
    36 
    37    @abc.abstractmethod 
    38    def reset_nonce(self, nonce: bytes) -> None: 
    39        """ 
    40        Resets the nonce for the cipher context to the provided value. 
    41        Raises an exception if it does not support reset or if the 
    42        provided nonce does not have a valid length. 
    43        """ 
    44 
    45 
    46class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta): 
    47    @abc.abstractmethod 
    48    def authenticate_additional_data(self, data: Buffer) -> None: 
    49        """ 
    50        Authenticates the provided bytes. 
    51        """ 
    52 
    53 
    54class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta): 
    55    @abc.abstractmethod 
    56    def finalize_with_tag(self, tag: bytes) -> bytes: 
    57        """ 
    58        Returns the results of processing the final block as bytes and allows 
    59        delayed passing of the authentication tag. 
    60        """ 
    61 
    62 
    63class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta): 
    64    @property 
    65    @abc.abstractmethod 
    66    def tag(self) -> bytes: 
    67        """ 
    68        Returns tag bytes. This is only available after encryption is 
    69        finalized. 
    70        """ 
    71 
    72 
    73Mode = typing.TypeVar( 
    74    "Mode", bound=typing.Optional[modes.Mode], covariant=True 
    75) 
    76 
    77 
    78class Cipher(typing.Generic[Mode]): 
    79    def __init__( 
    80        self, 
    81        algorithm: CipherAlgorithm, 
    82        mode: Mode, 
    83        backend: typing.Any = None, 
    84    ) -> None: 
    85        if not isinstance(algorithm, CipherAlgorithm): 
    86            raise TypeError("Expected interface of CipherAlgorithm.") 
    87 
    88        if mode is not None: 
    89            # mypy needs this assert to narrow the type from our generic 
    90            # type. Maybe it won't some time in the future. 
    91            assert isinstance(mode, modes.Mode) 
    92            mode.validate_for_algorithm(algorithm) 
    93 
    94        self.algorithm = algorithm 
    95        self.mode = mode 
    96 
    97    @typing.overload 
    98    def encryptor( 
    99        self: Cipher[modes.ModeWithAuthenticationTag], 
    100    ) -> AEADEncryptionContext: ... 
    101 
    102    @typing.overload 
    103    def encryptor( 
    104        self: _CIPHER_TYPE, 
    105    ) -> CipherContext: ... 
    106 
    107    def encryptor(self): 
    108        if isinstance(self.mode, modes.ModeWithAuthenticationTag): 
    109            if self.mode.tag is not None: 
    110                raise ValueError( 
    111                    "Authentication tag must be None when encrypting." 
    112                ) 
    113 
    114        return rust_openssl.ciphers.create_encryption_ctx( 
    115            self.algorithm, self.mode 
    116        ) 
    117 
    118    @typing.overload 
    119    def decryptor( 
    120        self: Cipher[modes.ModeWithAuthenticationTag], 
    121    ) -> AEADDecryptionContext: ... 
    122 
    123    @typing.overload 
    124    def decryptor( 
    125        self: _CIPHER_TYPE, 
    126    ) -> CipherContext: ... 
    127 
    128    def decryptor(self): 
    129        return rust_openssl.ciphers.create_decryption_ctx( 
    130            self.algorithm, self.mode 
    131        ) 
    132 
    133 
    134_CIPHER_TYPE = Cipher[ 
    135    typing.Union[ 
    136        modes.ModeWithNonce, 
    137        modes.ModeWithTweak, 
    138        modes.ECB, 
    139        modes.ModeWithInitializationVector, 
    140        None, 
    141    ] 
    142] 
    143 
    144CipherContext.register(rust_openssl.ciphers.CipherContext) 
    145AEADEncryptionContext.register(rust_openssl.ciphers.AEADEncryptionContext) 
    146AEADDecryptionContext.register(rust_openssl.ciphers.AEADDecryptionContext)