Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/cookiejar.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

330 statements  

1import calendar 

2import contextlib 

3import datetime 

4import heapq 

5import itertools 

6import json 

7import pathlib 

8import re 

9import time 

10import warnings 

11from collections import defaultdict 

12from collections.abc import Iterable, Iterator, Mapping 

13from http.cookies import BaseCookie, Morsel, SimpleCookie 

14from typing import Union 

15 

16from yarl import URL 

17 

18from ._cookie_helpers import preserve_morsel_with_coded_value 

19from .abc import AbstractCookieJar, ClearCookiePredicate 

20from .helpers import is_ip_address 

21from .typedefs import LooseCookies, PathLike, StrOrURL 

22 

23__all__ = ("CookieJar", "DummyCookieJar") 

24 

25 

26CookieItem = Union[str, "Morsel[str]"] 

27 

28# We cache these string methods here as their use is in performance critical code. 

29_FORMAT_PATH = "{}/{}".format 

30_FORMAT_DOMAIN_REVERSED = "{1}.{0}".format 

31 

32# The minimum number of scheduled cookie expirations before we start cleaning up 

33# the expiration heap. This is a performance optimization to avoid cleaning up the 

34# heap too often when there are only a few scheduled expirations. 

35_MIN_SCHEDULED_COOKIE_EXPIRATION = 100 

36_SIMPLE_COOKIE = SimpleCookie() 

37 

38 

39class CookieJar(AbstractCookieJar): 

40 """Implements cookie storage adhering to RFC 6265.""" 

41 

42 DATE_TOKENS_RE = re.compile( 

43 r"[\x09\x20-\x2F\x3B-\x40\x5B-\x60\x7B-\x7E]*" 

44 r"(?P<token>[\x00-\x08\x0A-\x1F\d:a-zA-Z\x7F-\xFF]+)" 

45 ) 

46 

47 DATE_HMS_TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})") 

48 

49 DATE_DAY_OF_MONTH_RE = re.compile(r"(\d{1,2})") 

50 

51 DATE_MONTH_RE = re.compile( 

52 "(jan)|(feb)|(mar)|(apr)|(may)|(jun)|(jul)|(aug)|(sep)|(oct)|(nov)|(dec)", 

53 re.I, 

54 ) 

55 

56 DATE_YEAR_RE = re.compile(r"(\d{2,4})") 

57 

58 # calendar.timegm() fails for timestamps after datetime.datetime.max 

59 # Minus one as a loss of precision occurs when timestamp() is called. 

60 MAX_TIME = ( 

61 int(datetime.datetime.max.replace(tzinfo=datetime.timezone.utc).timestamp()) - 1 

62 ) 

63 try: 

64 calendar.timegm(time.gmtime(MAX_TIME)) 

65 except OSError: 

66 # Hit the maximum representable time on Windows 

67 # https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/localtime-localtime32-localtime64 

68 MAX_TIME = calendar.timegm((3000, 12, 31, 23, 59, 59, -1, -1, -1)) 

69 except OverflowError: 

70 # #4515: datetime.max may not be representable on 32-bit platforms 

71 MAX_TIME = 2**31 - 1 

72 # Avoid minuses in the future, 3x faster 

73 SUB_MAX_TIME = MAX_TIME - 1 

74 

75 def __init__( 

76 self, 

77 *, 

78 unsafe: bool = False, 

79 quote_cookie: bool = True, 

80 treat_as_secure_origin: StrOrURL | Iterable[StrOrURL] | None = None, 

81 ) -> None: 

82 self._cookies: defaultdict[tuple[str, str], SimpleCookie] = defaultdict( 

83 SimpleCookie 

84 ) 

85 self._morsel_cache: defaultdict[tuple[str, str], dict[str, Morsel[str]]] = ( 

86 defaultdict(dict) 

87 ) 

88 self._host_only_cookies: set[tuple[str, str]] = set() 

89 self._unsafe = unsafe 

90 self._quote_cookie = quote_cookie 

91 if treat_as_secure_origin is None: 

92 self._treat_as_secure_origin: frozenset[URL] = frozenset() 

93 elif isinstance(treat_as_secure_origin, URL): 

94 self._treat_as_secure_origin = frozenset({treat_as_secure_origin.origin()}) 

95 elif isinstance(treat_as_secure_origin, str): 

96 self._treat_as_secure_origin = frozenset( 

97 {URL(treat_as_secure_origin).origin()} 

98 ) 

99 else: 

100 self._treat_as_secure_origin = frozenset( 

101 { 

102 URL(url).origin() if isinstance(url, str) else url.origin() 

103 for url in treat_as_secure_origin 

104 } 

105 ) 

106 self._expire_heap: list[tuple[float, tuple[str, str, str]]] = [] 

107 self._expirations: dict[tuple[str, str, str], float] = {} 

108 

109 @property 

110 def unsafe(self) -> bool: 

111 return self._unsafe 

112 

113 @property 

114 def quote_cookie(self) -> bool: 

115 return self._quote_cookie 

116 

117 def save(self, file_path: PathLike) -> None: 

118 """Save cookies to a file using JSON format. 

119 

120 :param file_path: Path to file where cookies will be serialized, 

121 :class:`str` or :class:`pathlib.Path` instance. 

122 """ 

123 file_path = pathlib.Path(file_path) 

124 data: dict[str, dict[str, dict[str, str | bool]]] = {} 

125 for (domain, path), cookie in self._cookies.items(): 

126 key = f"{domain}|{path}" 

127 data[key] = {} 

128 for name, morsel in cookie.items(): 

129 morsel_data: dict[str, str | bool] = { 

130 "key": morsel.key, 

131 "value": morsel.value, 

132 "coded_value": morsel.coded_value, 

133 } 

134 # Save all morsel attributes that have values 

135 for attr in morsel._reserved: # type: ignore[attr-defined] 

136 attr_val = morsel[attr] 

137 if attr_val: 

138 morsel_data[attr] = attr_val 

139 data[key][name] = morsel_data 

140 with file_path.open(mode="w", encoding="utf-8") as f: 

141 json.dump(data, f, indent=2) 

142 

143 def load(self, file_path: PathLike) -> None: 

144 """Load cookies from a JSON file. 

145 

146 :param file_path: Path to file from where cookies will be 

147 imported, :class:`str` or :class:`pathlib.Path` instance. 

148 """ 

149 file_path = pathlib.Path(file_path) 

150 with file_path.open(mode="r", encoding="utf-8") as f: 

151 data = json.load(f) 

152 self._cookies = self._load_json_data(data) 

153 

154 def _load_json_data( 

155 self, data: dict[str, dict[str, dict[str, str | bool]]] 

156 ) -> defaultdict[tuple[str, str], SimpleCookie]: 

157 """Load cookies from parsed JSON data.""" 

158 cookies: defaultdict[tuple[str, str], SimpleCookie] = defaultdict(SimpleCookie) 

159 for compound_key, cookie_data in data.items(): 

160 domain, path = compound_key.split("|", 1) 

161 key = (domain, path) 

162 for name, morsel_data in cookie_data.items(): 

163 morsel: Morsel[str] = Morsel() 

164 morsel_key = morsel_data["key"] 

165 morsel_value = morsel_data["value"] 

166 morsel_coded_value = morsel_data["coded_value"] 

167 # Use __setstate__ to bypass validation, same pattern 

168 # used in _build_morsel and _cookie_helpers. 

169 morsel.__setstate__( # type: ignore[attr-defined] 

170 { 

171 "key": morsel_key, 

172 "value": morsel_value, 

173 "coded_value": morsel_coded_value, 

174 } 

175 ) 

176 # Restore morsel attributes 

177 for attr in morsel._reserved: # type: ignore[attr-defined] 

178 if attr in morsel_data and attr not in ( 

179 "key", 

180 "value", 

181 "coded_value", 

182 ): 

183 morsel[attr] = morsel_data[attr] 

184 cookies[key][name] = morsel 

185 return cookies 

186 

187 def clear(self, predicate: ClearCookiePredicate | None = None) -> None: 

188 if predicate is None: 

189 self._expire_heap.clear() 

190 self._cookies.clear() 

191 self._morsel_cache.clear() 

192 self._host_only_cookies.clear() 

193 self._expirations.clear() 

194 return 

195 

196 now = time.time() 

197 to_del = [ 

198 key 

199 for (domain, path), cookie in self._cookies.items() 

200 for name, morsel in cookie.items() 

201 if ( 

202 (key := (domain, path, name)) in self._expirations 

203 and self._expirations[key] <= now 

204 ) 

205 or predicate(morsel) 

206 ] 

207 if to_del: 

208 self._delete_cookies(to_del) 

209 

210 def clear_domain(self, domain: str) -> None: 

211 self.clear(lambda x: self._is_domain_match(domain, x["domain"])) 

212 

213 def __iter__(self) -> "Iterator[Morsel[str]]": 

214 self._do_expiration() 

215 for val in self._cookies.values(): 

216 yield from val.values() 

217 

218 def __len__(self) -> int: 

219 """Return number of cookies. 

220 

221 This function does not iterate self to avoid unnecessary expiration 

222 checks. 

223 """ 

224 return sum(len(cookie.values()) for cookie in self._cookies.values()) 

225 

226 def _do_expiration(self) -> None: 

227 """Remove expired cookies.""" 

228 if not (expire_heap_len := len(self._expire_heap)): 

229 return 

230 

231 # If the expiration heap grows larger than the number expirations 

232 # times two, we clean it up to avoid keeping expired entries in 

233 # the heap and consuming memory. We guard this with a minimum 

234 # threshold to avoid cleaning up the heap too often when there are 

235 # only a few scheduled expirations. 

236 if ( 

237 expire_heap_len > _MIN_SCHEDULED_COOKIE_EXPIRATION 

238 and expire_heap_len > len(self._expirations) * 2 

239 ): 

240 # Remove any expired entries from the expiration heap 

241 # that do not match the expiration time in the expirations 

242 # as it means the cookie has been re-added to the heap 

243 # with a different expiration time. 

244 self._expire_heap = [ 

245 entry 

246 for entry in self._expire_heap 

247 if self._expirations.get(entry[1]) == entry[0] 

248 ] 

249 heapq.heapify(self._expire_heap) 

250 

251 now = time.time() 

252 to_del: list[tuple[str, str, str]] = [] 

253 # Find any expired cookies and add them to the to-delete list 

254 while self._expire_heap: 

255 when, cookie_key = self._expire_heap[0] 

256 if when > now: 

257 break 

258 heapq.heappop(self._expire_heap) 

259 # Check if the cookie hasn't been re-added to the heap 

260 # with a different expiration time as it will be removed 

261 # later when it reaches the top of the heap and its 

262 # expiration time is met. 

263 if self._expirations.get(cookie_key) == when: 

264 to_del.append(cookie_key) 

265 

266 if to_del: 

267 self._delete_cookies(to_del) 

268 

269 def _delete_cookies(self, to_del: list[tuple[str, str, str]]) -> None: 

270 for domain, path, name in to_del: 

271 self._host_only_cookies.discard((domain, name)) 

272 self._cookies[(domain, path)].pop(name, None) 

273 self._morsel_cache[(domain, path)].pop(name, None) 

274 self._expirations.pop((domain, path, name), None) 

275 

276 def _expire_cookie(self, when: float, domain: str, path: str, name: str) -> None: 

277 cookie_key = (domain, path, name) 

278 if self._expirations.get(cookie_key) == when: 

279 # Avoid adding duplicates to the heap 

280 return 

281 heapq.heappush(self._expire_heap, (when, cookie_key)) 

282 self._expirations[cookie_key] = when 

283 

284 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None: 

285 """Update cookies.""" 

286 hostname = response_url.raw_host 

287 

288 if not self._unsafe and is_ip_address(hostname): 

289 # Don't accept cookies from IPs 

290 return 

291 

292 if isinstance(cookies, Mapping): 

293 cookies = cookies.items() 

294 

295 for name, cookie in cookies: 

296 if not isinstance(cookie, Morsel): 

297 tmp = SimpleCookie() 

298 tmp[name] = cookie # type: ignore[assignment] 

299 cookie = tmp[name] 

300 

301 domain = cookie["domain"] 

302 

303 # ignore domains with trailing dots 

304 if domain and domain[-1] == ".": 

305 domain = "" 

306 del cookie["domain"] 

307 

308 if not domain and hostname is not None: 

309 # Set the cookie's domain to the response hostname 

310 # and set its host-only-flag 

311 self._host_only_cookies.add((hostname, name)) 

312 domain = cookie["domain"] = hostname 

313 

314 if domain and domain[0] == ".": 

315 # Remove leading dot 

316 domain = domain[1:] 

317 cookie["domain"] = domain 

318 

319 if hostname and not self._is_domain_match(domain, hostname): 

320 # Setting cookies for different domains is not allowed 

321 continue 

322 

323 path = cookie["path"] 

324 if not path or path[0] != "/": 

325 # Set the cookie's path to the response path 

326 path = response_url.path 

327 if not path.startswith("/"): 

328 path = "/" 

329 else: 

330 # Cut everything from the last slash to the end 

331 path = "/" + path[1 : path.rfind("/")] 

332 cookie["path"] = path 

333 path = path.rstrip("/") 

334 

335 if max_age := cookie["max-age"]: 

336 try: 

337 delta_seconds = int(max_age) 

338 max_age_expiration = min(time.time() + delta_seconds, self.MAX_TIME) 

339 self._expire_cookie(max_age_expiration, domain, path, name) 

340 except ValueError: 

341 cookie["max-age"] = "" 

342 

343 elif expires := cookie["expires"]: 

344 if expire_time := self._parse_date(expires): 

345 self._expire_cookie(expire_time, domain, path, name) 

346 else: 

347 cookie["expires"] = "" 

348 

349 key = (domain, path) 

350 if self._cookies[key].get(name) != cookie: 

351 # Don't blow away the cache if the same 

352 # cookie gets set again 

353 self._cookies[key][name] = cookie 

354 self._morsel_cache[key].pop(name, None) 

355 

356 self._do_expiration() 

357 

358 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]": 

359 """Returns this jar's cookies filtered by their attributes.""" 

360 if not isinstance(request_url, URL): 

361 warnings.warn( # type: ignore[unreachable] 

362 f"The method accepts yarl.URL instances only, got {type(request_url)}", 

363 DeprecationWarning, 

364 ) 

365 request_url = URL(request_url) 

366 # We always use BaseCookie now since all 

367 # cookies set on on filtered are fully constructed 

368 # Morsels, not just names and values. 

369 filtered: BaseCookie[str] = BaseCookie() 

370 if not self._cookies: 

371 # Skip do_expiration() if there are no cookies. 

372 return filtered 

373 self._do_expiration() 

374 if not self._cookies: 

375 # Skip rest of function if no non-expired cookies. 

376 return filtered 

377 hostname = request_url.raw_host or "" 

378 

379 is_not_secure = request_url.scheme not in ("https", "wss") 

380 if is_not_secure and self._treat_as_secure_origin: 

381 request_origin = URL() 

382 with contextlib.suppress(ValueError): 

383 request_origin = request_url.origin() 

384 is_not_secure = request_origin not in self._treat_as_secure_origin 

385 

386 # Send shared cookie 

387 key = ("", "") 

388 for c in self._cookies[key].values(): 

389 # Check cache first 

390 if c.key in self._morsel_cache[key]: 

391 filtered[c.key] = self._morsel_cache[key][c.key] 

392 continue 

393 

394 # Build and cache the morsel 

395 mrsl_val = self._build_morsel(c) 

396 self._morsel_cache[key][c.key] = mrsl_val 

397 filtered[c.key] = mrsl_val 

398 

399 if is_ip_address(hostname): 

400 if not self._unsafe: 

401 return filtered 

402 domains: Iterable[str] = (hostname,) 

403 else: 

404 # Get all the subdomains that might match a cookie (e.g. "foo.bar.com", "bar.com", "com") 

405 domains = itertools.accumulate( 

406 reversed(hostname.split(".")), _FORMAT_DOMAIN_REVERSED 

407 ) 

408 

409 # Get all the path prefixes that might match a cookie (e.g. "", "/foo", "/foo/bar") 

410 paths = itertools.accumulate(request_url.path.split("/"), _FORMAT_PATH) 

411 # Create every combination of (domain, path) pairs. 

412 pairs = itertools.product(domains, paths) 

413 

414 path_len = len(request_url.path) 

415 # Point 2: https://www.rfc-editor.org/rfc/rfc6265.html#section-5.4 

416 for p in pairs: 

417 if p not in self._cookies: 

418 continue 

419 for name, cookie in self._cookies[p].items(): 

420 domain = cookie["domain"] 

421 

422 if (domain, name) in self._host_only_cookies and domain != hostname: 

423 continue 

424 

425 # Skip edge case when the cookie has a trailing slash but request doesn't. 

426 if len(cookie["path"]) > path_len: 

427 continue 

428 

429 if is_not_secure and cookie["secure"]: 

430 continue 

431 

432 # We already built the Morsel so reuse it here 

433 if name in self._morsel_cache[p]: 

434 filtered[name] = self._morsel_cache[p][name] 

435 continue 

436 

437 # Build and cache the morsel 

438 mrsl_val = self._build_morsel(cookie) 

439 self._morsel_cache[p][name] = mrsl_val 

440 filtered[name] = mrsl_val 

441 

442 return filtered 

443 

444 def _build_morsel(self, cookie: Morsel[str]) -> Morsel[str]: 

445 """Build a morsel for sending, respecting quote_cookie setting.""" 

446 if self._quote_cookie and cookie.coded_value and cookie.coded_value[0] == '"': 

447 return preserve_morsel_with_coded_value(cookie) 

448 morsel: Morsel[str] = Morsel() 

449 if self._quote_cookie: 

450 value, coded_value = _SIMPLE_COOKIE.value_encode(cookie.value) 

451 else: 

452 coded_value = value = cookie.value 

453 # We use __setstate__ instead of the public set() API because it allows us to 

454 # bypass validation and set already validated state. This is more stable than 

455 # setting protected attributes directly. 

456 morsel.__setstate__({"key": cookie.key, "value": value, "coded_value": coded_value}) # type: ignore[attr-defined] 

457 return morsel 

458 

459 @staticmethod 

460 def _is_domain_match(domain: str, hostname: str) -> bool: 

461 """Implements domain matching adhering to RFC 6265.""" 

462 if hostname == domain: 

463 return True 

464 

465 if not hostname.endswith(domain): 

466 return False 

467 

468 non_matching = hostname[: -len(domain)] 

469 

470 if not non_matching.endswith("."): 

471 return False 

472 

473 return not is_ip_address(hostname) 

474 

475 @classmethod 

476 def _parse_date(cls, date_str: str) -> int | None: 

477 """Implements date string parsing adhering to RFC 6265.""" 

478 if not date_str: 

479 return None 

480 

481 found_time = False 

482 found_day = False 

483 found_month = False 

484 found_year = False 

485 

486 hour = minute = second = 0 

487 day = 0 

488 month = 0 

489 year = 0 

490 

491 for token_match in cls.DATE_TOKENS_RE.finditer(date_str): 

492 token = token_match.group("token") 

493 

494 if not found_time: 

495 time_match = cls.DATE_HMS_TIME_RE.match(token) 

496 if time_match: 

497 found_time = True 

498 hour, minute, second = (int(s) for s in time_match.groups()) 

499 continue 

500 

501 if not found_day: 

502 day_match = cls.DATE_DAY_OF_MONTH_RE.match(token) 

503 if day_match: 

504 found_day = True 

505 day = int(day_match.group()) 

506 continue 

507 

508 if not found_month: 

509 month_match = cls.DATE_MONTH_RE.match(token) 

510 if month_match: 

511 found_month = True 

512 assert month_match.lastindex is not None 

513 month = month_match.lastindex 

514 continue 

515 

516 if not found_year: 

517 year_match = cls.DATE_YEAR_RE.match(token) 

518 if year_match: 

519 found_year = True 

520 year = int(year_match.group()) 

521 

522 if 70 <= year <= 99: 

523 year += 1900 

524 elif 0 <= year <= 69: 

525 year += 2000 

526 

527 if False in (found_day, found_month, found_year, found_time): 

528 return None 

529 

530 if not 1 <= day <= 31: 

531 return None 

532 

533 if year < 1601 or hour > 23 or minute > 59 or second > 59: 

534 return None 

535 

536 return calendar.timegm((year, month, day, hour, minute, second, -1, -1, -1)) 

537 

538 

539class DummyCookieJar(AbstractCookieJar): 

540 """Implements a dummy cookie storage. 

541 

542 It can be used with the ClientSession when no cookie processing is needed. 

543 

544 """ 

545 

546 def __iter__(self) -> "Iterator[Morsel[str]]": 

547 while False: 

548 yield None # type: ignore[unreachable] 

549 

550 def __len__(self) -> int: 

551 return 0 

552 

553 @property 

554 def unsafe(self) -> bool: 

555 return False 

556 

557 @property 

558 def quote_cookie(self) -> bool: 

559 return True 

560 

561 def clear(self, predicate: ClearCookiePredicate | None = None) -> None: 

562 pass 

563 

564 def clear_domain(self, domain: str) -> None: 

565 pass 

566 

567 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None: 

568 pass 

569 

570 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]": 

571 return SimpleCookie()