Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/itsdangerous/serializer.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

120 statements  

1from __future__ import annotations 

2 

3import collections.abc as cabc 

4import json 

5import typing as t 

6 

7from .encoding import want_bytes 

8from .exc import BadPayload 

9from .exc import BadSignature 

10from .signer import _make_keys_list 

11from .signer import Signer 

12 

13if t.TYPE_CHECKING: 

14 import typing_extensions as te 

15 

16 # This should be either be str or bytes. To avoid having to specify the 

17 # bound type, it falls back to a union if structural matching fails. 

18 _TSerialized = te.TypeVar( 

19 "_TSerialized", bound=t.Union[str, bytes], default=t.Union[str, bytes] 

20 ) 

21else: 

22 # Still available at runtime on Python < 3.13, but without the default. 

23 _TSerialized = t.TypeVar("_TSerialized", bound=t.Union[str, bytes]) 

24 

25 

26class _PDataSerializer(t.Protocol[_TSerialized]): 

27 def loads(self, payload: _TSerialized, /) -> t.Any: ... 

28 # A signature with additional arguments is not handled correctly by type 

29 # checkers right now, so an overload is used below for serializers that 

30 # don't match this strict protocol. 

31 def dumps(self, obj: t.Any, /) -> _TSerialized: ... 

32 

33 

34# Use TypeIs once it's available in typing_extensions or 3.13. 

35def is_text_serializer( 

36 serializer: _PDataSerializer[t.Any], 

37) -> te.TypeGuard[_PDataSerializer[str]]: 

38 """Checks whether a serializer generates text or binary.""" 

39 return isinstance(serializer.dumps({}), str) 

40 

41 

42class Serializer(t.Generic[_TSerialized]): 

43 """A serializer wraps a :class:`~itsdangerous.signer.Signer` to 

44 enable serializing and securely signing data other than bytes. It 

45 can unsign to verify that the data hasn't been changed. 

46 

47 The serializer provides :meth:`dumps` and :meth:`loads`, similar to 

48 :mod:`json`, and by default uses :mod:`json` internally to serialize 

49 the data to bytes. 

50 

51 The secret key should be a random string of ``bytes`` and should not 

52 be saved to code or version control. Different salts should be used 

53 to distinguish signing in different contexts. See :doc:`/concepts` 

54 for information about the security of the secret key and salt. 

55 

56 :param secret_key: The secret key to sign and verify with. Can be a 

57 list of keys, oldest to newest, to support key rotation. 

58 :param salt: Extra key to combine with ``secret_key`` to distinguish 

59 signatures in different contexts. 

60 :param serializer: An object that provides ``dumps`` and ``loads`` 

61 methods for serializing data to a string. Defaults to 

62 :attr:`default_serializer`, which defaults to :mod:`json`. 

63 :param serializer_kwargs: Keyword arguments to pass when calling 

64 ``serializer.dumps``. 

65 :param signer: A ``Signer`` class to instantiate when signing data. 

66 Defaults to :attr:`default_signer`, which defaults to 

67 :class:`~itsdangerous.signer.Signer`. 

68 :param signer_kwargs: Keyword arguments to pass when instantiating 

69 the ``Signer`` class. 

70 :param fallback_signers: List of signer parameters to try when 

71 unsigning with the default signer fails. Each item can be a dict 

72 of ``signer_kwargs``, a ``Signer`` class, or a tuple of 

73 ``(signer, signer_kwargs)``. Defaults to 

74 :attr:`default_fallback_signers`. 

75 

76 .. versionchanged:: 2.0 

77 Added support for key rotation by passing a list to 

78 ``secret_key``. 

79 

80 .. versionchanged:: 2.0 

81 Removed the default SHA-512 fallback signer from 

82 ``default_fallback_signers``. 

83 

84 .. versionchanged:: 1.1 

85 Added support for ``fallback_signers`` and configured a default 

86 SHA-512 fallback. This fallback is for users who used the yanked 

87 1.0.0 release which defaulted to SHA-512. 

88 

89 .. versionchanged:: 0.14 

90 The ``signer`` and ``signer_kwargs`` parameters were added to 

91 the constructor. 

92 """ 

93 

94 #: The default serialization module to use to serialize data to a 

95 #: string internally. The default is :mod:`json`, but can be changed 

96 #: to any object that provides ``dumps`` and ``loads`` methods. 

97 default_serializer: _PDataSerializer[t.Any] = json 

98 

99 #: The default ``Signer`` class to instantiate when signing data. 

100 #: The default is :class:`itsdangerous.signer.Signer`. 

101 default_signer: type[Signer] = Signer 

102 

103 #: The default fallback signers to try when unsigning fails. 

104 default_fallback_signers: list[ 

105 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

106 ] = [] 

107 

108 # Serializer[str] if no data serializer is provided, or if it returns str. 

109 @t.overload 

110 def __init__( 

111 self: Serializer[str], 

112 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 

113 salt: str | bytes | None = b"itsdangerous", 

114 serializer: None | _PDataSerializer[str] = None, 

115 serializer_kwargs: dict[str, t.Any] | None = None, 

116 signer: type[Signer] | None = None, 

117 signer_kwargs: dict[str, t.Any] | None = None, 

118 fallback_signers: list[ 

119 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

120 ] 

121 | None = None, 

122 ): ... 

123 

124 # Serializer[bytes] with a bytes data serializer positional argument. 

125 @t.overload 

126 def __init__( 

127 self: Serializer[bytes], 

128 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 

129 salt: str | bytes | None, 

130 serializer: _PDataSerializer[bytes], 

131 serializer_kwargs: dict[str, t.Any] | None = None, 

132 signer: type[Signer] | None = None, 

133 signer_kwargs: dict[str, t.Any] | None = None, 

134 fallback_signers: list[ 

135 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

136 ] 

137 | None = None, 

138 ): ... 

139 

140 # Serializer[bytes] with a bytes data serializer keyword argument. 

141 @t.overload 

142 def __init__( 

143 self: Serializer[bytes], 

144 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 

145 salt: str | bytes | None = b"itsdangerous", 

146 *, 

147 serializer: _PDataSerializer[bytes], 

148 serializer_kwargs: dict[str, t.Any] | None = None, 

149 signer: type[Signer] | None = None, 

150 signer_kwargs: dict[str, t.Any] | None = None, 

151 fallback_signers: list[ 

152 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

153 ] 

154 | None = None, 

155 ): ... 

156 

157 # Fall back with a positional argument. If the strict signature of 

158 # _PDataSerializer doesn't match, fall back to a union, requiring the user 

159 # to specify the type. 

160 @t.overload 

161 def __init__( 

162 self, 

163 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 

164 salt: str | bytes | None, 

165 serializer: t.Any, 

166 serializer_kwargs: dict[str, t.Any] | None = None, 

167 signer: type[Signer] | None = None, 

168 signer_kwargs: dict[str, t.Any] | None = None, 

169 fallback_signers: list[ 

170 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

171 ] 

172 | None = None, 

173 ): ... 

174 

175 # Fall back with a keyword argument. 

176 @t.overload 

177 def __init__( 

178 self, 

179 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 

180 salt: str | bytes | None = b"itsdangerous", 

181 *, 

182 serializer: t.Any, 

183 serializer_kwargs: dict[str, t.Any] | None = None, 

184 signer: type[Signer] | None = None, 

185 signer_kwargs: dict[str, t.Any] | None = None, 

186 fallback_signers: list[ 

187 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

188 ] 

189 | None = None, 

190 ): ... 

191 

192 def __init__( 

193 self, 

194 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 

195 salt: str | bytes | None = b"itsdangerous", 

196 serializer: t.Any | None = None, 

197 serializer_kwargs: dict[str, t.Any] | None = None, 

198 signer: type[Signer] | None = None, 

199 signer_kwargs: dict[str, t.Any] | None = None, 

200 fallback_signers: list[ 

201 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

202 ] 

203 | None = None, 

204 ): 

205 #: The list of secret keys to try for verifying signatures, from 

206 #: oldest to newest. The newest (last) key is used for signing. 

207 #: 

208 #: This allows a key rotation system to keep a list of allowed 

209 #: keys and remove expired ones. 

210 self.secret_keys: list[bytes] = _make_keys_list(secret_key) 

211 

212 if salt is not None: 

213 salt = want_bytes(salt) 

214 # if salt is None then the signer's default is used 

215 

216 self.salt = salt 

217 

218 if serializer is None: 

219 serializer = self.default_serializer 

220 

221 self.serializer: _PDataSerializer[_TSerialized] = serializer 

222 self.is_text_serializer: bool = is_text_serializer(serializer) 

223 

224 if signer is None: 

225 signer = self.default_signer 

226 

227 self.signer: type[Signer] = signer 

228 self.signer_kwargs: dict[str, t.Any] = signer_kwargs or {} 

229 

230 if fallback_signers is None: 

231 fallback_signers = list(self.default_fallback_signers) 

232 

233 self.fallback_signers: list[ 

234 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 

235 ] = fallback_signers 

236 self.serializer_kwargs: dict[str, t.Any] = serializer_kwargs or {} 

237 

238 @property 

239 def secret_key(self) -> bytes: 

240 """The newest (last) entry in the :attr:`secret_keys` list. This 

241 is for compatibility from before key rotation support was added. 

242 """ 

243 return self.secret_keys[-1] 

244 

245 def load_payload( 

246 self, payload: bytes, serializer: _PDataSerializer[t.Any] | None = None 

247 ) -> t.Any: 

248 """Loads the encoded object. This function raises 

249 :class:`.BadPayload` if the payload is not valid. The 

250 ``serializer`` parameter can be used to override the serializer 

251 stored on the class. The encoded ``payload`` should always be 

252 bytes. 

253 """ 

254 if serializer is None: 

255 use_serializer = self.serializer 

256 is_text = self.is_text_serializer 

257 else: 

258 use_serializer = serializer 

259 is_text = is_text_serializer(serializer) 

260 

261 try: 

262 if is_text: 

263 return use_serializer.loads(payload.decode("utf-8")) # type: ignore[arg-type] 

264 

265 return use_serializer.loads(payload) # type: ignore[arg-type] 

266 except Exception as e: 

267 raise BadPayload( 

268 "Could not load the payload because an exception" 

269 " occurred on unserializing the data.", 

270 original_error=e, 

271 ) from e 

272 

273 def dump_payload(self, obj: t.Any) -> bytes: 

274 """Dumps the encoded object. The return value is always bytes. 

275 If the internal serializer returns text, the value will be 

276 encoded as UTF-8. 

277 """ 

278 return want_bytes(self.serializer.dumps(obj, **self.serializer_kwargs)) 

279 

280 def make_signer(self, salt: str | bytes | None = None) -> Signer: 

281 """Creates a new instance of the signer to be used. The default 

282 implementation uses the :class:`.Signer` base class. 

283 """ 

284 if salt is None: 

285 salt = self.salt 

286 

287 return self.signer(self.secret_keys, salt=salt, **self.signer_kwargs) 

288 

289 def iter_unsigners(self, salt: str | bytes | None = None) -> cabc.Iterator[Signer]: 

290 """Iterates over all signers to be tried for unsigning. Starts 

291 with the configured signer, then constructs each signer 

292 specified in ``fallback_signers``. 

293 """ 

294 if salt is None: 

295 salt = self.salt 

296 

297 yield self.make_signer(salt) 

298 

299 for fallback in self.fallback_signers: 

300 if isinstance(fallback, dict): 

301 kwargs = fallback 

302 fallback = self.signer 

303 elif isinstance(fallback, tuple): 

304 fallback, kwargs = fallback 

305 else: 

306 kwargs = self.signer_kwargs 

307 

308 for secret_key in self.secret_keys: 

309 yield fallback(secret_key, salt=salt, **kwargs) 

310 

311 def dumps(self, obj: t.Any, salt: str | bytes | None = None) -> _TSerialized: 

312 """Returns a signed string serialized with the internal 

313 serializer. The return value can be either a byte or unicode 

314 string depending on the format of the internal serializer. 

315 """ 

316 payload = want_bytes(self.dump_payload(obj)) 

317 rv = self.make_signer(salt).sign(payload) 

318 

319 if self.is_text_serializer: 

320 return rv.decode("utf-8") # type: ignore[return-value] 

321 

322 return rv # type: ignore[return-value] 

323 

324 def dump(self, obj: t.Any, f: t.IO[t.Any], salt: str | bytes | None = None) -> None: 

325 """Like :meth:`dumps` but dumps into a file. The file handle has 

326 to be compatible with what the internal serializer expects. 

327 """ 

328 f.write(self.dumps(obj, salt)) 

329 

330 def loads( 

331 self, s: str | bytes, salt: str | bytes | None = None, **kwargs: t.Any 

332 ) -> t.Any: 

333 """Reverse of :meth:`dumps`. Raises :exc:`.BadSignature` if the 

334 signature validation fails. 

335 """ 

336 s = want_bytes(s) 

337 last_exception = None 

338 

339 for signer in self.iter_unsigners(salt): 

340 try: 

341 return self.load_payload(signer.unsign(s)) 

342 except BadSignature as err: 

343 last_exception = err 

344 

345 raise t.cast(BadSignature, last_exception) 

346 

347 def load(self, f: t.IO[t.Any], salt: str | bytes | None = None) -> t.Any: 

348 """Like :meth:`loads` but loads from a file.""" 

349 return self.loads(f.read(), salt) 

350 

351 def loads_unsafe( 

352 self, s: str | bytes, salt: str | bytes | None = None 

353 ) -> tuple[bool, t.Any]: 

354 """Like :meth:`loads` but without verifying the signature. This 

355 is potentially very dangerous to use depending on how your 

356 serializer works. The return value is ``(signature_valid, 

357 payload)`` instead of just the payload. The first item will be a 

358 boolean that indicates if the signature is valid. This function 

359 never fails. 

360 

361 Use it for debugging only and if you know that your serializer 

362 module is not exploitable (for example, do not use it with a 

363 pickle serializer). 

364 

365 .. versionadded:: 0.15 

366 """ 

367 return self._loads_unsafe_impl(s, salt) 

368 

369 def _loads_unsafe_impl( 

370 self, 

371 s: str | bytes, 

372 salt: str | bytes | None, 

373 load_kwargs: dict[str, t.Any] | None = None, 

374 load_payload_kwargs: dict[str, t.Any] | None = None, 

375 ) -> tuple[bool, t.Any]: 

376 """Low level helper function to implement :meth:`loads_unsafe` 

377 in serializer subclasses. 

378 """ 

379 if load_kwargs is None: 

380 load_kwargs = {} 

381 

382 try: 

383 return True, self.loads(s, salt=salt, **load_kwargs) 

384 except BadSignature as e: 

385 if e.payload is None: 

386 return False, None 

387 

388 if load_payload_kwargs is None: 

389 load_payload_kwargs = {} 

390 

391 try: 

392 return ( 

393 False, 

394 self.load_payload(e.payload, **load_payload_kwargs), 

395 ) 

396 except BadPayload: 

397 return False, None 

398 

399 def load_unsafe( 

400 self, f: t.IO[t.Any], salt: str | bytes | None = None 

401 ) -> tuple[bool, t.Any]: 

402 """Like :meth:`loads_unsafe` but loads from a file. 

403 

404 .. versionadded:: 0.15 

405 """ 

406 return self.loads_unsafe(f.read(), salt=salt)