Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scrapy/http/cookies.py: 9%

122 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-07 06:38 +0000

1import re 

2import time 

3from http.cookiejar import Cookie 

4from http.cookiejar import CookieJar as _CookieJar 

5from http.cookiejar import DefaultCookiePolicy 

6from typing import Sequence 

7 

8from scrapy import Request 

9from scrapy.http import Response 

10from scrapy.utils.httpobj import urlparse_cached 

11from scrapy.utils.python import to_unicode 

12 

13# Defined in the http.cookiejar module, but undocumented: 

14# https://github.com/python/cpython/blob/v3.9.0/Lib/http/cookiejar.py#L527 

15IPV4_RE = re.compile(r"\.\d+$", re.ASCII) 

16 

17 

18class CookieJar: 

19 def __init__(self, policy=None, check_expired_frequency=10000): 

20 self.policy = policy or DefaultCookiePolicy() 

21 self.jar = _CookieJar(self.policy) 

22 self.jar._cookies_lock = _DummyLock() 

23 self.check_expired_frequency = check_expired_frequency 

24 self.processed = 0 

25 

26 def extract_cookies(self, response, request): 

27 wreq = WrappedRequest(request) 

28 wrsp = WrappedResponse(response) 

29 return self.jar.extract_cookies(wrsp, wreq) 

30 

31 def add_cookie_header(self, request: Request) -> None: 

32 wreq = WrappedRequest(request) 

33 self.policy._now = self.jar._now = int(time.time()) 

34 

35 # the cookiejar implementation iterates through all domains 

36 # instead we restrict to potential matches on the domain 

37 req_host = urlparse_cached(request).hostname 

38 if not req_host: 

39 return 

40 

41 if not IPV4_RE.search(req_host): 

42 hosts = potential_domain_matches(req_host) 

43 if "." not in req_host: 

44 hosts += [req_host + ".local"] 

45 else: 

46 hosts = [req_host] 

47 

48 cookies = [] 

49 for host in hosts: 

50 if host in self.jar._cookies: 

51 cookies += self.jar._cookies_for_domain(host, wreq) 

52 

53 attrs = self.jar._cookie_attrs(cookies) 

54 if attrs: 

55 if not wreq.has_header("Cookie"): 

56 wreq.add_unredirected_header("Cookie", "; ".join(attrs)) 

57 

58 self.processed += 1 

59 if self.processed % self.check_expired_frequency == 0: 

60 # This is still quite inefficient for large number of cookies 

61 self.jar.clear_expired_cookies() 

62 

63 @property 

64 def _cookies(self): 

65 return self.jar._cookies 

66 

67 def clear_session_cookies(self, *args, **kwargs): 

68 return self.jar.clear_session_cookies(*args, **kwargs) 

69 

70 def clear(self, domain=None, path=None, name=None): 

71 return self.jar.clear(domain, path, name) 

72 

73 def __iter__(self): 

74 return iter(self.jar) 

75 

76 def __len__(self): 

77 return len(self.jar) 

78 

79 def set_policy(self, pol): 

80 return self.jar.set_policy(pol) 

81 

82 def make_cookies(self, response: Response, request: Request) -> Sequence[Cookie]: 

83 wreq = WrappedRequest(request) 

84 wrsp = WrappedResponse(response) 

85 return self.jar.make_cookies(wrsp, wreq) 

86 

87 def set_cookie(self, cookie): 

88 self.jar.set_cookie(cookie) 

89 

90 def set_cookie_if_ok(self, cookie: Cookie, request: Request) -> None: 

91 self.jar.set_cookie_if_ok(cookie, WrappedRequest(request)) 

92 

93 

94def potential_domain_matches(domain): 

95 """Potential domain matches for a cookie 

96 

97 >>> potential_domain_matches('www.example.com') 

98 ['www.example.com', 'example.com', '.www.example.com', '.example.com'] 

99 

100 """ 

101 matches = [domain] 

102 try: 

103 start = domain.index(".") + 1 

104 end = domain.rindex(".") 

105 while start < end: 

106 matches.append(domain[start:]) 

107 start = domain.index(".", start) + 1 

108 except ValueError: 

109 pass 

110 return matches + ["." + d for d in matches] 

111 

112 

113class _DummyLock: 

114 def acquire(self): 

115 pass 

116 

117 def release(self): 

118 pass 

119 

120 

121class WrappedRequest: 

122 """Wraps a scrapy Request class with methods defined by urllib2.Request class to interact with CookieJar class 

123 

124 see http://docs.python.org/library/urllib2.html#urllib2.Request 

125 """ 

126 

127 def __init__(self, request): 

128 self.request = request 

129 

130 def get_full_url(self): 

131 return self.request.url 

132 

133 def get_host(self): 

134 return urlparse_cached(self.request).netloc 

135 

136 def get_type(self): 

137 return urlparse_cached(self.request).scheme 

138 

139 def is_unverifiable(self): 

140 """Unverifiable should indicate whether the request is unverifiable, as defined by RFC 2965. 

141 

142 It defaults to False. An unverifiable request is one whose URL the user did not have the 

143 option to approve. For example, if the request is for an image in an 

144 HTML document, and the user had no option to approve the automatic 

145 fetching of the image, this should be true. 

146 """ 

147 return self.request.meta.get("is_unverifiable", False) 

148 

149 @property 

150 def full_url(self): 

151 return self.get_full_url() 

152 

153 @property 

154 def host(self): 

155 return self.get_host() 

156 

157 @property 

158 def type(self): 

159 return self.get_type() 

160 

161 @property 

162 def unverifiable(self): 

163 return self.is_unverifiable() 

164 

165 @property 

166 def origin_req_host(self): 

167 return urlparse_cached(self.request).hostname 

168 

169 def has_header(self, name): 

170 return name in self.request.headers 

171 

172 def get_header(self, name, default=None): 

173 return to_unicode(self.request.headers.get(name, default), errors="replace") 

174 

175 def header_items(self): 

176 return [ 

177 ( 

178 to_unicode(k, errors="replace"), 

179 [to_unicode(x, errors="replace") for x in v], 

180 ) 

181 for k, v in self.request.headers.items() 

182 ] 

183 

184 def add_unredirected_header(self, name, value): 

185 self.request.headers.appendlist(name, value) 

186 

187 

188class WrappedResponse: 

189 def __init__(self, response): 

190 self.response = response 

191 

192 def info(self): 

193 return self 

194 

195 def get_all(self, name, default=None): 

196 return [ 

197 to_unicode(v, errors="replace") for v in self.response.headers.getlist(name) 

198 ]