1from __future__ import annotations
2
3import collections.abc as cabc
4import time
5import typing as t
6from datetime import datetime
7from datetime import timezone
8
9from .encoding import base64_decode
10from .encoding import base64_encode
11from .encoding import bytes_to_int
12from .encoding import int_to_bytes
13from .encoding import want_bytes
14from .exc import BadSignature
15from .exc import BadTimeSignature
16from .exc import SignatureExpired
17from .serializer import _TSerialized
18from .serializer import Serializer
19from .signer import Signer
20
21
22class TimestampSigner(Signer):
23 """Works like the regular :class:`.Signer` but also records the time
24 of the signing and can be used to expire signatures. The
25 :meth:`unsign` method can raise :exc:`.SignatureExpired` if the
26 unsigning failed because the signature is expired.
27 """
28
29 def get_timestamp(self) -> int:
30 """Returns the current timestamp. The function must return an
31 integer.
32 """
33 return int(time.time())
34
35 def timestamp_to_datetime(self, ts: int) -> datetime:
36 """Convert the timestamp from :meth:`get_timestamp` into an
37 aware :class`datetime.datetime` in UTC.
38
39 .. versionchanged:: 2.0
40 The timestamp is returned as a timezone-aware ``datetime``
41 in UTC rather than a naive ``datetime`` assumed to be UTC.
42 """
43 return datetime.fromtimestamp(ts, tz=timezone.utc)
44
45 def sign(self, value: str | bytes) -> bytes:
46 """Signs the given string and also attaches time information."""
47 value = want_bytes(value)
48 timestamp = base64_encode(int_to_bytes(self.get_timestamp()))
49 sep = want_bytes(self.sep)
50 value = value + sep + timestamp
51 return value + sep + self.get_signature(value)
52
53 # Ignore overlapping signatures check, return_timestamp is the only
54 # parameter that affects the return type.
55
56 @t.overload
57 def unsign( # type: ignore[overload-overlap]
58 self,
59 signed_value: str | bytes,
60 max_age: int | None = None,
61 return_timestamp: t.Literal[False] = False,
62 ) -> bytes: ...
63
64 @t.overload
65 def unsign(
66 self,
67 signed_value: str | bytes,
68 max_age: int | None = None,
69 return_timestamp: t.Literal[True] = True,
70 ) -> tuple[bytes, datetime]: ...
71
72 def unsign(
73 self,
74 signed_value: str | bytes,
75 max_age: int | None = None,
76 return_timestamp: bool = False,
77 ) -> tuple[bytes, datetime] | bytes:
78 """Works like the regular :meth:`.Signer.unsign` but can also
79 validate the time. See the base docstring of the class for
80 the general behavior. If ``return_timestamp`` is ``True`` the
81 timestamp of the signature will be returned as an aware
82 :class:`datetime.datetime` object in UTC.
83
84 .. versionchanged:: 2.0
85 The timestamp is returned as a timezone-aware ``datetime``
86 in UTC rather than a naive ``datetime`` assumed to be UTC.
87 """
88 try:
89 result = super().unsign(signed_value)
90 sig_error = None
91 except BadSignature as e:
92 sig_error = e
93 result = e.payload or b""
94
95 sep = want_bytes(self.sep)
96
97 # If there is no timestamp in the result there is something
98 # seriously wrong. In case there was a signature error, we raise
99 # that one directly, otherwise we have a weird situation in
100 # which we shouldn't have come except someone uses a time-based
101 # serializer on non-timestamp data, so catch that.
102 if sep not in result:
103 if sig_error:
104 raise sig_error
105
106 raise BadTimeSignature("timestamp missing", payload=result)
107
108 value, ts_bytes = result.rsplit(sep, 1)
109 ts_int: int | None = None
110 ts_dt: datetime | None = None
111
112 try:
113 ts_int = bytes_to_int(base64_decode(ts_bytes))
114 except Exception:
115 pass
116
117 # Signature is *not* okay. Raise a proper error now that we have
118 # split the value and the timestamp.
119 if sig_error is not None:
120 if ts_int is not None:
121 try:
122 ts_dt = self.timestamp_to_datetime(ts_int)
123 except (ValueError, OSError, OverflowError) as exc:
124 # Windows raises OSError
125 # 32-bit raises OverflowError
126 raise BadTimeSignature(
127 "Malformed timestamp", payload=value
128 ) from exc
129
130 raise BadTimeSignature(str(sig_error), payload=value, date_signed=ts_dt)
131
132 # Signature was okay but the timestamp is actually not there or
133 # malformed. Should not happen, but we handle it anyway.
134 if ts_int is None:
135 raise BadTimeSignature("Malformed timestamp", payload=value)
136
137 # Check timestamp is not older than max_age
138 if max_age is not None:
139 age = self.get_timestamp() - ts_int
140
141 if age > max_age:
142 raise SignatureExpired(
143 f"Signature age {age} > {max_age} seconds",
144 payload=value,
145 date_signed=self.timestamp_to_datetime(ts_int),
146 )
147
148 if age < 0:
149 raise SignatureExpired(
150 f"Signature age {age} < 0 seconds",
151 payload=value,
152 date_signed=self.timestamp_to_datetime(ts_int),
153 )
154
155 if return_timestamp:
156 return value, self.timestamp_to_datetime(ts_int)
157
158 return value
159
160 def validate(self, signed_value: str | bytes, max_age: int | None = None) -> bool:
161 """Only validates the given signed value. Returns ``True`` if
162 the signature exists and is valid."""
163 try:
164 self.unsign(signed_value, max_age=max_age)
165 return True
166 except BadSignature:
167 return False
168
169
170class TimedSerializer(Serializer[_TSerialized]):
171 """Uses :class:`TimestampSigner` instead of the default
172 :class:`.Signer`.
173 """
174
175 default_signer: type[TimestampSigner] = TimestampSigner
176
177 def iter_unsigners(
178 self, salt: str | bytes | None = None
179 ) -> cabc.Iterator[TimestampSigner]:
180 return t.cast("cabc.Iterator[TimestampSigner]", super().iter_unsigners(salt))
181
182 # TODO: Signature is incompatible because parameters were added
183 # before salt.
184
185 def loads( # type: ignore[override]
186 self,
187 s: str | bytes,
188 max_age: int | None = None,
189 return_timestamp: bool = False,
190 salt: str | bytes | None = None,
191 ) -> t.Any:
192 """Reverse of :meth:`dumps`, raises :exc:`.BadSignature` if the
193 signature validation fails. If a ``max_age`` is provided it will
194 ensure the signature is not older than that time in seconds. In
195 case the signature is outdated, :exc:`.SignatureExpired` is
196 raised. All arguments are forwarded to the signer's
197 :meth:`~TimestampSigner.unsign` method.
198 """
199 s = want_bytes(s)
200 last_exception = None
201
202 for signer in self.iter_unsigners(salt):
203 try:
204 base64d, timestamp = signer.unsign(
205 s, max_age=max_age, return_timestamp=True
206 )
207 payload = self.load_payload(base64d)
208
209 if return_timestamp:
210 return payload, timestamp
211
212 return payload
213 except SignatureExpired:
214 # The signature was unsigned successfully but was
215 # expired. Do not try the next signer.
216 raise
217 except BadSignature as err:
218 last_exception = err
219
220 raise t.cast(BadSignature, last_exception)
221
222 def loads_unsafe( # type: ignore[override]
223 self,
224 s: str | bytes,
225 max_age: int | None = None,
226 salt: str | bytes | None = None,
227 ) -> tuple[bool, t.Any]:
228 return self._loads_unsafe_impl(s, salt, load_kwargs={"max_age": max_age})