Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/cookiejar.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

352 statements  

1import calendar 

2import contextlib 

3import datetime 

4import heapq 

5import itertools 

6import json 

7import os 

8import pathlib 

9import re 

10import time 

11import warnings 

12from collections import defaultdict 

13from collections.abc import Iterable, Iterator, Mapping 

14from http.cookies import BaseCookie, Morsel, SimpleCookie 

15from types import MappingProxyType 

16from typing import Union 

17 

18from yarl import URL 

19 

20from ._cookie_helpers import preserve_morsel_with_coded_value 

21from .abc import AbstractCookieJar, ClearCookiePredicate 

22from .helpers import is_ip_address 

23from .typedefs import LooseCookies, PathLike, StrOrURL 

24 

25__all__ = ("CookieJar", "DummyCookieJar") 

26 

27 

28CookieItem = Union[str, "Morsel[str]"] 

29 

30# We cache these string methods here as their use is in performance critical code. 

31_FORMAT_PATH = "{}/{}".format 

32_FORMAT_DOMAIN_REVERSED = "{1}.{0}".format 

33 

34# The minimum number of scheduled cookie expirations before we start cleaning up 

35# the expiration heap. This is a performance optimization to avoid cleaning up the 

36# heap too often when there are only a few scheduled expirations. 

37_MIN_SCHEDULED_COOKIE_EXPIRATION = 100 

38_SIMPLE_COOKIE = SimpleCookie() 

39 

40# Not persisted; the absolute deadline is saved instead. 

41_RELATIVE_EXPIRY_ATTRS = frozenset(("max-age", "expires")) 

42 

43 

44class CookieJar(AbstractCookieJar): 

45 """Implements cookie storage adhering to RFC 6265.""" 

46 

47 DATE_TOKENS_RE = re.compile( 

48 r"[\x09\x20-\x2F\x3B-\x40\x5B-\x60\x7B-\x7E]*" 

49 r"(?P<token>[\x00-\x08\x0A-\x1F\d:a-zA-Z\x7F-\xFF]+)" 

50 ) 

51 

52 DATE_HMS_TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})") 

53 

54 DATE_DAY_OF_MONTH_RE = re.compile(r"(\d{1,2})") 

55 

56 DATE_MONTH_RE = re.compile( 

57 "(jan)|(feb)|(mar)|(apr)|(may)|(jun)|(jul)|(aug)|(sep)|(oct)|(nov)|(dec)", 

58 re.I, 

59 ) 

60 

61 DATE_YEAR_RE = re.compile(r"(\d{2,4})") 

62 

63 # calendar.timegm() fails for timestamps after datetime.datetime.max 

64 # Minus one as a loss of precision occurs when timestamp() is called. 

65 MAX_TIME = ( 

66 int(datetime.datetime.max.replace(tzinfo=datetime.timezone.utc).timestamp()) - 1 

67 ) 

68 try: 

69 calendar.timegm(time.gmtime(MAX_TIME)) 

70 except OSError: 

71 # Hit the maximum representable time on Windows 

72 # https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/localtime-localtime32-localtime64 

73 MAX_TIME = calendar.timegm((3000, 12, 31, 23, 59, 59, -1, -1, -1)) 

74 except OverflowError: 

75 # #4515: datetime.max may not be representable on 32-bit platforms 

76 MAX_TIME = 2**31 - 1 

77 # Avoid minuses in the future, 3x faster 

78 SUB_MAX_TIME = MAX_TIME - 1 

79 

80 def __init__( 

81 self, 

82 *, 

83 unsafe: bool = False, 

84 quote_cookie: bool = True, 

85 treat_as_secure_origin: StrOrURL | Iterable[StrOrURL] | None = None, 

86 ) -> None: 

87 self._cookies: defaultdict[tuple[str, str], SimpleCookie] = defaultdict( 

88 SimpleCookie 

89 ) 

90 self._morsel_cache: defaultdict[tuple[str, str], dict[str, Morsel[str]]] = ( 

91 defaultdict(dict) 

92 ) 

93 self._host_only_cookies: set[tuple[str, str]] = set() 

94 self._unsafe = unsafe 

95 self._quote_cookie = quote_cookie 

96 if treat_as_secure_origin is None: 

97 self._treat_as_secure_origin: frozenset[URL] = frozenset() 

98 elif isinstance(treat_as_secure_origin, URL): 

99 self._treat_as_secure_origin = frozenset({treat_as_secure_origin.origin()}) 

100 elif isinstance(treat_as_secure_origin, str): 

101 self._treat_as_secure_origin = frozenset( 

102 {URL(treat_as_secure_origin).origin()} 

103 ) 

104 else: 

105 self._treat_as_secure_origin = frozenset( 

106 { 

107 URL(url).origin() if isinstance(url, str) else url.origin() 

108 for url in treat_as_secure_origin 

109 } 

110 ) 

111 self._expire_heap: list[tuple[float, tuple[str, str, str]]] = [] 

112 self._expirations: dict[tuple[str, str, str], float] = {} 

113 

114 @property 

115 def unsafe(self) -> bool: 

116 return self._unsafe 

117 

118 @property 

119 def quote_cookie(self) -> bool: 

120 return self._quote_cookie 

121 

122 @property 

123 def cookies(self) -> MappingProxyType[tuple[str, str], SimpleCookie]: 

124 """Return the cookies stored in this jar.""" 

125 return MappingProxyType(self._cookies) 

126 

127 @property 

128 def host_only_cookies(self) -> frozenset[tuple[str, str]]: 

129 """Return the host-only cookies stored in this jar.""" 

130 return frozenset(self._host_only_cookies) 

131 

132 def save(self, file_path: PathLike) -> None: 

133 """Save cookies to a file using JSON format. 

134 

135 :param file_path: Path to file where cookies will be serialized, 

136 :class:`str` or :class:`pathlib.Path` instance. 

137 """ 

138 file_path = pathlib.Path(file_path) 

139 data: dict[str, dict[str, dict[str, str | bool | float]]] = {} 

140 for (domain, path), cookie in self._cookies.items(): 

141 key = f"{domain}|{path}" 

142 data[key] = {} 

143 for name, morsel in cookie.items(): 

144 morsel_data: dict[str, str | bool | float] = { 

145 "key": morsel.key, 

146 "value": morsel.value, 

147 "coded_value": morsel.coded_value, 

148 } 

149 # Skip relative expiry; the absolute deadline is saved below. 

150 for attr in morsel._reserved: # type: ignore[attr-defined] 

151 if attr in _RELATIVE_EXPIRY_ATTRS: 

152 continue 

153 attr_val = morsel[attr] 

154 if attr_val: 

155 morsel_data[attr] = attr_val 

156 # Persist or it reloads as a domain cookie and leaks to subdomains. 

157 if (domain, name) in self._host_only_cookies: 

158 morsel_data["host_only"] = True 

159 if (exp := self._expirations.get((domain, path, name))) is not None: 

160 morsel_data["expires_timestamp"] = exp 

161 data[key][name] = morsel_data 

162 

163 # Cookie persistence may include authentication/session tokens. 

164 # Use 0o600 at creation time to avoid umask-dependent overexposure 

165 # and enforce least-privilege access to sensitive credential data. 

166 with open( 

167 file_path, 

168 mode="w", 

169 encoding="utf-8", 

170 opener=lambda path, flags: os.open(path, flags, 0o600), 

171 ) as f: 

172 json.dump(data, f, indent=2) 

173 

174 def load(self, file_path: PathLike) -> None: 

175 """Load cookies from a JSON file. 

176 

177 Replaces the current jar contents; loaded cookies pass through the 

178 same acceptance rules as :meth:`update_cookies`. 

179 

180 :param file_path: Path to file from where cookies will be 

181 imported, :class:`str` or :class:`pathlib.Path` instance. 

182 """ 

183 file_path = pathlib.Path(file_path) 

184 with file_path.open(mode="r", encoding="utf-8") as f: 

185 data = json.load(f) 

186 self._load_json_data(data) 

187 

188 def _load_json_data( 

189 self, data: dict[str, dict[str, dict[str, str | bool | float]]] 

190 ) -> None: 

191 """Replace contents, routing cookies through update_cookies().""" 

192 self.clear() 

193 for compound_key, cookie_data in data.items(): 

194 domain, path = compound_key.split("|", 1) 

195 for name, morsel_data in cookie_data.items(): 

196 morsel: Morsel[str] = Morsel() 

197 # Use __setstate__ to bypass validation, same pattern 

198 # used in _build_morsel and _cookie_helpers. 

199 morsel.__setstate__( # type: ignore[attr-defined] 

200 { 

201 "key": morsel_data["key"], 

202 "value": morsel_data["value"], 

203 "coded_value": morsel_data["coded_value"], 

204 } 

205 ) 

206 # Restore morsel attributes 

207 for attr in morsel._reserved: # type: ignore[attr-defined] 

208 if attr in morsel_data and attr not in ( 

209 "key", 

210 "value", 

211 "coded_value", 

212 ): 

213 morsel[attr] = morsel_data[attr] 

214 # Drop the domain so update_cookies() re-marks it host-only. 

215 if morsel_data.get("host_only"): 

216 morsel["domain"] = "" 

217 response_url = ( 

218 URL.build(scheme="https", host=domain) if domain else URL() 

219 ) 

220 self.update_cookies({name: morsel}, response_url) 

221 # Restore the absolute deadline; update_cookies() schedules none. 

222 if (exp := morsel_data.get("expires_timestamp")) is not None: 

223 self._expire_cookie(float(exp), domain, path, name) 

224 self._do_expiration() 

225 

226 def clear(self, predicate: ClearCookiePredicate | None = None) -> None: 

227 if predicate is None: 

228 self._expire_heap.clear() 

229 self._cookies.clear() 

230 self._morsel_cache.clear() 

231 self._host_only_cookies.clear() 

232 self._expirations.clear() 

233 return 

234 

235 now = time.time() 

236 to_del = [ 

237 key 

238 for (domain, path), cookie in self._cookies.items() 

239 for name, morsel in cookie.items() 

240 if ( 

241 (key := (domain, path, name)) in self._expirations 

242 and self._expirations[key] <= now 

243 ) 

244 or predicate(morsel) 

245 ] 

246 if to_del: 

247 self._delete_cookies(to_del) 

248 

249 def clear_domain(self, domain: str) -> None: 

250 self.clear(lambda x: self._is_domain_match(domain, x["domain"])) 

251 

252 def __iter__(self) -> "Iterator[Morsel[str]]": 

253 self._do_expiration() 

254 for val in self._cookies.values(): 

255 yield from val.values() 

256 

257 def __len__(self) -> int: 

258 """Return number of cookies. 

259 

260 This function does not iterate self to avoid unnecessary expiration 

261 checks. 

262 """ 

263 return sum(len(cookie.values()) for cookie in self._cookies.values()) 

264 

265 def _do_expiration(self) -> None: 

266 """Remove expired cookies.""" 

267 if not (expire_heap_len := len(self._expire_heap)): 

268 return 

269 

270 # If the expiration heap grows larger than the number expirations 

271 # times two, we clean it up to avoid keeping expired entries in 

272 # the heap and consuming memory. We guard this with a minimum 

273 # threshold to avoid cleaning up the heap too often when there are 

274 # only a few scheduled expirations. 

275 if ( 

276 expire_heap_len > _MIN_SCHEDULED_COOKIE_EXPIRATION 

277 and expire_heap_len > len(self._expirations) * 2 

278 ): 

279 # Remove any expired entries from the expiration heap 

280 # that do not match the expiration time in the expirations 

281 # as it means the cookie has been re-added to the heap 

282 # with a different expiration time. 

283 self._expire_heap = [ 

284 entry 

285 for entry in self._expire_heap 

286 if self._expirations.get(entry[1]) == entry[0] 

287 ] 

288 heapq.heapify(self._expire_heap) 

289 

290 now = time.time() 

291 to_del: list[tuple[str, str, str]] = [] 

292 # Find any expired cookies and add them to the to-delete list 

293 while self._expire_heap: 

294 when, cookie_key = self._expire_heap[0] 

295 if when > now: 

296 break 

297 heapq.heappop(self._expire_heap) 

298 # Check if the cookie hasn't been re-added to the heap 

299 # with a different expiration time as it will be removed 

300 # later when it reaches the top of the heap and its 

301 # expiration time is met. 

302 if self._expirations.get(cookie_key) == when: 

303 to_del.append(cookie_key) 

304 

305 if to_del: 

306 self._delete_cookies(to_del) 

307 

308 def _delete_cookies(self, to_del: list[tuple[str, str, str]]) -> None: 

309 for domain, path, name in to_del: 

310 self._host_only_cookies.discard((domain, name)) 

311 self._cookies[(domain, path)].pop(name, None) 

312 self._morsel_cache[(domain, path)].pop(name, None) 

313 self._expirations.pop((domain, path, name), None) 

314 

315 def _expire_cookie(self, when: float, domain: str, path: str, name: str) -> None: 

316 cookie_key = (domain, path, name) 

317 if self._expirations.get(cookie_key) == when: 

318 # Avoid adding duplicates to the heap 

319 return 

320 heapq.heappush(self._expire_heap, (when, cookie_key)) 

321 self._expirations[cookie_key] = when 

322 

323 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None: 

324 """Update cookies.""" 

325 hostname = response_url.raw_host 

326 

327 if not self._unsafe and is_ip_address(hostname): 

328 # Don't accept cookies from IPs 

329 return 

330 

331 if isinstance(cookies, Mapping): 

332 cookies = cookies.items() 

333 

334 for name, cookie in cookies: 

335 if not isinstance(cookie, Morsel): 

336 tmp = SimpleCookie() 

337 tmp[name] = cookie # type: ignore[assignment] 

338 cookie = tmp[name] 

339 

340 domain = cookie["domain"] 

341 

342 # ignore domains with trailing dots 

343 if domain and domain[-1] == ".": 

344 domain = "" 

345 del cookie["domain"] 

346 

347 if not domain and hostname is not None: 

348 # Set the cookie's domain to the response hostname 

349 # and set its host-only-flag 

350 self._host_only_cookies.add((hostname, name)) 

351 domain = cookie["domain"] = hostname 

352 

353 if domain and domain[0] == ".": 

354 # Remove leading dot 

355 domain = domain[1:] 

356 cookie["domain"] = domain 

357 

358 if hostname and not self._is_domain_match(domain, hostname): 

359 # Setting cookies for different domains is not allowed 

360 continue 

361 

362 path = cookie["path"] 

363 if not path or path[0] != "/": 

364 # Set the cookie's path to the response path 

365 path = response_url.path 

366 if not path.startswith("/"): 

367 path = "/" 

368 else: 

369 # Cut everything from the last slash to the end 

370 path = "/" + path[1 : path.rfind("/")] 

371 cookie["path"] = path 

372 path = path.rstrip("/") 

373 

374 if max_age := cookie["max-age"]: 

375 try: 

376 delta_seconds = int(max_age) 

377 max_age_expiration = min(time.time() + delta_seconds, self.MAX_TIME) 

378 self._expire_cookie(max_age_expiration, domain, path, name) 

379 except ValueError: 

380 cookie["max-age"] = "" 

381 

382 elif expires := cookie["expires"]: 

383 if expire_time := self._parse_date(expires): 

384 self._expire_cookie(expire_time, domain, path, name) 

385 else: 

386 cookie["expires"] = "" 

387 

388 key = (domain, path) 

389 if self._cookies[key].get(name) != cookie: 

390 # Don't blow away the cache if the same 

391 # cookie gets set again 

392 self._cookies[key][name] = cookie 

393 self._morsel_cache[key].pop(name, None) 

394 

395 self._do_expiration() 

396 

397 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]": 

398 """Returns this jar's cookies filtered by their attributes.""" 

399 if not isinstance(request_url, URL): 

400 warnings.warn( # type: ignore[unreachable] 

401 f"The method accepts yarl.URL instances only, got {type(request_url)}", 

402 DeprecationWarning, 

403 ) 

404 request_url = URL(request_url) 

405 # We always use BaseCookie now since all 

406 # cookies set on on filtered are fully constructed 

407 # Morsels, not just names and values. 

408 filtered: BaseCookie[str] = BaseCookie() 

409 if not self._cookies: 

410 # Skip do_expiration() if there are no cookies. 

411 return filtered 

412 self._do_expiration() 

413 if not self._cookies: 

414 # Skip rest of function if no non-expired cookies. 

415 return filtered 

416 hostname = request_url.raw_host or "" 

417 

418 is_not_secure = request_url.scheme not in ("https", "wss") 

419 if is_not_secure and self._treat_as_secure_origin: 

420 request_origin = URL() 

421 with contextlib.suppress(ValueError): 

422 request_origin = request_url.origin() 

423 is_not_secure = request_origin not in self._treat_as_secure_origin 

424 

425 # Send shared cookie 

426 key = ("", "") 

427 for c in self._cookies[key].values(): 

428 # Check cache first 

429 if c.key in self._morsel_cache[key]: 

430 filtered[c.key] = self._morsel_cache[key][c.key] 

431 continue 

432 

433 # Build and cache the morsel 

434 mrsl_val = self._build_morsel(c) 

435 self._morsel_cache[key][c.key] = mrsl_val 

436 filtered[c.key] = mrsl_val 

437 

438 if is_ip_address(hostname): 

439 if not self._unsafe: 

440 return filtered 

441 domains: Iterable[str] = (hostname,) 

442 else: 

443 # Get all the subdomains that might match a cookie (e.g. "foo.bar.com", "bar.com", "com") 

444 domains = itertools.accumulate( 

445 reversed(hostname.split(".")), _FORMAT_DOMAIN_REVERSED 

446 ) 

447 

448 # Get all the path prefixes that might match a cookie (e.g. "", "/foo", "/foo/bar") 

449 paths = itertools.accumulate(request_url.path.split("/"), _FORMAT_PATH) 

450 # Create every combination of (domain, path) pairs. 

451 pairs = itertools.product(domains, paths) 

452 

453 path_len = len(request_url.path) 

454 # Point 2: https://www.rfc-editor.org/rfc/rfc6265.html#section-5.4 

455 for p in pairs: 

456 if p not in self._cookies: 

457 continue 

458 for name, cookie in self._cookies[p].items(): 

459 domain = cookie["domain"] 

460 

461 if (domain, name) in self._host_only_cookies and domain != hostname: 

462 continue 

463 

464 # Skip edge case when the cookie has a trailing slash but request doesn't. 

465 if len(cookie["path"]) > path_len: 

466 continue 

467 

468 if is_not_secure and cookie["secure"]: 

469 continue 

470 

471 # We already built the Morsel so reuse it here 

472 if name in self._morsel_cache[p]: 

473 filtered[name] = self._morsel_cache[p][name] 

474 continue 

475 

476 # Build and cache the morsel 

477 mrsl_val = self._build_morsel(cookie) 

478 self._morsel_cache[p][name] = mrsl_val 

479 filtered[name] = mrsl_val 

480 

481 return filtered 

482 

483 def _build_morsel(self, cookie: Morsel[str]) -> Morsel[str]: 

484 """Build a morsel for sending, respecting quote_cookie setting.""" 

485 if self._quote_cookie and cookie.coded_value and cookie.coded_value[0] == '"': 

486 return preserve_morsel_with_coded_value(cookie) 

487 morsel: Morsel[str] = Morsel() 

488 if self._quote_cookie: 

489 value, coded_value = _SIMPLE_COOKIE.value_encode(cookie.value) 

490 else: 

491 coded_value = value = cookie.value 

492 # We use __setstate__ instead of the public set() API because it allows us to 

493 # bypass validation and set already validated state. This is more stable than 

494 # setting protected attributes directly. 

495 morsel.__setstate__({"key": cookie.key, "value": value, "coded_value": coded_value}) # type: ignore[attr-defined] 

496 return morsel 

497 

498 @staticmethod 

499 def _is_domain_match(domain: str, hostname: str) -> bool: 

500 """Implements domain matching adhering to RFC 6265.""" 

501 if hostname == domain: 

502 return True 

503 

504 if not hostname.endswith(domain): 

505 return False 

506 

507 non_matching = hostname[: -len(domain)] 

508 

509 if not non_matching.endswith("."): 

510 return False 

511 

512 return not is_ip_address(hostname) 

513 

514 @classmethod 

515 def _parse_date(cls, date_str: str) -> int | None: 

516 """Implements date string parsing adhering to RFC 6265.""" 

517 if not date_str: 

518 return None 

519 

520 found_time = False 

521 found_day = False 

522 found_month = False 

523 found_year = False 

524 

525 hour = minute = second = 0 

526 day = 0 

527 month = 0 

528 year = 0 

529 

530 for token_match in cls.DATE_TOKENS_RE.finditer(date_str): 

531 token = token_match.group("token") 

532 

533 if not found_time: 

534 time_match = cls.DATE_HMS_TIME_RE.match(token) 

535 if time_match: 

536 found_time = True 

537 hour, minute, second = (int(s) for s in time_match.groups()) 

538 continue 

539 

540 if not found_day: 

541 day_match = cls.DATE_DAY_OF_MONTH_RE.match(token) 

542 if day_match: 

543 found_day = True 

544 day = int(day_match.group()) 

545 continue 

546 

547 if not found_month: 

548 month_match = cls.DATE_MONTH_RE.match(token) 

549 if month_match: 

550 found_month = True 

551 assert month_match.lastindex is not None 

552 month = month_match.lastindex 

553 continue 

554 

555 if not found_year: 

556 year_match = cls.DATE_YEAR_RE.match(token) 

557 if year_match: 

558 found_year = True 

559 year = int(year_match.group()) 

560 

561 if 70 <= year <= 99: 

562 year += 1900 

563 elif 0 <= year <= 69: 

564 year += 2000 

565 

566 if False in (found_day, found_month, found_year, found_time): 

567 return None 

568 

569 if not 1 <= day <= 31: 

570 return None 

571 

572 if year < 1601 or hour > 23 or minute > 59 or second > 59: 

573 return None 

574 

575 return calendar.timegm((year, month, day, hour, minute, second, -1, -1, -1)) 

576 

577 

578class DummyCookieJar(AbstractCookieJar): 

579 """Implements a dummy cookie storage. 

580 

581 It can be used with the ClientSession when no cookie processing is needed. 

582 

583 """ 

584 

585 def __iter__(self) -> "Iterator[Morsel[str]]": 

586 while False: 

587 yield None # type: ignore[unreachable] 

588 

589 def __len__(self) -> int: 

590 return 0 

591 

592 @property 

593 def unsafe(self) -> bool: 

594 return False 

595 

596 @property 

597 def quote_cookie(self) -> bool: 

598 return True 

599 

600 @property 

601 def cookies(self) -> MappingProxyType[tuple[str, str], SimpleCookie]: 

602 """Return an empty mapping.""" 

603 return MappingProxyType({}) 

604 

605 @property 

606 def host_only_cookies(self) -> frozenset[tuple[str, str]]: 

607 """Return an empty frozenset.""" 

608 return frozenset() 

609 

610 def clear(self, predicate: ClearCookiePredicate | None = None) -> None: 

611 pass 

612 

613 def clear_domain(self, domain: str) -> None: 

614 pass 

615 

616 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None: 

617 pass 

618 

619 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]": 

620 return SimpleCookie()