Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_vendor/cachecontrol/caches/file_cache.py: 27%
92 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
1# SPDX-FileCopyrightText: 2015 Eric Larson
2#
3# SPDX-License-Identifier: Apache-2.0
4from __future__ import annotations
6import hashlib
7import os
8from textwrap import dedent
9from typing import IO, TYPE_CHECKING
11from pip._vendor.cachecontrol.cache import BaseCache, SeparateBodyBaseCache
12from pip._vendor.cachecontrol.controller import CacheController
14if TYPE_CHECKING:
15 from datetime import datetime
17 from filelock import BaseFileLock
20def _secure_open_write(filename: str, fmode: int) -> IO[bytes]:
21 # We only want to write to this file, so open it in write only mode
22 flags = os.O_WRONLY
24 # os.O_CREAT | os.O_EXCL will fail if the file already exists, so we only
25 # will open *new* files.
26 # We specify this because we want to ensure that the mode we pass is the
27 # mode of the file.
28 flags |= os.O_CREAT | os.O_EXCL
30 # Do not follow symlinks to prevent someone from making a symlink that
31 # we follow and insecurely open a cache file.
32 if hasattr(os, "O_NOFOLLOW"):
33 flags |= os.O_NOFOLLOW
35 # On Windows we'll mark this file as binary
36 if hasattr(os, "O_BINARY"):
37 flags |= os.O_BINARY
39 # Before we open our file, we want to delete any existing file that is
40 # there
41 try:
42 os.remove(filename)
43 except OSError:
44 # The file must not exist already, so we can just skip ahead to opening
45 pass
47 # Open our file, the use of os.O_CREAT | os.O_EXCL will ensure that if a
48 # race condition happens between the os.remove and this line, that an
49 # error will be raised. Because we utilize a lockfile this should only
50 # happen if someone is attempting to attack us.
51 fd = os.open(filename, flags, fmode)
52 try:
53 return os.fdopen(fd, "wb")
55 except:
56 # An error occurred wrapping our FD in a file object
57 os.close(fd)
58 raise
61class _FileCacheMixin:
62 """Shared implementation for both FileCache variants."""
64 def __init__(
65 self,
66 directory: str,
67 forever: bool = False,
68 filemode: int = 0o0600,
69 dirmode: int = 0o0700,
70 lock_class: type[BaseFileLock] | None = None,
71 ) -> None:
72 try:
73 if lock_class is None:
74 from filelock import FileLock
76 lock_class = FileLock
77 except ImportError:
78 notice = dedent(
79 """
80 NOTE: In order to use the FileCache you must have
81 filelock installed. You can install it via pip:
82 pip install filelock
83 """
84 )
85 raise ImportError(notice)
87 self.directory = directory
88 self.forever = forever
89 self.filemode = filemode
90 self.dirmode = dirmode
91 self.lock_class = lock_class
93 @staticmethod
94 def encode(x: str) -> str:
95 return hashlib.sha224(x.encode()).hexdigest()
97 def _fn(self, name: str) -> str:
98 # NOTE: This method should not change as some may depend on it.
99 # See: https://github.com/ionrock/cachecontrol/issues/63
100 hashed = self.encode(name)
101 parts = list(hashed[:5]) + [hashed]
102 return os.path.join(self.directory, *parts)
104 def get(self, key: str) -> bytes | None:
105 name = self._fn(key)
106 try:
107 with open(name, "rb") as fh:
108 return fh.read()
110 except FileNotFoundError:
111 return None
113 def set(
114 self, key: str, value: bytes, expires: int | datetime | None = None
115 ) -> None:
116 name = self._fn(key)
117 self._write(name, value)
119 def _write(self, path: str, data: bytes) -> None:
120 """
121 Safely write the data to the given path.
122 """
123 # Make sure the directory exists
124 try:
125 os.makedirs(os.path.dirname(path), self.dirmode)
126 except OSError:
127 pass
129 with self.lock_class(path + ".lock"):
130 # Write our actual file
131 with _secure_open_write(path, self.filemode) as fh:
132 fh.write(data)
134 def _delete(self, key: str, suffix: str) -> None:
135 name = self._fn(key) + suffix
136 if not self.forever:
137 try:
138 os.remove(name)
139 except FileNotFoundError:
140 pass
143class FileCache(_FileCacheMixin, BaseCache):
144 """
145 Traditional FileCache: body is stored in memory, so not suitable for large
146 downloads.
147 """
149 def delete(self, key: str) -> None:
150 self._delete(key, "")
153class SeparateBodyFileCache(_FileCacheMixin, SeparateBodyBaseCache):
154 """
155 Memory-efficient FileCache: body is stored in a separate file, reducing
156 peak memory usage.
157 """
159 def get_body(self, key: str) -> IO[bytes] | None:
160 name = self._fn(key) + ".body"
161 try:
162 return open(name, "rb")
163 except FileNotFoundError:
164 return None
166 def set_body(self, key: str, body: bytes) -> None:
167 name = self._fn(key) + ".body"
168 self._write(name, body)
170 def delete(self, key: str) -> None:
171 self._delete(key, "")
172 self._delete(key, ".body")
175def url_to_file_path(url: str, filecache: FileCache) -> str:
176 """Return the file cache path based on the URL.
178 This does not ensure the file exists!
179 """
180 key = CacheController.cache_url(url)
181 return filecache._fn(key)