Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scrapy/http/cookies.py: 9%
122 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-07 06:38 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-07 06:38 +0000
1import re
2import time
3from http.cookiejar import Cookie
4from http.cookiejar import CookieJar as _CookieJar
5from http.cookiejar import DefaultCookiePolicy
6from typing import Sequence
8from scrapy import Request
9from scrapy.http import Response
10from scrapy.utils.httpobj import urlparse_cached
11from scrapy.utils.python import to_unicode
13# Defined in the http.cookiejar module, but undocumented:
14# https://github.com/python/cpython/blob/v3.9.0/Lib/http/cookiejar.py#L527
15IPV4_RE = re.compile(r"\.\d+$", re.ASCII)
18class CookieJar:
19 def __init__(self, policy=None, check_expired_frequency=10000):
20 self.policy = policy or DefaultCookiePolicy()
21 self.jar = _CookieJar(self.policy)
22 self.jar._cookies_lock = _DummyLock()
23 self.check_expired_frequency = check_expired_frequency
24 self.processed = 0
26 def extract_cookies(self, response, request):
27 wreq = WrappedRequest(request)
28 wrsp = WrappedResponse(response)
29 return self.jar.extract_cookies(wrsp, wreq)
31 def add_cookie_header(self, request: Request) -> None:
32 wreq = WrappedRequest(request)
33 self.policy._now = self.jar._now = int(time.time())
35 # the cookiejar implementation iterates through all domains
36 # instead we restrict to potential matches on the domain
37 req_host = urlparse_cached(request).hostname
38 if not req_host:
39 return
41 if not IPV4_RE.search(req_host):
42 hosts = potential_domain_matches(req_host)
43 if "." not in req_host:
44 hosts += [req_host + ".local"]
45 else:
46 hosts = [req_host]
48 cookies = []
49 for host in hosts:
50 if host in self.jar._cookies:
51 cookies += self.jar._cookies_for_domain(host, wreq)
53 attrs = self.jar._cookie_attrs(cookies)
54 if attrs:
55 if not wreq.has_header("Cookie"):
56 wreq.add_unredirected_header("Cookie", "; ".join(attrs))
58 self.processed += 1
59 if self.processed % self.check_expired_frequency == 0:
60 # This is still quite inefficient for large number of cookies
61 self.jar.clear_expired_cookies()
63 @property
64 def _cookies(self):
65 return self.jar._cookies
67 def clear_session_cookies(self, *args, **kwargs):
68 return self.jar.clear_session_cookies(*args, **kwargs)
70 def clear(self, domain=None, path=None, name=None):
71 return self.jar.clear(domain, path, name)
73 def __iter__(self):
74 return iter(self.jar)
76 def __len__(self):
77 return len(self.jar)
79 def set_policy(self, pol):
80 return self.jar.set_policy(pol)
82 def make_cookies(self, response: Response, request: Request) -> Sequence[Cookie]:
83 wreq = WrappedRequest(request)
84 wrsp = WrappedResponse(response)
85 return self.jar.make_cookies(wrsp, wreq)
87 def set_cookie(self, cookie):
88 self.jar.set_cookie(cookie)
90 def set_cookie_if_ok(self, cookie: Cookie, request: Request) -> None:
91 self.jar.set_cookie_if_ok(cookie, WrappedRequest(request))
94def potential_domain_matches(domain):
95 """Potential domain matches for a cookie
97 >>> potential_domain_matches('www.example.com')
98 ['www.example.com', 'example.com', '.www.example.com', '.example.com']
100 """
101 matches = [domain]
102 try:
103 start = domain.index(".") + 1
104 end = domain.rindex(".")
105 while start < end:
106 matches.append(domain[start:])
107 start = domain.index(".", start) + 1
108 except ValueError:
109 pass
110 return matches + ["." + d for d in matches]
113class _DummyLock:
114 def acquire(self):
115 pass
117 def release(self):
118 pass
121class WrappedRequest:
122 """Wraps a scrapy Request class with methods defined by urllib2.Request class to interact with CookieJar class
124 see http://docs.python.org/library/urllib2.html#urllib2.Request
125 """
127 def __init__(self, request):
128 self.request = request
130 def get_full_url(self):
131 return self.request.url
133 def get_host(self):
134 return urlparse_cached(self.request).netloc
136 def get_type(self):
137 return urlparse_cached(self.request).scheme
139 def is_unverifiable(self):
140 """Unverifiable should indicate whether the request is unverifiable, as defined by RFC 2965.
142 It defaults to False. An unverifiable request is one whose URL the user did not have the
143 option to approve. For example, if the request is for an image in an
144 HTML document, and the user had no option to approve the automatic
145 fetching of the image, this should be true.
146 """
147 return self.request.meta.get("is_unverifiable", False)
149 @property
150 def full_url(self):
151 return self.get_full_url()
153 @property
154 def host(self):
155 return self.get_host()
157 @property
158 def type(self):
159 return self.get_type()
161 @property
162 def unverifiable(self):
163 return self.is_unverifiable()
165 @property
166 def origin_req_host(self):
167 return urlparse_cached(self.request).hostname
169 def has_header(self, name):
170 return name in self.request.headers
172 def get_header(self, name, default=None):
173 return to_unicode(self.request.headers.get(name, default), errors="replace")
175 def header_items(self):
176 return [
177 (
178 to_unicode(k, errors="replace"),
179 [to_unicode(x, errors="replace") for x in v],
180 )
181 for k, v in self.request.headers.items()
182 ]
184 def add_unredirected_header(self, name, value):
185 self.request.headers.appendlist(name, value)
188class WrappedResponse:
189 def __init__(self, response):
190 self.response = response
192 def info(self):
193 return self
195 def get_all(self, name, default=None):
196 return [
197 to_unicode(v, errors="replace") for v in self.response.headers.getlist(name)
198 ]