Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/cookiejar.py: 39%

256 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:56 +0000

1import asyncio 

2import contextlib 

3import datetime 

4import os # noqa 

5import pathlib 

6import pickle 

7import re 

8from collections import defaultdict 

9from http.cookies import BaseCookie, Morsel, SimpleCookie 

10from typing import ( # noqa 

11 DefaultDict, 

12 Dict, 

13 Iterable, 

14 Iterator, 

15 List, 

16 Mapping, 

17 Optional, 

18 Set, 

19 Tuple, 

20 Union, 

21 cast, 

22) 

23 

24from yarl import URL 

25 

26from .abc import AbstractCookieJar, ClearCookiePredicate 

27from .helpers import is_ip_address, next_whole_second 

28from .typedefs import LooseCookies, PathLike, StrOrURL 

29 

30__all__ = ("CookieJar", "DummyCookieJar") 

31 

32 

33CookieItem = Union[str, "Morsel[str]"] 

34 

35 

36class CookieJar(AbstractCookieJar): 

37 """Implements cookie storage adhering to RFC 6265.""" 

38 

39 DATE_TOKENS_RE = re.compile( 

40 r"[\x09\x20-\x2F\x3B-\x40\x5B-\x60\x7B-\x7E]*" 

41 r"(?P<token>[\x00-\x08\x0A-\x1F\d:a-zA-Z\x7F-\xFF]+)" 

42 ) 

43 

44 DATE_HMS_TIME_RE = re.compile(r"(\d{1,2}):(\d{1,2}):(\d{1,2})") 

45 

46 DATE_DAY_OF_MONTH_RE = re.compile(r"(\d{1,2})") 

47 

48 DATE_MONTH_RE = re.compile( 

49 "(jan)|(feb)|(mar)|(apr)|(may)|(jun)|(jul)|" "(aug)|(sep)|(oct)|(nov)|(dec)", 

50 re.I, 

51 ) 

52 

53 DATE_YEAR_RE = re.compile(r"(\d{2,4})") 

54 

55 MAX_TIME = datetime.datetime.max.replace(tzinfo=datetime.timezone.utc) 

56 

57 MAX_32BIT_TIME = datetime.datetime.utcfromtimestamp(2**31 - 1) 

58 

59 def __init__( 

60 self, 

61 *, 

62 unsafe: bool = False, 

63 quote_cookie: bool = True, 

64 treat_as_secure_origin: Union[StrOrURL, List[StrOrURL], None] = None, 

65 loop: Optional[asyncio.AbstractEventLoop] = None, 

66 ) -> None: 

67 super().__init__(loop=loop) 

68 self._cookies: DefaultDict[Tuple[str, str], SimpleCookie[str]] = defaultdict( 

69 SimpleCookie 

70 ) 

71 self._host_only_cookies: Set[Tuple[str, str]] = set() 

72 self._unsafe = unsafe 

73 self._quote_cookie = quote_cookie 

74 if treat_as_secure_origin is None: 

75 treat_as_secure_origin = [] 

76 elif isinstance(treat_as_secure_origin, URL): 

77 treat_as_secure_origin = [treat_as_secure_origin.origin()] 

78 elif isinstance(treat_as_secure_origin, str): 

79 treat_as_secure_origin = [URL(treat_as_secure_origin).origin()] 

80 else: 

81 treat_as_secure_origin = [ 

82 URL(url).origin() if isinstance(url, str) else url.origin() 

83 for url in treat_as_secure_origin 

84 ] 

85 self._treat_as_secure_origin = treat_as_secure_origin 

86 self._next_expiration = next_whole_second() 

87 self._expirations: Dict[Tuple[str, str, str], datetime.datetime] = {} 

88 # #4515: datetime.max may not be representable on 32-bit platforms 

89 self._max_time = self.MAX_TIME 

90 try: 

91 self._max_time.timestamp() 

92 except OverflowError: 

93 self._max_time = self.MAX_32BIT_TIME 

94 

95 def save(self, file_path: PathLike) -> None: 

96 file_path = pathlib.Path(file_path) 

97 with file_path.open(mode="wb") as f: 

98 pickle.dump(self._cookies, f, pickle.HIGHEST_PROTOCOL) 

99 

100 def load(self, file_path: PathLike) -> None: 

101 file_path = pathlib.Path(file_path) 

102 with file_path.open(mode="rb") as f: 

103 self._cookies = pickle.load(f) 

104 

105 def clear(self, predicate: Optional[ClearCookiePredicate] = None) -> None: 

106 if predicate is None: 

107 self._next_expiration = next_whole_second() 

108 self._cookies.clear() 

109 self._host_only_cookies.clear() 

110 self._expirations.clear() 

111 return 

112 

113 to_del = [] 

114 now = datetime.datetime.now(datetime.timezone.utc) 

115 for (domain, path), cookie in self._cookies.items(): 

116 for name, morsel in cookie.items(): 

117 key = (domain, path, name) 

118 if ( 

119 key in self._expirations and self._expirations[key] <= now 

120 ) or predicate(morsel): 

121 to_del.append(key) 

122 

123 for domain, path, name in to_del: 

124 self._host_only_cookies.discard((domain, name)) 

125 key = (domain, path, name) 

126 if key in self._expirations: 

127 del self._expirations[(domain, path, name)] 

128 self._cookies[(domain, path)].pop(name, None) 

129 

130 next_expiration = min(self._expirations.values(), default=self._max_time) 

131 try: 

132 self._next_expiration = next_expiration.replace( 

133 microsecond=0 

134 ) + datetime.timedelta(seconds=1) 

135 except OverflowError: 

136 self._next_expiration = self._max_time 

137 

138 def clear_domain(self, domain: str) -> None: 

139 self.clear(lambda x: self._is_domain_match(domain, x["domain"])) 

140 

141 def __iter__(self) -> "Iterator[Morsel[str]]": 

142 self._do_expiration() 

143 for val in self._cookies.values(): 

144 yield from val.values() 

145 

146 def __len__(self) -> int: 

147 return sum(1 for i in self) 

148 

149 def _do_expiration(self) -> None: 

150 self.clear(lambda x: False) 

151 

152 def _expire_cookie( 

153 self, when: datetime.datetime, domain: str, path: str, name: str 

154 ) -> None: 

155 self._next_expiration = min(self._next_expiration, when) 

156 self._expirations[(domain, path, name)] = when 

157 

158 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None: 

159 """Update cookies.""" 

160 hostname = response_url.raw_host 

161 

162 if not self._unsafe and is_ip_address(hostname): 

163 # Don't accept cookies from IPs 

164 return 

165 

166 if isinstance(cookies, Mapping): 

167 cookies = cookies.items() 

168 

169 for name, cookie in cookies: 

170 if not isinstance(cookie, Morsel): 

171 tmp: SimpleCookie[str] = SimpleCookie() 

172 tmp[name] = cookie # type: ignore[assignment] 

173 cookie = tmp[name] 

174 

175 domain = cookie["domain"] 

176 

177 # ignore domains with trailing dots 

178 if domain.endswith("."): 

179 domain = "" 

180 del cookie["domain"] 

181 

182 if not domain and hostname is not None: 

183 # Set the cookie's domain to the response hostname 

184 # and set its host-only-flag 

185 self._host_only_cookies.add((hostname, name)) 

186 domain = cookie["domain"] = hostname 

187 

188 if domain.startswith("."): 

189 # Remove leading dot 

190 domain = domain[1:] 

191 cookie["domain"] = domain 

192 

193 if hostname and not self._is_domain_match(domain, hostname): 

194 # Setting cookies for different domains is not allowed 

195 continue 

196 

197 path = cookie["path"] 

198 if not path or not path.startswith("/"): 

199 # Set the cookie's path to the response path 

200 path = response_url.path 

201 if not path.startswith("/"): 

202 path = "/" 

203 else: 

204 # Cut everything from the last slash to the end 

205 path = "/" + path[1 : path.rfind("/")] 

206 cookie["path"] = path 

207 

208 max_age = cookie["max-age"] 

209 if max_age: 

210 try: 

211 delta_seconds = int(max_age) 

212 try: 

213 max_age_expiration = datetime.datetime.now( 

214 datetime.timezone.utc 

215 ) + datetime.timedelta(seconds=delta_seconds) 

216 except OverflowError: 

217 max_age_expiration = self._max_time 

218 self._expire_cookie(max_age_expiration, domain, path, name) 

219 except ValueError: 

220 cookie["max-age"] = "" 

221 

222 else: 

223 expires = cookie["expires"] 

224 if expires: 

225 expire_time = self._parse_date(expires) 

226 if expire_time: 

227 self._expire_cookie(expire_time, domain, path, name) 

228 else: 

229 cookie["expires"] = "" 

230 

231 self._cookies[(domain, path)][name] = cookie 

232 

233 self._do_expiration() 

234 

235 def filter_cookies( 

236 self, request_url: URL = URL() 

237 ) -> Union["BaseCookie[str]", "SimpleCookie[str]"]: 

238 """Returns this jar's cookies filtered by their attributes.""" 

239 self._do_expiration() 

240 request_url = URL(request_url) 

241 filtered: Union["SimpleCookie[str]", "BaseCookie[str]"] = ( 

242 SimpleCookie() if self._quote_cookie else BaseCookie() 

243 ) 

244 hostname = request_url.raw_host or "" 

245 request_origin = URL() 

246 with contextlib.suppress(ValueError): 

247 request_origin = request_url.origin() 

248 

249 is_not_secure = ( 

250 request_url.scheme not in ("https", "wss") 

251 and request_origin not in self._treat_as_secure_origin 

252 ) 

253 

254 for cookie in self: 

255 name = cookie.key 

256 domain = cookie["domain"] 

257 

258 # Send shared cookies 

259 if not domain: 

260 filtered[name] = cookie.value 

261 continue 

262 

263 if not self._unsafe and is_ip_address(hostname): 

264 continue 

265 

266 if (domain, name) in self._host_only_cookies: 

267 if domain != hostname: 

268 continue 

269 elif not self._is_domain_match(domain, hostname): 

270 continue 

271 

272 if not self._is_path_match(request_url.path, cookie["path"]): 

273 continue 

274 

275 if is_not_secure and cookie["secure"]: 

276 continue 

277 

278 # It's critical we use the Morsel so the coded_value 

279 # (based on cookie version) is preserved 

280 mrsl_val = cast("Morsel[str]", cookie.get(cookie.key, Morsel())) 

281 mrsl_val.set(cookie.key, cookie.value, cookie.coded_value) 

282 filtered[name] = mrsl_val 

283 

284 return filtered 

285 

286 @staticmethod 

287 def _is_domain_match(domain: str, hostname: str) -> bool: 

288 """Implements domain matching adhering to RFC 6265.""" 

289 if hostname == domain: 

290 return True 

291 

292 if not hostname.endswith(domain): 

293 return False 

294 

295 non_matching = hostname[: -len(domain)] 

296 

297 if not non_matching.endswith("."): 

298 return False 

299 

300 return not is_ip_address(hostname) 

301 

302 @staticmethod 

303 def _is_path_match(req_path: str, cookie_path: str) -> bool: 

304 """Implements path matching adhering to RFC 6265.""" 

305 if not req_path.startswith("/"): 

306 req_path = "/" 

307 

308 if req_path == cookie_path: 

309 return True 

310 

311 if not req_path.startswith(cookie_path): 

312 return False 

313 

314 if cookie_path.endswith("/"): 

315 return True 

316 

317 non_matching = req_path[len(cookie_path) :] 

318 

319 return non_matching.startswith("/") 

320 

321 @classmethod 

322 def _parse_date(cls, date_str: str) -> Optional[datetime.datetime]: 

323 """Implements date string parsing adhering to RFC 6265.""" 

324 if not date_str: 

325 return None 

326 

327 found_time = False 

328 found_day = False 

329 found_month = False 

330 found_year = False 

331 

332 hour = minute = second = 0 

333 day = 0 

334 month = 0 

335 year = 0 

336 

337 for token_match in cls.DATE_TOKENS_RE.finditer(date_str): 

338 

339 token = token_match.group("token") 

340 

341 if not found_time: 

342 time_match = cls.DATE_HMS_TIME_RE.match(token) 

343 if time_match: 

344 found_time = True 

345 hour, minute, second = (int(s) for s in time_match.groups()) 

346 continue 

347 

348 if not found_day: 

349 day_match = cls.DATE_DAY_OF_MONTH_RE.match(token) 

350 if day_match: 

351 found_day = True 

352 day = int(day_match.group()) 

353 continue 

354 

355 if not found_month: 

356 month_match = cls.DATE_MONTH_RE.match(token) 

357 if month_match: 

358 found_month = True 

359 assert month_match.lastindex is not None 

360 month = month_match.lastindex 

361 continue 

362 

363 if not found_year: 

364 year_match = cls.DATE_YEAR_RE.match(token) 

365 if year_match: 

366 found_year = True 

367 year = int(year_match.group()) 

368 

369 if 70 <= year <= 99: 

370 year += 1900 

371 elif 0 <= year <= 69: 

372 year += 2000 

373 

374 if False in (found_day, found_month, found_year, found_time): 

375 return None 

376 

377 if not 1 <= day <= 31: 

378 return None 

379 

380 if year < 1601 or hour > 23 or minute > 59 or second > 59: 

381 return None 

382 

383 return datetime.datetime( 

384 year, month, day, hour, minute, second, tzinfo=datetime.timezone.utc 

385 ) 

386 

387 

388class DummyCookieJar(AbstractCookieJar): 

389 """Implements a dummy cookie storage. 

390 

391 It can be used with the ClientSession when no cookie processing is needed. 

392 

393 """ 

394 

395 def __init__(self, *, loop: Optional[asyncio.AbstractEventLoop] = None) -> None: 

396 super().__init__(loop=loop) 

397 

398 def __iter__(self) -> "Iterator[Morsel[str]]": 

399 while False: 

400 yield None 

401 

402 def __len__(self) -> int: 

403 return 0 

404 

405 def clear(self, predicate: Optional[ClearCookiePredicate] = None) -> None: 

406 pass 

407 

408 def clear_domain(self, domain: str) -> None: 

409 pass 

410 

411 def update_cookies(self, cookies: LooseCookies, response_url: URL = URL()) -> None: 

412 pass 

413 

414 def filter_cookies(self, request_url: URL) -> "BaseCookie[str]": 

415 return SimpleCookie()