1"""HTTP cache implementation."""
2
3from __future__ import annotations
4
5import os
6import shutil
7from collections.abc import Generator
8from contextlib import contextmanager
9from datetime import datetime
10from typing import Any, BinaryIO, Callable
11
12from pip._vendor.cachecontrol.cache import SeparateBodyBaseCache
13from pip._vendor.cachecontrol.caches import SeparateBodyFileCache
14from pip._vendor.requests.models import Response
15
16from pip._internal.utils.filesystem import (
17 adjacent_tmp_file,
18 copy_directory_permissions,
19 replace,
20)
21from pip._internal.utils.misc import ensure_dir
22
23
24def is_from_cache(response: Response) -> bool:
25 return getattr(response, "from_cache", False)
26
27
28@contextmanager
29def suppressed_cache_errors() -> Generator[None, None, None]:
30 """If we can't access the cache then we can just skip caching and process
31 requests as if caching wasn't enabled.
32 """
33 try:
34 yield
35 except OSError:
36 pass
37
38
39class SafeFileCache(SeparateBodyBaseCache):
40 """
41 A file based cache which is safe to use even when the target directory may
42 not be accessible or writable.
43
44 There is a race condition when two processes try to write and/or read the
45 same entry at the same time, since each entry consists of two separate
46 files (https://github.com/psf/cachecontrol/issues/324). We therefore have
47 additional logic that makes sure that both files to be present before
48 returning an entry; this fixes the read side of the race condition.
49
50 For the write side, we assume that the server will only ever return the
51 same data for the same URL, which ought to be the case for files pip is
52 downloading. PyPI does not have a mechanism to swap out a wheel for
53 another wheel, for example. If this assumption is not true, the
54 CacheControl issue will need to be fixed.
55 """
56
57 def __init__(self, directory: str) -> None:
58 assert directory is not None, "Cache directory must not be None."
59 super().__init__()
60 self.directory = directory
61
62 def _get_cache_path(self, name: str) -> str:
63 # From cachecontrol.caches.file_cache.FileCache._fn, brought into our
64 # class for backwards-compatibility and to avoid using a non-public
65 # method.
66 hashed = SeparateBodyFileCache.encode(name)
67 parts = list(hashed[:5]) + [hashed]
68 return os.path.join(self.directory, *parts)
69
70 def get(self, key: str) -> bytes | None:
71 # The cache entry is only valid if both metadata and body exist.
72 metadata_path = self._get_cache_path(key)
73 body_path = metadata_path + ".body"
74 if not (os.path.exists(metadata_path) and os.path.exists(body_path)):
75 return None
76 with suppressed_cache_errors():
77 with open(metadata_path, "rb") as f:
78 return f.read()
79
80 def _write_to_file(self, path: str, writer_func: Callable[[BinaryIO], Any]) -> None:
81 """Common file writing logic with proper permissions and atomic replacement."""
82 with suppressed_cache_errors():
83 ensure_dir(os.path.dirname(path))
84
85 with adjacent_tmp_file(path) as f:
86 writer_func(f)
87 # Inherit the read/write permissions of the cache directory
88 # to enable multi-user cache use-cases.
89 copy_directory_permissions(self.directory, f)
90
91 replace(f.name, path)
92
93 def _write(self, path: str, data: bytes) -> None:
94 self._write_to_file(path, lambda f: f.write(data))
95
96 def _write_from_io(self, path: str, source_file: BinaryIO) -> None:
97 self._write_to_file(path, lambda f: shutil.copyfileobj(source_file, f))
98
99 def set(
100 self, key: str, value: bytes, expires: int | datetime | None = None
101 ) -> None:
102 path = self._get_cache_path(key)
103 self._write(path, value)
104
105 def delete(self, key: str) -> None:
106 path = self._get_cache_path(key)
107 with suppressed_cache_errors():
108 os.remove(path)
109 with suppressed_cache_errors():
110 os.remove(path + ".body")
111
112 def get_body(self, key: str) -> BinaryIO | None:
113 # The cache entry is only valid if both metadata and body exist.
114 metadata_path = self._get_cache_path(key)
115 body_path = metadata_path + ".body"
116 if not (os.path.exists(metadata_path) and os.path.exists(body_path)):
117 return None
118 with suppressed_cache_errors():
119 return open(body_path, "rb")
120
121 def set_body(self, key: str, body: bytes) -> None:
122 path = self._get_cache_path(key) + ".body"
123 self._write(path, body)
124
125 def set_body_from_io(self, key: str, body_file: BinaryIO) -> None:
126 """Set the body of the cache entry from a file object."""
127 path = self._get_cache_path(key) + ".body"
128 self._write_from_io(path, body_file)