1from __future__ import annotations
2
3import collections.abc as cabc
4import json
5import typing as t
6
7from .encoding import want_bytes
8from .exc import BadPayload
9from .exc import BadSignature
10from .signer import _make_keys_list
11from .signer import Signer
12
13if t.TYPE_CHECKING:
14 import typing_extensions as te
15
16 # This should be either be str or bytes. To avoid having to specify the
17 # bound type, it falls back to a union if structural matching fails.
18 _TSerialized = te.TypeVar("_TSerialized", bound=str | bytes, default=str | bytes)
19else:
20 # Still available at runtime on Python < 3.13, but without the default.
21 _TSerialized = t.TypeVar("_TSerialized", bound=str | bytes)
22
23
24class _PDataSerializer(t.Protocol[_TSerialized]):
25 def loads(self, payload: _TSerialized, /) -> t.Any: ...
26 # A signature with additional arguments is not handled correctly by type
27 # checkers right now, so an overload is used below for serializers that
28 # don't match this strict protocol.
29 def dumps(self, obj: t.Any, /) -> _TSerialized: ...
30
31
32# Use TypeIs once it's available in typing_extensions or 3.13.
33def is_text_serializer(
34 serializer: _PDataSerializer[t.Any],
35) -> te.TypeGuard[_PDataSerializer[str]]:
36 """Checks whether a serializer generates text or binary."""
37 return isinstance(serializer.dumps({}), str)
38
39
40class Serializer(t.Generic[_TSerialized]):
41 """A serializer wraps a :class:`~itsdangerous.signer.Signer` to
42 enable serializing and securely signing data other than bytes. It
43 can unsign to verify that the data hasn't been changed.
44
45 The serializer provides :meth:`dumps` and :meth:`loads`, similar to
46 :mod:`json`, and by default uses :mod:`json` internally to serialize
47 the data to bytes.
48
49 The secret key should be a random string of ``bytes`` and should not
50 be saved to code or version control. Different salts should be used
51 to distinguish signing in different contexts. See :doc:`/concepts`
52 for information about the security of the secret key and salt.
53
54 :param secret_key: The secret key to sign and verify with. Can be a
55 list of keys, oldest to newest, to support key rotation.
56 :param salt: Extra key to combine with ``secret_key`` to distinguish
57 signatures in different contexts.
58 :param serializer: An object that provides ``dumps`` and ``loads``
59 methods for serializing data to a string. Defaults to
60 :attr:`default_serializer`, which defaults to :mod:`json`.
61 :param serializer_kwargs: Keyword arguments to pass when calling
62 ``serializer.dumps``.
63 :param signer: A ``Signer`` class to instantiate when signing data.
64 Defaults to :attr:`default_signer`, which defaults to
65 :class:`~itsdangerous.signer.Signer`.
66 :param signer_kwargs: Keyword arguments to pass when instantiating
67 the ``Signer`` class.
68 :param fallback_signers: List of signer parameters to try when
69 unsigning with the default signer fails. Each item can be a dict
70 of ``signer_kwargs``, a ``Signer`` class, or a tuple of
71 ``(signer, signer_kwargs)``. Defaults to
72 :attr:`default_fallback_signers`.
73
74 .. versionchanged:: 2.0
75 Added support for key rotation by passing a list to
76 ``secret_key``.
77
78 .. versionchanged:: 2.0
79 Removed the default SHA-512 fallback signer from
80 ``default_fallback_signers``.
81
82 .. versionchanged:: 1.1
83 Added support for ``fallback_signers`` and configured a default
84 SHA-512 fallback. This fallback is for users who used the yanked
85 1.0.0 release which defaulted to SHA-512.
86
87 .. versionchanged:: 0.14
88 The ``signer`` and ``signer_kwargs`` parameters were added to
89 the constructor.
90 """
91
92 #: The default serialization module to use to serialize data to a
93 #: string internally. The default is :mod:`json`, but can be changed
94 #: to any object that provides ``dumps`` and ``loads`` methods.
95 default_serializer: _PDataSerializer[t.Any] = json
96
97 #: The default ``Signer`` class to instantiate when signing data.
98 #: The default is :class:`itsdangerous.signer.Signer`.
99 default_signer: type[Signer] = Signer
100
101 #: The default fallback signers to try when unsigning fails.
102 default_fallback_signers: list[
103 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
104 ] = []
105
106 # Serializer[str] if no data serializer is provided, or if it returns str.
107 @t.overload
108 def __init__(
109 self: Serializer[str],
110 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
111 salt: str | bytes | None = b"itsdangerous",
112 serializer: None | _PDataSerializer[str] = None,
113 serializer_kwargs: dict[str, t.Any] | None = None,
114 signer: type[Signer] | None = None,
115 signer_kwargs: dict[str, t.Any] | None = None,
116 fallback_signers: list[
117 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
118 ]
119 | None = None,
120 ): ...
121
122 # Serializer[bytes] with a bytes data serializer positional argument.
123 @t.overload
124 def __init__(
125 self: Serializer[bytes],
126 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
127 salt: str | bytes | None,
128 serializer: _PDataSerializer[bytes],
129 serializer_kwargs: dict[str, t.Any] | None = None,
130 signer: type[Signer] | None = None,
131 signer_kwargs: dict[str, t.Any] | None = None,
132 fallback_signers: list[
133 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
134 ]
135 | None = None,
136 ): ...
137
138 # Serializer[bytes] with a bytes data serializer keyword argument.
139 @t.overload
140 def __init__(
141 self: Serializer[bytes],
142 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
143 salt: str | bytes | None = b"itsdangerous",
144 *,
145 serializer: _PDataSerializer[bytes],
146 serializer_kwargs: dict[str, t.Any] | None = None,
147 signer: type[Signer] | None = None,
148 signer_kwargs: dict[str, t.Any] | None = None,
149 fallback_signers: list[
150 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
151 ]
152 | None = None,
153 ): ...
154
155 # Fall back with a positional argument. If the strict signature of
156 # _PDataSerializer doesn't match, fall back to a union, requiring the user
157 # to specify the type.
158 @t.overload
159 def __init__(
160 self,
161 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
162 salt: str | bytes | None,
163 serializer: t.Any,
164 serializer_kwargs: dict[str, t.Any] | None = None,
165 signer: type[Signer] | None = None,
166 signer_kwargs: dict[str, t.Any] | None = None,
167 fallback_signers: list[
168 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
169 ]
170 | None = None,
171 ): ...
172
173 # Fall back with a keyword argument.
174 @t.overload
175 def __init__(
176 self,
177 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
178 salt: str | bytes | None = b"itsdangerous",
179 *,
180 serializer: t.Any,
181 serializer_kwargs: dict[str, t.Any] | None = None,
182 signer: type[Signer] | None = None,
183 signer_kwargs: dict[str, t.Any] | None = None,
184 fallback_signers: list[
185 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
186 ]
187 | None = None,
188 ): ...
189
190 def __init__(
191 self,
192 secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes],
193 salt: str | bytes | None = b"itsdangerous",
194 serializer: t.Any | None = None,
195 serializer_kwargs: dict[str, t.Any] | None = None,
196 signer: type[Signer] | None = None,
197 signer_kwargs: dict[str, t.Any] | None = None,
198 fallback_signers: list[
199 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
200 ]
201 | None = None,
202 ):
203 #: The list of secret keys to try for verifying signatures, from
204 #: oldest to newest. The newest (last) key is used for signing.
205 #:
206 #: This allows a key rotation system to keep a list of allowed
207 #: keys and remove expired ones.
208 self.secret_keys: list[bytes] = _make_keys_list(secret_key)
209
210 if salt is not None:
211 salt = want_bytes(salt)
212 # if salt is None then the signer's default is used
213
214 self.salt = salt
215
216 if serializer is None:
217 serializer = self.default_serializer
218
219 self.serializer: _PDataSerializer[_TSerialized] = serializer
220 self.is_text_serializer: bool = is_text_serializer(serializer)
221
222 if signer is None:
223 signer = self.default_signer
224
225 self.signer: type[Signer] = signer
226 self.signer_kwargs: dict[str, t.Any] = signer_kwargs or {}
227
228 if fallback_signers is None:
229 fallback_signers = list(self.default_fallback_signers)
230
231 self.fallback_signers: list[
232 dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer]
233 ] = fallback_signers
234 self.serializer_kwargs: dict[str, t.Any] = serializer_kwargs or {}
235
236 @property
237 def secret_key(self) -> bytes:
238 """The newest (last) entry in the :attr:`secret_keys` list. This
239 is for compatibility from before key rotation support was added.
240 """
241 return self.secret_keys[-1]
242
243 def load_payload(
244 self, payload: bytes, serializer: _PDataSerializer[t.Any] | None = None
245 ) -> t.Any:
246 """Loads the encoded object. This function raises
247 :class:`.BadPayload` if the payload is not valid. The
248 ``serializer`` parameter can be used to override the serializer
249 stored on the class. The encoded ``payload`` should always be
250 bytes.
251 """
252 if serializer is None:
253 use_serializer = self.serializer
254 is_text = self.is_text_serializer
255 else:
256 use_serializer = serializer
257 is_text = is_text_serializer(serializer)
258
259 try:
260 if is_text:
261 return use_serializer.loads(payload.decode("utf-8")) # type: ignore[arg-type]
262
263 return use_serializer.loads(payload) # type: ignore[arg-type]
264 except Exception as e:
265 raise BadPayload(
266 "Could not load the payload because an exception"
267 " occurred on unserializing the data.",
268 original_error=e,
269 ) from e
270
271 def dump_payload(self, obj: t.Any) -> bytes:
272 """Dumps the encoded object. The return value is always bytes.
273 If the internal serializer returns text, the value will be
274 encoded as UTF-8.
275 """
276 return want_bytes(self.serializer.dumps(obj, **self.serializer_kwargs))
277
278 def make_signer(self, salt: str | bytes | None = None) -> Signer:
279 """Creates a new instance of the signer to be used. The default
280 implementation uses the :class:`.Signer` base class.
281 """
282 if salt is None:
283 salt = self.salt
284
285 return self.signer(self.secret_keys, salt=salt, **self.signer_kwargs)
286
287 def iter_unsigners(self, salt: str | bytes | None = None) -> cabc.Iterator[Signer]:
288 """Iterates over all signers to be tried for unsigning. Starts
289 with the configured signer, then constructs each signer
290 specified in ``fallback_signers``.
291 """
292 if salt is None:
293 salt = self.salt
294
295 yield self.make_signer(salt)
296
297 for fallback in self.fallback_signers:
298 if isinstance(fallback, dict):
299 kwargs = fallback
300 fallback = self.signer
301 elif isinstance(fallback, tuple):
302 fallback, kwargs = fallback
303 else:
304 kwargs = self.signer_kwargs
305
306 for secret_key in self.secret_keys:
307 yield fallback(secret_key, salt=salt, **kwargs)
308
309 def dumps(self, obj: t.Any, salt: str | bytes | None = None) -> _TSerialized:
310 """Returns a signed string serialized with the internal
311 serializer. The return value can be either a byte or unicode
312 string depending on the format of the internal serializer.
313 """
314 payload = want_bytes(self.dump_payload(obj))
315 rv = self.make_signer(salt).sign(payload)
316
317 if self.is_text_serializer:
318 return rv.decode("utf-8") # type: ignore[return-value]
319
320 return rv # type: ignore[return-value]
321
322 def dump(self, obj: t.Any, f: t.IO[t.Any], salt: str | bytes | None = None) -> None:
323 """Like :meth:`dumps` but dumps into a file. The file handle has
324 to be compatible with what the internal serializer expects.
325 """
326 f.write(self.dumps(obj, salt))
327
328 def loads(
329 self, s: str | bytes, salt: str | bytes | None = None, **kwargs: t.Any
330 ) -> t.Any:
331 """Reverse of :meth:`dumps`. Raises :exc:`.BadSignature` if the
332 signature validation fails.
333 """
334 s = want_bytes(s)
335 last_exception = None
336
337 for signer in self.iter_unsigners(salt):
338 try:
339 return self.load_payload(signer.unsign(s))
340 except BadSignature as err:
341 last_exception = err
342
343 raise t.cast(BadSignature, last_exception)
344
345 def load(self, f: t.IO[t.Any], salt: str | bytes | None = None) -> t.Any:
346 """Like :meth:`loads` but loads from a file."""
347 return self.loads(f.read(), salt)
348
349 def loads_unsafe(
350 self, s: str | bytes, salt: str | bytes | None = None
351 ) -> tuple[bool, t.Any]:
352 """Like :meth:`loads` but without verifying the signature. This
353 is potentially very dangerous to use depending on how your
354 serializer works. The return value is ``(signature_valid,
355 payload)`` instead of just the payload. The first item will be a
356 boolean that indicates if the signature is valid. This function
357 never fails.
358
359 Use it for debugging only and if you know that your serializer
360 module is not exploitable (for example, do not use it with a
361 pickle serializer).
362
363 .. versionadded:: 0.15
364 """
365 return self._loads_unsafe_impl(s, salt)
366
367 def _loads_unsafe_impl(
368 self,
369 s: str | bytes,
370 salt: str | bytes | None,
371 load_kwargs: dict[str, t.Any] | None = None,
372 load_payload_kwargs: dict[str, t.Any] | None = None,
373 ) -> tuple[bool, t.Any]:
374 """Low level helper function to implement :meth:`loads_unsafe`
375 in serializer subclasses.
376 """
377 if load_kwargs is None:
378 load_kwargs = {}
379
380 try:
381 return True, self.loads(s, salt=salt, **load_kwargs)
382 except BadSignature as e:
383 if e.payload is None:
384 return False, None
385
386 if load_payload_kwargs is None:
387 load_payload_kwargs = {}
388
389 try:
390 return (
391 False,
392 self.load_payload(e.payload, **load_payload_kwargs),
393 )
394 except BadPayload:
395 return False, None
396
397 def load_unsafe(
398 self, f: t.IO[t.Any], salt: str | bytes | None = None
399 ) -> tuple[bool, t.Any]:
400 """Like :meth:`loads_unsafe` but loads from a file.
401
402 .. versionadded:: 0.15
403 """
404 return self.loads_unsafe(f.read(), salt=salt)