Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/itsdangerous/timed.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

96 statements  

1from __future__ import annotations 

2 

3import collections.abc as cabc 

4import time 

5import typing as t 

6from datetime import datetime 

7from datetime import timezone 

8 

9from .encoding import base64_decode 

10from .encoding import base64_encode 

11from .encoding import bytes_to_int 

12from .encoding import int_to_bytes 

13from .encoding import want_bytes 

14from .exc import BadSignature 

15from .exc import BadTimeSignature 

16from .exc import SignatureExpired 

17from .serializer import _TSerialized 

18from .serializer import Serializer 

19from .signer import Signer 

20 

21 

22class TimestampSigner(Signer): 

23 """Works like the regular :class:`.Signer` but also records the time 

24 of the signing and can be used to expire signatures. The 

25 :meth:`unsign` method can raise :exc:`.SignatureExpired` if the 

26 unsigning failed because the signature is expired. 

27 """ 

28 

29 def get_timestamp(self) -> int: 

30 """Returns the current timestamp. The function must return an 

31 integer. 

32 """ 

33 return int(time.time()) 

34 

35 def timestamp_to_datetime(self, ts: int) -> datetime: 

36 """Convert the timestamp from :meth:`get_timestamp` into an 

37 aware :class`datetime.datetime` in UTC. 

38 

39 .. versionchanged:: 2.0 

40 The timestamp is returned as a timezone-aware ``datetime`` 

41 in UTC rather than a naive ``datetime`` assumed to be UTC. 

42 """ 

43 return datetime.fromtimestamp(ts, tz=timezone.utc) 

44 

45 def sign(self, value: str | bytes) -> bytes: 

46 """Signs the given string and also attaches time information.""" 

47 value = want_bytes(value) 

48 timestamp = base64_encode(int_to_bytes(self.get_timestamp())) 

49 sep = want_bytes(self.sep) 

50 value = value + sep + timestamp 

51 return value + sep + self.get_signature(value) 

52 

53 # Ignore overlapping signatures check, return_timestamp is the only 

54 # parameter that affects the return type. 

55 

56 @t.overload 

57 def unsign( # type: ignore[overload-overlap] 

58 self, 

59 signed_value: str | bytes, 

60 max_age: int | None = None, 

61 return_timestamp: t.Literal[False] = False, 

62 ) -> bytes: ... 

63 

64 @t.overload 

65 def unsign( 

66 self, 

67 signed_value: str | bytes, 

68 max_age: int | None = None, 

69 return_timestamp: t.Literal[True] = True, 

70 ) -> tuple[bytes, datetime]: ... 

71 

72 def unsign( 

73 self, 

74 signed_value: str | bytes, 

75 max_age: int | None = None, 

76 return_timestamp: bool = False, 

77 ) -> tuple[bytes, datetime] | bytes: 

78 """Works like the regular :meth:`.Signer.unsign` but can also 

79 validate the time. See the base docstring of the class for 

80 the general behavior. If ``return_timestamp`` is ``True`` the 

81 timestamp of the signature will be returned as an aware 

82 :class:`datetime.datetime` object in UTC. 

83 

84 .. versionchanged:: 2.0 

85 The timestamp is returned as a timezone-aware ``datetime`` 

86 in UTC rather than a naive ``datetime`` assumed to be UTC. 

87 """ 

88 try: 

89 result = super().unsign(signed_value) 

90 sig_error = None 

91 except BadSignature as e: 

92 sig_error = e 

93 result = e.payload or b"" 

94 

95 sep = want_bytes(self.sep) 

96 

97 # If there is no timestamp in the result there is something 

98 # seriously wrong. In case there was a signature error, we raise 

99 # that one directly, otherwise we have a weird situation in 

100 # which we shouldn't have come except someone uses a time-based 

101 # serializer on non-timestamp data, so catch that. 

102 if sep not in result: 

103 if sig_error: 

104 raise sig_error 

105 

106 raise BadTimeSignature("timestamp missing", payload=result) 

107 

108 value, ts_bytes = result.rsplit(sep, 1) 

109 ts_int: int | None = None 

110 ts_dt: datetime | None = None 

111 

112 try: 

113 ts_int = bytes_to_int(base64_decode(ts_bytes)) 

114 except Exception: 

115 pass 

116 

117 # Signature is *not* okay. Raise a proper error now that we have 

118 # split the value and the timestamp. 

119 if sig_error is not None: 

120 if ts_int is not None: 

121 try: 

122 ts_dt = self.timestamp_to_datetime(ts_int) 

123 except (ValueError, OSError, OverflowError) as exc: 

124 # Windows raises OSError 

125 # 32-bit raises OverflowError 

126 raise BadTimeSignature( 

127 "Malformed timestamp", payload=value 

128 ) from exc 

129 

130 raise BadTimeSignature(str(sig_error), payload=value, date_signed=ts_dt) 

131 

132 # Signature was okay but the timestamp is actually not there or 

133 # malformed. Should not happen, but we handle it anyway. 

134 if ts_int is None: 

135 raise BadTimeSignature("Malformed timestamp", payload=value) 

136 

137 # Check timestamp is not older than max_age 

138 if max_age is not None: 

139 age = self.get_timestamp() - ts_int 

140 

141 if age > max_age: 

142 raise SignatureExpired( 

143 f"Signature age {age} > {max_age} seconds", 

144 payload=value, 

145 date_signed=self.timestamp_to_datetime(ts_int), 

146 ) 

147 

148 if age < 0: 

149 raise SignatureExpired( 

150 f"Signature age {age} < 0 seconds", 

151 payload=value, 

152 date_signed=self.timestamp_to_datetime(ts_int), 

153 ) 

154 

155 if return_timestamp: 

156 return value, self.timestamp_to_datetime(ts_int) 

157 

158 return value 

159 

160 def validate(self, signed_value: str | bytes, max_age: int | None = None) -> bool: 

161 """Only validates the given signed value. Returns ``True`` if 

162 the signature exists and is valid.""" 

163 try: 

164 self.unsign(signed_value, max_age=max_age) 

165 return True 

166 except BadSignature: 

167 return False 

168 

169 

170class TimedSerializer(Serializer[_TSerialized]): 

171 """Uses :class:`TimestampSigner` instead of the default 

172 :class:`.Signer`. 

173 """ 

174 

175 default_signer: type[TimestampSigner] = TimestampSigner 

176 

177 def iter_unsigners( 

178 self, salt: str | bytes | None = None 

179 ) -> cabc.Iterator[TimestampSigner]: 

180 return t.cast("cabc.Iterator[TimestampSigner]", super().iter_unsigners(salt)) 

181 

182 # TODO: Signature is incompatible because parameters were added 

183 # before salt. 

184 

185 def loads( # type: ignore[override] 

186 self, 

187 s: str | bytes, 

188 max_age: int | None = None, 

189 return_timestamp: bool = False, 

190 salt: str | bytes | None = None, 

191 ) -> t.Any: 

192 """Reverse of :meth:`dumps`, raises :exc:`.BadSignature` if the 

193 signature validation fails. If a ``max_age`` is provided it will 

194 ensure the signature is not older than that time in seconds. In 

195 case the signature is outdated, :exc:`.SignatureExpired` is 

196 raised. All arguments are forwarded to the signer's 

197 :meth:`~TimestampSigner.unsign` method. 

198 """ 

199 s = want_bytes(s) 

200 last_exception = None 

201 

202 for signer in self.iter_unsigners(salt): 

203 try: 

204 base64d, timestamp = signer.unsign( 

205 s, max_age=max_age, return_timestamp=True 

206 ) 

207 payload = self.load_payload(base64d) 

208 

209 if return_timestamp: 

210 return payload, timestamp 

211 

212 return payload 

213 except SignatureExpired: 

214 # The signature was unsigned successfully but was 

215 # expired. Do not try the next signer. 

216 raise 

217 except BadSignature as err: 

218 last_exception = err 

219 

220 raise t.cast(BadSignature, last_exception) 

221 

222 def loads_unsafe( # type: ignore[override] 

223 self, 

224 s: str | bytes, 

225 max_age: int | None = None, 

226 salt: str | bytes | None = None, 

227 ) -> tuple[bool, t.Any]: 

228 return self._loads_unsafe_impl(s, salt, load_kwargs={"max_age": max_age})