Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/cookiejar.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import calendar
2import contextlib
3import datetime
4import heapq
5import itertools
6import json
7import os
8import pathlib
9import re
10import time
11import warnings
12from collections import defaultdict
13from collections.abc import Iterable, Iterator, Mapping
14from http.cookies import BaseCookie, Morsel, SimpleCookie
15from types import MappingProxyType
16from typing import Union
18from yarl import URL
20from ._cookie_helpers import preserve_morsel_with_coded_value
21from .abc import AbstractCookieJar, ClearCookiePredicate
22from .helpers import is_ip_address
23from .typedefs import LooseCookies, PathLike, StrOrURL
25__all__ = ("CookieJar", "DummyCookieJar")
28CookieItem = Union[str, "Morsel[str]"]
30# We cache these string methods here as their use is in performance critical code.
31_FORMAT_PATH = "{}/{}".format
32_FORMAT_DOMAIN_REVERSED = "{1}.{0}".format
34# The minimum number of scheduled cookie expirations before we start cleaning up
35# the expiration heap. This is a performance optimization to avoid cleaning up the
36# heap too often when there are only a few scheduled expirations.
37_MIN_SCHEDULED_COOKIE_EXPIRATION = 100
38_SIMPLE_COOKIE = SimpleCookie()
41class CookieJar(AbstractCookieJar):
42 """Implements cookie storage adhering to RFC 6265."""
44 DATE_TOKENS_RE = re.compile(
45 r"[\x09\x20-\x2F\x3B-\x40\x5B-\x60\x7B-\x7E]*"
46 r"(?P<token>[\x00-\x08\x0A-\x1F\d:a-zA-Z\x7F-\xFF]+)"
47 )
49 DATE_HMS_TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})")
51 DATE_DAY_OF_MONTH_RE = re.compile(r"(\d{1,2})")
53 DATE_MONTH_RE = re.compile(
54 "(jan)|(feb)|(mar)|(apr)|(may)|(jun)|(jul)|(aug)|(sep)|(oct)|(nov)|(dec)",
55 re.I,
56 )
58 DATE_YEAR_RE = re.compile(r"(\d{2,4})")
60 # calendar.timegm() fails for timestamps after datetime.datetime.max
61 # Minus one as a loss of precision occurs when timestamp() is called.
62 MAX_TIME = (
63 int(datetime.datetime.max.replace(tzinfo=datetime.timezone.utc).timestamp()) - 1
64 )
65 try:
66 calendar.timegm(time.gmtime(MAX_TIME))
67 except OSError:
68 # Hit the maximum representable time on Windows
69 # https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/localtime-localtime32-localtime64
70 MAX_TIME = calendar.timegm((3000, 12, 31, 23, 59, 59, -1, -1, -1))
71 except OverflowError:
72 # #4515: datetime.max may not be representable on 32-bit platforms
73 MAX_TIME = 2**31 - 1
74 # Avoid minuses in the future, 3x faster
75 SUB_MAX_TIME = MAX_TIME - 1
77 def __init__(
78 self,
79 *,
80 unsafe: bool = False,
81 quote_cookie: bool = True,
82 treat_as_secure_origin: StrOrURL | Iterable[StrOrURL] | None = None,
83 ) -> None:
84 self._cookies: defaultdict[tuple[str, str], SimpleCookie] = defaultdict(
85 SimpleCookie
86 )
87 self._morsel_cache: defaultdict[tuple[str, str], dict[str, Morsel[str]]] = (
88 defaultdict(dict)
89 )
90 self._host_only_cookies: set[tuple[str, str]] = set()
91 self._unsafe = unsafe
92 self._quote_cookie = quote_cookie
93 if treat_as_secure_origin is None:
94 self._treat_as_secure_origin: frozenset[URL] = frozenset()
95 elif isinstance(treat_as_secure_origin, URL):
96 self._treat_as_secure_origin = frozenset({treat_as_secure_origin.origin()})
97 elif isinstance(treat_as_secure_origin, str):
98 self._treat_as_secure_origin = frozenset(
99 {URL(treat_as_secure_origin).origin()}
100 )
101 else:
102 self._treat_as_secure_origin = frozenset(
103 {
104 URL(url).origin() if isinstance(url, str) else url.origin()
105 for url in treat_as_secure_origin
106 }
107 )
108 self._expire_heap: list[tuple[float, tuple[str, str, str]]] = []
109 self._expirations: dict[tuple[str, str, str], float] = {}
111 @property
112 def unsafe(self) -> bool:
113 return self._unsafe
115 @property
116 def quote_cookie(self) -> bool:
117 return self._quote_cookie
119 @property
120 def cookies(self) -> MappingProxyType[tuple[str, str], SimpleCookie]:
121 """Return the cookies stored in this jar."""
122 return MappingProxyType(self._cookies)
124 @property
125 def host_only_cookies(self) -> frozenset[tuple[str, str]]:
126 """Return the host-only cookies stored in this jar."""
127 return frozenset(self._host_only_cookies)
129 def save(self, file_path: PathLike) -> None:
130 """Save cookies to a file using JSON format.
132 :param file_path: Path to file where cookies will be serialized,
133 :class:`str` or :class:`pathlib.Path` instance.
134 """
135 file_path = pathlib.Path(file_path)
136 data: dict[str, dict[str, dict[str, str | bool]]] = {}
137 for (domain, path), cookie in self._cookies.items():
138 key = f"{domain}|{path}"
139 data[key] = {}
140 for name, morsel in cookie.items():
141 morsel_data: dict[str, str | bool] = {
142 "key": morsel.key,
143 "value": morsel.value,
144 "coded_value": morsel.coded_value,
145 }
146 # Save all morsel attributes that have values
147 for attr in morsel._reserved: # type: ignore[attr-defined]
148 attr_val = morsel[attr]
149 if attr_val:
150 morsel_data[attr] = attr_val
151 data[key][name] = morsel_data
153 # Cookie persistence may include authentication/session tokens.
154 # Use 0o600 at creation time to avoid umask-dependent overexposure
155 # and enforce least-privilege access to sensitive credential data.
156 with open(
157 file_path,
158 mode="w",
159 encoding="utf-8",
160 opener=lambda path, flags: os.open(path, flags, 0o600),
161 ) as f:
162 json.dump(data, f, indent=2)
164 def load(self, file_path: PathLike) -> None:
165 """Load cookies from a JSON file.
167 :param file_path: Path to file from where cookies will be
168 imported, :class:`str` or :class:`pathlib.Path` instance.
169 """
170 file_path = pathlib.Path(file_path)
171 with file_path.open(mode="r", encoding="utf-8") as f:
172 data = json.load(f)
173 self._cookies = self._load_json_data(data)
175 def _load_json_data(
176 self, data: dict[str, dict[str, dict[str, str | bool]]]
177 ) -> defaultdict[tuple[str, str], SimpleCookie]:
178 """Load cookies from parsed JSON data."""
179 cookies: defaultdict[tuple[str, str], SimpleCookie] = defaultdict(SimpleCookie)
180 for compound_key, cookie_data in data.items():
181 domain, path = compound_key.split("|", 1)
182 key = (domain, path)
183 for name, morsel_data in cookie_data.items():
184 morsel: Morsel[str] = Morsel()
185 morsel_key = morsel_data["key"]
186 morsel_value = morsel_data["value"]
187 morsel_coded_value = morsel_data["coded_value"]
188 # Use __setstate__ to bypass validation, same pattern
189 # used in _build_morsel and _cookie_helpers.
190 morsel.__setstate__( # type: ignore[attr-defined]
191 {
192 "key": morsel_key,
193 "value": morsel_value,
194 "coded_value": morsel_coded_value,
195 }
196 )
197 # Restore morsel attributes
198 for attr in morsel._reserved: # type: ignore[attr-defined]
199 if attr in morsel_data and attr not in (
200 "key",
201 "value",
202 "coded_value",
203 ):
204 morsel[attr] = morsel_data[attr]
205 cookies[key][name] = morsel
206 return cookies
208 def clear(self, predicate: ClearCookiePredicate | None = None) -> None:
209 if predicate is None:
210 self._expire_heap.clear()
211 self._cookies.clear()
212 self._morsel_cache.clear()
213 self._host_only_cookies.clear()
214 self._expirations.clear()
215 return
217 now = time.time()
218 to_del = [
219 key
220 for (domain, path), cookie in self._cookies.items()
221 for name, morsel in cookie.items()
222 if (
223 (key := (domain, path, name)) in self._expirations
224 and self._expirations[key] <= now
225 )
226 or predicate(morsel)
227 ]
228 if to_del:
229 self._delete_cookies(to_del)
231 def clear_domain(self, domain: str) -> None:
232 self.clear(lambda x: self._is_domain_match(domain, x["domain"]))
234 def __iter__(self) -> "Iterator[Morsel[str]]":
235 self._do_expiration()
236 for val in self._cookies.values():
237 yield from val.values()
239 def __len__(self) -> int:
240 """Return number of cookies.
242 This function does not iterate self to avoid unnecessary expiration
243 checks.
244 """
245 return sum(len(cookie.values()) for cookie in self._cookies.values())
247 def _do_expiration(self) -> None:
248 """Remove expired cookies."""
249 if not (expire_heap_len := len(self._expire_heap)):
250 return
252 # If the expiration heap grows larger than the number expirations
253 # times two, we clean it up to avoid keeping expired entries in
254 # the heap and consuming memory. We guard this with a minimum
255 # threshold to avoid cleaning up the heap too often when there are
256 # only a few scheduled expirations.
257 if (
258 expire_heap_len > _MIN_SCHEDULED_COOKIE_EXPIRATION
259 and expire_heap_len > len(self._expirations) * 2
260 ):
261 # Remove any expired entries from the expiration heap
262 # that do not match the expiration time in the expirations
263 # as it means the cookie has been re-added to the heap
264 # with a different expiration time.
265 self._expire_heap = [
266 entry
267 for entry in self._expire_heap
268 if self._expirations.get(entry[1]) == entry[0]
269 ]
270 heapq.heapify(self._expire_heap)
272 now = time.time()
273 to_del: list[tuple[str, str, str]] = []
274 # Find any expired cookies and add them to the to-delete list
275 while self._expire_heap:
276 when, cookie_key = self._expire_heap[0]
277 if when > now:
278 break
279 heapq.heappop(self._expire_heap)
280 # Check if the cookie hasn't been re-added to the heap
281 # with a different expiration time as it will be removed
282 # later when it reaches the top of the heap and its
283 # expiration time is met.
284 if self._expirations.get(cookie_key) == when:
285 to_del.append(cookie_key)
287 if to_del:
288 self._delete_cookies(to_del)
290 def _delete_cookies(self, to_del: list[tuple[str, str, str]]) -> None:
291 for domain, path, name in to_del:
292 self._host_only_cookies.discard((domain, name))
293 self._cookies[(domain, path)].pop(name, None)
294 self._morsel_cache[(domain, path)].pop(name, None)
295 self._expirations.pop((domain, path, name), None)
297 def _expire_cookie(self, when: float, domain: str, path: str, name: str) -> None:
298 cookie_key = (domain, path, name)
299 if self._expirations.get(cookie_key) == when:
300 # Avoid adding duplicates to the heap
301 return
302 heapq.heappush(self._expire_heap, (when, cookie_key))
303 self._expirations[cookie_key] = when
305 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:
306 """Update cookies."""
307 hostname = response_url.raw_host
309 if not self._unsafe and is_ip_address(hostname):
310 # Don't accept cookies from IPs
311 return
313 if isinstance(cookies, Mapping):
314 cookies = cookies.items()
316 for name, cookie in cookies:
317 if not isinstance(cookie, Morsel):
318 tmp = SimpleCookie()
319 tmp[name] = cookie # type: ignore[assignment]
320 cookie = tmp[name]
322 domain = cookie["domain"]
324 # ignore domains with trailing dots
325 if domain and domain[-1] == ".":
326 domain = ""
327 del cookie["domain"]
329 if not domain and hostname is not None:
330 # Set the cookie's domain to the response hostname
331 # and set its host-only-flag
332 self._host_only_cookies.add((hostname, name))
333 domain = cookie["domain"] = hostname
335 if domain and domain[0] == ".":
336 # Remove leading dot
337 domain = domain[1:]
338 cookie["domain"] = domain
340 if hostname and not self._is_domain_match(domain, hostname):
341 # Setting cookies for different domains is not allowed
342 continue
344 path = cookie["path"]
345 if not path or path[0] != "/":
346 # Set the cookie's path to the response path
347 path = response_url.path
348 if not path.startswith("/"):
349 path = "/"
350 else:
351 # Cut everything from the last slash to the end
352 path = "/" + path[1 : path.rfind("/")]
353 cookie["path"] = path
354 path = path.rstrip("/")
356 if max_age := cookie["max-age"]:
357 try:
358 delta_seconds = int(max_age)
359 max_age_expiration = min(time.time() + delta_seconds, self.MAX_TIME)
360 self._expire_cookie(max_age_expiration, domain, path, name)
361 except ValueError:
362 cookie["max-age"] = ""
364 elif expires := cookie["expires"]:
365 if expire_time := self._parse_date(expires):
366 self._expire_cookie(expire_time, domain, path, name)
367 else:
368 cookie["expires"] = ""
370 key = (domain, path)
371 if self._cookies[key].get(name) != cookie:
372 # Don't blow away the cache if the same
373 # cookie gets set again
374 self._cookies[key][name] = cookie
375 self._morsel_cache[key].pop(name, None)
377 self._do_expiration()
379 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]":
380 """Returns this jar's cookies filtered by their attributes."""
381 if not isinstance(request_url, URL):
382 warnings.warn( # type: ignore[unreachable]
383 f"The method accepts yarl.URL instances only, got {type(request_url)}",
384 DeprecationWarning,
385 )
386 request_url = URL(request_url)
387 # We always use BaseCookie now since all
388 # cookies set on on filtered are fully constructed
389 # Morsels, not just names and values.
390 filtered: BaseCookie[str] = BaseCookie()
391 if not self._cookies:
392 # Skip do_expiration() if there are no cookies.
393 return filtered
394 self._do_expiration()
395 if not self._cookies:
396 # Skip rest of function if no non-expired cookies.
397 return filtered
398 hostname = request_url.raw_host or ""
400 is_not_secure = request_url.scheme not in ("https", "wss")
401 if is_not_secure and self._treat_as_secure_origin:
402 request_origin = URL()
403 with contextlib.suppress(ValueError):
404 request_origin = request_url.origin()
405 is_not_secure = request_origin not in self._treat_as_secure_origin
407 # Send shared cookie
408 key = ("", "")
409 for c in self._cookies[key].values():
410 # Check cache first
411 if c.key in self._morsel_cache[key]:
412 filtered[c.key] = self._morsel_cache[key][c.key]
413 continue
415 # Build and cache the morsel
416 mrsl_val = self._build_morsel(c)
417 self._morsel_cache[key][c.key] = mrsl_val
418 filtered[c.key] = mrsl_val
420 if is_ip_address(hostname):
421 if not self._unsafe:
422 return filtered
423 domains: Iterable[str] = (hostname,)
424 else:
425 # Get all the subdomains that might match a cookie (e.g. "foo.bar.com", "bar.com", "com")
426 domains = itertools.accumulate(
427 reversed(hostname.split(".")), _FORMAT_DOMAIN_REVERSED
428 )
430 # Get all the path prefixes that might match a cookie (e.g. "", "/foo", "/foo/bar")
431 paths = itertools.accumulate(request_url.path.split("/"), _FORMAT_PATH)
432 # Create every combination of (domain, path) pairs.
433 pairs = itertools.product(domains, paths)
435 path_len = len(request_url.path)
436 # Point 2: https://www.rfc-editor.org/rfc/rfc6265.html#section-5.4
437 for p in pairs:
438 if p not in self._cookies:
439 continue
440 for name, cookie in self._cookies[p].items():
441 domain = cookie["domain"]
443 if (domain, name) in self._host_only_cookies and domain != hostname:
444 continue
446 # Skip edge case when the cookie has a trailing slash but request doesn't.
447 if len(cookie["path"]) > path_len:
448 continue
450 if is_not_secure and cookie["secure"]:
451 continue
453 # We already built the Morsel so reuse it here
454 if name in self._morsel_cache[p]:
455 filtered[name] = self._morsel_cache[p][name]
456 continue
458 # Build and cache the morsel
459 mrsl_val = self._build_morsel(cookie)
460 self._morsel_cache[p][name] = mrsl_val
461 filtered[name] = mrsl_val
463 return filtered
465 def _build_morsel(self, cookie: Morsel[str]) -> Morsel[str]:
466 """Build a morsel for sending, respecting quote_cookie setting."""
467 if self._quote_cookie and cookie.coded_value and cookie.coded_value[0] == '"':
468 return preserve_morsel_with_coded_value(cookie)
469 morsel: Morsel[str] = Morsel()
470 if self._quote_cookie:
471 value, coded_value = _SIMPLE_COOKIE.value_encode(cookie.value)
472 else:
473 coded_value = value = cookie.value
474 # We use __setstate__ instead of the public set() API because it allows us to
475 # bypass validation and set already validated state. This is more stable than
476 # setting protected attributes directly.
477 morsel.__setstate__({"key": cookie.key, "value": value, "coded_value": coded_value}) # type: ignore[attr-defined]
478 return morsel
480 @staticmethod
481 def _is_domain_match(domain: str, hostname: str) -> bool:
482 """Implements domain matching adhering to RFC 6265."""
483 if hostname == domain:
484 return True
486 if not hostname.endswith(domain):
487 return False
489 non_matching = hostname[: -len(domain)]
491 if not non_matching.endswith("."):
492 return False
494 return not is_ip_address(hostname)
496 @classmethod
497 def _parse_date(cls, date_str: str) -> int | None:
498 """Implements date string parsing adhering to RFC 6265."""
499 if not date_str:
500 return None
502 found_time = False
503 found_day = False
504 found_month = False
505 found_year = False
507 hour = minute = second = 0
508 day = 0
509 month = 0
510 year = 0
512 for token_match in cls.DATE_TOKENS_RE.finditer(date_str):
513 token = token_match.group("token")
515 if not found_time:
516 time_match = cls.DATE_HMS_TIME_RE.match(token)
517 if time_match:
518 found_time = True
519 hour, minute, second = (int(s) for s in time_match.groups())
520 continue
522 if not found_day:
523 day_match = cls.DATE_DAY_OF_MONTH_RE.match(token)
524 if day_match:
525 found_day = True
526 day = int(day_match.group())
527 continue
529 if not found_month:
530 month_match = cls.DATE_MONTH_RE.match(token)
531 if month_match:
532 found_month = True
533 assert month_match.lastindex is not None
534 month = month_match.lastindex
535 continue
537 if not found_year:
538 year_match = cls.DATE_YEAR_RE.match(token)
539 if year_match:
540 found_year = True
541 year = int(year_match.group())
543 if 70 <= year <= 99:
544 year += 1900
545 elif 0 <= year <= 69:
546 year += 2000
548 if False in (found_day, found_month, found_year, found_time):
549 return None
551 if not 1 <= day <= 31:
552 return None
554 if year < 1601 or hour > 23 or minute > 59 or second > 59:
555 return None
557 return calendar.timegm((year, month, day, hour, minute, second, -1, -1, -1))
560class DummyCookieJar(AbstractCookieJar):
561 """Implements a dummy cookie storage.
563 It can be used with the ClientSession when no cookie processing is needed.
565 """
567 def __iter__(self) -> "Iterator[Morsel[str]]":
568 while False:
569 yield None # type: ignore[unreachable]
571 def __len__(self) -> int:
572 return 0
574 @property
575 def unsafe(self) -> bool:
576 return False
578 @property
579 def quote_cookie(self) -> bool:
580 return True
582 @property
583 def cookies(self) -> MappingProxyType[tuple[str, str], SimpleCookie]:
584 """Return an empty mapping."""
585 return MappingProxyType({})
587 @property
588 def host_only_cookies(self) -> frozenset[tuple[str, str]]:
589 """Return an empty frozenset."""
590 return frozenset()
592 def clear(self, predicate: ClearCookiePredicate | None = None) -> None:
593 pass
595 def clear_domain(self, domain: str) -> None:
596 pass
598 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:
599 pass
601 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]":
602 return SimpleCookie()