Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/itsdangerous/serializer.py: 67%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

120 statements  

1from __future__ import annotations 

2 

3import collections.abc as cabc 

4import json 

5import typing as t 

6 

7from .encoding import want_bytes 

8from .exc import BadPayload 

9from .exc import BadSignature 

10from .signer import _make_keys_list 

11from .signer import Signer 

12 

13if t.TYPE_CHECKING: 

14 import typing_extensions as te 

15 

16 # This should be either be str or bytes. To avoid having to specify the 

17 # bound type, it falls back to a union if structural matching fails. 

18 _TSerialized = te.TypeVar("_TSerialized", bound=str | bytes, default=str | bytes) 

19else: 

20 # Still available at runtime on Python < 3.13, but without the default. 

21 _TSerialized = t.TypeVar("_TSerialized", bound=str | bytes) 

22 

23 

24class _PDataSerializer(t.Protocol[_TSerialized]): 

25 def loads(self, payload: _TSerialized, /) -> t.Any: ... 

26 # A signature with additional arguments is not handled correctly by type 

27 # checkers right now, so an overload is used below for serializers that 

28 # don't match this strict protocol. 

29 def dumps(self, obj: t.Any, /) -> _TSerialized: ... 

30 

31 

32# Use TypeIs once it's available in typing_extensions or 3.13. 

33def is_text_serializer( 

34 serializer: _PDataSerializer[t.Any], 

35) -> te.TypeGuard[_PDataSerializer[str]]: 

36 """Checks whether a serializer generates text or binary.""" 

37 return isinstance(serializer.dumps({}), str) 

38 

39 

40class Serializer(t.Generic[_TSerialized]): 

41 """A serializer wraps a :class:`~itsdangerous.signer.Signer` to 

42 enable serializing and securely signing data other than bytes. It 

43 can unsign to verify that the data hasn't been changed. 

44 

45 The serializer provides :meth:`dumps` and :meth:`loads`, similar to 

46 :mod:`json`, and by default uses :mod:`json` internally to serialize 

47 the data to bytes. 

48 

49 The secret key should be a random string of ``bytes`` and should not 

50 be saved to code or version control. Different salts should be used 

51 to distinguish signing in different contexts. See :doc:`/concepts` 

52 for information about the security of the secret key and salt. 

53 

54 :param secret_key: The secret key to sign and verify with. Can be a 

55 list of keys, oldest to newest, to support key rotation. 

56 :param salt: Extra key to combine with ``secret_key`` to distinguish 

57 signatures in different contexts. 

58 :param serializer: An object that provides ``dumps`` and ``loads`` 

59 methods for serializing data to a string. Defaults to 

60 :attr:`default_serializer`, which defaults to :mod:`json`. 

61 :param serializer_kwargs: Keyword arguments to pass when calling 

62 ``serializer.dumps``. 

63 :param signer: A ``Signer`` class to instantiate when signing data. 

64 Defaults to :attr:`default_signer`, which defaults to 

65 :class:`~itsdangerous.signer.Signer`. 

66 :param signer_kwargs: Keyword arguments to pass when instantiating 

67 the ``Signer`` class. 

68 :param fallback_signers: List of signer parameters to try when 

69 unsigning with the default signer fails. Each item can be a dict 

70 of ``signer_kwargs``, a ``Signer`` class, or a tuple of 

71 ``(signer, signer_kwargs)``. Defaults to 

72 :attr:`default_fallback_signers`. 

73 

74 .. versionchanged:: 2.0 

75 Added support for key rotation by passing a list to 

76 ``secret_key``. 

77 

78 .. versionchanged:: 2.0 

79 Removed the default SHA-512 fallback signer from 

80 ``default_fallback_signers``. 

81 

82 .. versionchanged:: 1.1 

83 Added support for ``fallback_signers`` and configured a default 

84 SHA-512 fallback. This fallback is for users who used the yanked 

85 1.0.0 release which defaulted to SHA-512. 

86 

87 .. versionchanged:: 0.14 

88 The ``signer`` and ``signer_kwargs`` parameters were added to 

89 the constructor. 

90 """ 

91 

92 #: The default serialization module to use to serialize data to a 

93 #: string internally. The default is :mod:`json`, but can be changed 

94 #: to any object that provides ``dumps`` and ``loads`` methods. 

95 default_serializer: _PDataSerializer[t.Any] = json 

96 

97 #: The default ``Signer`` class to instantiate when signing data. 

98 #: The default is :class:`itsdangerous.signer.Signer`. 

99 default_signer: type[Signer] = Signer 

100 

101 #: The default fallback signers to try when unsigning fails. 

102 default_fallback_signers: list[ 

103 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

104 ] = [] 

105 

106 # Serializer[str] if no data serializer is provided, or if it returns str. 

107 @t.overload 

108 def __init__( 

109 self: Serializer[str], 

110 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 

111 salt: str | bytes | None = b"itsdangerous", 

112 serializer: None | _PDataSerializer[str] = None, 

113 serializer_kwargs: dict[str, t.Any] | None = None, 

114 signer: type[Signer] | None = None, 

115 signer_kwargs: dict[str, t.Any] | None = None, 

116 fallback_signers: list[ 

117 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

118 ] 

119 | None = None, 

120 ): ... 

121 

122 # Serializer[bytes] with a bytes data serializer positional argument. 

123 @t.overload 

124 def __init__( 

125 self: Serializer[bytes], 

126 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 

127 salt: str | bytes | None, 

128 serializer: _PDataSerializer[bytes], 

129 serializer_kwargs: dict[str, t.Any] | None = None, 

130 signer: type[Signer] | None = None, 

131 signer_kwargs: dict[str, t.Any] | None = None, 

132 fallback_signers: list[ 

133 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

134 ] 

135 | None = None, 

136 ): ... 

137 

138 # Serializer[bytes] with a bytes data serializer keyword argument. 

139 @t.overload 

140 def __init__( 

141 self: Serializer[bytes], 

142 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 

143 salt: str | bytes | None = b"itsdangerous", 

144 *, 

145 serializer: _PDataSerializer[bytes], 

146 serializer_kwargs: dict[str, t.Any] | None = None, 

147 signer: type[Signer] | None = None, 

148 signer_kwargs: dict[str, t.Any] | None = None, 

149 fallback_signers: list[ 

150 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

151 ] 

152 | None = None, 

153 ): ... 

154 

155 # Fall back with a positional argument. If the strict signature of 

156 # _PDataSerializer doesn't match, fall back to a union, requiring the user 

157 # to specify the type. 

158 @t.overload 

159 def __init__( 

160 self, 

161 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 

162 salt: str | bytes | None, 

163 serializer: t.Any, 

164 serializer_kwargs: dict[str, t.Any] | None = None, 

165 signer: type[Signer] | None = None, 

166 signer_kwargs: dict[str, t.Any] | None = None, 

167 fallback_signers: list[ 

168 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

169 ] 

170 | None = None, 

171 ): ... 

172 

173 # Fall back with a keyword argument. 

174 @t.overload 

175 def __init__( 

176 self, 

177 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 

178 salt: str | bytes | None = b"itsdangerous", 

179 *, 

180 serializer: t.Any, 

181 serializer_kwargs: dict[str, t.Any] | None = None, 

182 signer: type[Signer] | None = None, 

183 signer_kwargs: dict[str, t.Any] | None = None, 

184 fallback_signers: list[ 

185 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

186 ] 

187 | None = None, 

188 ): ... 

189 

190 def __init__( 

191 self, 

192 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 

193 salt: str | bytes | None = b"itsdangerous", 

194 serializer: t.Any | None = None, 

195 serializer_kwargs: dict[str, t.Any] | None = None, 

196 signer: type[Signer] | None = None, 

197 signer_kwargs: dict[str, t.Any] | None = None, 

198 fallback_signers: list[ 

199 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

200 ] 

201 | None = None, 

202 ): 

203 #: The list of secret keys to try for verifying signatures, from 

204 #: oldest to newest. The newest (last) key is used for signing. 

205 #: 

206 #: This allows a key rotation system to keep a list of allowed 

207 #: keys and remove expired ones. 

208 self.secret_keys: list[bytes] = _make_keys_list(secret_key) 

209 

210 if salt is not None: 

211 salt = want_bytes(salt) 

212 # if salt is None then the signer's default is used 

213 

214 self.salt = salt 

215 

216 if serializer is None: 

217 serializer = self.default_serializer 

218 

219 self.serializer: _PDataSerializer[_TSerialized] = serializer 

220 self.is_text_serializer: bool = is_text_serializer(serializer) 

221 

222 if signer is None: 

223 signer = self.default_signer 

224 

225 self.signer: type[Signer] = signer 

226 self.signer_kwargs: dict[str, t.Any] = signer_kwargs or {} 

227 

228 if fallback_signers is None: 

229 fallback_signers = list(self.default_fallback_signers) 

230 

231 self.fallback_signers: list[ 

232 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

233 ] = fallback_signers 

234 self.serializer_kwargs: dict[str, t.Any] = serializer_kwargs or {} 

235 

236 @property 

237 def secret_key(self) -> bytes: 

238 """The newest (last) entry in the :attr:`secret_keys` list. This 

239 is for compatibility from before key rotation support was added. 

240 """ 

241 return self.secret_keys[-1] 

242 

243 def load_payload( 

244 self, payload: bytes, serializer: _PDataSerializer[t.Any] | None = None 

245 ) -> t.Any: 

246 """Loads the encoded object. This function raises 

247 :class:`.BadPayload` if the payload is not valid. The 

248 ``serializer`` parameter can be used to override the serializer 

249 stored on the class. The encoded ``payload`` should always be 

250 bytes. 

251 """ 

252 if serializer is None: 

253 use_serializer = self.serializer 

254 is_text = self.is_text_serializer 

255 else: 

256 use_serializer = serializer 

257 is_text = is_text_serializer(serializer) 

258 

259 try: 

260 if is_text: 

261 return use_serializer.loads(payload.decode("utf-8")) # type: ignore[arg-type] 

262 

263 return use_serializer.loads(payload) # type: ignore[arg-type] 

264 except Exception as e: 

265 raise BadPayload( 

266 "Could not load the payload because an exception" 

267 " occurred on unserializing the data.", 

268 original_error=e, 

269 ) from e 

270 

271 def dump_payload(self, obj: t.Any) -> bytes: 

272 """Dumps the encoded object. The return value is always bytes. 

273 If the internal serializer returns text, the value will be 

274 encoded as UTF-8. 

275 """ 

276 return want_bytes(self.serializer.dumps(obj, **self.serializer_kwargs)) 

277 

278 def make_signer(self, salt: str | bytes | None = None) -> Signer: 

279 """Creates a new instance of the signer to be used. The default 

280 implementation uses the :class:`.Signer` base class. 

281 """ 

282 if salt is None: 

283 salt = self.salt 

284 

285 return self.signer(self.secret_keys, salt=salt, **self.signer_kwargs) 

286 

287 def iter_unsigners(self, salt: str | bytes | None = None) -> cabc.Iterator[Signer]: 

288 """Iterates over all signers to be tried for unsigning. Starts 

289 with the configured signer, then constructs each signer 

290 specified in ``fallback_signers``. 

291 """ 

292 if salt is None: 

293 salt = self.salt 

294 

295 yield self.make_signer(salt) 

296 

297 for fallback in self.fallback_signers: 

298 if isinstance(fallback, dict): 

299 kwargs = fallback 

300 fallback = self.signer 

301 elif isinstance(fallback, tuple): 

302 fallback, kwargs = fallback 

303 else: 

304 kwargs = self.signer_kwargs 

305 

306 for secret_key in self.secret_keys: 

307 yield fallback(secret_key, salt=salt, **kwargs) 

308 

309 def dumps(self, obj: t.Any, salt: str | bytes | None = None) -> _TSerialized: 

310 """Returns a signed string serialized with the internal 

311 serializer. The return value can be either a byte or unicode 

312 string depending on the format of the internal serializer. 

313 """ 

314 payload = want_bytes(self.dump_payload(obj)) 

315 rv = self.make_signer(salt).sign(payload) 

316 

317 if self.is_text_serializer: 

318 return rv.decode("utf-8") # type: ignore[return-value] 

319 

320 return rv # type: ignore[return-value] 

321 

322 def dump(self, obj: t.Any, f: t.IO[t.Any], salt: str | bytes | None = None) -> None: 

323 """Like :meth:`dumps` but dumps into a file. The file handle has 

324 to be compatible with what the internal serializer expects. 

325 """ 

326 f.write(self.dumps(obj, salt)) 

327 

328 def loads( 

329 self, s: str | bytes, salt: str | bytes | None = None, **kwargs: t.Any 

330 ) -> t.Any: 

331 """Reverse of :meth:`dumps`. Raises :exc:`.BadSignature` if the 

332 signature validation fails. 

333 """ 

334 s = want_bytes(s) 

335 last_exception = None 

336 

337 for signer in self.iter_unsigners(salt): 

338 try: 

339 return self.load_payload(signer.unsign(s)) 

340 except BadSignature as err: 

341 last_exception = err 

342 

343 raise t.cast(BadSignature, last_exception) 

344 

345 def load(self, f: t.IO[t.Any], salt: str | bytes | None = None) -> t.Any: 

346 """Like :meth:`loads` but loads from a file.""" 

347 return self.loads(f.read(), salt) 

348 

349 def loads_unsafe( 

350 self, s: str | bytes, salt: str | bytes | None = None 

351 ) -> tuple[bool, t.Any]: 

352 """Like :meth:`loads` but without verifying the signature. This 

353 is potentially very dangerous to use depending on how your 

354 serializer works. The return value is ``(signature_valid, 

355 payload)`` instead of just the payload. The first item will be a 

356 boolean that indicates if the signature is valid. This function 

357 never fails. 

358 

359 Use it for debugging only and if you know that your serializer 

360 module is not exploitable (for example, do not use it with a 

361 pickle serializer). 

362 

363 .. versionadded:: 0.15 

364 """ 

365 return self._loads_unsafe_impl(s, salt) 

366 

367 def _loads_unsafe_impl( 

368 self, 

369 s: str | bytes, 

370 salt: str | bytes | None, 

371 load_kwargs: dict[str, t.Any] | None = None, 

372 load_payload_kwargs: dict[str, t.Any] | None = None, 

373 ) -> tuple[bool, t.Any]: 

374 """Low level helper function to implement :meth:`loads_unsafe` 

375 in serializer subclasses. 

376 """ 

377 if load_kwargs is None: 

378 load_kwargs = {} 

379 

380 try: 

381 return True, self.loads(s, salt=salt, **load_kwargs) 

382 except BadSignature as e: 

383 if e.payload is None: 

384 return False, None 

385 

386 if load_payload_kwargs is None: 

387 load_payload_kwargs = {} 

388 

389 try: 

390 return ( 

391 False, 

392 self.load_payload(e.payload, **load_payload_kwargs), 

393 ) 

394 except BadPayload: 

395 return False, None 

396 

397 def load_unsafe( 

398 self, f: t.IO[t.Any], salt: str | bytes | None = None 

399 ) -> tuple[bool, t.Any]: 

400 """Like :meth:`loads_unsafe` but loads from a file. 

401 

402 .. versionadded:: 0.15 

403 """ 

404 return self.loads_unsafe(f.read(), salt=salt)