Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_vendor/cachecontrol/caches/file_cache.py: 25%

96 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1# SPDX-FileCopyrightText: 2015 Eric Larson 

2# 

3# SPDX-License-Identifier: Apache-2.0 

4 

5import hashlib 

6import os 

7from textwrap import dedent 

8 

9from ..cache import BaseCache, SeparateBodyBaseCache 

10from ..controller import CacheController 

11 

12try: 

13 FileNotFoundError 

14except NameError: 

15 # py2.X 

16 FileNotFoundError = (IOError, OSError) 

17 

18 

19def _secure_open_write(filename, fmode): 

20 # We only want to write to this file, so open it in write only mode 

21 flags = os.O_WRONLY 

22 

23 # os.O_CREAT | os.O_EXCL will fail if the file already exists, so we only 

24 # will open *new* files. 

25 # We specify this because we want to ensure that the mode we pass is the 

26 # mode of the file. 

27 flags |= os.O_CREAT | os.O_EXCL 

28 

29 # Do not follow symlinks to prevent someone from making a symlink that 

30 # we follow and insecurely open a cache file. 

31 if hasattr(os, "O_NOFOLLOW"): 

32 flags |= os.O_NOFOLLOW 

33 

34 # On Windows we'll mark this file as binary 

35 if hasattr(os, "O_BINARY"): 

36 flags |= os.O_BINARY 

37 

38 # Before we open our file, we want to delete any existing file that is 

39 # there 

40 try: 

41 os.remove(filename) 

42 except (IOError, OSError): 

43 # The file must not exist already, so we can just skip ahead to opening 

44 pass 

45 

46 # Open our file, the use of os.O_CREAT | os.O_EXCL will ensure that if a 

47 # race condition happens between the os.remove and this line, that an 

48 # error will be raised. Because we utilize a lockfile this should only 

49 # happen if someone is attempting to attack us. 

50 fd = os.open(filename, flags, fmode) 

51 try: 

52 return os.fdopen(fd, "wb") 

53 

54 except: 

55 # An error occurred wrapping our FD in a file object 

56 os.close(fd) 

57 raise 

58 

59 

60class _FileCacheMixin: 

61 """Shared implementation for both FileCache variants.""" 

62 

63 def __init__( 

64 self, 

65 directory, 

66 forever=False, 

67 filemode=0o0600, 

68 dirmode=0o0700, 

69 use_dir_lock=None, 

70 lock_class=None, 

71 ): 

72 

73 if use_dir_lock is not None and lock_class is not None: 

74 raise ValueError("Cannot use use_dir_lock and lock_class together") 

75 

76 try: 

77 from lockfile import LockFile 

78 from lockfile.mkdirlockfile import MkdirLockFile 

79 except ImportError: 

80 notice = dedent( 

81 """ 

82 NOTE: In order to use the FileCache you must have 

83 lockfile installed. You can install it via pip: 

84 pip install lockfile 

85 """ 

86 ) 

87 raise ImportError(notice) 

88 

89 else: 

90 if use_dir_lock: 

91 lock_class = MkdirLockFile 

92 

93 elif lock_class is None: 

94 lock_class = LockFile 

95 

96 self.directory = directory 

97 self.forever = forever 

98 self.filemode = filemode 

99 self.dirmode = dirmode 

100 self.lock_class = lock_class 

101 

102 @staticmethod 

103 def encode(x): 

104 return hashlib.sha224(x.encode()).hexdigest() 

105 

106 def _fn(self, name): 

107 # NOTE: This method should not change as some may depend on it. 

108 # See: https://github.com/ionrock/cachecontrol/issues/63 

109 hashed = self.encode(name) 

110 parts = list(hashed[:5]) + [hashed] 

111 return os.path.join(self.directory, *parts) 

112 

113 def get(self, key): 

114 name = self._fn(key) 

115 try: 

116 with open(name, "rb") as fh: 

117 return fh.read() 

118 

119 except FileNotFoundError: 

120 return None 

121 

122 def set(self, key, value, expires=None): 

123 name = self._fn(key) 

124 self._write(name, value) 

125 

126 def _write(self, path, data: bytes): 

127 """ 

128 Safely write the data to the given path. 

129 """ 

130 # Make sure the directory exists 

131 try: 

132 os.makedirs(os.path.dirname(path), self.dirmode) 

133 except (IOError, OSError): 

134 pass 

135 

136 with self.lock_class(path) as lock: 

137 # Write our actual file 

138 with _secure_open_write(lock.path, self.filemode) as fh: 

139 fh.write(data) 

140 

141 def _delete(self, key, suffix): 

142 name = self._fn(key) + suffix 

143 if not self.forever: 

144 try: 

145 os.remove(name) 

146 except FileNotFoundError: 

147 pass 

148 

149 

150class FileCache(_FileCacheMixin, BaseCache): 

151 """ 

152 Traditional FileCache: body is stored in memory, so not suitable for large 

153 downloads. 

154 """ 

155 

156 def delete(self, key): 

157 self._delete(key, "") 

158 

159 

160class SeparateBodyFileCache(_FileCacheMixin, SeparateBodyBaseCache): 

161 """ 

162 Memory-efficient FileCache: body is stored in a separate file, reducing 

163 peak memory usage. 

164 """ 

165 

166 def get_body(self, key): 

167 name = self._fn(key) + ".body" 

168 try: 

169 return open(name, "rb") 

170 except FileNotFoundError: 

171 return None 

172 

173 def set_body(self, key, body): 

174 name = self._fn(key) + ".body" 

175 self._write(name, body) 

176 

177 def delete(self, key): 

178 self._delete(key, "") 

179 self._delete(key, ".body") 

180 

181 

182def url_to_file_path(url, filecache): 

183 """Return the file cache path based on the URL. 

184 

185 This does not ensure the file exists! 

186 """ 

187 key = CacheController.cache_url(url) 

188 return filecache._fn(key)