1from __future__ import annotations 
    2 
    3import collections.abc as cabc 
    4import json 
    5import typing as t 
    6 
    7from .encoding import want_bytes 
    8from .exc import BadPayload 
    9from .exc import BadSignature 
    10from .signer import _make_keys_list 
    11from .signer import Signer 
    12 
    13if t.TYPE_CHECKING: 
    14    import typing_extensions as te 
    15 
    16    # This should be either be str or bytes. To avoid having to specify the 
    17    # bound type, it falls back to a union if structural matching fails. 
    18    _TSerialized = te.TypeVar( 
    19        "_TSerialized", bound=t.Union[str, bytes], default=t.Union[str, bytes] 
    20    ) 
    21else: 
    22    # Still available at runtime on Python < 3.13, but without the default. 
    23    _TSerialized = t.TypeVar("_TSerialized", bound=t.Union[str, bytes]) 
    24 
    25 
    26class _PDataSerializer(t.Protocol[_TSerialized]): 
    27    def loads(self, payload: _TSerialized, /) -> t.Any: ... 
    28    # A signature with additional arguments is not handled correctly by type 
    29    # checkers right now, so an overload is used below for serializers that 
    30    # don't match this strict protocol. 
    31    def dumps(self, obj: t.Any, /) -> _TSerialized: ... 
    32 
    33 
    34# Use TypeIs once it's available in typing_extensions or 3.13. 
    35def is_text_serializer( 
    36    serializer: _PDataSerializer[t.Any], 
    37) -> te.TypeGuard[_PDataSerializer[str]]: 
    38    """Checks whether a serializer generates text or binary.""" 
    39    return isinstance(serializer.dumps({}), str) 
    40 
    41 
    42class Serializer(t.Generic[_TSerialized]): 
    43    """A serializer wraps a :class:`~itsdangerous.signer.Signer` to 
    44    enable serializing and securely signing data other than bytes. It 
    45    can unsign to verify that the data hasn't been changed. 
    46 
    47    The serializer provides :meth:`dumps` and :meth:`loads`, similar to 
    48    :mod:`json`, and by default uses :mod:`json` internally to serialize 
    49    the data to bytes. 
    50 
    51    The secret key should be a random string of ``bytes`` and should not 
    52    be saved to code or version control. Different salts should be used 
    53    to distinguish signing in different contexts. See :doc:`/concepts` 
    54    for information about the security of the secret key and salt. 
    55 
    56    :param secret_key: The secret key to sign and verify with. Can be a 
    57        list of keys, oldest to newest, to support key rotation. 
    58    :param salt: Extra key to combine with ``secret_key`` to distinguish 
    59        signatures in different contexts. 
    60    :param serializer: An object that provides ``dumps`` and ``loads`` 
    61        methods for serializing data to a string. Defaults to 
    62        :attr:`default_serializer`, which defaults to :mod:`json`. 
    63    :param serializer_kwargs: Keyword arguments to pass when calling 
    64        ``serializer.dumps``. 
    65    :param signer: A ``Signer`` class to instantiate when signing data. 
    66        Defaults to :attr:`default_signer`, which defaults to 
    67        :class:`~itsdangerous.signer.Signer`. 
    68    :param signer_kwargs: Keyword arguments to pass when instantiating 
    69        the ``Signer`` class. 
    70    :param fallback_signers: List of signer parameters to try when 
    71        unsigning with the default signer fails. Each item can be a dict 
    72        of ``signer_kwargs``, a ``Signer`` class, or a tuple of 
    73        ``(signer, signer_kwargs)``. Defaults to 
    74        :attr:`default_fallback_signers`. 
    75 
    76    .. versionchanged:: 2.0 
    77        Added support for key rotation by passing a list to 
    78        ``secret_key``. 
    79 
    80    .. versionchanged:: 2.0 
    81        Removed the default SHA-512 fallback signer from 
    82        ``default_fallback_signers``. 
    83 
    84    .. versionchanged:: 1.1 
    85        Added support for ``fallback_signers`` and configured a default 
    86        SHA-512 fallback. This fallback is for users who used the yanked 
    87        1.0.0 release which defaulted to SHA-512. 
    88 
    89    .. versionchanged:: 0.14 
    90        The ``signer`` and ``signer_kwargs`` parameters were added to 
    91        the constructor. 
    92    """ 
    93 
    94    #: The default serialization module to use to serialize data to a 
    95    #: string internally. The default is :mod:`json`, but can be changed 
    96    #: to any object that provides ``dumps`` and ``loads`` methods. 
    97    default_serializer: _PDataSerializer[t.Any] = json 
    98 
    99    #: The default ``Signer`` class to instantiate when signing data. 
    100    #: The default is :class:`itsdangerous.signer.Signer`. 
    101    default_signer: type[Signer] = Signer 
    102 
    103    #: The default fallback signers to try when unsigning fails. 
    104    default_fallback_signers: list[ 
    105        dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 
    106    ] = [] 
    107 
    108    # Serializer[str] if no data serializer is provided, or if it returns str. 
    109    @t.overload 
    110    def __init__( 
    111        self: Serializer[str], 
    112        secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 
    113        salt: str | bytes | None = b"itsdangerous", 
    114        serializer: None | _PDataSerializer[str] = None, 
    115        serializer_kwargs: dict[str, t.Any] | None = None, 
    116        signer: type[Signer] | None = None, 
    117        signer_kwargs: dict[str, t.Any] | None = None, 
    118        fallback_signers: list[ 
    119            dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 
    120        ] 
    121        | None = None, 
    122    ): ... 
    123 
    124    # Serializer[bytes] with a bytes data serializer positional argument. 
    125    @t.overload 
    126    def __init__( 
    127        self: Serializer[bytes], 
    128        secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 
    129        salt: str | bytes | None, 
    130        serializer: _PDataSerializer[bytes], 
    131        serializer_kwargs: dict[str, t.Any] | None = None, 
    132        signer: type[Signer] | None = None, 
    133        signer_kwargs: dict[str, t.Any] | None = None, 
    134        fallback_signers: list[ 
    135            dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 
    136        ] 
    137        | None = None, 
    138    ): ... 
    139 
    140    # Serializer[bytes] with a bytes data serializer keyword argument. 
    141    @t.overload 
    142    def __init__( 
    143        self: Serializer[bytes], 
    144        secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 
    145        salt: str | bytes | None = b"itsdangerous", 
    146        *, 
    147        serializer: _PDataSerializer[bytes], 
    148        serializer_kwargs: dict[str, t.Any] | None = None, 
    149        signer: type[Signer] | None = None, 
    150        signer_kwargs: dict[str, t.Any] | None = None, 
    151        fallback_signers: list[ 
    152            dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 
    153        ] 
    154        | None = None, 
    155    ): ... 
    156 
    157    # Fall back with a positional argument. If the strict signature of 
    158    # _PDataSerializer doesn't match, fall back to a union, requiring the user 
    159    # to specify the type. 
    160    @t.overload 
    161    def __init__( 
    162        self, 
    163        secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 
    164        salt: str | bytes | None, 
    165        serializer: t.Any, 
    166        serializer_kwargs: dict[str, t.Any] | None = None, 
    167        signer: type[Signer] | None = None, 
    168        signer_kwargs: dict[str, t.Any] | None = None, 
    169        fallback_signers: list[ 
    170            dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 
    171        ] 
    172        | None = None, 
    173    ): ... 
    174 
    175    # Fall back with a keyword argument. 
    176    @t.overload 
    177    def __init__( 
    178        self, 
    179        secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 
    180        salt: str | bytes | None = b"itsdangerous", 
    181        *, 
    182        serializer: t.Any, 
    183        serializer_kwargs: dict[str, t.Any] | None = None, 
    184        signer: type[Signer] | None = None, 
    185        signer_kwargs: dict[str, t.Any] | None = None, 
    186        fallback_signers: list[ 
    187            dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 
    188        ] 
    189        | None = None, 
    190    ): ... 
    191 
    192    def __init__( 
    193        self, 
    194        secret_key: str | bytes | cabc.Iterable[str] | cabc.Iterable[bytes], 
    195        salt: str | bytes | None = b"itsdangerous", 
    196        serializer: t.Any | None = None, 
    197        serializer_kwargs: dict[str, t.Any] | None = None, 
    198        signer: type[Signer] | None = None, 
    199        signer_kwargs: dict[str, t.Any] | None = None, 
    200        fallback_signers: list[ 
    201            dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 
    202        ] 
    203        | None = None, 
    204    ): 
    205        #: The list of secret keys to try for verifying signatures, from 
    206        #: oldest to newest. The newest (last) key is used for signing. 
    207        #: 
    208        #: This allows a key rotation system to keep a list of allowed 
    209        #: keys and remove expired ones. 
    210        self.secret_keys: list[bytes] = _make_keys_list(secret_key) 
    211 
    212        if salt is not None: 
    213            salt = want_bytes(salt) 
    214            # if salt is None then the signer's default is used 
    215 
    216        self.salt = salt 
    217 
    218        if serializer is None: 
    219            serializer = self.default_serializer 
    220 
    221        self.serializer: _PDataSerializer[_TSerialized] = serializer 
    222        self.is_text_serializer: bool = is_text_serializer(serializer) 
    223 
    224        if signer is None: 
    225            signer = self.default_signer 
    226 
    227        self.signer: type[Signer] = signer 
    228        self.signer_kwargs: dict[str, t.Any] = signer_kwargs or {} 
    229 
    230        if fallback_signers is None: 
    231            fallback_signers = list(self.default_fallback_signers) 
    232 
    233        self.fallback_signers: list[ 
    234            dict[str, t.Any] | tuple[type[Signer], dict[str, t.Any]] | type[Signer] 
    235        ] = fallback_signers 
    236        self.serializer_kwargs: dict[str, t.Any] = serializer_kwargs or {} 
    237 
    238    @property 
    239    def secret_key(self) -> bytes: 
    240        """The newest (last) entry in the :attr:`secret_keys` list. This 
    241        is for compatibility from before key rotation support was added. 
    242        """ 
    243        return self.secret_keys[-1] 
    244 
    245    def load_payload( 
    246        self, payload: bytes, serializer: _PDataSerializer[t.Any] | None = None 
    247    ) -> t.Any: 
    248        """Loads the encoded object. This function raises 
    249        :class:`.BadPayload` if the payload is not valid. The 
    250        ``serializer`` parameter can be used to override the serializer 
    251        stored on the class. The encoded ``payload`` should always be 
    252        bytes. 
    253        """ 
    254        if serializer is None: 
    255            use_serializer = self.serializer 
    256            is_text = self.is_text_serializer 
    257        else: 
    258            use_serializer = serializer 
    259            is_text = is_text_serializer(serializer) 
    260 
    261        try: 
    262            if is_text: 
    263                return use_serializer.loads(payload.decode("utf-8"))  # type: ignore[arg-type] 
    264 
    265            return use_serializer.loads(payload)  # type: ignore[arg-type] 
    266        except Exception as e: 
    267            raise BadPayload( 
    268                "Could not load the payload because an exception" 
    269                " occurred on unserializing the data.", 
    270                original_error=e, 
    271            ) from e 
    272 
    273    def dump_payload(self, obj: t.Any) -> bytes: 
    274        """Dumps the encoded object. The return value is always bytes. 
    275        If the internal serializer returns text, the value will be 
    276        encoded as UTF-8. 
    277        """ 
    278        return want_bytes(self.serializer.dumps(obj, **self.serializer_kwargs)) 
    279 
    280    def make_signer(self, salt: str | bytes | None = None) -> Signer: 
    281        """Creates a new instance of the signer to be used. The default 
    282        implementation uses the :class:`.Signer` base class. 
    283        """ 
    284        if salt is None: 
    285            salt = self.salt 
    286 
    287        return self.signer(self.secret_keys, salt=salt, **self.signer_kwargs) 
    288 
    289    def iter_unsigners(self, salt: str | bytes | None = None) -> cabc.Iterator[Signer]: 
    290        """Iterates over all signers to be tried for unsigning. Starts 
    291        with the configured signer, then constructs each signer 
    292        specified in ``fallback_signers``. 
    293        """ 
    294        if salt is None: 
    295            salt = self.salt 
    296 
    297        yield self.make_signer(salt) 
    298 
    299        for fallback in self.fallback_signers: 
    300            if isinstance(fallback, dict): 
    301                kwargs = fallback 
    302                fallback = self.signer 
    303            elif isinstance(fallback, tuple): 
    304                fallback, kwargs = fallback 
    305            else: 
    306                kwargs = self.signer_kwargs 
    307 
    308            for secret_key in self.secret_keys: 
    309                yield fallback(secret_key, salt=salt, **kwargs) 
    310 
    311    def dumps(self, obj: t.Any, salt: str | bytes | None = None) -> _TSerialized: 
    312        """Returns a signed string serialized with the internal 
    313        serializer. The return value can be either a byte or unicode 
    314        string depending on the format of the internal serializer. 
    315        """ 
    316        payload = want_bytes(self.dump_payload(obj)) 
    317        rv = self.make_signer(salt).sign(payload) 
    318 
    319        if self.is_text_serializer: 
    320            return rv.decode("utf-8")  # type: ignore[return-value] 
    321 
    322        return rv  # type: ignore[return-value] 
    323 
    324    def dump(self, obj: t.Any, f: t.IO[t.Any], salt: str | bytes | None = None) -> None: 
    325        """Like :meth:`dumps` but dumps into a file. The file handle has 
    326        to be compatible with what the internal serializer expects. 
    327        """ 
    328        f.write(self.dumps(obj, salt)) 
    329 
    330    def loads( 
    331        self, s: str | bytes, salt: str | bytes | None = None, **kwargs: t.Any 
    332    ) -> t.Any: 
    333        """Reverse of :meth:`dumps`. Raises :exc:`.BadSignature` if the 
    334        signature validation fails. 
    335        """ 
    336        s = want_bytes(s) 
    337        last_exception = None 
    338 
    339        for signer in self.iter_unsigners(salt): 
    340            try: 
    341                return self.load_payload(signer.unsign(s)) 
    342            except BadSignature as err: 
    343                last_exception = err 
    344 
    345        raise t.cast(BadSignature, last_exception) 
    346 
    347    def load(self, f: t.IO[t.Any], salt: str | bytes | None = None) -> t.Any: 
    348        """Like :meth:`loads` but loads from a file.""" 
    349        return self.loads(f.read(), salt) 
    350 
    351    def loads_unsafe( 
    352        self, s: str | bytes, salt: str | bytes | None = None 
    353    ) -> tuple[bool, t.Any]: 
    354        """Like :meth:`loads` but without verifying the signature. This 
    355        is potentially very dangerous to use depending on how your 
    356        serializer works. The return value is ``(signature_valid, 
    357        payload)`` instead of just the payload. The first item will be a 
    358        boolean that indicates if the signature is valid. This function 
    359        never fails. 
    360 
    361        Use it for debugging only and if you know that your serializer 
    362        module is not exploitable (for example, do not use it with a 
    363        pickle serializer). 
    364 
    365        .. versionadded:: 0.15 
    366        """ 
    367        return self._loads_unsafe_impl(s, salt) 
    368 
    369    def _loads_unsafe_impl( 
    370        self, 
    371        s: str | bytes, 
    372        salt: str | bytes | None, 
    373        load_kwargs: dict[str, t.Any] | None = None, 
    374        load_payload_kwargs: dict[str, t.Any] | None = None, 
    375    ) -> tuple[bool, t.Any]: 
    376        """Low level helper function to implement :meth:`loads_unsafe` 
    377        in serializer subclasses. 
    378        """ 
    379        if load_kwargs is None: 
    380            load_kwargs = {} 
    381 
    382        try: 
    383            return True, self.loads(s, salt=salt, **load_kwargs) 
    384        except BadSignature as e: 
    385            if e.payload is None: 
    386                return False, None 
    387 
    388            if load_payload_kwargs is None: 
    389                load_payload_kwargs = {} 
    390 
    391            try: 
    392                return ( 
    393                    False, 
    394                    self.load_payload(e.payload, **load_payload_kwargs), 
    395                ) 
    396            except BadPayload: 
    397                return False, None 
    398 
    399    def load_unsafe( 
    400        self, f: t.IO[t.Any], salt: str | bytes | None = None 
    401    ) -> tuple[bool, t.Any]: 
    402        """Like :meth:`loads_unsafe` but loads from a file. 
    403 
    404        .. versionadded:: 0.15 
    405        """ 
    406        return self.loads_unsafe(f.read(), salt=salt)