Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/fernet.py: 26%
137 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5from __future__ import annotations
7import base64
8import binascii
9import os
10import time
11import typing
13from cryptography import utils
14from cryptography.exceptions import InvalidSignature
15from cryptography.hazmat.primitives import hashes, padding
16from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
17from cryptography.hazmat.primitives.hmac import HMAC
20class InvalidToken(Exception):
21 pass
24_MAX_CLOCK_SKEW = 60
27class Fernet:
28 def __init__(
29 self,
30 key: typing.Union[bytes, str],
31 backend: typing.Any = None,
32 ) -> None:
33 try:
34 key = base64.urlsafe_b64decode(key)
35 except binascii.Error as exc:
36 raise ValueError(
37 "Fernet key must be 32 url-safe base64-encoded bytes."
38 ) from exc
39 if len(key) != 32:
40 raise ValueError(
41 "Fernet key must be 32 url-safe base64-encoded bytes."
42 )
44 self._signing_key = key[:16]
45 self._encryption_key = key[16:]
47 @classmethod
48 def generate_key(cls) -> bytes:
49 return base64.urlsafe_b64encode(os.urandom(32))
51 def encrypt(self, data: bytes) -> bytes:
52 return self.encrypt_at_time(data, int(time.time()))
54 def encrypt_at_time(self, data: bytes, current_time: int) -> bytes:
55 iv = os.urandom(16)
56 return self._encrypt_from_parts(data, current_time, iv)
58 def _encrypt_from_parts(
59 self, data: bytes, current_time: int, iv: bytes
60 ) -> bytes:
61 utils._check_bytes("data", data)
63 padder = padding.PKCS7(algorithms.AES.block_size).padder()
64 padded_data = padder.update(data) + padder.finalize()
65 encryptor = Cipher(
66 algorithms.AES(self._encryption_key),
67 modes.CBC(iv),
68 ).encryptor()
69 ciphertext = encryptor.update(padded_data) + encryptor.finalize()
71 basic_parts = (
72 b"\x80"
73 + current_time.to_bytes(length=8, byteorder="big")
74 + iv
75 + ciphertext
76 )
78 h = HMAC(self._signing_key, hashes.SHA256())
79 h.update(basic_parts)
80 hmac = h.finalize()
81 return base64.urlsafe_b64encode(basic_parts + hmac)
83 def decrypt(
84 self, token: typing.Union[bytes, str], ttl: typing.Optional[int] = None
85 ) -> bytes:
86 timestamp, data = Fernet._get_unverified_token_data(token)
87 if ttl is None:
88 time_info = None
89 else:
90 time_info = (ttl, int(time.time()))
91 return self._decrypt_data(data, timestamp, time_info)
93 def decrypt_at_time(
94 self, token: typing.Union[bytes, str], ttl: int, current_time: int
95 ) -> bytes:
96 if ttl is None:
97 raise ValueError(
98 "decrypt_at_time() can only be used with a non-None ttl"
99 )
100 timestamp, data = Fernet._get_unverified_token_data(token)
101 return self._decrypt_data(data, timestamp, (ttl, current_time))
103 def extract_timestamp(self, token: typing.Union[bytes, str]) -> int:
104 timestamp, data = Fernet._get_unverified_token_data(token)
105 # Verify the token was not tampered with.
106 self._verify_signature(data)
107 return timestamp
109 @staticmethod
110 def _get_unverified_token_data(
111 token: typing.Union[bytes, str]
112 ) -> typing.Tuple[int, bytes]:
113 if not isinstance(token, (str, bytes)):
114 raise TypeError("token must be bytes or str")
116 try:
117 data = base64.urlsafe_b64decode(token)
118 except (TypeError, binascii.Error):
119 raise InvalidToken
121 if not data or data[0] != 0x80:
122 raise InvalidToken
124 if len(data) < 9:
125 raise InvalidToken
127 timestamp = int.from_bytes(data[1:9], byteorder="big")
128 return timestamp, data
130 def _verify_signature(self, data: bytes) -> None:
131 h = HMAC(self._signing_key, hashes.SHA256())
132 h.update(data[:-32])
133 try:
134 h.verify(data[-32:])
135 except InvalidSignature:
136 raise InvalidToken
138 def _decrypt_data(
139 self,
140 data: bytes,
141 timestamp: int,
142 time_info: typing.Optional[typing.Tuple[int, int]],
143 ) -> bytes:
144 if time_info is not None:
145 ttl, current_time = time_info
146 if timestamp + ttl < current_time:
147 raise InvalidToken
149 if current_time + _MAX_CLOCK_SKEW < timestamp:
150 raise InvalidToken
152 self._verify_signature(data)
154 iv = data[9:25]
155 ciphertext = data[25:-32]
156 decryptor = Cipher(
157 algorithms.AES(self._encryption_key), modes.CBC(iv)
158 ).decryptor()
159 plaintext_padded = decryptor.update(ciphertext)
160 try:
161 plaintext_padded += decryptor.finalize()
162 except ValueError:
163 raise InvalidToken
164 unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
166 unpadded = unpadder.update(plaintext_padded)
167 try:
168 unpadded += unpadder.finalize()
169 except ValueError:
170 raise InvalidToken
171 return unpadded
174class MultiFernet:
175 def __init__(self, fernets: typing.Iterable[Fernet]):
176 fernets = list(fernets)
177 if not fernets:
178 raise ValueError(
179 "MultiFernet requires at least one Fernet instance"
180 )
181 self._fernets = fernets
183 def encrypt(self, msg: bytes) -> bytes:
184 return self.encrypt_at_time(msg, int(time.time()))
186 def encrypt_at_time(self, msg: bytes, current_time: int) -> bytes:
187 return self._fernets[0].encrypt_at_time(msg, current_time)
189 def rotate(self, msg: typing.Union[bytes, str]) -> bytes:
190 timestamp, data = Fernet._get_unverified_token_data(msg)
191 for f in self._fernets:
192 try:
193 p = f._decrypt_data(data, timestamp, None)
194 break
195 except InvalidToken:
196 pass
197 else:
198 raise InvalidToken
200 iv = os.urandom(16)
201 return self._fernets[0]._encrypt_from_parts(p, timestamp, iv)
203 def decrypt(
204 self, msg: typing.Union[bytes, str], ttl: typing.Optional[int] = None
205 ) -> bytes:
206 for f in self._fernets:
207 try:
208 return f.decrypt(msg, ttl)
209 except InvalidToken:
210 pass
211 raise InvalidToken
213 def decrypt_at_time(
214 self, msg: typing.Union[bytes, str], ttl: int, current_time: int
215 ) -> bytes:
216 for f in self._fernets:
217 try:
218 return f.decrypt_at_time(msg, ttl, current_time)
219 except InvalidToken:
220 pass
221 raise InvalidToken