Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/cookiejar.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import calendar
2import contextlib
3import datetime
4import heapq
5import itertools
6import json
7import pathlib
8import re
9import time
10import warnings
11from collections import defaultdict
12from collections.abc import Iterable, Iterator, Mapping
13from http.cookies import BaseCookie, Morsel, SimpleCookie
14from typing import Union
16from yarl import URL
18from ._cookie_helpers import preserve_morsel_with_coded_value
19from .abc import AbstractCookieJar, ClearCookiePredicate
20from .helpers import is_ip_address
21from .typedefs import LooseCookies, PathLike, StrOrURL
23__all__ = ("CookieJar", "DummyCookieJar")
26CookieItem = Union[str, "Morsel[str]"]
28# We cache these string methods here as their use is in performance critical code.
29_FORMAT_PATH = "{}/{}".format
30_FORMAT_DOMAIN_REVERSED = "{1}.{0}".format
32# The minimum number of scheduled cookie expirations before we start cleaning up
33# the expiration heap. This is a performance optimization to avoid cleaning up the
34# heap too often when there are only a few scheduled expirations.
35_MIN_SCHEDULED_COOKIE_EXPIRATION = 100
36_SIMPLE_COOKIE = SimpleCookie()
39class CookieJar(AbstractCookieJar):
40 """Implements cookie storage adhering to RFC 6265."""
42 DATE_TOKENS_RE = re.compile(
43 r"[\x09\x20-\x2F\x3B-\x40\x5B-\x60\x7B-\x7E]*"
44 r"(?P<token>[\x00-\x08\x0A-\x1F\d:a-zA-Z\x7F-\xFF]+)"
45 )
47 DATE_HMS_TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})")
49 DATE_DAY_OF_MONTH_RE = re.compile(r"(\d{1,2})")
51 DATE_MONTH_RE = re.compile(
52 "(jan)|(feb)|(mar)|(apr)|(may)|(jun)|(jul)|(aug)|(sep)|(oct)|(nov)|(dec)",
53 re.I,
54 )
56 DATE_YEAR_RE = re.compile(r"(\d{2,4})")
58 # calendar.timegm() fails for timestamps after datetime.datetime.max
59 # Minus one as a loss of precision occurs when timestamp() is called.
60 MAX_TIME = (
61 int(datetime.datetime.max.replace(tzinfo=datetime.timezone.utc).timestamp()) - 1
62 )
63 try:
64 calendar.timegm(time.gmtime(MAX_TIME))
65 except OSError:
66 # Hit the maximum representable time on Windows
67 # https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/localtime-localtime32-localtime64
68 MAX_TIME = calendar.timegm((3000, 12, 31, 23, 59, 59, -1, -1, -1))
69 except OverflowError:
70 # #4515: datetime.max may not be representable on 32-bit platforms
71 MAX_TIME = 2**31 - 1
72 # Avoid minuses in the future, 3x faster
73 SUB_MAX_TIME = MAX_TIME - 1
75 def __init__(
76 self,
77 *,
78 unsafe: bool = False,
79 quote_cookie: bool = True,
80 treat_as_secure_origin: StrOrURL | Iterable[StrOrURL] | None = None,
81 ) -> None:
82 self._cookies: defaultdict[tuple[str, str], SimpleCookie] = defaultdict(
83 SimpleCookie
84 )
85 self._morsel_cache: defaultdict[tuple[str, str], dict[str, Morsel[str]]] = (
86 defaultdict(dict)
87 )
88 self._host_only_cookies: set[tuple[str, str]] = set()
89 self._unsafe = unsafe
90 self._quote_cookie = quote_cookie
91 if treat_as_secure_origin is None:
92 self._treat_as_secure_origin: frozenset[URL] = frozenset()
93 elif isinstance(treat_as_secure_origin, URL):
94 self._treat_as_secure_origin = frozenset({treat_as_secure_origin.origin()})
95 elif isinstance(treat_as_secure_origin, str):
96 self._treat_as_secure_origin = frozenset(
97 {URL(treat_as_secure_origin).origin()}
98 )
99 else:
100 self._treat_as_secure_origin = frozenset(
101 {
102 URL(url).origin() if isinstance(url, str) else url.origin()
103 for url in treat_as_secure_origin
104 }
105 )
106 self._expire_heap: list[tuple[float, tuple[str, str, str]]] = []
107 self._expirations: dict[tuple[str, str, str], float] = {}
109 @property
110 def unsafe(self) -> bool:
111 return self._unsafe
113 @property
114 def quote_cookie(self) -> bool:
115 return self._quote_cookie
117 def save(self, file_path: PathLike) -> None:
118 """Save cookies to a file using JSON format.
120 :param file_path: Path to file where cookies will be serialized,
121 :class:`str` or :class:`pathlib.Path` instance.
122 """
123 file_path = pathlib.Path(file_path)
124 data: dict[str, dict[str, dict[str, str | bool]]] = {}
125 for (domain, path), cookie in self._cookies.items():
126 key = f"{domain}|{path}"
127 data[key] = {}
128 for name, morsel in cookie.items():
129 morsel_data: dict[str, str | bool] = {
130 "key": morsel.key,
131 "value": morsel.value,
132 "coded_value": morsel.coded_value,
133 }
134 # Save all morsel attributes that have values
135 for attr in morsel._reserved: # type: ignore[attr-defined]
136 attr_val = morsel[attr]
137 if attr_val:
138 morsel_data[attr] = attr_val
139 data[key][name] = morsel_data
140 with file_path.open(mode="w", encoding="utf-8") as f:
141 json.dump(data, f, indent=2)
143 def load(self, file_path: PathLike) -> None:
144 """Load cookies from a JSON file.
146 :param file_path: Path to file from where cookies will be
147 imported, :class:`str` or :class:`pathlib.Path` instance.
148 """
149 file_path = pathlib.Path(file_path)
150 with file_path.open(mode="r", encoding="utf-8") as f:
151 data = json.load(f)
152 self._cookies = self._load_json_data(data)
154 def _load_json_data(
155 self, data: dict[str, dict[str, dict[str, str | bool]]]
156 ) -> defaultdict[tuple[str, str], SimpleCookie]:
157 """Load cookies from parsed JSON data."""
158 cookies: defaultdict[tuple[str, str], SimpleCookie] = defaultdict(SimpleCookie)
159 for compound_key, cookie_data in data.items():
160 domain, path = compound_key.split("|", 1)
161 key = (domain, path)
162 for name, morsel_data in cookie_data.items():
163 morsel: Morsel[str] = Morsel()
164 morsel_key = morsel_data["key"]
165 morsel_value = morsel_data["value"]
166 morsel_coded_value = morsel_data["coded_value"]
167 # Use __setstate__ to bypass validation, same pattern
168 # used in _build_morsel and _cookie_helpers.
169 morsel.__setstate__( # type: ignore[attr-defined]
170 {
171 "key": morsel_key,
172 "value": morsel_value,
173 "coded_value": morsel_coded_value,
174 }
175 )
176 # Restore morsel attributes
177 for attr in morsel._reserved: # type: ignore[attr-defined]
178 if attr in morsel_data and attr not in (
179 "key",
180 "value",
181 "coded_value",
182 ):
183 morsel[attr] = morsel_data[attr]
184 cookies[key][name] = morsel
185 return cookies
187 def clear(self, predicate: ClearCookiePredicate | None = None) -> None:
188 if predicate is None:
189 self._expire_heap.clear()
190 self._cookies.clear()
191 self._morsel_cache.clear()
192 self._host_only_cookies.clear()
193 self._expirations.clear()
194 return
196 now = time.time()
197 to_del = [
198 key
199 for (domain, path), cookie in self._cookies.items()
200 for name, morsel in cookie.items()
201 if (
202 (key := (domain, path, name)) in self._expirations
203 and self._expirations[key] <= now
204 )
205 or predicate(morsel)
206 ]
207 if to_del:
208 self._delete_cookies(to_del)
210 def clear_domain(self, domain: str) -> None:
211 self.clear(lambda x: self._is_domain_match(domain, x["domain"]))
213 def __iter__(self) -> "Iterator[Morsel[str]]":
214 self._do_expiration()
215 for val in self._cookies.values():
216 yield from val.values()
218 def __len__(self) -> int:
219 """Return number of cookies.
221 This function does not iterate self to avoid unnecessary expiration
222 checks.
223 """
224 return sum(len(cookie.values()) for cookie in self._cookies.values())
226 def _do_expiration(self) -> None:
227 """Remove expired cookies."""
228 if not (expire_heap_len := len(self._expire_heap)):
229 return
231 # If the expiration heap grows larger than the number expirations
232 # times two, we clean it up to avoid keeping expired entries in
233 # the heap and consuming memory. We guard this with a minimum
234 # threshold to avoid cleaning up the heap too often when there are
235 # only a few scheduled expirations.
236 if (
237 expire_heap_len > _MIN_SCHEDULED_COOKIE_EXPIRATION
238 and expire_heap_len > len(self._expirations) * 2
239 ):
240 # Remove any expired entries from the expiration heap
241 # that do not match the expiration time in the expirations
242 # as it means the cookie has been re-added to the heap
243 # with a different expiration time.
244 self._expire_heap = [
245 entry
246 for entry in self._expire_heap
247 if self._expirations.get(entry[1]) == entry[0]
248 ]
249 heapq.heapify(self._expire_heap)
251 now = time.time()
252 to_del: list[tuple[str, str, str]] = []
253 # Find any expired cookies and add them to the to-delete list
254 while self._expire_heap:
255 when, cookie_key = self._expire_heap[0]
256 if when > now:
257 break
258 heapq.heappop(self._expire_heap)
259 # Check if the cookie hasn't been re-added to the heap
260 # with a different expiration time as it will be removed
261 # later when it reaches the top of the heap and its
262 # expiration time is met.
263 if self._expirations.get(cookie_key) == when:
264 to_del.append(cookie_key)
266 if to_del:
267 self._delete_cookies(to_del)
269 def _delete_cookies(self, to_del: list[tuple[str, str, str]]) -> None:
270 for domain, path, name in to_del:
271 self._host_only_cookies.discard((domain, name))
272 self._cookies[(domain, path)].pop(name, None)
273 self._morsel_cache[(domain, path)].pop(name, None)
274 self._expirations.pop((domain, path, name), None)
276 def _expire_cookie(self, when: float, domain: str, path: str, name: str) -> None:
277 cookie_key = (domain, path, name)
278 if self._expirations.get(cookie_key) == when:
279 # Avoid adding duplicates to the heap
280 return
281 heapq.heappush(self._expire_heap, (when, cookie_key))
282 self._expirations[cookie_key] = when
284 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:
285 """Update cookies."""
286 hostname = response_url.raw_host
288 if not self._unsafe and is_ip_address(hostname):
289 # Don't accept cookies from IPs
290 return
292 if isinstance(cookies, Mapping):
293 cookies = cookies.items()
295 for name, cookie in cookies:
296 if not isinstance(cookie, Morsel):
297 tmp = SimpleCookie()
298 tmp[name] = cookie # type: ignore[assignment]
299 cookie = tmp[name]
301 domain = cookie["domain"]
303 # ignore domains with trailing dots
304 if domain and domain[-1] == ".":
305 domain = ""
306 del cookie["domain"]
308 if not domain and hostname is not None:
309 # Set the cookie's domain to the response hostname
310 # and set its host-only-flag
311 self._host_only_cookies.add((hostname, name))
312 domain = cookie["domain"] = hostname
314 if domain and domain[0] == ".":
315 # Remove leading dot
316 domain = domain[1:]
317 cookie["domain"] = domain
319 if hostname and not self._is_domain_match(domain, hostname):
320 # Setting cookies for different domains is not allowed
321 continue
323 path = cookie["path"]
324 if not path or path[0] != "/":
325 # Set the cookie's path to the response path
326 path = response_url.path
327 if not path.startswith("/"):
328 path = "/"
329 else:
330 # Cut everything from the last slash to the end
331 path = "/" + path[1 : path.rfind("/")]
332 cookie["path"] = path
333 path = path.rstrip("/")
335 if max_age := cookie["max-age"]:
336 try:
337 delta_seconds = int(max_age)
338 max_age_expiration = min(time.time() + delta_seconds, self.MAX_TIME)
339 self._expire_cookie(max_age_expiration, domain, path, name)
340 except ValueError:
341 cookie["max-age"] = ""
343 elif expires := cookie["expires"]:
344 if expire_time := self._parse_date(expires):
345 self._expire_cookie(expire_time, domain, path, name)
346 else:
347 cookie["expires"] = ""
349 key = (domain, path)
350 if self._cookies[key].get(name) != cookie:
351 # Don't blow away the cache if the same
352 # cookie gets set again
353 self._cookies[key][name] = cookie
354 self._morsel_cache[key].pop(name, None)
356 self._do_expiration()
358 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]":
359 """Returns this jar's cookies filtered by their attributes."""
360 if not isinstance(request_url, URL):
361 warnings.warn( # type: ignore[unreachable]
362 f"The method accepts yarl.URL instances only, got {type(request_url)}",
363 DeprecationWarning,
364 )
365 request_url = URL(request_url)
366 # We always use BaseCookie now since all
367 # cookies set on on filtered are fully constructed
368 # Morsels, not just names and values.
369 filtered: BaseCookie[str] = BaseCookie()
370 if not self._cookies:
371 # Skip do_expiration() if there are no cookies.
372 return filtered
373 self._do_expiration()
374 if not self._cookies:
375 # Skip rest of function if no non-expired cookies.
376 return filtered
377 hostname = request_url.raw_host or ""
379 is_not_secure = request_url.scheme not in ("https", "wss")
380 if is_not_secure and self._treat_as_secure_origin:
381 request_origin = URL()
382 with contextlib.suppress(ValueError):
383 request_origin = request_url.origin()
384 is_not_secure = request_origin not in self._treat_as_secure_origin
386 # Send shared cookie
387 key = ("", "")
388 for c in self._cookies[key].values():
389 # Check cache first
390 if c.key in self._morsel_cache[key]:
391 filtered[c.key] = self._morsel_cache[key][c.key]
392 continue
394 # Build and cache the morsel
395 mrsl_val = self._build_morsel(c)
396 self._morsel_cache[key][c.key] = mrsl_val
397 filtered[c.key] = mrsl_val
399 if is_ip_address(hostname):
400 if not self._unsafe:
401 return filtered
402 domains: Iterable[str] = (hostname,)
403 else:
404 # Get all the subdomains that might match a cookie (e.g. "foo.bar.com", "bar.com", "com")
405 domains = itertools.accumulate(
406 reversed(hostname.split(".")), _FORMAT_DOMAIN_REVERSED
407 )
409 # Get all the path prefixes that might match a cookie (e.g. "", "/foo", "/foo/bar")
410 paths = itertools.accumulate(request_url.path.split("/"), _FORMAT_PATH)
411 # Create every combination of (domain, path) pairs.
412 pairs = itertools.product(domains, paths)
414 path_len = len(request_url.path)
415 # Point 2: https://www.rfc-editor.org/rfc/rfc6265.html#section-5.4
416 for p in pairs:
417 if p not in self._cookies:
418 continue
419 for name, cookie in self._cookies[p].items():
420 domain = cookie["domain"]
422 if (domain, name) in self._host_only_cookies and domain != hostname:
423 continue
425 # Skip edge case when the cookie has a trailing slash but request doesn't.
426 if len(cookie["path"]) > path_len:
427 continue
429 if is_not_secure and cookie["secure"]:
430 continue
432 # We already built the Morsel so reuse it here
433 if name in self._morsel_cache[p]:
434 filtered[name] = self._morsel_cache[p][name]
435 continue
437 # Build and cache the morsel
438 mrsl_val = self._build_morsel(cookie)
439 self._morsel_cache[p][name] = mrsl_val
440 filtered[name] = mrsl_val
442 return filtered
444 def _build_morsel(self, cookie: Morsel[str]) -> Morsel[str]:
445 """Build a morsel for sending, respecting quote_cookie setting."""
446 if self._quote_cookie and cookie.coded_value and cookie.coded_value[0] == '"':
447 return preserve_morsel_with_coded_value(cookie)
448 morsel: Morsel[str] = Morsel()
449 if self._quote_cookie:
450 value, coded_value = _SIMPLE_COOKIE.value_encode(cookie.value)
451 else:
452 coded_value = value = cookie.value
453 # We use __setstate__ instead of the public set() API because it allows us to
454 # bypass validation and set already validated state. This is more stable than
455 # setting protected attributes directly.
456 morsel.__setstate__({"key": cookie.key, "value": value, "coded_value": coded_value}) # type: ignore[attr-defined]
457 return morsel
459 @staticmethod
460 def _is_domain_match(domain: str, hostname: str) -> bool:
461 """Implements domain matching adhering to RFC 6265."""
462 if hostname == domain:
463 return True
465 if not hostname.endswith(domain):
466 return False
468 non_matching = hostname[: -len(domain)]
470 if not non_matching.endswith("."):
471 return False
473 return not is_ip_address(hostname)
475 @classmethod
476 def _parse_date(cls, date_str: str) -> int | None:
477 """Implements date string parsing adhering to RFC 6265."""
478 if not date_str:
479 return None
481 found_time = False
482 found_day = False
483 found_month = False
484 found_year = False
486 hour = minute = second = 0
487 day = 0
488 month = 0
489 year = 0
491 for token_match in cls.DATE_TOKENS_RE.finditer(date_str):
492 token = token_match.group("token")
494 if not found_time:
495 time_match = cls.DATE_HMS_TIME_RE.match(token)
496 if time_match:
497 found_time = True
498 hour, minute, second = (int(s) for s in time_match.groups())
499 continue
501 if not found_day:
502 day_match = cls.DATE_DAY_OF_MONTH_RE.match(token)
503 if day_match:
504 found_day = True
505 day = int(day_match.group())
506 continue
508 if not found_month:
509 month_match = cls.DATE_MONTH_RE.match(token)
510 if month_match:
511 found_month = True
512 assert month_match.lastindex is not None
513 month = month_match.lastindex
514 continue
516 if not found_year:
517 year_match = cls.DATE_YEAR_RE.match(token)
518 if year_match:
519 found_year = True
520 year = int(year_match.group())
522 if 70 <= year <= 99:
523 year += 1900
524 elif 0 <= year <= 69:
525 year += 2000
527 if False in (found_day, found_month, found_year, found_time):
528 return None
530 if not 1 <= day <= 31:
531 return None
533 if year < 1601 or hour > 23 or minute > 59 or second > 59:
534 return None
536 return calendar.timegm((year, month, day, hour, minute, second, -1, -1, -1))
539class DummyCookieJar(AbstractCookieJar):
540 """Implements a dummy cookie storage.
542 It can be used with the ClientSession when no cookie processing is needed.
544 """
546 def __iter__(self) -> "Iterator[Morsel[str]]":
547 while False:
548 yield None # type: ignore[unreachable]
550 def __len__(self) -> int:
551 return 0
553 @property
554 def unsafe(self) -> bool:
555 return False
557 @property
558 def quote_cookie(self) -> bool:
559 return True
561 def clear(self, predicate: ClearCookiePredicate | None = None) -> None:
562 pass
564 def clear_domain(self, domain: str) -> None:
565 pass
567 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:
568 pass
570 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]":
571 return SimpleCookie()