Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/cookiejar.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

344 statements  

1import calendar 

2import contextlib 

3import datetime 

4import heapq 

5import itertools 

6import json 

7import os 

8import pathlib 

9import re 

10import time 

11import warnings 

12from collections import defaultdict 

13from collections.abc import Iterable, Iterator, Mapping 

14from http.cookies import BaseCookie, Morsel, SimpleCookie 

15from types import MappingProxyType 

16from typing import Union 

17 

18from yarl import URL 

19 

20from ._cookie_helpers import preserve_morsel_with_coded_value 

21from .abc import AbstractCookieJar, ClearCookiePredicate 

22from .helpers import is_ip_address 

23from .typedefs import LooseCookies, PathLike, StrOrURL 

24 

25__all__ = ("CookieJar", "DummyCookieJar") 

26 

27 

28CookieItem = Union[str, "Morsel[str]"] 

29 

30# We cache these string methods here as their use is in performance critical code. 

31_FORMAT_PATH = "{}/{}".format 

32_FORMAT_DOMAIN_REVERSED = "{1}.{0}".format 

33 

34# The minimum number of scheduled cookie expirations before we start cleaning up 

35# the expiration heap. This is a performance optimization to avoid cleaning up the 

36# heap too often when there are only a few scheduled expirations. 

37_MIN_SCHEDULED_COOKIE_EXPIRATION = 100 

38_SIMPLE_COOKIE = SimpleCookie() 

39 

40 

41class CookieJar(AbstractCookieJar): 

42 """Implements cookie storage adhering to RFC 6265.""" 

43 

44 DATE_TOKENS_RE = re.compile( 

45 r"[\x09\x20-\x2F\x3B-\x40\x5B-\x60\x7B-\x7E]*" 

46 r"(?P<token>[\x00-\x08\x0A-\x1F\d:a-zA-Z\x7F-\xFF]+)" 

47 ) 

48 

49 DATE_HMS_TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})") 

50 

51 DATE_DAY_OF_MONTH_RE = re.compile(r"(\d{1,2})") 

52 

53 DATE_MONTH_RE = re.compile( 

54 "(jan)|(feb)|(mar)|(apr)|(may)|(jun)|(jul)|(aug)|(sep)|(oct)|(nov)|(dec)", 

55 re.I, 

56 ) 

57 

58 DATE_YEAR_RE = re.compile(r"(\d{2,4})") 

59 

60 # calendar.timegm() fails for timestamps after datetime.datetime.max 

61 # Minus one as a loss of precision occurs when timestamp() is called. 

62 MAX_TIME = ( 

63 int(datetime.datetime.max.replace(tzinfo=datetime.timezone.utc).timestamp()) - 1 

64 ) 

65 try: 

66 calendar.timegm(time.gmtime(MAX_TIME)) 

67 except OSError: 

68 # Hit the maximum representable time on Windows 

69 # https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/localtime-localtime32-localtime64 

70 MAX_TIME = calendar.timegm((3000, 12, 31, 23, 59, 59, -1, -1, -1)) 

71 except OverflowError: 

72 # #4515: datetime.max may not be representable on 32-bit platforms 

73 MAX_TIME = 2**31 - 1 

74 # Avoid minuses in the future, 3x faster 

75 SUB_MAX_TIME = MAX_TIME - 1 

76 

77 def __init__( 

78 self, 

79 *, 

80 unsafe: bool = False, 

81 quote_cookie: bool = True, 

82 treat_as_secure_origin: StrOrURL | Iterable[StrOrURL] | None = None, 

83 ) -> None: 

84 self._cookies: defaultdict[tuple[str, str], SimpleCookie] = defaultdict( 

85 SimpleCookie 

86 ) 

87 self._morsel_cache: defaultdict[tuple[str, str], dict[str, Morsel[str]]] = ( 

88 defaultdict(dict) 

89 ) 

90 self._host_only_cookies: set[tuple[str, str]] = set() 

91 self._unsafe = unsafe 

92 self._quote_cookie = quote_cookie 

93 if treat_as_secure_origin is None: 

94 self._treat_as_secure_origin: frozenset[URL] = frozenset() 

95 elif isinstance(treat_as_secure_origin, URL): 

96 self._treat_as_secure_origin = frozenset({treat_as_secure_origin.origin()}) 

97 elif isinstance(treat_as_secure_origin, str): 

98 self._treat_as_secure_origin = frozenset( 

99 {URL(treat_as_secure_origin).origin()} 

100 ) 

101 else: 

102 self._treat_as_secure_origin = frozenset( 

103 { 

104 URL(url).origin() if isinstance(url, str) else url.origin() 

105 for url in treat_as_secure_origin 

106 } 

107 ) 

108 self._expire_heap: list[tuple[float, tuple[str, str, str]]] = [] 

109 self._expirations: dict[tuple[str, str, str], float] = {} 

110 

111 @property 

112 def unsafe(self) -> bool: 

113 return self._unsafe 

114 

115 @property 

116 def quote_cookie(self) -> bool: 

117 return self._quote_cookie 

118 

119 @property 

120 def cookies(self) -> MappingProxyType[tuple[str, str], SimpleCookie]: 

121 """Return the cookies stored in this jar.""" 

122 return MappingProxyType(self._cookies) 

123 

124 @property 

125 def host_only_cookies(self) -> frozenset[tuple[str, str]]: 

126 """Return the host-only cookies stored in this jar.""" 

127 return frozenset(self._host_only_cookies) 

128 

129 def save(self, file_path: PathLike) -> None: 

130 """Save cookies to a file using JSON format. 

131 

132 :param file_path: Path to file where cookies will be serialized, 

133 :class:`str` or :class:`pathlib.Path` instance. 

134 """ 

135 file_path = pathlib.Path(file_path) 

136 data: dict[str, dict[str, dict[str, str | bool]]] = {} 

137 for (domain, path), cookie in self._cookies.items(): 

138 key = f"{domain}|{path}" 

139 data[key] = {} 

140 for name, morsel in cookie.items(): 

141 morsel_data: dict[str, str | bool] = { 

142 "key": morsel.key, 

143 "value": morsel.value, 

144 "coded_value": morsel.coded_value, 

145 } 

146 # Save all morsel attributes that have values 

147 for attr in morsel._reserved: # type: ignore[attr-defined] 

148 attr_val = morsel[attr] 

149 if attr_val: 

150 morsel_data[attr] = attr_val 

151 data[key][name] = morsel_data 

152 

153 # Cookie persistence may include authentication/session tokens. 

154 # Use 0o600 at creation time to avoid umask-dependent overexposure 

155 # and enforce least-privilege access to sensitive credential data. 

156 with open( 

157 file_path, 

158 mode="w", 

159 encoding="utf-8", 

160 opener=lambda path, flags: os.open(path, flags, 0o600), 

161 ) as f: 

162 json.dump(data, f, indent=2) 

163 

164 def load(self, file_path: PathLike) -> None: 

165 """Load cookies from a JSON file. 

166 

167 :param file_path: Path to file from where cookies will be 

168 imported, :class:`str` or :class:`pathlib.Path` instance. 

169 """ 

170 file_path = pathlib.Path(file_path) 

171 with file_path.open(mode="r", encoding="utf-8") as f: 

172 data = json.load(f) 

173 self._cookies = self._load_json_data(data) 

174 

175 def _load_json_data( 

176 self, data: dict[str, dict[str, dict[str, str | bool]]] 

177 ) -> defaultdict[tuple[str, str], SimpleCookie]: 

178 """Load cookies from parsed JSON data.""" 

179 cookies: defaultdict[tuple[str, str], SimpleCookie] = defaultdict(SimpleCookie) 

180 for compound_key, cookie_data in data.items(): 

181 domain, path = compound_key.split("|", 1) 

182 key = (domain, path) 

183 for name, morsel_data in cookie_data.items(): 

184 morsel: Morsel[str] = Morsel() 

185 morsel_key = morsel_data["key"] 

186 morsel_value = morsel_data["value"] 

187 morsel_coded_value = morsel_data["coded_value"] 

188 # Use __setstate__ to bypass validation, same pattern 

189 # used in _build_morsel and _cookie_helpers. 

190 morsel.__setstate__( # type: ignore[attr-defined] 

191 { 

192 "key": morsel_key, 

193 "value": morsel_value, 

194 "coded_value": morsel_coded_value, 

195 } 

196 ) 

197 # Restore morsel attributes 

198 for attr in morsel._reserved: # type: ignore[attr-defined] 

199 if attr in morsel_data and attr not in ( 

200 "key", 

201 "value", 

202 "coded_value", 

203 ): 

204 morsel[attr] = morsel_data[attr] 

205 cookies[key][name] = morsel 

206 return cookies 

207 

208 def clear(self, predicate: ClearCookiePredicate | None = None) -> None: 

209 if predicate is None: 

210 self._expire_heap.clear() 

211 self._cookies.clear() 

212 self._morsel_cache.clear() 

213 self._host_only_cookies.clear() 

214 self._expirations.clear() 

215 return 

216 

217 now = time.time() 

218 to_del = [ 

219 key 

220 for (domain, path), cookie in self._cookies.items() 

221 for name, morsel in cookie.items() 

222 if ( 

223 (key := (domain, path, name)) in self._expirations 

224 and self._expirations[key] <= now 

225 ) 

226 or predicate(morsel) 

227 ] 

228 if to_del: 

229 self._delete_cookies(to_del) 

230 

231 def clear_domain(self, domain: str) -> None: 

232 self.clear(lambda x: self._is_domain_match(domain, x["domain"])) 

233 

234 def __iter__(self) -> "Iterator[Morsel[str]]": 

235 self._do_expiration() 

236 for val in self._cookies.values(): 

237 yield from val.values() 

238 

239 def __len__(self) -> int: 

240 """Return number of cookies. 

241 

242 This function does not iterate self to avoid unnecessary expiration 

243 checks. 

244 """ 

245 return sum(len(cookie.values()) for cookie in self._cookies.values()) 

246 

247 def _do_expiration(self) -> None: 

248 """Remove expired cookies.""" 

249 if not (expire_heap_len := len(self._expire_heap)): 

250 return 

251 

252 # If the expiration heap grows larger than the number expirations 

253 # times two, we clean it up to avoid keeping expired entries in 

254 # the heap and consuming memory. We guard this with a minimum 

255 # threshold to avoid cleaning up the heap too often when there are 

256 # only a few scheduled expirations. 

257 if ( 

258 expire_heap_len > _MIN_SCHEDULED_COOKIE_EXPIRATION 

259 and expire_heap_len > len(self._expirations) * 2 

260 ): 

261 # Remove any expired entries from the expiration heap 

262 # that do not match the expiration time in the expirations 

263 # as it means the cookie has been re-added to the heap 

264 # with a different expiration time. 

265 self._expire_heap = [ 

266 entry 

267 for entry in self._expire_heap 

268 if self._expirations.get(entry[1]) == entry[0] 

269 ] 

270 heapq.heapify(self._expire_heap) 

271 

272 now = time.time() 

273 to_del: list[tuple[str, str, str]] = [] 

274 # Find any expired cookies and add them to the to-delete list 

275 while self._expire_heap: 

276 when, cookie_key = self._expire_heap[0] 

277 if when > now: 

278 break 

279 heapq.heappop(self._expire_heap) 

280 # Check if the cookie hasn't been re-added to the heap 

281 # with a different expiration time as it will be removed 

282 # later when it reaches the top of the heap and its 

283 # expiration time is met. 

284 if self._expirations.get(cookie_key) == when: 

285 to_del.append(cookie_key) 

286 

287 if to_del: 

288 self._delete_cookies(to_del) 

289 

290 def _delete_cookies(self, to_del: list[tuple[str, str, str]]) -> None: 

291 for domain, path, name in to_del: 

292 self._host_only_cookies.discard((domain, name)) 

293 self._cookies[(domain, path)].pop(name, None) 

294 self._morsel_cache[(domain, path)].pop(name, None) 

295 self._expirations.pop((domain, path, name), None) 

296 

297 def _expire_cookie(self, when: float, domain: str, path: str, name: str) -> None: 

298 cookie_key = (domain, path, name) 

299 if self._expirations.get(cookie_key) == when: 

300 # Avoid adding duplicates to the heap 

301 return 

302 heapq.heappush(self._expire_heap, (when, cookie_key)) 

303 self._expirations[cookie_key] = when 

304 

305 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None: 

306 """Update cookies.""" 

307 hostname = response_url.raw_host 

308 

309 if not self._unsafe and is_ip_address(hostname): 

310 # Don't accept cookies from IPs 

311 return 

312 

313 if isinstance(cookies, Mapping): 

314 cookies = cookies.items() 

315 

316 for name, cookie in cookies: 

317 if not isinstance(cookie, Morsel): 

318 tmp = SimpleCookie() 

319 tmp[name] = cookie # type: ignore[assignment] 

320 cookie = tmp[name] 

321 

322 domain = cookie["domain"] 

323 

324 # ignore domains with trailing dots 

325 if domain and domain[-1] == ".": 

326 domain = "" 

327 del cookie["domain"] 

328 

329 if not domain and hostname is not None: 

330 # Set the cookie's domain to the response hostname 

331 # and set its host-only-flag 

332 self._host_only_cookies.add((hostname, name)) 

333 domain = cookie["domain"] = hostname 

334 

335 if domain and domain[0] == ".": 

336 # Remove leading dot 

337 domain = domain[1:] 

338 cookie["domain"] = domain 

339 

340 if hostname and not self._is_domain_match(domain, hostname): 

341 # Setting cookies for different domains is not allowed 

342 continue 

343 

344 path = cookie["path"] 

345 if not path or path[0] != "/": 

346 # Set the cookie's path to the response path 

347 path = response_url.path 

348 if not path.startswith("/"): 

349 path = "/" 

350 else: 

351 # Cut everything from the last slash to the end 

352 path = "/" + path[1 : path.rfind("/")] 

353 cookie["path"] = path 

354 path = path.rstrip("/") 

355 

356 if max_age := cookie["max-age"]: 

357 try: 

358 delta_seconds = int(max_age) 

359 max_age_expiration = min(time.time() + delta_seconds, self.MAX_TIME) 

360 self._expire_cookie(max_age_expiration, domain, path, name) 

361 except ValueError: 

362 cookie["max-age"] = "" 

363 

364 elif expires := cookie["expires"]: 

365 if expire_time := self._parse_date(expires): 

366 self._expire_cookie(expire_time, domain, path, name) 

367 else: 

368 cookie["expires"] = "" 

369 

370 key = (domain, path) 

371 if self._cookies[key].get(name) != cookie: 

372 # Don't blow away the cache if the same 

373 # cookie gets set again 

374 self._cookies[key][name] = cookie 

375 self._morsel_cache[key].pop(name, None) 

376 

377 self._do_expiration() 

378 

379 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]": 

380 """Returns this jar's cookies filtered by their attributes.""" 

381 if not isinstance(request_url, URL): 

382 warnings.warn( # type: ignore[unreachable] 

383 f"The method accepts yarl.URL instances only, got {type(request_url)}", 

384 DeprecationWarning, 

385 ) 

386 request_url = URL(request_url) 

387 # We always use BaseCookie now since all 

388 # cookies set on on filtered are fully constructed 

389 # Morsels, not just names and values. 

390 filtered: BaseCookie[str] = BaseCookie() 

391 if not self._cookies: 

392 # Skip do_expiration() if there are no cookies. 

393 return filtered 

394 self._do_expiration() 

395 if not self._cookies: 

396 # Skip rest of function if no non-expired cookies. 

397 return filtered 

398 hostname = request_url.raw_host or "" 

399 

400 is_not_secure = request_url.scheme not in ("https", "wss") 

401 if is_not_secure and self._treat_as_secure_origin: 

402 request_origin = URL() 

403 with contextlib.suppress(ValueError): 

404 request_origin = request_url.origin() 

405 is_not_secure = request_origin not in self._treat_as_secure_origin 

406 

407 # Send shared cookie 

408 key = ("", "") 

409 for c in self._cookies[key].values(): 

410 # Check cache first 

411 if c.key in self._morsel_cache[key]: 

412 filtered[c.key] = self._morsel_cache[key][c.key] 

413 continue 

414 

415 # Build and cache the morsel 

416 mrsl_val = self._build_morsel(c) 

417 self._morsel_cache[key][c.key] = mrsl_val 

418 filtered[c.key] = mrsl_val 

419 

420 if is_ip_address(hostname): 

421 if not self._unsafe: 

422 return filtered 

423 domains: Iterable[str] = (hostname,) 

424 else: 

425 # Get all the subdomains that might match a cookie (e.g. "foo.bar.com", "bar.com", "com") 

426 domains = itertools.accumulate( 

427 reversed(hostname.split(".")), _FORMAT_DOMAIN_REVERSED 

428 ) 

429 

430 # Get all the path prefixes that might match a cookie (e.g. "", "/foo", "/foo/bar") 

431 paths = itertools.accumulate(request_url.path.split("/"), _FORMAT_PATH) 

432 # Create every combination of (domain, path) pairs. 

433 pairs = itertools.product(domains, paths) 

434 

435 path_len = len(request_url.path) 

436 # Point 2: https://www.rfc-editor.org/rfc/rfc6265.html#section-5.4 

437 for p in pairs: 

438 if p not in self._cookies: 

439 continue 

440 for name, cookie in self._cookies[p].items(): 

441 domain = cookie["domain"] 

442 

443 if (domain, name) in self._host_only_cookies and domain != hostname: 

444 continue 

445 

446 # Skip edge case when the cookie has a trailing slash but request doesn't. 

447 if len(cookie["path"]) > path_len: 

448 continue 

449 

450 if is_not_secure and cookie["secure"]: 

451 continue 

452 

453 # We already built the Morsel so reuse it here 

454 if name in self._morsel_cache[p]: 

455 filtered[name] = self._morsel_cache[p][name] 

456 continue 

457 

458 # Build and cache the morsel 

459 mrsl_val = self._build_morsel(cookie) 

460 self._morsel_cache[p][name] = mrsl_val 

461 filtered[name] = mrsl_val 

462 

463 return filtered 

464 

465 def _build_morsel(self, cookie: Morsel[str]) -> Morsel[str]: 

466 """Build a morsel for sending, respecting quote_cookie setting.""" 

467 if self._quote_cookie and cookie.coded_value and cookie.coded_value[0] == '"': 

468 return preserve_morsel_with_coded_value(cookie) 

469 morsel: Morsel[str] = Morsel() 

470 if self._quote_cookie: 

471 value, coded_value = _SIMPLE_COOKIE.value_encode(cookie.value) 

472 else: 

473 coded_value = value = cookie.value 

474 # We use __setstate__ instead of the public set() API because it allows us to 

475 # bypass validation and set already validated state. This is more stable than 

476 # setting protected attributes directly. 

477 morsel.__setstate__({"key": cookie.key, "value": value, "coded_value": coded_value}) # type: ignore[attr-defined] 

478 return morsel 

479 

480 @staticmethod 

481 def _is_domain_match(domain: str, hostname: str) -> bool: 

482 """Implements domain matching adhering to RFC 6265.""" 

483 if hostname == domain: 

484 return True 

485 

486 if not hostname.endswith(domain): 

487 return False 

488 

489 non_matching = hostname[: -len(domain)] 

490 

491 if not non_matching.endswith("."): 

492 return False 

493 

494 return not is_ip_address(hostname) 

495 

496 @classmethod 

497 def _parse_date(cls, date_str: str) -> int | None: 

498 """Implements date string parsing adhering to RFC 6265.""" 

499 if not date_str: 

500 return None 

501 

502 found_time = False 

503 found_day = False 

504 found_month = False 

505 found_year = False 

506 

507 hour = minute = second = 0 

508 day = 0 

509 month = 0 

510 year = 0 

511 

512 for token_match in cls.DATE_TOKENS_RE.finditer(date_str): 

513 token = token_match.group("token") 

514 

515 if not found_time: 

516 time_match = cls.DATE_HMS_TIME_RE.match(token) 

517 if time_match: 

518 found_time = True 

519 hour, minute, second = (int(s) for s in time_match.groups()) 

520 continue 

521 

522 if not found_day: 

523 day_match = cls.DATE_DAY_OF_MONTH_RE.match(token) 

524 if day_match: 

525 found_day = True 

526 day = int(day_match.group()) 

527 continue 

528 

529 if not found_month: 

530 month_match = cls.DATE_MONTH_RE.match(token) 

531 if month_match: 

532 found_month = True 

533 assert month_match.lastindex is not None 

534 month = month_match.lastindex 

535 continue 

536 

537 if not found_year: 

538 year_match = cls.DATE_YEAR_RE.match(token) 

539 if year_match: 

540 found_year = True 

541 year = int(year_match.group()) 

542 

543 if 70 <= year <= 99: 

544 year += 1900 

545 elif 0 <= year <= 69: 

546 year += 2000 

547 

548 if False in (found_day, found_month, found_year, found_time): 

549 return None 

550 

551 if not 1 <= day <= 31: 

552 return None 

553 

554 if year < 1601 or hour > 23 or minute > 59 or second > 59: 

555 return None 

556 

557 return calendar.timegm((year, month, day, hour, minute, second, -1, -1, -1)) 

558 

559 

560class DummyCookieJar(AbstractCookieJar): 

561 """Implements a dummy cookie storage. 

562 

563 It can be used with the ClientSession when no cookie processing is needed. 

564 

565 """ 

566 

567 def __iter__(self) -> "Iterator[Morsel[str]]": 

568 while False: 

569 yield None # type: ignore[unreachable] 

570 

571 def __len__(self) -> int: 

572 return 0 

573 

574 @property 

575 def unsafe(self) -> bool: 

576 return False 

577 

578 @property 

579 def quote_cookie(self) -> bool: 

580 return True 

581 

582 @property 

583 def cookies(self) -> MappingProxyType[tuple[str, str], SimpleCookie]: 

584 """Return an empty mapping.""" 

585 return MappingProxyType({}) 

586 

587 @property 

588 def host_only_cookies(self) -> frozenset[tuple[str, str]]: 

589 """Return an empty frozenset.""" 

590 return frozenset() 

591 

592 def clear(self, predicate: ClearCookiePredicate | None = None) -> None: 

593 pass 

594 

595 def clear_domain(self, domain: str) -> None: 

596 pass 

597 

598 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None: 

599 pass 

600 

601 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]": 

602 return SimpleCookie()