Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/cookiejar.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import calendar
3import contextlib
4import datetime
5import heapq
6import itertools
7import json
8import os
9import pathlib
10import pickle
11import re
12import time
13import warnings
14from collections import defaultdict
15from collections.abc import Iterable, Iterator, Mapping
16from http.cookies import BaseCookie, Morsel, SimpleCookie
17from types import MappingProxyType
18from typing import Union
20from yarl import URL
22from ._cookie_helpers import preserve_morsel_with_coded_value
23from .abc import AbstractCookieJar, ClearCookiePredicate
24from .helpers import is_ip_address
25from .typedefs import LooseCookies, PathLike, StrOrURL
27__all__ = ("CookieJar", "DummyCookieJar")
30CookieItem = Union[str, "Morsel[str]"]
32# We cache these string methods here as their use is in performance critical code.
33_FORMAT_PATH = "{}/{}".format
34_FORMAT_DOMAIN_REVERSED = "{1}.{0}".format
36# The minimum number of scheduled cookie expirations before we start cleaning up
37# the expiration heap. This is a performance optimization to avoid cleaning up the
38# heap too often when there are only a few scheduled expirations.
39_MIN_SCHEDULED_COOKIE_EXPIRATION = 100
40_SIMPLE_COOKIE = SimpleCookie()
42# Not persisted; the absolute deadline is saved instead.
43_RELATIVE_EXPIRY_ATTRS = frozenset(("max-age", "expires"))
46class _RestrictedCookieUnpickler(pickle._Unpickler):
47 """A restricted unpickler that only allows cookie-related types.
49 This prevents arbitrary code execution when loading pickled cookie data
50 from untrusted sources. Only types that are expected in a serialized
51 CookieJar are permitted.
53 Subclasses :class:`pickle._Unpickler` (the pure-Python implementation)
54 rather than :class:`pickle.Unpickler` because the accelerated unpickler
55 on some implementations (notably PyPy) does not dispatch through
56 :meth:`find_class` overrides.
58 See: https://docs.python.org/3/library/pickle.html#restricting-globals
59 """
61 _ALLOWED_CLASSES: frozenset[tuple[str, str]] = frozenset(
62 {
63 # Core cookie types
64 ("http.cookies", "SimpleCookie"),
65 ("http.cookies", "Morsel"),
66 # Container types used by CookieJar._cookies
67 ("collections", "defaultdict"),
68 # builtins that pickle uses for reconstruction
69 ("builtins", "tuple"),
70 ("builtins", "set"),
71 ("builtins", "frozenset"),
72 ("builtins", "dict"),
73 }
74 )
76 def find_class(self, module: str, name: str) -> type:
77 if (module, name) not in self._ALLOWED_CLASSES:
78 raise pickle.UnpicklingError(
79 f"Forbidden class: {module}.{name}. "
80 "CookieJar.load() only allows cookie-related types for security. "
81 "See https://docs.python.org/3/library/pickle.html#restricting-globals"
82 )
83 return super().find_class(module, name) # type: ignore[no-any-return]
86class CookieJar(AbstractCookieJar):
87 """Implements cookie storage adhering to RFC 6265."""
89 DATE_TOKENS_RE = re.compile(
90 r"[\x09\x20-\x2F\x3B-\x40\x5B-\x60\x7B-\x7E]*"
91 r"(?P<token>[\x00-\x08\x0A-\x1F\d:a-zA-Z\x7F-\xFF]+)"
92 )
94 DATE_HMS_TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})")
96 DATE_DAY_OF_MONTH_RE = re.compile(r"(\d{1,2})")
98 DATE_MONTH_RE = re.compile(
99 "(jan)|(feb)|(mar)|(apr)|(may)|(jun)|(jul)|(aug)|(sep)|(oct)|(nov)|(dec)",
100 re.I,
101 )
103 DATE_YEAR_RE = re.compile(r"(\d{2,4})")
105 # calendar.timegm() fails for timestamps after datetime.datetime.max
106 # Minus one as a loss of precision occurs when timestamp() is called.
107 MAX_TIME = (
108 int(datetime.datetime.max.replace(tzinfo=datetime.timezone.utc).timestamp()) - 1
109 )
110 try:
111 calendar.timegm(time.gmtime(MAX_TIME))
112 except OSError:
113 # Hit the maximum representable time on Windows
114 # https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/localtime-localtime32-localtime64
115 MAX_TIME = calendar.timegm((3000, 12, 31, 23, 59, 59, -1, -1, -1))
116 except OverflowError:
117 # #4515: datetime.max may not be representable on 32-bit platforms
118 MAX_TIME = 2**31 - 1
119 # Avoid minuses in the future, 3x faster
120 SUB_MAX_TIME = MAX_TIME - 1
122 def __init__(
123 self,
124 *,
125 unsafe: bool = False,
126 quote_cookie: bool = True,
127 treat_as_secure_origin: StrOrURL | list[StrOrURL] | None = None,
128 loop: asyncio.AbstractEventLoop | None = None,
129 ) -> None:
130 super().__init__(loop=loop)
131 self._cookies: defaultdict[tuple[str, str], SimpleCookie] = defaultdict(
132 SimpleCookie
133 )
134 self._morsel_cache: defaultdict[tuple[str, str], dict[str, Morsel[str]]] = (
135 defaultdict(dict)
136 )
137 self._host_only_cookies: set[tuple[str, str]] = set()
138 self._unsafe = unsafe
139 self._quote_cookie = quote_cookie
140 if treat_as_secure_origin is None:
141 treat_as_secure_origin = []
142 elif isinstance(treat_as_secure_origin, URL):
143 treat_as_secure_origin = [treat_as_secure_origin.origin()]
144 elif isinstance(treat_as_secure_origin, str):
145 treat_as_secure_origin = [URL(treat_as_secure_origin).origin()]
146 else:
147 treat_as_secure_origin = [
148 URL(url).origin() if isinstance(url, str) else url.origin()
149 for url in treat_as_secure_origin
150 ]
151 self._treat_as_secure_origin = treat_as_secure_origin
152 self._expire_heap: list[tuple[float, tuple[str, str, str]]] = []
153 self._expirations: dict[tuple[str, str, str], float] = {}
155 @property
156 def unsafe(self) -> bool:
157 return self._unsafe
159 @property
160 def quote_cookie(self) -> bool:
161 return self._quote_cookie
163 @property
164 def cookies(self) -> MappingProxyType[tuple[str, str], SimpleCookie]:
165 """Return the cookies stored in this jar."""
166 return MappingProxyType(self._cookies)
168 @property
169 def host_only_cookies(self) -> frozenset[tuple[str, str]]:
170 """Return the host-only cookies stored in this jar."""
171 return frozenset(self._host_only_cookies)
173 def save(self, file_path: PathLike) -> None:
174 """Save cookies to a file using JSON format.
176 :param file_path: Path to file where cookies will be serialized,
177 :class:`str` or :class:`pathlib.Path` instance.
178 """
179 file_path = pathlib.Path(file_path)
180 data: dict[str, dict[str, dict[str, str | bool | float]]] = {}
181 for (domain, path), cookie in self._cookies.items():
182 key = f"{domain}|{path}"
183 data[key] = {}
184 for name, morsel in cookie.items():
185 morsel_data: dict[str, str | bool | float] = {
186 "key": morsel.key,
187 "value": morsel.value,
188 "coded_value": morsel.coded_value,
189 }
190 # Skip relative expiry; the absolute deadline is saved below.
191 for attr in morsel._reserved: # type: ignore[attr-defined]
192 if attr in _RELATIVE_EXPIRY_ATTRS:
193 continue
194 attr_val = morsel[attr]
195 if attr_val:
196 morsel_data[attr] = attr_val
197 # Persist or it reloads as a domain cookie and leaks to subdomains.
198 if (domain, name) in self._host_only_cookies:
199 morsel_data["host_only"] = True
200 if (exp := self._expirations.get((domain, path, name))) is not None:
201 morsel_data["expires_timestamp"] = exp
202 data[key][name] = morsel_data
204 # Cookie persistence may include authentication/session tokens.
205 # Use 0o600 at creation time to avoid umask-dependent overexposure
206 # and enforce least-privilege access to sensitive credential data.
207 with open(
208 file_path,
209 mode="w",
210 encoding="utf-8",
211 opener=lambda path, flags: os.open(path, flags, 0o600),
212 ) as f:
213 json.dump(data, f, indent=2)
215 def load(self, file_path: PathLike) -> None:
216 """Load cookies from a file.
218 Tries to load JSON format first. Falls back to loading legacy
219 pickle format (using a restricted unpickler) for backward
220 compatibility with existing cookie files.
222 Replaces the current jar contents; loaded cookies pass through the
223 same acceptance rules as :meth:`update_cookies`.
225 :param file_path: Path to file from where cookies will be
226 imported, :class:`str` or :class:`pathlib.Path` instance.
227 """
228 file_path = pathlib.Path(file_path)
229 # Try JSON format first
230 try:
231 with file_path.open(mode="r", encoding="utf-8") as f:
232 data = json.load(f)
233 self._load_json_data(data)
234 except (json.JSONDecodeError, UnicodeDecodeError, ValueError):
235 # Fall back to legacy pickle format with restricted unpickler
236 with file_path.open(mode="rb") as f:
237 self._cookies = _RestrictedCookieUnpickler(f).load()
239 def _load_json_data(
240 self, data: dict[str, dict[str, dict[str, str | bool | float]]]
241 ) -> None:
242 """Replace contents, routing cookies through update_cookies()."""
243 self.clear()
244 for compound_key, cookie_data in data.items():
245 domain, path = compound_key.split("|", 1)
246 for name, morsel_data in cookie_data.items():
247 morsel: Morsel[str] = Morsel()
248 # Use __setstate__ to bypass validation, same pattern
249 # used in _build_morsel and _cookie_helpers.
250 morsel.__setstate__( # type: ignore[attr-defined]
251 {
252 "key": morsel_data["key"],
253 "value": morsel_data["value"],
254 "coded_value": morsel_data["coded_value"],
255 }
256 )
257 # Restore morsel attributes
258 for attr in morsel._reserved: # type: ignore[attr-defined]
259 if attr in morsel_data and attr not in (
260 "key",
261 "value",
262 "coded_value",
263 ):
264 morsel[attr] = morsel_data[attr]
265 # Drop the domain so update_cookies() re-marks it host-only.
266 if morsel_data.get("host_only"):
267 morsel["domain"] = ""
268 response_url = (
269 URL.build(scheme="https", host=domain) if domain else URL()
270 )
271 self.update_cookies({name: morsel}, response_url)
272 # Restore the absolute deadline; update_cookies() schedules none.
273 if (exp := morsel_data.get("expires_timestamp")) is not None:
274 self._expire_cookie(float(exp), domain, path, name)
275 self._do_expiration()
277 def clear(self, predicate: ClearCookiePredicate | None = None) -> None:
278 if predicate is None:
279 self._expire_heap.clear()
280 self._cookies.clear()
281 self._morsel_cache.clear()
282 self._host_only_cookies.clear()
283 self._expirations.clear()
284 return
286 now = time.time()
287 to_del = [
288 key
289 for (domain, path), cookie in self._cookies.items()
290 for name, morsel in cookie.items()
291 if (
292 (key := (domain, path, name)) in self._expirations
293 and self._expirations[key] <= now
294 )
295 or predicate(morsel)
296 ]
297 if to_del:
298 self._delete_cookies(to_del)
300 def clear_domain(self, domain: str) -> None:
301 self.clear(lambda x: self._is_domain_match(domain, x["domain"]))
303 def __iter__(self) -> "Iterator[Morsel[str]]":
304 self._do_expiration()
305 for val in self._cookies.values():
306 yield from val.values()
308 def __len__(self) -> int:
309 """Return number of cookies.
311 This function does not iterate self to avoid unnecessary expiration
312 checks.
313 """
314 return sum(len(cookie.values()) for cookie in self._cookies.values())
316 def _do_expiration(self) -> None:
317 """Remove expired cookies."""
318 if not (expire_heap_len := len(self._expire_heap)):
319 return
321 # If the expiration heap grows larger than the number expirations
322 # times two, we clean it up to avoid keeping expired entries in
323 # the heap and consuming memory. We guard this with a minimum
324 # threshold to avoid cleaning up the heap too often when there are
325 # only a few scheduled expirations.
326 if (
327 expire_heap_len > _MIN_SCHEDULED_COOKIE_EXPIRATION
328 and expire_heap_len > len(self._expirations) * 2
329 ):
330 # Remove any expired entries from the expiration heap
331 # that do not match the expiration time in the expirations
332 # as it means the cookie has been re-added to the heap
333 # with a different expiration time.
334 self._expire_heap = [
335 entry
336 for entry in self._expire_heap
337 if self._expirations.get(entry[1]) == entry[0]
338 ]
339 heapq.heapify(self._expire_heap)
341 now = time.time()
342 to_del: list[tuple[str, str, str]] = []
343 # Find any expired cookies and add them to the to-delete list
344 while self._expire_heap:
345 when, cookie_key = self._expire_heap[0]
346 if when > now:
347 break
348 heapq.heappop(self._expire_heap)
349 # Check if the cookie hasn't been re-added to the heap
350 # with a different expiration time as it will be removed
351 # later when it reaches the top of the heap and its
352 # expiration time is met.
353 if self._expirations.get(cookie_key) == when:
354 to_del.append(cookie_key)
356 if to_del:
357 self._delete_cookies(to_del)
359 def _delete_cookies(self, to_del: list[tuple[str, str, str]]) -> None:
360 for domain, path, name in to_del:
361 self._host_only_cookies.discard((domain, name))
362 self._cookies[(domain, path)].pop(name, None)
363 self._morsel_cache[(domain, path)].pop(name, None)
364 self._expirations.pop((domain, path, name), None)
366 def _expire_cookie(self, when: float, domain: str, path: str, name: str) -> None:
367 cookie_key = (domain, path, name)
368 if self._expirations.get(cookie_key) == when:
369 # Avoid adding duplicates to the heap
370 return
371 heapq.heappush(self._expire_heap, (when, cookie_key))
372 self._expirations[cookie_key] = when
374 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:
375 """Update cookies."""
376 hostname = response_url.raw_host
378 if not self._unsafe and is_ip_address(hostname):
379 # Don't accept cookies from IPs
380 return
382 if isinstance(cookies, Mapping):
383 cookies = cookies.items()
385 for name, cookie in cookies:
386 if not isinstance(cookie, Morsel):
387 tmp = SimpleCookie()
388 tmp[name] = cookie # type: ignore[assignment]
389 cookie = tmp[name]
391 domain = cookie["domain"]
393 # ignore domains with trailing dots
394 if domain and domain[-1] == ".":
395 domain = ""
396 del cookie["domain"]
398 if not domain and hostname is not None:
399 # Set the cookie's domain to the response hostname
400 # and set its host-only-flag
401 self._host_only_cookies.add((hostname, name))
402 domain = cookie["domain"] = hostname
404 if domain and domain[0] == ".":
405 # Remove leading dot
406 domain = domain[1:]
407 cookie["domain"] = domain
409 if hostname and not self._is_domain_match(domain, hostname):
410 # Setting cookies for different domains is not allowed
411 continue
413 path = cookie["path"]
414 if not path or path[0] != "/":
415 # Set the cookie's path to the response path
416 path = response_url.path
417 if not path.startswith("/"):
418 path = "/"
419 else:
420 # Cut everything from the last slash to the end
421 path = "/" + path[1 : path.rfind("/")]
422 cookie["path"] = path
423 path = path.rstrip("/")
425 if max_age := cookie["max-age"]:
426 try:
427 delta_seconds = int(max_age)
428 max_age_expiration = min(time.time() + delta_seconds, self.MAX_TIME)
429 self._expire_cookie(max_age_expiration, domain, path, name)
430 except ValueError:
431 cookie["max-age"] = ""
433 elif expires := cookie["expires"]:
434 if expire_time := self._parse_date(expires):
435 self._expire_cookie(expire_time, domain, path, name)
436 else:
437 cookie["expires"] = ""
439 key = (domain, path)
440 if self._cookies[key].get(name) != cookie:
441 # Don't blow away the cache if the same
442 # cookie gets set again
443 self._cookies[key][name] = cookie
444 self._morsel_cache[key].pop(name, None)
446 self._do_expiration()
448 def filter_cookies(self, request_url: URL = URL()) -> "BaseCookie[str]":
449 """Returns this jar's cookies filtered by their attributes."""
450 # We always use BaseCookie now since all
451 # cookies set on on filtered are fully constructed
452 # Morsels, not just names and values.
453 filtered: BaseCookie[str] = BaseCookie()
454 if not self._cookies:
455 # Skip do_expiration() if there are no cookies.
456 return filtered
457 self._do_expiration()
458 if not self._cookies:
459 # Skip rest of function if no non-expired cookies.
460 return filtered
461 if type(request_url) is not URL:
462 warnings.warn(
463 "filter_cookies expects yarl.URL instances only,"
464 f"and will stop working in 4.x, got {type(request_url)}",
465 DeprecationWarning,
466 stacklevel=2,
467 )
468 request_url = URL(request_url)
469 hostname = request_url.raw_host or ""
471 is_not_secure = request_url.scheme not in ("https", "wss")
472 if is_not_secure and self._treat_as_secure_origin:
473 request_origin = URL()
474 with contextlib.suppress(ValueError):
475 request_origin = request_url.origin()
476 is_not_secure = request_origin not in self._treat_as_secure_origin
478 # Send shared cookie
479 key = ("", "")
480 for c in self._cookies[key].values():
481 # Check cache first
482 if c.key in self._morsel_cache[key]:
483 filtered[c.key] = self._morsel_cache[key][c.key]
484 continue
486 # Build and cache the morsel
487 mrsl_val = self._build_morsel(c)
488 self._morsel_cache[key][c.key] = mrsl_val
489 filtered[c.key] = mrsl_val
491 if is_ip_address(hostname):
492 if not self._unsafe:
493 return filtered
494 domains: Iterable[str] = (hostname,)
495 else:
496 # Get all the subdomains that might match a cookie (e.g. "foo.bar.com", "bar.com", "com")
497 domains = itertools.accumulate(
498 reversed(hostname.split(".")), _FORMAT_DOMAIN_REVERSED
499 )
501 # Get all the path prefixes that might match a cookie (e.g. "", "/foo", "/foo/bar")
502 paths = itertools.accumulate(request_url.path.split("/"), _FORMAT_PATH)
503 # Create every combination of (domain, path) pairs.
504 pairs = itertools.product(domains, paths)
506 path_len = len(request_url.path)
507 # Point 2: https://www.rfc-editor.org/rfc/rfc6265.html#section-5.4
508 for p in pairs:
509 if p not in self._cookies:
510 continue
511 for name, cookie in self._cookies[p].items():
512 domain = cookie["domain"]
514 if (domain, name) in self._host_only_cookies and domain != hostname:
515 continue
517 # Skip edge case when the cookie has a trailing slash but request doesn't.
518 if len(cookie["path"]) > path_len:
519 continue
521 if is_not_secure and cookie["secure"]:
522 continue
524 # We already built the Morsel so reuse it here
525 if name in self._morsel_cache[p]:
526 filtered[name] = self._morsel_cache[p][name]
527 continue
529 # Build and cache the morsel
530 mrsl_val = self._build_morsel(cookie)
531 self._morsel_cache[p][name] = mrsl_val
532 filtered[name] = mrsl_val
534 return filtered
536 def _build_morsel(self, cookie: Morsel[str]) -> Morsel[str]:
537 """Build a morsel for sending, respecting quote_cookie setting."""
538 if self._quote_cookie and cookie.coded_value and cookie.coded_value[0] == '"':
539 return preserve_morsel_with_coded_value(cookie)
540 morsel: Morsel[str] = Morsel()
541 if self._quote_cookie:
542 value, coded_value = _SIMPLE_COOKIE.value_encode(cookie.value)
543 else:
544 coded_value = value = cookie.value
545 # We use __setstate__ instead of the public set() API because it allows us to
546 # bypass validation and set already validated state. This is more stable than
547 # setting protected attributes directly and unlikely to change since it would
548 # break pickling.
549 morsel.__setstate__({"key": cookie.key, "value": value, "coded_value": coded_value}) # type: ignore[attr-defined]
550 return morsel
552 @staticmethod
553 def _is_domain_match(domain: str, hostname: str) -> bool:
554 """Implements domain matching adhering to RFC 6265."""
555 if hostname == domain:
556 return True
558 if not hostname.endswith(domain):
559 return False
561 non_matching = hostname[: -len(domain)]
563 if not non_matching.endswith("."):
564 return False
566 return not is_ip_address(hostname)
568 @classmethod
569 def _parse_date(cls, date_str: str) -> int | None:
570 """Implements date string parsing adhering to RFC 6265."""
571 if not date_str:
572 return None
574 found_time = False
575 found_day = False
576 found_month = False
577 found_year = False
579 hour = minute = second = 0
580 day = 0
581 month = 0
582 year = 0
584 for token_match in cls.DATE_TOKENS_RE.finditer(date_str):
586 token = token_match.group("token")
588 if not found_time:
589 time_match = cls.DATE_HMS_TIME_RE.match(token)
590 if time_match:
591 found_time = True
592 hour, minute, second = (int(s) for s in time_match.groups())
593 continue
595 if not found_day:
596 day_match = cls.DATE_DAY_OF_MONTH_RE.match(token)
597 if day_match:
598 found_day = True
599 day = int(day_match.group())
600 continue
602 if not found_month:
603 month_match = cls.DATE_MONTH_RE.match(token)
604 if month_match:
605 found_month = True
606 assert month_match.lastindex is not None
607 month = month_match.lastindex
608 continue
610 if not found_year:
611 year_match = cls.DATE_YEAR_RE.match(token)
612 if year_match:
613 found_year = True
614 year = int(year_match.group())
616 if 70 <= year <= 99:
617 year += 1900
618 elif 0 <= year <= 69:
619 year += 2000
621 if False in (found_day, found_month, found_year, found_time):
622 return None
624 if not 1 <= day <= 31:
625 return None
627 if year < 1601 or hour > 23 or minute > 59 or second > 59:
628 return None
630 return calendar.timegm((year, month, day, hour, minute, second, -1, -1, -1))
633class DummyCookieJar(AbstractCookieJar):
634 """Implements a dummy cookie storage.
636 It can be used with the ClientSession when no cookie processing is needed.
638 """
640 def __init__(self, *, loop: asyncio.AbstractEventLoop | None = None) -> None:
641 super().__init__(loop=loop)
643 def __iter__(self) -> "Iterator[Morsel[str]]":
644 while False:
645 yield None
647 def __len__(self) -> int:
648 return 0
650 @property
651 def unsafe(self) -> bool:
652 return False
654 @property
655 def quote_cookie(self) -> bool:
656 return True
658 @property
659 def cookies(self) -> MappingProxyType[tuple[str, str], SimpleCookie]:
660 """Return an empty mapping."""
661 return MappingProxyType({})
663 @property
664 def host_only_cookies(self) -> frozenset[tuple[str, str]]:
665 """Return an empty frozenset."""
666 return frozenset()
668 def clear(self, predicate: ClearCookiePredicate | None = None) -> None:
669 pass
671 def clear_domain(self, domain: str) -> None:
672 pass
674 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:
675 pass
677 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]":
678 return SimpleCookie()