Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_vendor/cachecontrol/controller.py: 12%
233 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
1# SPDX-FileCopyrightText: 2015 Eric Larson
2#
3# SPDX-License-Identifier: Apache-2.0
5"""
6The httplib2 algorithms ported for use with requests.
7"""
8from __future__ import annotations
10import calendar
11import logging
12import re
13import time
14from email.utils import parsedate_tz
15from typing import TYPE_CHECKING, Collection, Mapping
17from pip._vendor.requests.structures import CaseInsensitiveDict
19from pip._vendor.cachecontrol.cache import DictCache, SeparateBodyBaseCache
20from pip._vendor.cachecontrol.serialize import Serializer
22if TYPE_CHECKING:
23 from typing import Literal
25 from pip._vendor.requests import PreparedRequest
26 from pip._vendor.urllib3 import HTTPResponse
28 from pip._vendor.cachecontrol.cache import BaseCache
30logger = logging.getLogger(__name__)
32URI = re.compile(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?")
34PERMANENT_REDIRECT_STATUSES = (301, 308)
37def parse_uri(uri: str) -> tuple[str, str, str, str, str]:
38 """Parses a URI using the regex given in Appendix B of RFC 3986.
40 (scheme, authority, path, query, fragment) = parse_uri(uri)
41 """
42 match = URI.match(uri)
43 assert match is not None
44 groups = match.groups()
45 return (groups[1], groups[3], groups[4], groups[6], groups[8])
48class CacheController:
49 """An interface to see if request should cached or not."""
51 def __init__(
52 self,
53 cache: BaseCache | None = None,
54 cache_etags: bool = True,
55 serializer: Serializer | None = None,
56 status_codes: Collection[int] | None = None,
57 ):
58 self.cache = DictCache() if cache is None else cache
59 self.cache_etags = cache_etags
60 self.serializer = serializer or Serializer()
61 self.cacheable_status_codes = status_codes or (200, 203, 300, 301, 308)
63 @classmethod
64 def _urlnorm(cls, uri: str) -> str:
65 """Normalize the URL to create a safe key for the cache"""
66 (scheme, authority, path, query, fragment) = parse_uri(uri)
67 if not scheme or not authority:
68 raise Exception("Only absolute URIs are allowed. uri = %s" % uri)
70 scheme = scheme.lower()
71 authority = authority.lower()
73 if not path:
74 path = "/"
76 # Could do syntax based normalization of the URI before
77 # computing the digest. See Section 6.2.2 of Std 66.
78 request_uri = query and "?".join([path, query]) or path
79 defrag_uri = scheme + "://" + authority + request_uri
81 return defrag_uri
83 @classmethod
84 def cache_url(cls, uri: str) -> str:
85 return cls._urlnorm(uri)
87 def parse_cache_control(self, headers: Mapping[str, str]) -> dict[str, int | None]:
88 known_directives = {
89 # https://tools.ietf.org/html/rfc7234#section-5.2
90 "max-age": (int, True),
91 "max-stale": (int, False),
92 "min-fresh": (int, True),
93 "no-cache": (None, False),
94 "no-store": (None, False),
95 "no-transform": (None, False),
96 "only-if-cached": (None, False),
97 "must-revalidate": (None, False),
98 "public": (None, False),
99 "private": (None, False),
100 "proxy-revalidate": (None, False),
101 "s-maxage": (int, True),
102 }
104 cc_headers = headers.get("cache-control", headers.get("Cache-Control", ""))
106 retval: dict[str, int | None] = {}
108 for cc_directive in cc_headers.split(","):
109 if not cc_directive.strip():
110 continue
112 parts = cc_directive.split("=", 1)
113 directive = parts[0].strip()
115 try:
116 typ, required = known_directives[directive]
117 except KeyError:
118 logger.debug("Ignoring unknown cache-control directive: %s", directive)
119 continue
121 if not typ or not required:
122 retval[directive] = None
123 if typ:
124 try:
125 retval[directive] = typ(parts[1].strip())
126 except IndexError:
127 if required:
128 logger.debug(
129 "Missing value for cache-control " "directive: %s",
130 directive,
131 )
132 except ValueError:
133 logger.debug(
134 "Invalid value for cache-control directive " "%s, must be %s",
135 directive,
136 typ.__name__,
137 )
139 return retval
141 def _load_from_cache(self, request: PreparedRequest) -> HTTPResponse | None:
142 """
143 Load a cached response, or return None if it's not available.
144 """
145 cache_url = request.url
146 assert cache_url is not None
147 cache_data = self.cache.get(cache_url)
148 if cache_data is None:
149 logger.debug("No cache entry available")
150 return None
152 if isinstance(self.cache, SeparateBodyBaseCache):
153 body_file = self.cache.get_body(cache_url)
154 else:
155 body_file = None
157 result = self.serializer.loads(request, cache_data, body_file)
158 if result is None:
159 logger.warning("Cache entry deserialization failed, entry ignored")
160 return result
162 def cached_request(self, request: PreparedRequest) -> HTTPResponse | Literal[False]:
163 """
164 Return a cached response if it exists in the cache, otherwise
165 return False.
166 """
167 assert request.url is not None
168 cache_url = self.cache_url(request.url)
169 logger.debug('Looking up "%s" in the cache', cache_url)
170 cc = self.parse_cache_control(request.headers)
172 # Bail out if the request insists on fresh data
173 if "no-cache" in cc:
174 logger.debug('Request header has "no-cache", cache bypassed')
175 return False
177 if "max-age" in cc and cc["max-age"] == 0:
178 logger.debug('Request header has "max_age" as 0, cache bypassed')
179 return False
181 # Check whether we can load the response from the cache:
182 resp = self._load_from_cache(request)
183 if not resp:
184 return False
186 # If we have a cached permanent redirect, return it immediately. We
187 # don't need to test our response for other headers b/c it is
188 # intrinsically "cacheable" as it is Permanent.
189 #
190 # See:
191 # https://tools.ietf.org/html/rfc7231#section-6.4.2
192 #
193 # Client can try to refresh the value by repeating the request
194 # with cache busting headers as usual (ie no-cache).
195 if int(resp.status) in PERMANENT_REDIRECT_STATUSES:
196 msg = (
197 "Returning cached permanent redirect response "
198 "(ignoring date and etag information)"
199 )
200 logger.debug(msg)
201 return resp
203 headers: CaseInsensitiveDict[str] = CaseInsensitiveDict(resp.headers)
204 if not headers or "date" not in headers:
205 if "etag" not in headers:
206 # Without date or etag, the cached response can never be used
207 # and should be deleted.
208 logger.debug("Purging cached response: no date or etag")
209 self.cache.delete(cache_url)
210 logger.debug("Ignoring cached response: no date")
211 return False
213 now = time.time()
214 time_tuple = parsedate_tz(headers["date"])
215 assert time_tuple is not None
216 date = calendar.timegm(time_tuple[:6])
217 current_age = max(0, now - date)
218 logger.debug("Current age based on date: %i", current_age)
220 # TODO: There is an assumption that the result will be a
221 # urllib3 response object. This may not be best since we
222 # could probably avoid instantiating or constructing the
223 # response until we know we need it.
224 resp_cc = self.parse_cache_control(headers)
226 # determine freshness
227 freshness_lifetime = 0
229 # Check the max-age pragma in the cache control header
230 max_age = resp_cc.get("max-age")
231 if max_age is not None:
232 freshness_lifetime = max_age
233 logger.debug("Freshness lifetime from max-age: %i", freshness_lifetime)
235 # If there isn't a max-age, check for an expires header
236 elif "expires" in headers:
237 expires = parsedate_tz(headers["expires"])
238 if expires is not None:
239 expire_time = calendar.timegm(expires[:6]) - date
240 freshness_lifetime = max(0, expire_time)
241 logger.debug("Freshness lifetime from expires: %i", freshness_lifetime)
243 # Determine if we are setting freshness limit in the
244 # request. Note, this overrides what was in the response.
245 max_age = cc.get("max-age")
246 if max_age is not None:
247 freshness_lifetime = max_age
248 logger.debug(
249 "Freshness lifetime from request max-age: %i", freshness_lifetime
250 )
252 min_fresh = cc.get("min-fresh")
253 if min_fresh is not None:
254 # adjust our current age by our min fresh
255 current_age += min_fresh
256 logger.debug("Adjusted current age from min-fresh: %i", current_age)
258 # Return entry if it is fresh enough
259 if freshness_lifetime > current_age:
260 logger.debug('The response is "fresh", returning cached response')
261 logger.debug("%i > %i", freshness_lifetime, current_age)
262 return resp
264 # we're not fresh. If we don't have an Etag, clear it out
265 if "etag" not in headers:
266 logger.debug('The cached response is "stale" with no etag, purging')
267 self.cache.delete(cache_url)
269 # return the original handler
270 return False
272 def conditional_headers(self, request: PreparedRequest) -> dict[str, str]:
273 resp = self._load_from_cache(request)
274 new_headers = {}
276 if resp:
277 headers: CaseInsensitiveDict[str] = CaseInsensitiveDict(resp.headers)
279 if "etag" in headers:
280 new_headers["If-None-Match"] = headers["ETag"]
282 if "last-modified" in headers:
283 new_headers["If-Modified-Since"] = headers["Last-Modified"]
285 return new_headers
287 def _cache_set(
288 self,
289 cache_url: str,
290 request: PreparedRequest,
291 response: HTTPResponse,
292 body: bytes | None = None,
293 expires_time: int | None = None,
294 ) -> None:
295 """
296 Store the data in the cache.
297 """
298 if isinstance(self.cache, SeparateBodyBaseCache):
299 # We pass in the body separately; just put a placeholder empty
300 # string in the metadata.
301 self.cache.set(
302 cache_url,
303 self.serializer.dumps(request, response, b""),
304 expires=expires_time,
305 )
306 # body is None can happen when, for example, we're only updating
307 # headers, as is the case in update_cached_response().
308 if body is not None:
309 self.cache.set_body(cache_url, body)
310 else:
311 self.cache.set(
312 cache_url,
313 self.serializer.dumps(request, response, body),
314 expires=expires_time,
315 )
317 def cache_response(
318 self,
319 request: PreparedRequest,
320 response: HTTPResponse,
321 body: bytes | None = None,
322 status_codes: Collection[int] | None = None,
323 ) -> None:
324 """
325 Algorithm for caching requests.
327 This assumes a requests Response object.
328 """
329 # From httplib2: Don't cache 206's since we aren't going to
330 # handle byte range requests
331 cacheable_status_codes = status_codes or self.cacheable_status_codes
332 if response.status not in cacheable_status_codes:
333 logger.debug(
334 "Status code %s not in %s", response.status, cacheable_status_codes
335 )
336 return
338 response_headers: CaseInsensitiveDict[str] = CaseInsensitiveDict(
339 response.headers
340 )
342 if "date" in response_headers:
343 time_tuple = parsedate_tz(response_headers["date"])
344 assert time_tuple is not None
345 date = calendar.timegm(time_tuple[:6])
346 else:
347 date = 0
349 # If we've been given a body, our response has a Content-Length, that
350 # Content-Length is valid then we can check to see if the body we've
351 # been given matches the expected size, and if it doesn't we'll just
352 # skip trying to cache it.
353 if (
354 body is not None
355 and "content-length" in response_headers
356 and response_headers["content-length"].isdigit()
357 and int(response_headers["content-length"]) != len(body)
358 ):
359 return
361 cc_req = self.parse_cache_control(request.headers)
362 cc = self.parse_cache_control(response_headers)
364 assert request.url is not None
365 cache_url = self.cache_url(request.url)
366 logger.debug('Updating cache with response from "%s"', cache_url)
368 # Delete it from the cache if we happen to have it stored there
369 no_store = False
370 if "no-store" in cc:
371 no_store = True
372 logger.debug('Response header has "no-store"')
373 if "no-store" in cc_req:
374 no_store = True
375 logger.debug('Request header has "no-store"')
376 if no_store and self.cache.get(cache_url):
377 logger.debug('Purging existing cache entry to honor "no-store"')
378 self.cache.delete(cache_url)
379 if no_store:
380 return
382 # https://tools.ietf.org/html/rfc7234#section-4.1:
383 # A Vary header field-value of "*" always fails to match.
384 # Storing such a response leads to a deserialization warning
385 # during cache lookup and is not allowed to ever be served,
386 # so storing it can be avoided.
387 if "*" in response_headers.get("vary", ""):
388 logger.debug('Response header has "Vary: *"')
389 return
391 # If we've been given an etag, then keep the response
392 if self.cache_etags and "etag" in response_headers:
393 expires_time = 0
394 if response_headers.get("expires"):
395 expires = parsedate_tz(response_headers["expires"])
396 if expires is not None:
397 expires_time = calendar.timegm(expires[:6]) - date
399 expires_time = max(expires_time, 14 * 86400)
401 logger.debug(f"etag object cached for {expires_time} seconds")
402 logger.debug("Caching due to etag")
403 self._cache_set(cache_url, request, response, body, expires_time)
405 # Add to the cache any permanent redirects. We do this before looking
406 # that the Date headers.
407 elif int(response.status) in PERMANENT_REDIRECT_STATUSES:
408 logger.debug("Caching permanent redirect")
409 self._cache_set(cache_url, request, response, b"")
411 # Add to the cache if the response headers demand it. If there
412 # is no date header then we can't do anything about expiring
413 # the cache.
414 elif "date" in response_headers:
415 time_tuple = parsedate_tz(response_headers["date"])
416 assert time_tuple is not None
417 date = calendar.timegm(time_tuple[:6])
418 # cache when there is a max-age > 0
419 max_age = cc.get("max-age")
420 if max_age is not None and max_age > 0:
421 logger.debug("Caching b/c date exists and max-age > 0")
422 expires_time = max_age
423 self._cache_set(
424 cache_url,
425 request,
426 response,
427 body,
428 expires_time,
429 )
431 # If the request can expire, it means we should cache it
432 # in the meantime.
433 elif "expires" in response_headers:
434 if response_headers["expires"]:
435 expires = parsedate_tz(response_headers["expires"])
436 if expires is not None:
437 expires_time = calendar.timegm(expires[:6]) - date
438 else:
439 expires_time = None
441 logger.debug(
442 "Caching b/c of expires header. expires in {} seconds".format(
443 expires_time
444 )
445 )
446 self._cache_set(
447 cache_url,
448 request,
449 response,
450 body,
451 expires_time,
452 )
454 def update_cached_response(
455 self, request: PreparedRequest, response: HTTPResponse
456 ) -> HTTPResponse:
457 """On a 304 we will get a new set of headers that we want to
458 update our cached value with, assuming we have one.
460 This should only ever be called when we've sent an ETag and
461 gotten a 304 as the response.
462 """
463 assert request.url is not None
464 cache_url = self.cache_url(request.url)
465 cached_response = self._load_from_cache(request)
467 if not cached_response:
468 # we didn't have a cached response
469 return response
471 # Lets update our headers with the headers from the new request:
472 # http://tools.ietf.org/html/draft-ietf-httpbis-p4-conditional-26#section-4.1
473 #
474 # The server isn't supposed to send headers that would make
475 # the cached body invalid. But... just in case, we'll be sure
476 # to strip out ones we know that might be problmatic due to
477 # typical assumptions.
478 excluded_headers = ["content-length"]
480 cached_response.headers.update(
481 {
482 k: v
483 for k, v in response.headers.items() # type: ignore[no-untyped-call]
484 if k.lower() not in excluded_headers
485 }
486 )
488 # we want a 200 b/c we have content via the cache
489 cached_response.status = 200
491 # update our cache
492 self._cache_set(cache_url, request, cached_response)
494 return cached_response