Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_vendor/cachecontrol/caches/file_cache.py: 27%

92 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-26 06:33 +0000

1# SPDX-FileCopyrightText: 2015 Eric Larson 

2# 

3# SPDX-License-Identifier: Apache-2.0 

4from __future__ import annotations 

5 

6import hashlib 

7import os 

8from textwrap import dedent 

9from typing import IO, TYPE_CHECKING 

10 

11from pip._vendor.cachecontrol.cache import BaseCache, SeparateBodyBaseCache 

12from pip._vendor.cachecontrol.controller import CacheController 

13 

14if TYPE_CHECKING: 

15 from datetime import datetime 

16 

17 from filelock import BaseFileLock 

18 

19 

20def _secure_open_write(filename: str, fmode: int) -> IO[bytes]: 

21 # We only want to write to this file, so open it in write only mode 

22 flags = os.O_WRONLY 

23 

24 # os.O_CREAT | os.O_EXCL will fail if the file already exists, so we only 

25 # will open *new* files. 

26 # We specify this because we want to ensure that the mode we pass is the 

27 # mode of the file. 

28 flags |= os.O_CREAT | os.O_EXCL 

29 

30 # Do not follow symlinks to prevent someone from making a symlink that 

31 # we follow and insecurely open a cache file. 

32 if hasattr(os, "O_NOFOLLOW"): 

33 flags |= os.O_NOFOLLOW 

34 

35 # On Windows we'll mark this file as binary 

36 if hasattr(os, "O_BINARY"): 

37 flags |= os.O_BINARY 

38 

39 # Before we open our file, we want to delete any existing file that is 

40 # there 

41 try: 

42 os.remove(filename) 

43 except OSError: 

44 # The file must not exist already, so we can just skip ahead to opening 

45 pass 

46 

47 # Open our file, the use of os.O_CREAT | os.O_EXCL will ensure that if a 

48 # race condition happens between the os.remove and this line, that an 

49 # error will be raised. Because we utilize a lockfile this should only 

50 # happen if someone is attempting to attack us. 

51 fd = os.open(filename, flags, fmode) 

52 try: 

53 return os.fdopen(fd, "wb") 

54 

55 except: 

56 # An error occurred wrapping our FD in a file object 

57 os.close(fd) 

58 raise 

59 

60 

61class _FileCacheMixin: 

62 """Shared implementation for both FileCache variants.""" 

63 

64 def __init__( 

65 self, 

66 directory: str, 

67 forever: bool = False, 

68 filemode: int = 0o0600, 

69 dirmode: int = 0o0700, 

70 lock_class: type[BaseFileLock] | None = None, 

71 ) -> None: 

72 try: 

73 if lock_class is None: 

74 from filelock import FileLock 

75 

76 lock_class = FileLock 

77 except ImportError: 

78 notice = dedent( 

79 """ 

80 NOTE: In order to use the FileCache you must have 

81 filelock installed. You can install it via pip: 

82 pip install filelock 

83 """ 

84 ) 

85 raise ImportError(notice) 

86 

87 self.directory = directory 

88 self.forever = forever 

89 self.filemode = filemode 

90 self.dirmode = dirmode 

91 self.lock_class = lock_class 

92 

93 @staticmethod 

94 def encode(x: str) -> str: 

95 return hashlib.sha224(x.encode()).hexdigest() 

96 

97 def _fn(self, name: str) -> str: 

98 # NOTE: This method should not change as some may depend on it. 

99 # See: https://github.com/ionrock/cachecontrol/issues/63 

100 hashed = self.encode(name) 

101 parts = list(hashed[:5]) + [hashed] 

102 return os.path.join(self.directory, *parts) 

103 

104 def get(self, key: str) -> bytes | None: 

105 name = self._fn(key) 

106 try: 

107 with open(name, "rb") as fh: 

108 return fh.read() 

109 

110 except FileNotFoundError: 

111 return None 

112 

113 def set( 

114 self, key: str, value: bytes, expires: int | datetime | None = None 

115 ) -> None: 

116 name = self._fn(key) 

117 self._write(name, value) 

118 

119 def _write(self, path: str, data: bytes) -> None: 

120 """ 

121 Safely write the data to the given path. 

122 """ 

123 # Make sure the directory exists 

124 try: 

125 os.makedirs(os.path.dirname(path), self.dirmode) 

126 except OSError: 

127 pass 

128 

129 with self.lock_class(path + ".lock"): 

130 # Write our actual file 

131 with _secure_open_write(path, self.filemode) as fh: 

132 fh.write(data) 

133 

134 def _delete(self, key: str, suffix: str) -> None: 

135 name = self._fn(key) + suffix 

136 if not self.forever: 

137 try: 

138 os.remove(name) 

139 except FileNotFoundError: 

140 pass 

141 

142 

143class FileCache(_FileCacheMixin, BaseCache): 

144 """ 

145 Traditional FileCache: body is stored in memory, so not suitable for large 

146 downloads. 

147 """ 

148 

149 def delete(self, key: str) -> None: 

150 self._delete(key, "") 

151 

152 

153class SeparateBodyFileCache(_FileCacheMixin, SeparateBodyBaseCache): 

154 """ 

155 Memory-efficient FileCache: body is stored in a separate file, reducing 

156 peak memory usage. 

157 """ 

158 

159 def get_body(self, key: str) -> IO[bytes] | None: 

160 name = self._fn(key) + ".body" 

161 try: 

162 return open(name, "rb") 

163 except FileNotFoundError: 

164 return None 

165 

166 def set_body(self, key: str, body: bytes) -> None: 

167 name = self._fn(key) + ".body" 

168 self._write(name, body) 

169 

170 def delete(self, key: str) -> None: 

171 self._delete(key, "") 

172 self._delete(key, ".body") 

173 

174 

175def url_to_file_path(url: str, filecache: FileCache) -> str: 

176 """Return the file cache path based on the URL. 

177 

178 This does not ensure the file exists! 

179 """ 

180 key = CacheController.cache_url(url) 

181 return filecache._fn(key)