Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/cookiejar.py: 33%
259 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1import asyncio
2import calendar
3import contextlib
4import datetime
5import os # noqa
6import pathlib
7import pickle
8import re
9import time
10from collections import defaultdict
11from http.cookies import BaseCookie, Morsel, SimpleCookie
12from math import ceil
13from typing import ( # noqa
14 DefaultDict,
15 Dict,
16 Iterable,
17 Iterator,
18 List,
19 Mapping,
20 Optional,
21 Set,
22 Tuple,
23 Union,
24 cast,
25)
27from yarl import URL
29from .abc import AbstractCookieJar, ClearCookiePredicate
30from .helpers import is_ip_address
31from .typedefs import LooseCookies, PathLike, StrOrURL
33__all__ = ("CookieJar", "DummyCookieJar")
36CookieItem = Union[str, "Morsel[str]"]
39class CookieJar(AbstractCookieJar):
40 """Implements cookie storage adhering to RFC 6265."""
42 DATE_TOKENS_RE = re.compile(
43 r"[\x09\x20-\x2F\x3B-\x40\x5B-\x60\x7B-\x7E]*"
44 r"(?P<token>[\x00-\x08\x0A-\x1F\d:a-zA-Z\x7F-\xFF]+)"
45 )
47 DATE_HMS_TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})")
49 DATE_DAY_OF_MONTH_RE = re.compile(r"(\d{1,2})")
51 DATE_MONTH_RE = re.compile(
52 "(jan)|(feb)|(mar)|(apr)|(may)|(jun)|(jul)|" "(aug)|(sep)|(oct)|(nov)|(dec)",
53 re.I,
54 )
56 DATE_YEAR_RE = re.compile(r"(\d{2,4})")
58 # calendar.timegm() fails for timestamps after datetime.datetime.max
59 # Minus one as a loss of precision occurs when timestamp() is called.
60 MAX_TIME = (
61 int(datetime.datetime.max.replace(tzinfo=datetime.timezone.utc).timestamp()) - 1
62 )
63 try:
64 calendar.timegm(time.gmtime(MAX_TIME))
65 except (OSError, ValueError):
66 # Hit the maximum representable time on Windows
67 # https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/localtime-localtime32-localtime64
68 # Throws ValueError on PyPy 3.8 and 3.9, OSError elsewhere
69 MAX_TIME = calendar.timegm((3000, 12, 31, 23, 59, 59, -1, -1, -1))
70 except OverflowError:
71 # #4515: datetime.max may not be representable on 32-bit platforms
72 MAX_TIME = 2**31 - 1
73 # Avoid minuses in the future, 3x faster
74 SUB_MAX_TIME = MAX_TIME - 1
76 def __init__(
77 self,
78 *,
79 unsafe: bool = False,
80 quote_cookie: bool = True,
81 treat_as_secure_origin: Union[StrOrURL, List[StrOrURL], None] = None,
82 loop: Optional[asyncio.AbstractEventLoop] = None,
83 ) -> None:
84 super().__init__(loop=loop)
85 self._cookies: DefaultDict[Tuple[str, str], SimpleCookie] = defaultdict(
86 SimpleCookie
87 )
88 self._host_only_cookies: Set[Tuple[str, str]] = set()
89 self._unsafe = unsafe
90 self._quote_cookie = quote_cookie
91 if treat_as_secure_origin is None:
92 treat_as_secure_origin = []
93 elif isinstance(treat_as_secure_origin, URL):
94 treat_as_secure_origin = [treat_as_secure_origin.origin()]
95 elif isinstance(treat_as_secure_origin, str):
96 treat_as_secure_origin = [URL(treat_as_secure_origin).origin()]
97 else:
98 treat_as_secure_origin = [
99 URL(url).origin() if isinstance(url, str) else url.origin()
100 for url in treat_as_secure_origin
101 ]
102 self._treat_as_secure_origin = treat_as_secure_origin
103 self._next_expiration: float = ceil(time.time())
104 self._expirations: Dict[Tuple[str, str, str], float] = {}
106 def save(self, file_path: PathLike) -> None:
107 file_path = pathlib.Path(file_path)
108 with file_path.open(mode="wb") as f:
109 pickle.dump(self._cookies, f, pickle.HIGHEST_PROTOCOL)
111 def load(self, file_path: PathLike) -> None:
112 file_path = pathlib.Path(file_path)
113 with file_path.open(mode="rb") as f:
114 self._cookies = pickle.load(f)
116 def clear(self, predicate: Optional[ClearCookiePredicate] = None) -> None:
117 if predicate is None:
118 self._next_expiration = ceil(time.time())
119 self._cookies.clear()
120 self._host_only_cookies.clear()
121 self._expirations.clear()
122 return
124 to_del = []
125 now = time.time()
126 for (domain, path), cookie in self._cookies.items():
127 for name, morsel in cookie.items():
128 key = (domain, path, name)
129 if (
130 key in self._expirations and self._expirations[key] <= now
131 ) or predicate(morsel):
132 to_del.append(key)
134 for domain, path, name in to_del:
135 self._host_only_cookies.discard((domain, name))
136 key = (domain, path, name)
137 if key in self._expirations:
138 del self._expirations[(domain, path, name)]
139 self._cookies[(domain, path)].pop(name, None)
141 self._next_expiration = (
142 min(*self._expirations.values(), self.SUB_MAX_TIME) + 1
143 if self._expirations
144 else self.MAX_TIME
145 )
147 def clear_domain(self, domain: str) -> None:
148 self.clear(lambda x: self._is_domain_match(domain, x["domain"]))
150 def __iter__(self) -> "Iterator[Morsel[str]]":
151 self._do_expiration()
152 for val in self._cookies.values():
153 yield from val.values()
155 def __len__(self) -> int:
156 return sum(1 for i in self)
158 def _do_expiration(self) -> None:
159 self.clear(lambda x: False)
161 def _expire_cookie(self, when: float, domain: str, path: str, name: str) -> None:
162 self._next_expiration = min(self._next_expiration, when)
163 self._expirations[(domain, path, name)] = when
165 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:
166 """Update cookies."""
167 hostname = response_url.raw_host
169 if not self._unsafe and is_ip_address(hostname):
170 # Don't accept cookies from IPs
171 return
173 if isinstance(cookies, Mapping):
174 cookies = cookies.items()
176 for name, cookie in cookies:
177 if not isinstance(cookie, Morsel):
178 tmp = SimpleCookie()
179 tmp[name] = cookie # type: ignore[assignment]
180 cookie = tmp[name]
182 domain = cookie["domain"]
184 # ignore domains with trailing dots
185 if domain.endswith("."):
186 domain = ""
187 del cookie["domain"]
189 if not domain and hostname is not None:
190 # Set the cookie's domain to the response hostname
191 # and set its host-only-flag
192 self._host_only_cookies.add((hostname, name))
193 domain = cookie["domain"] = hostname
195 if domain.startswith("."):
196 # Remove leading dot
197 domain = domain[1:]
198 cookie["domain"] = domain
200 if hostname and not self._is_domain_match(domain, hostname):
201 # Setting cookies for different domains is not allowed
202 continue
204 path = cookie["path"]
205 if not path or not path.startswith("/"):
206 # Set the cookie's path to the response path
207 path = response_url.path
208 if not path.startswith("/"):
209 path = "/"
210 else:
211 # Cut everything from the last slash to the end
212 path = "/" + path[1 : path.rfind("/")]
213 cookie["path"] = path
215 max_age = cookie["max-age"]
216 if max_age:
217 try:
218 delta_seconds = int(max_age)
219 max_age_expiration = min(time.time() + delta_seconds, self.MAX_TIME)
220 self._expire_cookie(max_age_expiration, domain, path, name)
221 except ValueError:
222 cookie["max-age"] = ""
224 else:
225 expires = cookie["expires"]
226 if expires:
227 expire_time = self._parse_date(expires)
228 if expire_time:
229 self._expire_cookie(expire_time, domain, path, name)
230 else:
231 cookie["expires"] = ""
233 self._cookies[(domain, path)][name] = cookie
235 self._do_expiration()
237 def filter_cookies(self, request_url: URL = URL()) -> "BaseCookie[str]":
238 """Returns this jar's cookies filtered by their attributes."""
239 filtered: Union[SimpleCookie, "BaseCookie[str]"] = (
240 SimpleCookie() if self._quote_cookie else BaseCookie()
241 )
242 if not self._cookies:
243 # Skip do_expiration() if there are no cookies.
244 return filtered
245 self._do_expiration()
246 if not self._cookies:
247 # Skip rest of function if no non-expired cookies.
248 return filtered
249 request_url = URL(request_url)
250 hostname = request_url.raw_host or ""
252 is_not_secure = request_url.scheme not in ("https", "wss")
253 if is_not_secure and self._treat_as_secure_origin:
254 request_origin = URL()
255 with contextlib.suppress(ValueError):
256 request_origin = request_url.origin()
257 is_not_secure = request_origin not in self._treat_as_secure_origin
259 # Point 2: https://www.rfc-editor.org/rfc/rfc6265.html#section-5.4
260 for cookie in sorted(self, key=lambda c: len(c["path"])):
261 name = cookie.key
262 domain = cookie["domain"]
264 # Send shared cookies
265 if not domain:
266 filtered[name] = cookie.value
267 continue
269 if not self._unsafe and is_ip_address(hostname):
270 continue
272 if (domain, name) in self._host_only_cookies:
273 if domain != hostname:
274 continue
275 elif not self._is_domain_match(domain, hostname):
276 continue
278 if not self._is_path_match(request_url.path, cookie["path"]):
279 continue
281 if is_not_secure and cookie["secure"]:
282 continue
284 # It's critical we use the Morsel so the coded_value
285 # (based on cookie version) is preserved
286 mrsl_val = cast("Morsel[str]", cookie.get(cookie.key, Morsel()))
287 mrsl_val.set(cookie.key, cookie.value, cookie.coded_value)
288 filtered[name] = mrsl_val
290 return filtered
292 @staticmethod
293 def _is_domain_match(domain: str, hostname: str) -> bool:
294 """Implements domain matching adhering to RFC 6265."""
295 if hostname == domain:
296 return True
298 if not hostname.endswith(domain):
299 return False
301 non_matching = hostname[: -len(domain)]
303 if not non_matching.endswith("."):
304 return False
306 return not is_ip_address(hostname)
308 @staticmethod
309 def _is_path_match(req_path: str, cookie_path: str) -> bool:
310 """Implements path matching adhering to RFC 6265."""
311 if not req_path.startswith("/"):
312 req_path = "/"
314 if req_path == cookie_path:
315 return True
317 if not req_path.startswith(cookie_path):
318 return False
320 if cookie_path.endswith("/"):
321 return True
323 non_matching = req_path[len(cookie_path) :]
325 return non_matching.startswith("/")
327 @classmethod
328 def _parse_date(cls, date_str: str) -> Optional[int]:
329 """Implements date string parsing adhering to RFC 6265."""
330 if not date_str:
331 return None
333 found_time = False
334 found_day = False
335 found_month = False
336 found_year = False
338 hour = minute = second = 0
339 day = 0
340 month = 0
341 year = 0
343 for token_match in cls.DATE_TOKENS_RE.finditer(date_str):
345 token = token_match.group("token")
347 if not found_time:
348 time_match = cls.DATE_HMS_TIME_RE.match(token)
349 if time_match:
350 found_time = True
351 hour, minute, second = (int(s) for s in time_match.groups())
352 continue
354 if not found_day:
355 day_match = cls.DATE_DAY_OF_MONTH_RE.match(token)
356 if day_match:
357 found_day = True
358 day = int(day_match.group())
359 continue
361 if not found_month:
362 month_match = cls.DATE_MONTH_RE.match(token)
363 if month_match:
364 found_month = True
365 assert month_match.lastindex is not None
366 month = month_match.lastindex
367 continue
369 if not found_year:
370 year_match = cls.DATE_YEAR_RE.match(token)
371 if year_match:
372 found_year = True
373 year = int(year_match.group())
375 if 70 <= year <= 99:
376 year += 1900
377 elif 0 <= year <= 69:
378 year += 2000
380 if False in (found_day, found_month, found_year, found_time):
381 return None
383 if not 1 <= day <= 31:
384 return None
386 if year < 1601 or hour > 23 or minute > 59 or second > 59:
387 return None
389 return calendar.timegm((year, month, day, hour, minute, second, -1, -1, -1))
392class DummyCookieJar(AbstractCookieJar):
393 """Implements a dummy cookie storage.
395 It can be used with the ClientSession when no cookie processing is needed.
397 """
399 def __init__(self, *, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
400 super().__init__(loop=loop)
402 def __iter__(self) -> "Iterator[Morsel[str]]":
403 while False:
404 yield None
406 def __len__(self) -> int:
407 return 0
409 def clear(self, predicate: Optional[ClearCookiePredicate] = None) -> None:
410 pass
412 def clear_domain(self, domain: str) -> None:
413 pass
415 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:
416 pass
418 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]":
419 return SimpleCookie()