Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/cookiejar.py: 22%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import calendar
2import contextlib
3import datetime
4import heapq
5import itertools
6import json
7import os
8import pathlib
9import re
10import time
11import warnings
12from collections import defaultdict
13from collections.abc import Iterable, Iterator, Mapping
14from http.cookies import BaseCookie, Morsel, SimpleCookie
15from types import MappingProxyType
16from typing import Union
18from yarl import URL
20from ._cookie_helpers import preserve_morsel_with_coded_value
21from .abc import AbstractCookieJar, ClearCookiePredicate
22from .helpers import is_ip_address
23from .typedefs import LooseCookies, PathLike, StrOrURL
25__all__ = ("CookieJar", "DummyCookieJar")
28CookieItem = Union[str, "Morsel[str]"]
30# We cache these string methods here as their use is in performance critical code.
31_FORMAT_PATH = "{}/{}".format
32_FORMAT_DOMAIN_REVERSED = "{1}.{0}".format
34# The minimum number of scheduled cookie expirations before we start cleaning up
35# the expiration heap. This is a performance optimization to avoid cleaning up the
36# heap too often when there are only a few scheduled expirations.
37_MIN_SCHEDULED_COOKIE_EXPIRATION = 100
38_SIMPLE_COOKIE = SimpleCookie()
40# Not persisted; the absolute deadline is saved instead.
41_RELATIVE_EXPIRY_ATTRS = frozenset(("max-age", "expires"))
44class CookieJar(AbstractCookieJar):
45 """Implements cookie storage adhering to RFC 6265."""
47 DATE_TOKENS_RE = re.compile(
48 r"[\x09\x20-\x2F\x3B-\x40\x5B-\x60\x7B-\x7E]*"
49 r"(?P<token>[\x00-\x08\x0A-\x1F\d:a-zA-Z\x7F-\xFF]+)"
50 )
52 DATE_HMS_TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})")
54 DATE_DAY_OF_MONTH_RE = re.compile(r"(\d{1,2})")
56 DATE_MONTH_RE = re.compile(
57 "(jan)|(feb)|(mar)|(apr)|(may)|(jun)|(jul)|(aug)|(sep)|(oct)|(nov)|(dec)",
58 re.I,
59 )
61 DATE_YEAR_RE = re.compile(r"(\d{2,4})")
63 # calendar.timegm() fails for timestamps after datetime.datetime.max
64 # Minus one as a loss of precision occurs when timestamp() is called.
65 MAX_TIME = (
66 int(datetime.datetime.max.replace(tzinfo=datetime.timezone.utc).timestamp()) - 1
67 )
68 try:
69 calendar.timegm(time.gmtime(MAX_TIME))
70 except OSError:
71 # Hit the maximum representable time on Windows
72 # https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/localtime-localtime32-localtime64
73 MAX_TIME = calendar.timegm((3000, 12, 31, 23, 59, 59, -1, -1, -1))
74 except OverflowError:
75 # #4515: datetime.max may not be representable on 32-bit platforms
76 MAX_TIME = 2**31 - 1
77 # Avoid minuses in the future, 3x faster
78 SUB_MAX_TIME = MAX_TIME - 1
80 def __init__(
81 self,
82 *,
83 unsafe: bool = False,
84 quote_cookie: bool = True,
85 treat_as_secure_origin: StrOrURL | Iterable[StrOrURL] | None = None,
86 ) -> None:
87 self._cookies: defaultdict[tuple[str, str], SimpleCookie] = defaultdict(
88 SimpleCookie
89 )
90 self._morsel_cache: defaultdict[tuple[str, str], dict[str, Morsel[str]]] = (
91 defaultdict(dict)
92 )
93 self._host_only_cookies: set[tuple[str, str]] = set()
94 self._unsafe = unsafe
95 self._quote_cookie = quote_cookie
96 if treat_as_secure_origin is None:
97 self._treat_as_secure_origin: frozenset[URL] = frozenset()
98 elif isinstance(treat_as_secure_origin, URL):
99 self._treat_as_secure_origin = frozenset({treat_as_secure_origin.origin()})
100 elif isinstance(treat_as_secure_origin, str):
101 self._treat_as_secure_origin = frozenset(
102 {URL(treat_as_secure_origin).origin()}
103 )
104 else:
105 self._treat_as_secure_origin = frozenset(
106 {
107 URL(url).origin() if isinstance(url, str) else url.origin()
108 for url in treat_as_secure_origin
109 }
110 )
111 self._expire_heap: list[tuple[float, tuple[str, str, str]]] = []
112 self._expirations: dict[tuple[str, str, str], float] = {}
114 @property
115 def unsafe(self) -> bool:
116 return self._unsafe
118 @property
119 def quote_cookie(self) -> bool:
120 return self._quote_cookie
122 @property
123 def cookies(self) -> MappingProxyType[tuple[str, str], SimpleCookie]:
124 """Return the cookies stored in this jar."""
125 return MappingProxyType(self._cookies)
127 @property
128 def host_only_cookies(self) -> frozenset[tuple[str, str]]:
129 """Return the host-only cookies stored in this jar."""
130 return frozenset(self._host_only_cookies)
132 def save(self, file_path: PathLike) -> None:
133 """Save cookies to a file using JSON format.
135 :param file_path: Path to file where cookies will be serialized,
136 :class:`str` or :class:`pathlib.Path` instance.
137 """
138 file_path = pathlib.Path(file_path)
139 data: dict[str, dict[str, dict[str, str | bool | float]]] = {}
140 for (domain, path), cookie in self._cookies.items():
141 key = f"{domain}|{path}"
142 data[key] = {}
143 for name, morsel in cookie.items():
144 morsel_data: dict[str, str | bool | float] = {
145 "key": morsel.key,
146 "value": morsel.value,
147 "coded_value": morsel.coded_value,
148 }
149 # Skip relative expiry; the absolute deadline is saved below.
150 for attr in morsel._reserved: # type: ignore[attr-defined]
151 if attr in _RELATIVE_EXPIRY_ATTRS:
152 continue
153 attr_val = morsel[attr]
154 if attr_val:
155 morsel_data[attr] = attr_val
156 # Persist or it reloads as a domain cookie and leaks to subdomains.
157 if (domain, name) in self._host_only_cookies:
158 morsel_data["host_only"] = True
159 if (exp := self._expirations.get((domain, path, name))) is not None:
160 morsel_data["expires_timestamp"] = exp
161 data[key][name] = morsel_data
163 # Cookie persistence may include authentication/session tokens.
164 # Use 0o600 at creation time to avoid umask-dependent overexposure
165 # and enforce least-privilege access to sensitive credential data.
166 with open(
167 file_path,
168 mode="w",
169 encoding="utf-8",
170 opener=lambda path, flags: os.open(path, flags, 0o600),
171 ) as f:
172 json.dump(data, f, indent=2)
174 def load(self, file_path: PathLike) -> None:
175 """Load cookies from a JSON file.
177 Replaces the current jar contents; loaded cookies pass through the
178 same acceptance rules as :meth:`update_cookies`.
180 :param file_path: Path to file from where cookies will be
181 imported, :class:`str` or :class:`pathlib.Path` instance.
182 """
183 file_path = pathlib.Path(file_path)
184 with file_path.open(mode="r", encoding="utf-8") as f:
185 data = json.load(f)
186 self._load_json_data(data)
188 def _load_json_data(
189 self, data: dict[str, dict[str, dict[str, str | bool | float]]]
190 ) -> None:
191 """Replace contents, routing cookies through update_cookies()."""
192 self.clear()
193 for compound_key, cookie_data in data.items():
194 domain, path = compound_key.split("|", 1)
195 for name, morsel_data in cookie_data.items():
196 morsel: Morsel[str] = Morsel()
197 # Use __setstate__ to bypass validation, same pattern
198 # used in _build_morsel and _cookie_helpers.
199 morsel.__setstate__( # type: ignore[attr-defined]
200 {
201 "key": morsel_data["key"],
202 "value": morsel_data["value"],
203 "coded_value": morsel_data["coded_value"],
204 }
205 )
206 # Restore morsel attributes
207 for attr in morsel._reserved: # type: ignore[attr-defined]
208 if attr in morsel_data and attr not in (
209 "key",
210 "value",
211 "coded_value",
212 ):
213 morsel[attr] = morsel_data[attr]
214 # Drop the domain so update_cookies() re-marks it host-only.
215 if morsel_data.get("host_only"):
216 morsel["domain"] = ""
217 response_url = (
218 URL.build(scheme="https", host=domain) if domain else URL()
219 )
220 self.update_cookies({name: morsel}, response_url)
221 # Restore the absolute deadline; update_cookies() schedules none.
222 if (exp := morsel_data.get("expires_timestamp")) is not None:
223 self._expire_cookie(float(exp), domain, path, name)
224 self._do_expiration()
226 def clear(self, predicate: ClearCookiePredicate | None = None) -> None:
227 if predicate is None:
228 self._expire_heap.clear()
229 self._cookies.clear()
230 self._morsel_cache.clear()
231 self._host_only_cookies.clear()
232 self._expirations.clear()
233 return
235 now = time.time()
236 to_del = [
237 key
238 for (domain, path), cookie in self._cookies.items()
239 for name, morsel in cookie.items()
240 if (
241 (key := (domain, path, name)) in self._expirations
242 and self._expirations[key] <= now
243 )
244 or predicate(morsel)
245 ]
246 if to_del:
247 self._delete_cookies(to_del)
249 def clear_domain(self, domain: str) -> None:
250 self.clear(lambda x: self._is_domain_match(domain, x["domain"]))
252 def __iter__(self) -> "Iterator[Morsel[str]]":
253 self._do_expiration()
254 for val in self._cookies.values():
255 yield from val.values()
257 def __len__(self) -> int:
258 """Return number of cookies.
260 This function does not iterate self to avoid unnecessary expiration
261 checks.
262 """
263 return sum(len(cookie.values()) for cookie in self._cookies.values())
265 def _do_expiration(self) -> None:
266 """Remove expired cookies."""
267 if not (expire_heap_len := len(self._expire_heap)):
268 return
270 # If the expiration heap grows larger than the number expirations
271 # times two, we clean it up to avoid keeping expired entries in
272 # the heap and consuming memory. We guard this with a minimum
273 # threshold to avoid cleaning up the heap too often when there are
274 # only a few scheduled expirations.
275 if (
276 expire_heap_len > _MIN_SCHEDULED_COOKIE_EXPIRATION
277 and expire_heap_len > len(self._expirations) * 2
278 ):
279 # Remove any expired entries from the expiration heap
280 # that do not match the expiration time in the expirations
281 # as it means the cookie has been re-added to the heap
282 # with a different expiration time.
283 self._expire_heap = [
284 entry
285 for entry in self._expire_heap
286 if self._expirations.get(entry[1]) == entry[0]
287 ]
288 heapq.heapify(self._expire_heap)
290 now = time.time()
291 to_del: list[tuple[str, str, str]] = []
292 # Find any expired cookies and add them to the to-delete list
293 while self._expire_heap:
294 when, cookie_key = self._expire_heap[0]
295 if when > now:
296 break
297 heapq.heappop(self._expire_heap)
298 # Check if the cookie hasn't been re-added to the heap
299 # with a different expiration time as it will be removed
300 # later when it reaches the top of the heap and its
301 # expiration time is met.
302 if self._expirations.get(cookie_key) == when:
303 to_del.append(cookie_key)
305 if to_del:
306 self._delete_cookies(to_del)
308 def _delete_cookies(self, to_del: list[tuple[str, str, str]]) -> None:
309 for domain, path, name in to_del:
310 self._host_only_cookies.discard((domain, name))
311 self._cookies[(domain, path)].pop(name, None)
312 self._morsel_cache[(domain, path)].pop(name, None)
313 self._expirations.pop((domain, path, name), None)
315 def _expire_cookie(self, when: float, domain: str, path: str, name: str) -> None:
316 cookie_key = (domain, path, name)
317 if self._expirations.get(cookie_key) == when:
318 # Avoid adding duplicates to the heap
319 return
320 heapq.heappush(self._expire_heap, (when, cookie_key))
321 self._expirations[cookie_key] = when
323 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:
324 """Update cookies."""
325 hostname = response_url.raw_host
327 if not self._unsafe and is_ip_address(hostname):
328 # Don't accept cookies from IPs
329 return
331 if isinstance(cookies, Mapping):
332 cookies = cookies.items()
334 for name, cookie in cookies:
335 if not isinstance(cookie, Morsel):
336 tmp = SimpleCookie()
337 tmp[name] = cookie # type: ignore[assignment]
338 cookie = tmp[name]
340 domain = cookie["domain"]
342 # ignore domains with trailing dots
343 if domain and domain[-1] == ".":
344 domain = ""
345 del cookie["domain"]
347 if not domain and hostname is not None:
348 # Set the cookie's domain to the response hostname
349 # and set its host-only-flag
350 self._host_only_cookies.add((hostname, name))
351 domain = cookie["domain"] = hostname
353 if domain and domain[0] == ".":
354 # Remove leading dot
355 domain = domain[1:]
356 cookie["domain"] = domain
358 if hostname and not self._is_domain_match(domain, hostname):
359 # Setting cookies for different domains is not allowed
360 continue
362 path = cookie["path"]
363 if not path or path[0] != "/":
364 # Set the cookie's path to the response path
365 path = response_url.path
366 if not path.startswith("/"):
367 path = "/"
368 else:
369 # Cut everything from the last slash to the end
370 path = "/" + path[1 : path.rfind("/")]
371 cookie["path"] = path
372 path = path.rstrip("/")
374 if max_age := cookie["max-age"]:
375 try:
376 delta_seconds = int(max_age)
377 max_age_expiration = min(time.time() + delta_seconds, self.MAX_TIME)
378 self._expire_cookie(max_age_expiration, domain, path, name)
379 except ValueError:
380 cookie["max-age"] = ""
382 elif expires := cookie["expires"]:
383 if expire_time := self._parse_date(expires):
384 self._expire_cookie(expire_time, domain, path, name)
385 else:
386 cookie["expires"] = ""
388 key = (domain, path)
389 if self._cookies[key].get(name) != cookie:
390 # Don't blow away the cache if the same
391 # cookie gets set again
392 self._cookies[key][name] = cookie
393 self._morsel_cache[key].pop(name, None)
395 self._do_expiration()
397 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]":
398 """Returns this jar's cookies filtered by their attributes."""
399 if not isinstance(request_url, URL):
400 warnings.warn( # type: ignore[unreachable]
401 f"The method accepts yarl.URL instances only, got {type(request_url)}",
402 DeprecationWarning,
403 )
404 request_url = URL(request_url)
405 # We always use BaseCookie now since all
406 # cookies set on on filtered are fully constructed
407 # Morsels, not just names and values.
408 filtered: BaseCookie[str] = BaseCookie()
409 if not self._cookies:
410 # Skip do_expiration() if there are no cookies.
411 return filtered
412 self._do_expiration()
413 if not self._cookies:
414 # Skip rest of function if no non-expired cookies.
415 return filtered
416 hostname = request_url.raw_host or ""
418 is_not_secure = request_url.scheme not in ("https", "wss")
419 if is_not_secure and self._treat_as_secure_origin:
420 request_origin = URL()
421 with contextlib.suppress(ValueError):
422 request_origin = request_url.origin()
423 is_not_secure = request_origin not in self._treat_as_secure_origin
425 # Send shared cookie
426 key = ("", "")
427 for c in self._cookies[key].values():
428 # Check cache first
429 if c.key in self._morsel_cache[key]:
430 filtered[c.key] = self._morsel_cache[key][c.key]
431 continue
433 # Build and cache the morsel
434 mrsl_val = self._build_morsel(c)
435 self._morsel_cache[key][c.key] = mrsl_val
436 filtered[c.key] = mrsl_val
438 if is_ip_address(hostname):
439 if not self._unsafe:
440 return filtered
441 domains: Iterable[str] = (hostname,)
442 else:
443 # Get all the subdomains that might match a cookie (e.g. "foo.bar.com", "bar.com", "com")
444 domains = itertools.accumulate(
445 reversed(hostname.split(".")), _FORMAT_DOMAIN_REVERSED
446 )
448 # Get all the path prefixes that might match a cookie (e.g. "", "/foo", "/foo/bar")
449 paths = itertools.accumulate(request_url.path.split("/"), _FORMAT_PATH)
450 # Create every combination of (domain, path) pairs.
451 pairs = itertools.product(domains, paths)
453 path_len = len(request_url.path)
454 # Point 2: https://www.rfc-editor.org/rfc/rfc6265.html#section-5.4
455 for p in pairs:
456 if p not in self._cookies:
457 continue
458 for name, cookie in self._cookies[p].items():
459 domain = cookie["domain"]
461 if (domain, name) in self._host_only_cookies and domain != hostname:
462 continue
464 # Skip edge case when the cookie has a trailing slash but request doesn't.
465 if len(cookie["path"]) > path_len:
466 continue
468 if is_not_secure and cookie["secure"]:
469 continue
471 # We already built the Morsel so reuse it here
472 if name in self._morsel_cache[p]:
473 filtered[name] = self._morsel_cache[p][name]
474 continue
476 # Build and cache the morsel
477 mrsl_val = self._build_morsel(cookie)
478 self._morsel_cache[p][name] = mrsl_val
479 filtered[name] = mrsl_val
481 return filtered
483 def _build_morsel(self, cookie: Morsel[str]) -> Morsel[str]:
484 """Build a morsel for sending, respecting quote_cookie setting."""
485 if self._quote_cookie and cookie.coded_value and cookie.coded_value[0] == '"':
486 return preserve_morsel_with_coded_value(cookie)
487 morsel: Morsel[str] = Morsel()
488 if self._quote_cookie:
489 value, coded_value = _SIMPLE_COOKIE.value_encode(cookie.value)
490 else:
491 coded_value = value = cookie.value
492 # We use __setstate__ instead of the public set() API because it allows us to
493 # bypass validation and set already validated state. This is more stable than
494 # setting protected attributes directly.
495 morsel.__setstate__({"key": cookie.key, "value": value, "coded_value": coded_value}) # type: ignore[attr-defined]
496 return morsel
498 @staticmethod
499 def _is_domain_match(domain: str, hostname: str) -> bool:
500 """Implements domain matching adhering to RFC 6265."""
501 if hostname == domain:
502 return True
504 if not hostname.endswith(domain):
505 return False
507 non_matching = hostname[: -len(domain)]
509 if not non_matching.endswith("."):
510 return False
512 return not is_ip_address(hostname)
514 @classmethod
515 def _parse_date(cls, date_str: str) -> int | None:
516 """Implements date string parsing adhering to RFC 6265."""
517 if not date_str:
518 return None
520 found_time = False
521 found_day = False
522 found_month = False
523 found_year = False
525 hour = minute = second = 0
526 day = 0
527 month = 0
528 year = 0
530 for token_match in cls.DATE_TOKENS_RE.finditer(date_str):
531 token = token_match.group("token")
533 if not found_time:
534 time_match = cls.DATE_HMS_TIME_RE.match(token)
535 if time_match:
536 found_time = True
537 hour, minute, second = (int(s) for s in time_match.groups())
538 continue
540 if not found_day:
541 day_match = cls.DATE_DAY_OF_MONTH_RE.match(token)
542 if day_match:
543 found_day = True
544 day = int(day_match.group())
545 continue
547 if not found_month:
548 month_match = cls.DATE_MONTH_RE.match(token)
549 if month_match:
550 found_month = True
551 assert month_match.lastindex is not None
552 month = month_match.lastindex
553 continue
555 if not found_year:
556 year_match = cls.DATE_YEAR_RE.match(token)
557 if year_match:
558 found_year = True
559 year = int(year_match.group())
561 if 70 <= year <= 99:
562 year += 1900
563 elif 0 <= year <= 69:
564 year += 2000
566 if False in (found_day, found_month, found_year, found_time):
567 return None
569 if not 1 <= day <= 31:
570 return None
572 if year < 1601 or hour > 23 or minute > 59 or second > 59:
573 return None
575 return calendar.timegm((year, month, day, hour, minute, second, -1, -1, -1))
578class DummyCookieJar(AbstractCookieJar):
579 """Implements a dummy cookie storage.
581 It can be used with the ClientSession when no cookie processing is needed.
583 """
585 def __iter__(self) -> "Iterator[Morsel[str]]":
586 while False:
587 yield None # type: ignore[unreachable]
589 def __len__(self) -> int:
590 return 0
592 @property
593 def unsafe(self) -> bool:
594 return False
596 @property
597 def quote_cookie(self) -> bool:
598 return True
600 @property
601 def cookies(self) -> MappingProxyType[tuple[str, str], SimpleCookie]:
602 """Return an empty mapping."""
603 return MappingProxyType({})
605 @property
606 def host_only_cookies(self) -> frozenset[tuple[str, str]]:
607 """Return an empty frozenset."""
608 return frozenset()
610 def clear(self, predicate: ClearCookiePredicate | None = None) -> None:
611 pass
613 def clear_domain(self, domain: str) -> None:
614 pass
616 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:
617 pass
619 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]":
620 return SimpleCookie()