1# SPDX-FileCopyrightText: 2015 Eric Larson
2#
3# SPDX-License-Identifier: Apache-2.0
4from __future__ import annotations
5
6import hashlib
7import os
8import tempfile
9from textwrap import dedent
10from typing import IO, TYPE_CHECKING
11from pathlib import Path
12
13from pip._vendor.cachecontrol.cache import BaseCache, SeparateBodyBaseCache
14from pip._vendor.cachecontrol.controller import CacheController
15
16if TYPE_CHECKING:
17 from datetime import datetime
18
19 from filelock import BaseFileLock
20
21
22class _FileCacheMixin:
23 """Shared implementation for both FileCache variants."""
24
25 def __init__(
26 self,
27 directory: str | Path,
28 forever: bool = False,
29 filemode: int = 0o0600,
30 dirmode: int = 0o0700,
31 lock_class: type[BaseFileLock] | None = None,
32 ) -> None:
33 try:
34 if lock_class is None:
35 from filelock import FileLock
36
37 lock_class = FileLock
38 except ImportError:
39 notice = dedent(
40 """
41 NOTE: In order to use the FileCache you must have
42 filelock installed. You can install it via pip:
43 pip install cachecontrol[filecache]
44 """
45 )
46 raise ImportError(notice)
47
48 self.directory = directory
49 self.forever = forever
50 self.filemode = filemode
51 self.dirmode = dirmode
52 self.lock_class = lock_class
53
54 @staticmethod
55 def encode(x: str) -> str:
56 return hashlib.sha224(x.encode()).hexdigest()
57
58 def _fn(self, name: str) -> str:
59 # NOTE: This method should not change as some may depend on it.
60 # See: https://github.com/ionrock/cachecontrol/issues/63
61 hashed = self.encode(name)
62 parts = list(hashed[:5]) + [hashed]
63 return os.path.join(self.directory, *parts)
64
65 def get(self, key: str) -> bytes | None:
66 name = self._fn(key)
67 try:
68 with open(name, "rb") as fh:
69 return fh.read()
70
71 except FileNotFoundError:
72 return None
73
74 def set(
75 self, key: str, value: bytes, expires: int | datetime | None = None
76 ) -> None:
77 name = self._fn(key)
78 self._write(name, value)
79
80 def _write(self, path: str, data: bytes) -> None:
81 """
82 Safely write the data to the given path.
83 """
84 # Make sure the directory exists
85 dirname = os.path.dirname(path)
86 os.makedirs(dirname, self.dirmode, exist_ok=True)
87
88 with self.lock_class(path + ".lock"):
89 # Write our actual file
90 (fd, name) = tempfile.mkstemp(dir=dirname)
91 try:
92 os.write(fd, data)
93 finally:
94 os.close(fd)
95 os.chmod(name, self.filemode)
96 os.replace(name, path)
97
98 def _delete(self, key: str, suffix: str) -> None:
99 name = self._fn(key) + suffix
100 if not self.forever:
101 try:
102 os.remove(name)
103 except FileNotFoundError:
104 pass
105
106
107class FileCache(_FileCacheMixin, BaseCache):
108 """
109 Traditional FileCache: body is stored in memory, so not suitable for large
110 downloads.
111 """
112
113 def delete(self, key: str) -> None:
114 self._delete(key, "")
115
116
117class SeparateBodyFileCache(_FileCacheMixin, SeparateBodyBaseCache):
118 """
119 Memory-efficient FileCache: body is stored in a separate file, reducing
120 peak memory usage.
121 """
122
123 def get_body(self, key: str) -> IO[bytes] | None:
124 name = self._fn(key) + ".body"
125 try:
126 return open(name, "rb")
127 except FileNotFoundError:
128 return None
129
130 def set_body(self, key: str, body: bytes) -> None:
131 name = self._fn(key) + ".body"
132 self._write(name, body)
133
134 def delete(self, key: str) -> None:
135 self._delete(key, "")
136 self._delete(key, ".body")
137
138
139def url_to_file_path(url: str, filecache: FileCache) -> str:
140 """Return the file cache path based on the URL.
141
142 This does not ensure the file exists!
143 """
144 key = CacheController.cache_url(url)
145 return filecache._fn(key)