Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_vendor/cachecontrol/controller.py: 12%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

242 statements  

1# SPDX-FileCopyrightText: 2015 Eric Larson 

2# 

3# SPDX-License-Identifier: Apache-2.0 

4 

5""" 

6The httplib2 algorithms ported for use with requests. 

7""" 

8 

9from __future__ import annotations 

10 

11import calendar 

12import logging 

13import re 

14import time 

15import weakref 

16from email.utils import parsedate_tz 

17from typing import TYPE_CHECKING, Collection, Mapping 

18 

19from pip._vendor.requests.structures import CaseInsensitiveDict 

20 

21from pip._vendor.cachecontrol.cache import DictCache, SeparateBodyBaseCache 

22from pip._vendor.cachecontrol.serialize import Serializer 

23 

24if TYPE_CHECKING: 

25 from typing import Literal 

26 

27 from pip._vendor.requests import PreparedRequest 

28 from pip._vendor.urllib3 import HTTPResponse 

29 

30 from pip._vendor.cachecontrol.cache import BaseCache 

31 

32logger = logging.getLogger(__name__) 

33 

34URI = re.compile(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?") 

35 

36PERMANENT_REDIRECT_STATUSES = (301, 308) 

37 

38 

39def parse_uri(uri: str) -> tuple[str, str, str, str, str]: 

40 """Parses a URI using the regex given in Appendix B of RFC 3986. 

41 

42 (scheme, authority, path, query, fragment) = parse_uri(uri) 

43 """ 

44 match = URI.match(uri) 

45 assert match is not None 

46 groups = match.groups() 

47 return (groups[1], groups[3], groups[4], groups[6], groups[8]) 

48 

49 

50class CacheController: 

51 """An interface to see if request should cached or not.""" 

52 

53 def __init__( 

54 self, 

55 cache: BaseCache | None = None, 

56 cache_etags: bool = True, 

57 serializer: Serializer | None = None, 

58 status_codes: Collection[int] | None = None, 

59 ): 

60 self.cache = DictCache() if cache is None else cache 

61 self.cache_etags = cache_etags 

62 self.serializer = serializer or Serializer() 

63 self.cacheable_status_codes = status_codes or (200, 203, 300, 301, 308) 

64 

65 @classmethod 

66 def _urlnorm(cls, uri: str) -> str: 

67 """Normalize the URL to create a safe key for the cache""" 

68 (scheme, authority, path, query, fragment) = parse_uri(uri) 

69 if not scheme or not authority: 

70 raise Exception("Only absolute URIs are allowed. uri = %s" % uri) 

71 

72 scheme = scheme.lower() 

73 authority = authority.lower() 

74 

75 if not path: 

76 path = "/" 

77 

78 # Could do syntax based normalization of the URI before 

79 # computing the digest. See Section 6.2.2 of Std 66. 

80 request_uri = query and "?".join([path, query]) or path 

81 defrag_uri = scheme + "://" + authority + request_uri 

82 

83 return defrag_uri 

84 

85 @classmethod 

86 def cache_url(cls, uri: str) -> str: 

87 return cls._urlnorm(uri) 

88 

89 def parse_cache_control(self, headers: Mapping[str, str]) -> dict[str, int | None]: 

90 known_directives = { 

91 # https://tools.ietf.org/html/rfc7234#section-5.2 

92 "max-age": (int, True), 

93 "max-stale": (int, False), 

94 "min-fresh": (int, True), 

95 "no-cache": (None, False), 

96 "no-store": (None, False), 

97 "no-transform": (None, False), 

98 "only-if-cached": (None, False), 

99 "must-revalidate": (None, False), 

100 "public": (None, False), 

101 "private": (None, False), 

102 "proxy-revalidate": (None, False), 

103 "s-maxage": (int, True), 

104 } 

105 

106 cc_headers = headers.get("cache-control", headers.get("Cache-Control", "")) 

107 

108 retval: dict[str, int | None] = {} 

109 

110 for cc_directive in cc_headers.split(","): 

111 if not cc_directive.strip(): 

112 continue 

113 

114 parts = cc_directive.split("=", 1) 

115 directive = parts[0].strip() 

116 

117 try: 

118 typ, required = known_directives[directive] 

119 except KeyError: 

120 logger.debug("Ignoring unknown cache-control directive: %s", directive) 

121 continue 

122 

123 if not typ or not required: 

124 retval[directive] = None 

125 if typ: 

126 try: 

127 retval[directive] = typ(parts[1].strip()) 

128 except IndexError: 

129 if required: 

130 logger.debug( 

131 "Missing value for cache-control " "directive: %s", 

132 directive, 

133 ) 

134 except ValueError: 

135 logger.debug( 

136 "Invalid value for cache-control directive " "%s, must be %s", 

137 directive, 

138 typ.__name__, 

139 ) 

140 

141 return retval 

142 

143 def _load_from_cache(self, request: PreparedRequest) -> HTTPResponse | None: 

144 """ 

145 Load a cached response, or return None if it's not available. 

146 """ 

147 # We do not support caching of partial content: so if the request contains a 

148 # Range header then we don't want to load anything from the cache. 

149 if "Range" in request.headers: 

150 return None 

151 

152 cache_url = request.url 

153 assert cache_url is not None 

154 cache_data = self.cache.get(cache_url) 

155 if cache_data is None: 

156 logger.debug("No cache entry available") 

157 return None 

158 

159 if isinstance(self.cache, SeparateBodyBaseCache): 

160 body_file = self.cache.get_body(cache_url) 

161 else: 

162 body_file = None 

163 

164 result = self.serializer.loads(request, cache_data, body_file) 

165 if result is None: 

166 logger.warning("Cache entry deserialization failed, entry ignored") 

167 return result 

168 

169 def cached_request(self, request: PreparedRequest) -> HTTPResponse | Literal[False]: 

170 """ 

171 Return a cached response if it exists in the cache, otherwise 

172 return False. 

173 """ 

174 assert request.url is not None 

175 cache_url = self.cache_url(request.url) 

176 logger.debug('Looking up "%s" in the cache', cache_url) 

177 cc = self.parse_cache_control(request.headers) 

178 

179 # Bail out if the request insists on fresh data 

180 if "no-cache" in cc: 

181 logger.debug('Request header has "no-cache", cache bypassed') 

182 return False 

183 

184 if "max-age" in cc and cc["max-age"] == 0: 

185 logger.debug('Request header has "max_age" as 0, cache bypassed') 

186 return False 

187 

188 # Check whether we can load the response from the cache: 

189 resp = self._load_from_cache(request) 

190 if not resp: 

191 return False 

192 

193 # If we have a cached permanent redirect, return it immediately. We 

194 # don't need to test our response for other headers b/c it is 

195 # intrinsically "cacheable" as it is Permanent. 

196 # 

197 # See: 

198 # https://tools.ietf.org/html/rfc7231#section-6.4.2 

199 # 

200 # Client can try to refresh the value by repeating the request 

201 # with cache busting headers as usual (ie no-cache). 

202 if int(resp.status) in PERMANENT_REDIRECT_STATUSES: 

203 msg = ( 

204 "Returning cached permanent redirect response " 

205 "(ignoring date and etag information)" 

206 ) 

207 logger.debug(msg) 

208 return resp 

209 

210 headers: CaseInsensitiveDict[str] = CaseInsensitiveDict(resp.headers) 

211 if not headers or "date" not in headers: 

212 if "etag" not in headers: 

213 # Without date or etag, the cached response can never be used 

214 # and should be deleted. 

215 logger.debug("Purging cached response: no date or etag") 

216 self.cache.delete(cache_url) 

217 logger.debug("Ignoring cached response: no date") 

218 return False 

219 

220 now = time.time() 

221 time_tuple = parsedate_tz(headers["date"]) 

222 assert time_tuple is not None 

223 date = calendar.timegm(time_tuple[:6]) 

224 current_age = max(0, now - date) 

225 logger.debug("Current age based on date: %i", current_age) 

226 

227 # TODO: There is an assumption that the result will be a 

228 # urllib3 response object. This may not be best since we 

229 # could probably avoid instantiating or constructing the 

230 # response until we know we need it. 

231 resp_cc = self.parse_cache_control(headers) 

232 

233 # determine freshness 

234 freshness_lifetime = 0 

235 

236 # Check the max-age pragma in the cache control header 

237 max_age = resp_cc.get("max-age") 

238 if max_age is not None: 

239 freshness_lifetime = max_age 

240 logger.debug("Freshness lifetime from max-age: %i", freshness_lifetime) 

241 

242 # If there isn't a max-age, check for an expires header 

243 elif "expires" in headers: 

244 expires = parsedate_tz(headers["expires"]) 

245 if expires is not None: 

246 expire_time = calendar.timegm(expires[:6]) - date 

247 freshness_lifetime = max(0, expire_time) 

248 logger.debug("Freshness lifetime from expires: %i", freshness_lifetime) 

249 

250 # Determine if we are setting freshness limit in the 

251 # request. Note, this overrides what was in the response. 

252 max_age = cc.get("max-age") 

253 if max_age is not None: 

254 freshness_lifetime = max_age 

255 logger.debug( 

256 "Freshness lifetime from request max-age: %i", freshness_lifetime 

257 ) 

258 

259 min_fresh = cc.get("min-fresh") 

260 if min_fresh is not None: 

261 # adjust our current age by our min fresh 

262 current_age += min_fresh 

263 logger.debug("Adjusted current age from min-fresh: %i", current_age) 

264 

265 # Return entry if it is fresh enough 

266 if freshness_lifetime > current_age: 

267 logger.debug('The response is "fresh", returning cached response') 

268 logger.debug("%i > %i", freshness_lifetime, current_age) 

269 return resp 

270 

271 # we're not fresh. If we don't have an Etag, clear it out 

272 if "etag" not in headers: 

273 logger.debug('The cached response is "stale" with no etag, purging') 

274 self.cache.delete(cache_url) 

275 

276 # return the original handler 

277 return False 

278 

279 def conditional_headers(self, request: PreparedRequest) -> dict[str, str]: 

280 resp = self._load_from_cache(request) 

281 new_headers = {} 

282 

283 if resp: 

284 headers: CaseInsensitiveDict[str] = CaseInsensitiveDict(resp.headers) 

285 

286 if "etag" in headers: 

287 new_headers["If-None-Match"] = headers["ETag"] 

288 

289 if "last-modified" in headers: 

290 new_headers["If-Modified-Since"] = headers["Last-Modified"] 

291 

292 return new_headers 

293 

294 def _cache_set( 

295 self, 

296 cache_url: str, 

297 request: PreparedRequest, 

298 response: HTTPResponse, 

299 body: bytes | None = None, 

300 expires_time: int | None = None, 

301 ) -> None: 

302 """ 

303 Store the data in the cache. 

304 """ 

305 if isinstance(self.cache, SeparateBodyBaseCache): 

306 # We pass in the body separately; just put a placeholder empty 

307 # string in the metadata. 

308 self.cache.set( 

309 cache_url, 

310 self.serializer.dumps(request, response, b""), 

311 expires=expires_time, 

312 ) 

313 # body is None can happen when, for example, we're only updating 

314 # headers, as is the case in update_cached_response(). 

315 if body is not None: 

316 self.cache.set_body(cache_url, body) 

317 else: 

318 self.cache.set( 

319 cache_url, 

320 self.serializer.dumps(request, response, body), 

321 expires=expires_time, 

322 ) 

323 

324 def cache_response( 

325 self, 

326 request: PreparedRequest, 

327 response_or_ref: HTTPResponse | weakref.ReferenceType[HTTPResponse], 

328 body: bytes | None = None, 

329 status_codes: Collection[int] | None = None, 

330 ) -> None: 

331 """ 

332 Algorithm for caching requests. 

333 

334 This assumes a requests Response object. 

335 """ 

336 if isinstance(response_or_ref, weakref.ReferenceType): 

337 response = response_or_ref() 

338 if response is None: 

339 # The weakref can be None only in case the user used streamed request 

340 # and did not consume or close it, and holds no reference to requests.Response. 

341 # In such case, we don't want to cache the response. 

342 return 

343 else: 

344 response = response_or_ref 

345 

346 # From httplib2: Don't cache 206's since we aren't going to 

347 # handle byte range requests 

348 cacheable_status_codes = status_codes or self.cacheable_status_codes 

349 if response.status not in cacheable_status_codes: 

350 logger.debug( 

351 "Status code %s not in %s", response.status, cacheable_status_codes 

352 ) 

353 return 

354 

355 response_headers: CaseInsensitiveDict[str] = CaseInsensitiveDict( 

356 response.headers 

357 ) 

358 

359 if "date" in response_headers: 

360 time_tuple = parsedate_tz(response_headers["date"]) 

361 assert time_tuple is not None 

362 date = calendar.timegm(time_tuple[:6]) 

363 else: 

364 date = 0 

365 

366 # If we've been given a body, our response has a Content-Length, that 

367 # Content-Length is valid then we can check to see if the body we've 

368 # been given matches the expected size, and if it doesn't we'll just 

369 # skip trying to cache it. 

370 if ( 

371 body is not None 

372 and "content-length" in response_headers 

373 and response_headers["content-length"].isdigit() 

374 and int(response_headers["content-length"]) != len(body) 

375 ): 

376 return 

377 

378 cc_req = self.parse_cache_control(request.headers) 

379 cc = self.parse_cache_control(response_headers) 

380 

381 assert request.url is not None 

382 cache_url = self.cache_url(request.url) 

383 logger.debug('Updating cache with response from "%s"', cache_url) 

384 

385 # Delete it from the cache if we happen to have it stored there 

386 no_store = False 

387 if "no-store" in cc: 

388 no_store = True 

389 logger.debug('Response header has "no-store"') 

390 if "no-store" in cc_req: 

391 no_store = True 

392 logger.debug('Request header has "no-store"') 

393 if no_store and self.cache.get(cache_url): 

394 logger.debug('Purging existing cache entry to honor "no-store"') 

395 self.cache.delete(cache_url) 

396 if no_store: 

397 return 

398 

399 # https://tools.ietf.org/html/rfc7234#section-4.1: 

400 # A Vary header field-value of "*" always fails to match. 

401 # Storing such a response leads to a deserialization warning 

402 # during cache lookup and is not allowed to ever be served, 

403 # so storing it can be avoided. 

404 if "*" in response_headers.get("vary", ""): 

405 logger.debug('Response header has "Vary: *"') 

406 return 

407 

408 # If we've been given an etag, then keep the response 

409 if self.cache_etags and "etag" in response_headers: 

410 expires_time = 0 

411 if response_headers.get("expires"): 

412 expires = parsedate_tz(response_headers["expires"]) 

413 if expires is not None: 

414 expires_time = calendar.timegm(expires[:6]) - date 

415 

416 expires_time = max(expires_time, 14 * 86400) 

417 

418 logger.debug(f"etag object cached for {expires_time} seconds") 

419 logger.debug("Caching due to etag") 

420 self._cache_set(cache_url, request, response, body, expires_time) 

421 

422 # Add to the cache any permanent redirects. We do this before looking 

423 # that the Date headers. 

424 elif int(response.status) in PERMANENT_REDIRECT_STATUSES: 

425 logger.debug("Caching permanent redirect") 

426 self._cache_set(cache_url, request, response, b"") 

427 

428 # Add to the cache if the response headers demand it. If there 

429 # is no date header then we can't do anything about expiring 

430 # the cache. 

431 elif "date" in response_headers: 

432 time_tuple = parsedate_tz(response_headers["date"]) 

433 assert time_tuple is not None 

434 date = calendar.timegm(time_tuple[:6]) 

435 # cache when there is a max-age > 0 

436 max_age = cc.get("max-age") 

437 if max_age is not None and max_age > 0: 

438 logger.debug("Caching b/c date exists and max-age > 0") 

439 expires_time = max_age 

440 self._cache_set( 

441 cache_url, 

442 request, 

443 response, 

444 body, 

445 expires_time, 

446 ) 

447 

448 # If the request can expire, it means we should cache it 

449 # in the meantime. 

450 elif "expires" in response_headers: 

451 if response_headers["expires"]: 

452 expires = parsedate_tz(response_headers["expires"]) 

453 if expires is not None: 

454 expires_time = calendar.timegm(expires[:6]) - date 

455 else: 

456 expires_time = None 

457 

458 logger.debug( 

459 "Caching b/c of expires header. expires in {} seconds".format( 

460 expires_time 

461 ) 

462 ) 

463 self._cache_set( 

464 cache_url, 

465 request, 

466 response, 

467 body, 

468 expires_time, 

469 ) 

470 

471 def update_cached_response( 

472 self, request: PreparedRequest, response: HTTPResponse 

473 ) -> HTTPResponse: 

474 """On a 304 we will get a new set of headers that we want to 

475 update our cached value with, assuming we have one. 

476 

477 This should only ever be called when we've sent an ETag and 

478 gotten a 304 as the response. 

479 """ 

480 assert request.url is not None 

481 cache_url = self.cache_url(request.url) 

482 cached_response = self._load_from_cache(request) 

483 

484 if not cached_response: 

485 # we didn't have a cached response 

486 return response 

487 

488 # Lets update our headers with the headers from the new request: 

489 # http://tools.ietf.org/html/draft-ietf-httpbis-p4-conditional-26#section-4.1 

490 # 

491 # The server isn't supposed to send headers that would make 

492 # the cached body invalid. But... just in case, we'll be sure 

493 # to strip out ones we know that might be problematic due to 

494 # typical assumptions. 

495 excluded_headers = ["content-length"] 

496 

497 cached_response.headers.update( 

498 { 

499 k: v 

500 for k, v in response.headers.items() 

501 if k.lower() not in excluded_headers 

502 } 

503 ) 

504 

505 # we want a 200 b/c we have content via the cache 

506 cached_response.status = 200 

507 

508 # update our cache 

509 self._cache_set(cache_url, request, cached_response) 

510 

511 return cached_response