Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/fernet.py: 60%
137 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 07:26 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 07:26 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5from __future__ import annotations
7import base64
8import binascii
9import os
10import time
11import typing
13from cryptography import utils
14from cryptography.exceptions import InvalidSignature
15from cryptography.hazmat.primitives import hashes, padding
16from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
17from cryptography.hazmat.primitives.hmac import HMAC
20class InvalidToken(Exception):
21 pass
24_MAX_CLOCK_SKEW = 60
27class Fernet:
28 def __init__(
29 self,
30 key: bytes | str,
31 backend: typing.Any = None,
32 ) -> None:
33 try:
34 key = base64.urlsafe_b64decode(key)
35 except binascii.Error as exc:
36 raise ValueError(
37 "Fernet key must be 32 url-safe base64-encoded bytes."
38 ) from exc
39 if len(key) != 32:
40 raise ValueError(
41 "Fernet key must be 32 url-safe base64-encoded bytes."
42 )
44 self._signing_key = key[:16]
45 self._encryption_key = key[16:]
47 @classmethod
48 def generate_key(cls) -> bytes:
49 return base64.urlsafe_b64encode(os.urandom(32))
51 def encrypt(self, data: bytes) -> bytes:
52 return self.encrypt_at_time(data, int(time.time()))
54 def encrypt_at_time(self, data: bytes, current_time: int) -> bytes:
55 iv = os.urandom(16)
56 return self._encrypt_from_parts(data, current_time, iv)
58 def _encrypt_from_parts(
59 self, data: bytes, current_time: int, iv: bytes
60 ) -> bytes:
61 utils._check_bytes("data", data)
63 padder = padding.PKCS7(algorithms.AES.block_size).padder()
64 padded_data = padder.update(data) + padder.finalize()
65 encryptor = Cipher(
66 algorithms.AES(self._encryption_key),
67 modes.CBC(iv),
68 ).encryptor()
69 ciphertext = encryptor.update(padded_data) + encryptor.finalize()
71 basic_parts = (
72 b"\x80"
73 + current_time.to_bytes(length=8, byteorder="big")
74 + iv
75 + ciphertext
76 )
78 h = HMAC(self._signing_key, hashes.SHA256())
79 h.update(basic_parts)
80 hmac = h.finalize()
81 return base64.urlsafe_b64encode(basic_parts + hmac)
83 def decrypt(self, token: bytes | str, ttl: int | None = None) -> bytes:
84 timestamp, data = Fernet._get_unverified_token_data(token)
85 if ttl is None:
86 time_info = None
87 else:
88 time_info = (ttl, int(time.time()))
89 return self._decrypt_data(data, timestamp, time_info)
91 def decrypt_at_time(
92 self, token: bytes | str, ttl: int, current_time: int
93 ) -> bytes:
94 if ttl is None:
95 raise ValueError(
96 "decrypt_at_time() can only be used with a non-None ttl"
97 )
98 timestamp, data = Fernet._get_unverified_token_data(token)
99 return self._decrypt_data(data, timestamp, (ttl, current_time))
101 def extract_timestamp(self, token: bytes | str) -> int:
102 timestamp, data = Fernet._get_unverified_token_data(token)
103 # Verify the token was not tampered with.
104 self._verify_signature(data)
105 return timestamp
107 @staticmethod
108 def _get_unverified_token_data(token: bytes | str) -> tuple[int, bytes]:
109 if not isinstance(token, (str, bytes)):
110 raise TypeError("token must be bytes or str")
112 try:
113 data = base64.urlsafe_b64decode(token)
114 except (TypeError, binascii.Error):
115 raise InvalidToken
117 if not data or data[0] != 0x80:
118 raise InvalidToken
120 if len(data) < 9:
121 raise InvalidToken
123 timestamp = int.from_bytes(data[1:9], byteorder="big")
124 return timestamp, data
126 def _verify_signature(self, data: bytes) -> None:
127 h = HMAC(self._signing_key, hashes.SHA256())
128 h.update(data[:-32])
129 try:
130 h.verify(data[-32:])
131 except InvalidSignature:
132 raise InvalidToken
134 def _decrypt_data(
135 self,
136 data: bytes,
137 timestamp: int,
138 time_info: tuple[int, int] | None,
139 ) -> bytes:
140 if time_info is not None:
141 ttl, current_time = time_info
142 if timestamp + ttl < current_time:
143 raise InvalidToken
145 if current_time + _MAX_CLOCK_SKEW < timestamp:
146 raise InvalidToken
148 self._verify_signature(data)
150 iv = data[9:25]
151 ciphertext = data[25:-32]
152 decryptor = Cipher(
153 algorithms.AES(self._encryption_key), modes.CBC(iv)
154 ).decryptor()
155 plaintext_padded = decryptor.update(ciphertext)
156 try:
157 plaintext_padded += decryptor.finalize()
158 except ValueError:
159 raise InvalidToken
160 unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
162 unpadded = unpadder.update(plaintext_padded)
163 try:
164 unpadded += unpadder.finalize()
165 except ValueError:
166 raise InvalidToken
167 return unpadded
170class MultiFernet:
171 def __init__(self, fernets: typing.Iterable[Fernet]):
172 fernets = list(fernets)
173 if not fernets:
174 raise ValueError(
175 "MultiFernet requires at least one Fernet instance"
176 )
177 self._fernets = fernets
179 def encrypt(self, msg: bytes) -> bytes:
180 return self.encrypt_at_time(msg, int(time.time()))
182 def encrypt_at_time(self, msg: bytes, current_time: int) -> bytes:
183 return self._fernets[0].encrypt_at_time(msg, current_time)
185 def rotate(self, msg: bytes | str) -> bytes:
186 timestamp, data = Fernet._get_unverified_token_data(msg)
187 for f in self._fernets:
188 try:
189 p = f._decrypt_data(data, timestamp, None)
190 break
191 except InvalidToken:
192 pass
193 else:
194 raise InvalidToken
196 iv = os.urandom(16)
197 return self._fernets[0]._encrypt_from_parts(p, timestamp, iv)
199 def decrypt(self, msg: bytes | str, ttl: int | None = None) -> bytes:
200 for f in self._fernets:
201 try:
202 return f.decrypt(msg, ttl)
203 except InvalidToken:
204 pass
205 raise InvalidToken
207 def decrypt_at_time(
208 self, msg: bytes | str, ttl: int, current_time: int
209 ) -> bytes:
210 for f in self._fernets:
211 try:
212 return f.decrypt_at_time(msg, ttl, current_time)
213 except InvalidToken:
214 pass
215 raise InvalidToken