Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/cookiejar.py: 39%
256 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
1import asyncio
2import contextlib
3import datetime
4import os # noqa
5import pathlib
6import pickle
7import re
8from collections import defaultdict
9from http.cookies import BaseCookie, Morsel, SimpleCookie
10from typing import ( # noqa
11 DefaultDict,
12 Dict,
13 Iterable,
14 Iterator,
15 List,
16 Mapping,
17 Optional,
18 Set,
19 Tuple,
20 Union,
21 cast,
22)
24from yarl import URL
26from .abc import AbstractCookieJar, ClearCookiePredicate
27from .helpers import is_ip_address, next_whole_second
28from .typedefs import LooseCookies, PathLike, StrOrURL
30__all__ = ("CookieJar", "DummyCookieJar")
33CookieItem = Union[str, "Morsel[str]"]
36class CookieJar(AbstractCookieJar):
37 """Implements cookie storage adhering to RFC 6265."""
39 DATE_TOKENS_RE = re.compile(
40 r"[\x09\x20-\x2F\x3B-\x40\x5B-\x60\x7B-\x7E]*"
41 r"(?P<token>[\x00-\x08\x0A-\x1F\d:a-zA-Z\x7F-\xFF]+)"
42 )
44 DATE_HMS_TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})")
46 DATE_DAY_OF_MONTH_RE = re.compile(r"(\d{1,2})")
48 DATE_MONTH_RE = re.compile(
49 "(jan)|(feb)|(mar)|(apr)|(may)|(jun)|(jul)|" "(aug)|(sep)|(oct)|(nov)|(dec)",
50 re.I,
51 )
53 DATE_YEAR_RE = re.compile(r"(\d{2,4})")
55 MAX_TIME = datetime.datetime.max.replace(tzinfo=datetime.timezone.utc)
57 MAX_32BIT_TIME = datetime.datetime.utcfromtimestamp(2**31 - 1)
59 def __init__(
60 self,
61 *,
62 unsafe: bool = False,
63 quote_cookie: bool = True,
64 treat_as_secure_origin: Union[StrOrURL, List[StrOrURL], None] = None,
65 loop: Optional[asyncio.AbstractEventLoop] = None,
66 ) -> None:
67 super().__init__(loop=loop)
68 self._cookies: DefaultDict[Tuple[str, str], SimpleCookie[str]] = defaultdict(
69 SimpleCookie
70 )
71 self._host_only_cookies: Set[Tuple[str, str]] = set()
72 self._unsafe = unsafe
73 self._quote_cookie = quote_cookie
74 if treat_as_secure_origin is None:
75 treat_as_secure_origin = []
76 elif isinstance(treat_as_secure_origin, URL):
77 treat_as_secure_origin = [treat_as_secure_origin.origin()]
78 elif isinstance(treat_as_secure_origin, str):
79 treat_as_secure_origin = [URL(treat_as_secure_origin).origin()]
80 else:
81 treat_as_secure_origin = [
82 URL(url).origin() if isinstance(url, str) else url.origin()
83 for url in treat_as_secure_origin
84 ]
85 self._treat_as_secure_origin = treat_as_secure_origin
86 self._next_expiration = next_whole_second()
87 self._expirations: Dict[Tuple[str, str, str], datetime.datetime] = {}
88 # #4515: datetime.max may not be representable on 32-bit platforms
89 self._max_time = self.MAX_TIME
90 try:
91 self._max_time.timestamp()
92 except OverflowError:
93 self._max_time = self.MAX_32BIT_TIME
95 def save(self, file_path: PathLike) -> None:
96 file_path = pathlib.Path(file_path)
97 with file_path.open(mode="wb") as f:
98 pickle.dump(self._cookies, f, pickle.HIGHEST_PROTOCOL)
100 def load(self, file_path: PathLike) -> None:
101 file_path = pathlib.Path(file_path)
102 with file_path.open(mode="rb") as f:
103 self._cookies = pickle.load(f)
105 def clear(self, predicate: Optional[ClearCookiePredicate] = None) -> None:
106 if predicate is None:
107 self._next_expiration = next_whole_second()
108 self._cookies.clear()
109 self._host_only_cookies.clear()
110 self._expirations.clear()
111 return
113 to_del = []
114 now = datetime.datetime.now(datetime.timezone.utc)
115 for (domain, path), cookie in self._cookies.items():
116 for name, morsel in cookie.items():
117 key = (domain, path, name)
118 if (
119 key in self._expirations and self._expirations[key] <= now
120 ) or predicate(morsel):
121 to_del.append(key)
123 for domain, path, name in to_del:
124 self._host_only_cookies.discard((domain, name))
125 key = (domain, path, name)
126 if key in self._expirations:
127 del self._expirations[(domain, path, name)]
128 self._cookies[(domain, path)].pop(name, None)
130 next_expiration = min(self._expirations.values(), default=self._max_time)
131 try:
132 self._next_expiration = next_expiration.replace(
133 microsecond=0
134 ) + datetime.timedelta(seconds=1)
135 except OverflowError:
136 self._next_expiration = self._max_time
138 def clear_domain(self, domain: str) -> None:
139 self.clear(lambda x: self._is_domain_match(domain, x["domain"]))
141 def __iter__(self) -> "Iterator[Morsel[str]]":
142 self._do_expiration()
143 for val in self._cookies.values():
144 yield from val.values()
146 def __len__(self) -> int:
147 return sum(1 for i in self)
149 def _do_expiration(self) -> None:
150 self.clear(lambda x: False)
152 def _expire_cookie(
153 self, when: datetime.datetime, domain: str, path: str, name: str
154 ) -> None:
155 self._next_expiration = min(self._next_expiration, when)
156 self._expirations[(domain, path, name)] = when
158 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:
159 """Update cookies."""
160 hostname = response_url.raw_host
162 if not self._unsafe and is_ip_address(hostname):
163 # Don't accept cookies from IPs
164 return
166 if isinstance(cookies, Mapping):
167 cookies = cookies.items()
169 for name, cookie in cookies:
170 if not isinstance(cookie, Morsel):
171 tmp: SimpleCookie[str] = SimpleCookie()
172 tmp[name] = cookie # type: ignore[assignment]
173 cookie = tmp[name]
175 domain = cookie["domain"]
177 # ignore domains with trailing dots
178 if domain.endswith("."):
179 domain = ""
180 del cookie["domain"]
182 if not domain and hostname is not None:
183 # Set the cookie's domain to the response hostname
184 # and set its host-only-flag
185 self._host_only_cookies.add((hostname, name))
186 domain = cookie["domain"] = hostname
188 if domain.startswith("."):
189 # Remove leading dot
190 domain = domain[1:]
191 cookie["domain"] = domain
193 if hostname and not self._is_domain_match(domain, hostname):
194 # Setting cookies for different domains is not allowed
195 continue
197 path = cookie["path"]
198 if not path or not path.startswith("/"):
199 # Set the cookie's path to the response path
200 path = response_url.path
201 if not path.startswith("/"):
202 path = "/"
203 else:
204 # Cut everything from the last slash to the end
205 path = "/" + path[1 : path.rfind("/")]
206 cookie["path"] = path
208 max_age = cookie["max-age"]
209 if max_age:
210 try:
211 delta_seconds = int(max_age)
212 try:
213 max_age_expiration = datetime.datetime.now(
214 datetime.timezone.utc
215 ) + datetime.timedelta(seconds=delta_seconds)
216 except OverflowError:
217 max_age_expiration = self._max_time
218 self._expire_cookie(max_age_expiration, domain, path, name)
219 except ValueError:
220 cookie["max-age"] = ""
222 else:
223 expires = cookie["expires"]
224 if expires:
225 expire_time = self._parse_date(expires)
226 if expire_time:
227 self._expire_cookie(expire_time, domain, path, name)
228 else:
229 cookie["expires"] = ""
231 self._cookies[(domain, path)][name] = cookie
233 self._do_expiration()
235 def filter_cookies(
236 self, request_url: URL = URL()
237 ) -> Union["BaseCookie[str]", "SimpleCookie[str]"]:
238 """Returns this jar's cookies filtered by their attributes."""
239 self._do_expiration()
240 request_url = URL(request_url)
241 filtered: Union["SimpleCookie[str]", "BaseCookie[str]"] = (
242 SimpleCookie() if self._quote_cookie else BaseCookie()
243 )
244 hostname = request_url.raw_host or ""
245 request_origin = URL()
246 with contextlib.suppress(ValueError):
247 request_origin = request_url.origin()
249 is_not_secure = (
250 request_url.scheme not in ("https", "wss")
251 and request_origin not in self._treat_as_secure_origin
252 )
254 for cookie in self:
255 name = cookie.key
256 domain = cookie["domain"]
258 # Send shared cookies
259 if not domain:
260 filtered[name] = cookie.value
261 continue
263 if not self._unsafe and is_ip_address(hostname):
264 continue
266 if (domain, name) in self._host_only_cookies:
267 if domain != hostname:
268 continue
269 elif not self._is_domain_match(domain, hostname):
270 continue
272 if not self._is_path_match(request_url.path, cookie["path"]):
273 continue
275 if is_not_secure and cookie["secure"]:
276 continue
278 # It's critical we use the Morsel so the coded_value
279 # (based on cookie version) is preserved
280 mrsl_val = cast("Morsel[str]", cookie.get(cookie.key, Morsel()))
281 mrsl_val.set(cookie.key, cookie.value, cookie.coded_value)
282 filtered[name] = mrsl_val
284 return filtered
286 @staticmethod
287 def _is_domain_match(domain: str, hostname: str) -> bool:
288 """Implements domain matching adhering to RFC 6265."""
289 if hostname == domain:
290 return True
292 if not hostname.endswith(domain):
293 return False
295 non_matching = hostname[: -len(domain)]
297 if not non_matching.endswith("."):
298 return False
300 return not is_ip_address(hostname)
302 @staticmethod
303 def _is_path_match(req_path: str, cookie_path: str) -> bool:
304 """Implements path matching adhering to RFC 6265."""
305 if not req_path.startswith("/"):
306 req_path = "/"
308 if req_path == cookie_path:
309 return True
311 if not req_path.startswith(cookie_path):
312 return False
314 if cookie_path.endswith("/"):
315 return True
317 non_matching = req_path[len(cookie_path) :]
319 return non_matching.startswith("/")
321 @classmethod
322 def _parse_date(cls, date_str: str) -> Optional[datetime.datetime]:
323 """Implements date string parsing adhering to RFC 6265."""
324 if not date_str:
325 return None
327 found_time = False
328 found_day = False
329 found_month = False
330 found_year = False
332 hour = minute = second = 0
333 day = 0
334 month = 0
335 year = 0
337 for token_match in cls.DATE_TOKENS_RE.finditer(date_str):
339 token = token_match.group("token")
341 if not found_time:
342 time_match = cls.DATE_HMS_TIME_RE.match(token)
343 if time_match:
344 found_time = True
345 hour, minute, second = (int(s) for s in time_match.groups())
346 continue
348 if not found_day:
349 day_match = cls.DATE_DAY_OF_MONTH_RE.match(token)
350 if day_match:
351 found_day = True
352 day = int(day_match.group())
353 continue
355 if not found_month:
356 month_match = cls.DATE_MONTH_RE.match(token)
357 if month_match:
358 found_month = True
359 assert month_match.lastindex is not None
360 month = month_match.lastindex
361 continue
363 if not found_year:
364 year_match = cls.DATE_YEAR_RE.match(token)
365 if year_match:
366 found_year = True
367 year = int(year_match.group())
369 if 70 <= year <= 99:
370 year += 1900
371 elif 0 <= year <= 69:
372 year += 2000
374 if False in (found_day, found_month, found_year, found_time):
375 return None
377 if not 1 <= day <= 31:
378 return None
380 if year < 1601 or hour > 23 or minute > 59 or second > 59:
381 return None
383 return datetime.datetime(
384 year, month, day, hour, minute, second, tzinfo=datetime.timezone.utc
385 )
388class DummyCookieJar(AbstractCookieJar):
389 """Implements a dummy cookie storage.
391 It can be used with the ClientSession when no cookie processing is needed.
393 """
395 def __init__(self, *, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
396 super().__init__(loop=loop)
398 def __iter__(self) -> "Iterator[Morsel[str]]":
399 while False:
400 yield None
402 def __len__(self) -> int:
403 return 0
405 def clear(self, predicate: Optional[ClearCookiePredicate] = None) -> None:
406 pass
408 def clear_domain(self, domain: str) -> None:
409 pass
411 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None:
412 pass
414 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]":
415 return SimpleCookie()