1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import annotations
6
7import abc
8import typing
9
10from cryptography.hazmat.bindings._rust import openssl as rust_openssl
11from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm
12from cryptography.hazmat.primitives.ciphers import modes
13from cryptography.utils import Buffer
14
15
16class CipherContext(metaclass=abc.ABCMeta):
17 @abc.abstractmethod
18 def update(self, data: Buffer) -> bytes:
19 """
20 Processes the provided bytes through the cipher and returns the results
21 as bytes.
22 """
23
24 @abc.abstractmethod
25 def update_into(self, data: Buffer, buf: Buffer) -> int:
26 """
27 Processes the provided bytes and writes the resulting data into the
28 provided buffer. Returns the number of bytes written.
29 """
30
31 @abc.abstractmethod
32 def finalize(self) -> bytes:
33 """
34 Returns the results of processing the final block as bytes.
35 """
36
37 @abc.abstractmethod
38 def reset_nonce(self, nonce: bytes) -> None:
39 """
40 Resets the nonce for the cipher context to the provided value.
41 Raises an exception if it does not support reset or if the
42 provided nonce does not have a valid length.
43 """
44
45
46class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta):
47 @abc.abstractmethod
48 def authenticate_additional_data(self, data: Buffer) -> None:
49 """
50 Authenticates the provided bytes.
51 """
52
53
54class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
55 @abc.abstractmethod
56 def finalize_with_tag(self, tag: bytes) -> bytes:
57 """
58 Returns the results of processing the final block as bytes and allows
59 delayed passing of the authentication tag.
60 """
61
62
63class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
64 @property
65 @abc.abstractmethod
66 def tag(self) -> bytes:
67 """
68 Returns tag bytes. This is only available after encryption is
69 finalized.
70 """
71
72
73Mode = typing.TypeVar(
74 "Mode", bound=typing.Optional[modes.Mode], covariant=True
75)
76
77
78class Cipher(typing.Generic[Mode]):
79 def __init__(
80 self,
81 algorithm: CipherAlgorithm,
82 mode: Mode,
83 backend: typing.Any = None,
84 ) -> None:
85 if not isinstance(algorithm, CipherAlgorithm):
86 raise TypeError("Expected interface of CipherAlgorithm.")
87
88 if mode is not None:
89 # mypy needs this assert to narrow the type from our generic
90 # type. Maybe it won't some time in the future.
91 assert isinstance(mode, modes.Mode)
92 mode.validate_for_algorithm(algorithm)
93
94 self.algorithm = algorithm
95 self.mode = mode
96
97 @typing.overload
98 def encryptor(
99 self: Cipher[modes.ModeWithAuthenticationTag],
100 ) -> AEADEncryptionContext: ...
101
102 @typing.overload
103 def encryptor(
104 self: _CIPHER_TYPE,
105 ) -> CipherContext: ...
106
107 def encryptor(self):
108 if isinstance(self.mode, modes.ModeWithAuthenticationTag):
109 if self.mode.tag is not None:
110 raise ValueError(
111 "Authentication tag must be None when encrypting."
112 )
113
114 return rust_openssl.ciphers.create_encryption_ctx(
115 self.algorithm, self.mode
116 )
117
118 @typing.overload
119 def decryptor(
120 self: Cipher[modes.ModeWithAuthenticationTag],
121 ) -> AEADDecryptionContext: ...
122
123 @typing.overload
124 def decryptor(
125 self: _CIPHER_TYPE,
126 ) -> CipherContext: ...
127
128 def decryptor(self):
129 return rust_openssl.ciphers.create_decryption_ctx(
130 self.algorithm, self.mode
131 )
132
133
134_CIPHER_TYPE = Cipher[
135 typing.Union[
136 modes.ModeWithNonce,
137 modes.ModeWithTweak,
138 modes.ECB,
139 modes.ModeWithInitializationVector,
140 None,
141 ]
142]
143
144CipherContext.register(rust_openssl.ciphers.CipherContext)
145AEADEncryptionContext.register(rust_openssl.ciphers.AEADEncryptionContext)
146AEADDecryptionContext.register(rust_openssl.ciphers.AEADDecryptionContext)