Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/cookiejar.py: 33%

259 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1import asyncio 

2import calendar 

3import contextlib 

4import datetime 

5import os # noqa 

6import pathlib 

7import pickle 

8import re 

9import time 

10from collections import defaultdict 

11from http.cookies import BaseCookie, Morsel, SimpleCookie 

12from math import ceil 

13from typing import ( # noqa 

14 DefaultDict, 

15 Dict, 

16 Iterable, 

17 Iterator, 

18 List, 

19 Mapping, 

20 Optional, 

21 Set, 

22 Tuple, 

23 Union, 

24 cast, 

25) 

26 

27from yarl import URL 

28 

29from .abc import AbstractCookieJar, ClearCookiePredicate 

30from .helpers import is_ip_address 

31from .typedefs import LooseCookies, PathLike, StrOrURL 

32 

33__all__ = ("CookieJar", "DummyCookieJar") 

34 

35 

36CookieItem = Union[str, "Morsel[str]"] 

37 

38 

39class CookieJar(AbstractCookieJar): 

40 """Implements cookie storage adhering to RFC 6265.""" 

41 

42 DATE_TOKENS_RE = re.compile( 

43 r"[\x09\x20-\x2F\x3B-\x40\x5B-\x60\x7B-\x7E]*" 

44 r"(?P<token>[\x00-\x08\x0A-\x1F\d:a-zA-Z\x7F-\xFF]+)" 

45 ) 

46 

47 DATE_HMS_TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})") 

48 

49 DATE_DAY_OF_MONTH_RE = re.compile(r"(\d{1,2})") 

50 

51 DATE_MONTH_RE = re.compile( 

52 "(jan)|(feb)|(mar)|(apr)|(may)|(jun)|(jul)|" "(aug)|(sep)|(oct)|(nov)|(dec)", 

53 re.I, 

54 ) 

55 

56 DATE_YEAR_RE = re.compile(r"(\d{2,4})") 

57 

58 # calendar.timegm() fails for timestamps after datetime.datetime.max 

59 # Minus one as a loss of precision occurs when timestamp() is called. 

60 MAX_TIME = ( 

61 int(datetime.datetime.max.replace(tzinfo=datetime.timezone.utc).timestamp()) - 1 

62 ) 

63 try: 

64 calendar.timegm(time.gmtime(MAX_TIME)) 

65 except (OSError, ValueError): 

66 # Hit the maximum representable time on Windows 

67 # https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/localtime-localtime32-localtime64 

68 # Throws ValueError on PyPy 3.8 and 3.9, OSError elsewhere 

69 MAX_TIME = calendar.timegm((3000, 12, 31, 23, 59, 59, -1, -1, -1)) 

70 except OverflowError: 

71 # #4515: datetime.max may not be representable on 32-bit platforms 

72 MAX_TIME = 2**31 - 1 

73 # Avoid minuses in the future, 3x faster 

74 SUB_MAX_TIME = MAX_TIME - 1 

75 

76 def __init__( 

77 self, 

78 *, 

79 unsafe: bool = False, 

80 quote_cookie: bool = True, 

81 treat_as_secure_origin: Union[StrOrURL, List[StrOrURL], None] = None, 

82 loop: Optional[asyncio.AbstractEventLoop] = None, 

83 ) -> None: 

84 super().__init__(loop=loop) 

85 self._cookies: DefaultDict[Tuple[str, str], SimpleCookie] = defaultdict( 

86 SimpleCookie 

87 ) 

88 self._host_only_cookies: Set[Tuple[str, str]] = set() 

89 self._unsafe = unsafe 

90 self._quote_cookie = quote_cookie 

91 if treat_as_secure_origin is None: 

92 treat_as_secure_origin = [] 

93 elif isinstance(treat_as_secure_origin, URL): 

94 treat_as_secure_origin = [treat_as_secure_origin.origin()] 

95 elif isinstance(treat_as_secure_origin, str): 

96 treat_as_secure_origin = [URL(treat_as_secure_origin).origin()] 

97 else: 

98 treat_as_secure_origin = [ 

99 URL(url).origin() if isinstance(url, str) else url.origin() 

100 for url in treat_as_secure_origin 

101 ] 

102 self._treat_as_secure_origin = treat_as_secure_origin 

103 self._next_expiration: float = ceil(time.time()) 

104 self._expirations: Dict[Tuple[str, str, str], float] = {} 

105 

106 def save(self, file_path: PathLike) -> None: 

107 file_path = pathlib.Path(file_path) 

108 with file_path.open(mode="wb") as f: 

109 pickle.dump(self._cookies, f, pickle.HIGHEST_PROTOCOL) 

110 

111 def load(self, file_path: PathLike) -> None: 

112 file_path = pathlib.Path(file_path) 

113 with file_path.open(mode="rb") as f: 

114 self._cookies = pickle.load(f) 

115 

116 def clear(self, predicate: Optional[ClearCookiePredicate] = None) -> None: 

117 if predicate is None: 

118 self._next_expiration = ceil(time.time()) 

119 self._cookies.clear() 

120 self._host_only_cookies.clear() 

121 self._expirations.clear() 

122 return 

123 

124 to_del = [] 

125 now = time.time() 

126 for (domain, path), cookie in self._cookies.items(): 

127 for name, morsel in cookie.items(): 

128 key = (domain, path, name) 

129 if ( 

130 key in self._expirations and self._expirations[key] <= now 

131 ) or predicate(morsel): 

132 to_del.append(key) 

133 

134 for domain, path, name in to_del: 

135 self._host_only_cookies.discard((domain, name)) 

136 key = (domain, path, name) 

137 if key in self._expirations: 

138 del self._expirations[(domain, path, name)] 

139 self._cookies[(domain, path)].pop(name, None) 

140 

141 self._next_expiration = ( 

142 min(*self._expirations.values(), self.SUB_MAX_TIME) + 1 

143 if self._expirations 

144 else self.MAX_TIME 

145 ) 

146 

147 def clear_domain(self, domain: str) -> None: 

148 self.clear(lambda x: self._is_domain_match(domain, x["domain"])) 

149 

150 def __iter__(self) -> "Iterator[Morsel[str]]": 

151 self._do_expiration() 

152 for val in self._cookies.values(): 

153 yield from val.values() 

154 

155 def __len__(self) -> int: 

156 return sum(1 for i in self) 

157 

158 def _do_expiration(self) -> None: 

159 self.clear(lambda x: False) 

160 

161 def _expire_cookie(self, when: float, domain: str, path: str, name: str) -> None: 

162 self._next_expiration = min(self._next_expiration, when) 

163 self._expirations[(domain, path, name)] = when 

164 

165 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None: 

166 """Update cookies.""" 

167 hostname = response_url.raw_host 

168 

169 if not self._unsafe and is_ip_address(hostname): 

170 # Don't accept cookies from IPs 

171 return 

172 

173 if isinstance(cookies, Mapping): 

174 cookies = cookies.items() 

175 

176 for name, cookie in cookies: 

177 if not isinstance(cookie, Morsel): 

178 tmp = SimpleCookie() 

179 tmp[name] = cookie # type: ignore[assignment] 

180 cookie = tmp[name] 

181 

182 domain = cookie["domain"] 

183 

184 # ignore domains with trailing dots 

185 if domain.endswith("."): 

186 domain = "" 

187 del cookie["domain"] 

188 

189 if not domain and hostname is not None: 

190 # Set the cookie's domain to the response hostname 

191 # and set its host-only-flag 

192 self._host_only_cookies.add((hostname, name)) 

193 domain = cookie["domain"] = hostname 

194 

195 if domain.startswith("."): 

196 # Remove leading dot 

197 domain = domain[1:] 

198 cookie["domain"] = domain 

199 

200 if hostname and not self._is_domain_match(domain, hostname): 

201 # Setting cookies for different domains is not allowed 

202 continue 

203 

204 path = cookie["path"] 

205 if not path or not path.startswith("/"): 

206 # Set the cookie's path to the response path 

207 path = response_url.path 

208 if not path.startswith("/"): 

209 path = "/" 

210 else: 

211 # Cut everything from the last slash to the end 

212 path = "/" + path[1 : path.rfind("/")] 

213 cookie["path"] = path 

214 

215 max_age = cookie["max-age"] 

216 if max_age: 

217 try: 

218 delta_seconds = int(max_age) 

219 max_age_expiration = min(time.time() + delta_seconds, self.MAX_TIME) 

220 self._expire_cookie(max_age_expiration, domain, path, name) 

221 except ValueError: 

222 cookie["max-age"] = "" 

223 

224 else: 

225 expires = cookie["expires"] 

226 if expires: 

227 expire_time = self._parse_date(expires) 

228 if expire_time: 

229 self._expire_cookie(expire_time, domain, path, name) 

230 else: 

231 cookie["expires"] = "" 

232 

233 self._cookies[(domain, path)][name] = cookie 

234 

235 self._do_expiration() 

236 

237 def filter_cookies(self, request_url: URL = URL()) -> "BaseCookie[str]": 

238 """Returns this jar's cookies filtered by their attributes.""" 

239 filtered: Union[SimpleCookie, "BaseCookie[str]"] = ( 

240 SimpleCookie() if self._quote_cookie else BaseCookie() 

241 ) 

242 if not self._cookies: 

243 # Skip do_expiration() if there are no cookies. 

244 return filtered 

245 self._do_expiration() 

246 if not self._cookies: 

247 # Skip rest of function if no non-expired cookies. 

248 return filtered 

249 request_url = URL(request_url) 

250 hostname = request_url.raw_host or "" 

251 

252 is_not_secure = request_url.scheme not in ("https", "wss") 

253 if is_not_secure and self._treat_as_secure_origin: 

254 request_origin = URL() 

255 with contextlib.suppress(ValueError): 

256 request_origin = request_url.origin() 

257 is_not_secure = request_origin not in self._treat_as_secure_origin 

258 

259 # Point 2: https://www.rfc-editor.org/rfc/rfc6265.html#section-5.4 

260 for cookie in sorted(self, key=lambda c: len(c["path"])): 

261 name = cookie.key 

262 domain = cookie["domain"] 

263 

264 # Send shared cookies 

265 if not domain: 

266 filtered[name] = cookie.value 

267 continue 

268 

269 if not self._unsafe and is_ip_address(hostname): 

270 continue 

271 

272 if (domain, name) in self._host_only_cookies: 

273 if domain != hostname: 

274 continue 

275 elif not self._is_domain_match(domain, hostname): 

276 continue 

277 

278 if not self._is_path_match(request_url.path, cookie["path"]): 

279 continue 

280 

281 if is_not_secure and cookie["secure"]: 

282 continue 

283 

284 # It's critical we use the Morsel so the coded_value 

285 # (based on cookie version) is preserved 

286 mrsl_val = cast("Morsel[str]", cookie.get(cookie.key, Morsel())) 

287 mrsl_val.set(cookie.key, cookie.value, cookie.coded_value) 

288 filtered[name] = mrsl_val 

289 

290 return filtered 

291 

292 @staticmethod 

293 def _is_domain_match(domain: str, hostname: str) -> bool: 

294 """Implements domain matching adhering to RFC 6265.""" 

295 if hostname == domain: 

296 return True 

297 

298 if not hostname.endswith(domain): 

299 return False 

300 

301 non_matching = hostname[: -len(domain)] 

302 

303 if not non_matching.endswith("."): 

304 return False 

305 

306 return not is_ip_address(hostname) 

307 

308 @staticmethod 

309 def _is_path_match(req_path: str, cookie_path: str) -> bool: 

310 """Implements path matching adhering to RFC 6265.""" 

311 if not req_path.startswith("/"): 

312 req_path = "/" 

313 

314 if req_path == cookie_path: 

315 return True 

316 

317 if not req_path.startswith(cookie_path): 

318 return False 

319 

320 if cookie_path.endswith("/"): 

321 return True 

322 

323 non_matching = req_path[len(cookie_path) :] 

324 

325 return non_matching.startswith("/") 

326 

327 @classmethod 

328 def _parse_date(cls, date_str: str) -> Optional[int]: 

329 """Implements date string parsing adhering to RFC 6265.""" 

330 if not date_str: 

331 return None 

332 

333 found_time = False 

334 found_day = False 

335 found_month = False 

336 found_year = False 

337 

338 hour = minute = second = 0 

339 day = 0 

340 month = 0 

341 year = 0 

342 

343 for token_match in cls.DATE_TOKENS_RE.finditer(date_str): 

344 

345 token = token_match.group("token") 

346 

347 if not found_time: 

348 time_match = cls.DATE_HMS_TIME_RE.match(token) 

349 if time_match: 

350 found_time = True 

351 hour, minute, second = (int(s) for s in time_match.groups()) 

352 continue 

353 

354 if not found_day: 

355 day_match = cls.DATE_DAY_OF_MONTH_RE.match(token) 

356 if day_match: 

357 found_day = True 

358 day = int(day_match.group()) 

359 continue 

360 

361 if not found_month: 

362 month_match = cls.DATE_MONTH_RE.match(token) 

363 if month_match: 

364 found_month = True 

365 assert month_match.lastindex is not None 

366 month = month_match.lastindex 

367 continue 

368 

369 if not found_year: 

370 year_match = cls.DATE_YEAR_RE.match(token) 

371 if year_match: 

372 found_year = True 

373 year = int(year_match.group()) 

374 

375 if 70 <= year <= 99: 

376 year += 1900 

377 elif 0 <= year <= 69: 

378 year += 2000 

379 

380 if False in (found_day, found_month, found_year, found_time): 

381 return None 

382 

383 if not 1 <= day <= 31: 

384 return None 

385 

386 if year < 1601 or hour > 23 or minute > 59 or second > 59: 

387 return None 

388 

389 return calendar.timegm((year, month, day, hour, minute, second, -1, -1, -1)) 

390 

391 

392class DummyCookieJar(AbstractCookieJar): 

393 """Implements a dummy cookie storage. 

394 

395 It can be used with the ClientSession when no cookie processing is needed. 

396 

397 """ 

398 

399 def __init__(self, *, loop: Optional[asyncio.AbstractEventLoop] = None) -> None: 

400 super().__init__(loop=loop) 

401 

402 def __iter__(self) -> "Iterator[Morsel[str]]": 

403 while False: 

404 yield None 

405 

406 def __len__(self) -> int: 

407 return 0 

408 

409 def clear(self, predicate: Optional[ClearCookiePredicate] = None) -> None: 

410 pass 

411 

412 def clear_domain(self, domain: str) -> None: 

413 pass 

414 

415 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None: 

416 pass 

417 

418 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]": 

419 return SimpleCookie()