Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/core/files/storage/memory.py: 34%

173 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-01-17 06:13 +0000

1""" 

2Based on dj-inmemorystorage (BSD) by Cody Soyland, Seán Hayes, Tore Birkeland, 

3and Nick Presta. 

4""" 

5 

6import errno 

7import io 

8import os 

9import pathlib 

10from urllib.parse import urljoin 

11 

12from django.conf import settings 

13from django.core.files.base import ContentFile 

14from django.core.signals import setting_changed 

15from django.utils._os import safe_join 

16from django.utils.deconstruct import deconstructible 

17from django.utils.encoding import filepath_to_uri 

18from django.utils.functional import cached_property 

19from django.utils.timezone import now 

20 

21from .base import Storage 

22from .mixins import StorageSettingsMixin 

23 

24__all__ = ("InMemoryStorage",) 

25 

26 

27class TimingMixin: 

28 def _initialize_times(self): 

29 self.created_time = now() 

30 self.accessed_time = self.created_time 

31 self.modified_time = self.created_time 

32 

33 def _update_accessed_time(self): 

34 self.accessed_time = now() 

35 

36 def _update_modified_time(self): 

37 self.modified_time = now() 

38 

39 

40class InMemoryFileNode(ContentFile, TimingMixin): 

41 """ 

42 Helper class representing an in-memory file node. 

43 

44 Handle unicode/bytes conversion during I/O operations and record creation, 

45 modification, and access times. 

46 """ 

47 

48 def __init__(self, content="", name=""): 

49 self.file = None 

50 self._content_type = type(content) 

51 self._initialize_stream() 

52 self._initialize_times() 

53 

54 def open(self, mode): 

55 self._convert_stream_content(mode) 

56 self._update_accessed_time() 

57 return super().open(mode) 

58 

59 def write(self, data): 

60 super().write(data) 

61 self._update_modified_time() 

62 

63 def _initialize_stream(self): 

64 """Initialize underlying stream according to the content type.""" 

65 self.file = io.BytesIO() if self._content_type == bytes else io.StringIO() 

66 

67 def _convert_stream_content(self, mode): 

68 """Convert actual file content according to the opening mode.""" 

69 new_content_type = bytes if "b" in mode else str 

70 # No conversion needed. 

71 if self._content_type == new_content_type: 

72 return 

73 

74 content = self.file.getvalue() 

75 content = content.encode() if isinstance(content, str) else content.decode() 

76 self._content_type = new_content_type 

77 self._initialize_stream() 

78 

79 self.file.write(content) 

80 

81 

82class InMemoryDirNode(TimingMixin): 

83 """ 

84 Helper class representing an in-memory directory node. 

85 

86 Handle path navigation of directory trees, creating missing nodes if 

87 needed. 

88 """ 

89 

90 def __init__(self): 

91 self._children = {} 

92 self._initialize_times() 

93 

94 def resolve(self, path, create_if_missing=False, leaf_cls=None, check_exists=True): 

95 """ 

96 Navigate current directory tree, returning node matching path or 

97 creating a new one, if missing. 

98 - path: path of the node to search 

99 - create_if_missing: create nodes if not exist. Defaults to False. 

100 - leaf_cls: expected type of leaf node. Defaults to None. 

101 - check_exists: if True and the leaf node does not exist, raise a 

102 FileNotFoundError. Defaults to True. 

103 """ 

104 path_segments = list(pathlib.Path(path).parts) 

105 current_node = self 

106 

107 while path_segments: 

108 path_segment = path_segments.pop(0) 

109 # If current node is a file node and there are unprocessed 

110 # segments, raise an error. 

111 if isinstance(current_node, InMemoryFileNode): 

112 path_segments = os.path.split(path) 

113 current_path = "/".join( 

114 path_segments[: path_segments.index(path_segment)] 

115 ) 

116 raise NotADirectoryError( 

117 errno.ENOTDIR, os.strerror(errno.ENOTDIR), current_path 

118 ) 

119 current_node = current_node._resolve_child( 

120 path_segment, 

121 create_if_missing, 

122 leaf_cls if len(path_segments) == 0 else InMemoryDirNode, 

123 ) 

124 if current_node is None: 

125 break 

126 

127 if current_node is None and check_exists: 

128 raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path) 

129 

130 # If a leaf_cls is not None, check if leaf node is of right type. 

131 if leaf_cls and not isinstance(current_node, leaf_cls): 

132 error_cls, error_code = ( 

133 (NotADirectoryError, errno.ENOTDIR) 

134 if leaf_cls is InMemoryDirNode 

135 else (IsADirectoryError, errno.EISDIR) 

136 ) 

137 raise error_cls(error_code, os.strerror(error_code), path) 

138 

139 return current_node 

140 

141 def _resolve_child(self, path_segment, create_if_missing, child_cls): 

142 if create_if_missing: 

143 self._update_accessed_time() 

144 self._update_modified_time() 

145 return self._children.setdefault(path_segment, child_cls()) 

146 return self._children.get(path_segment) 

147 

148 def listdir(self): 

149 directories, files = [], [] 

150 for name, entry in self._children.items(): 

151 if isinstance(entry, InMemoryDirNode): 

152 directories.append(name) 

153 else: 

154 files.append(name) 

155 return directories, files 

156 

157 def remove_child(self, name): 

158 if name in self._children: 

159 self._update_accessed_time() 

160 self._update_modified_time() 

161 del self._children[name] 

162 

163 

164@deconstructible(path="django.core.files.storage.InMemoryStorage") 

165class InMemoryStorage(Storage, StorageSettingsMixin): 

166 """A storage saving files in memory.""" 

167 

168 def __init__( 

169 self, 

170 location=None, 

171 base_url=None, 

172 file_permissions_mode=None, 

173 directory_permissions_mode=None, 

174 ): 

175 self._location = location 

176 self._base_url = base_url 

177 self._file_permissions_mode = file_permissions_mode 

178 self._directory_permissions_mode = directory_permissions_mode 

179 self._root = InMemoryDirNode() 

180 self._resolve( 

181 self.base_location, create_if_missing=True, leaf_cls=InMemoryDirNode 

182 ) 

183 setting_changed.connect(self._clear_cached_properties) 

184 

185 @cached_property 

186 def base_location(self): 

187 return self._value_or_setting(self._location, settings.MEDIA_ROOT) 

188 

189 @cached_property 

190 def location(self): 

191 return os.path.abspath(self.base_location) 

192 

193 @cached_property 

194 def base_url(self): 

195 if self._base_url is not None and not self._base_url.endswith("/"): 

196 self._base_url += "/" 

197 return self._value_or_setting(self._base_url, settings.MEDIA_URL) 

198 

199 @cached_property 

200 def file_permissions_mode(self): 

201 return self._value_or_setting( 

202 self._file_permissions_mode, settings.FILE_UPLOAD_PERMISSIONS 

203 ) 

204 

205 @cached_property 

206 def directory_permissions_mode(self): 

207 return self._value_or_setting( 

208 self._directory_permissions_mode, settings.FILE_UPLOAD_DIRECTORY_PERMISSIONS 

209 ) 

210 

211 def _relative_path(self, name): 

212 full_path = self.path(name) 

213 return os.path.relpath(full_path, self.location) 

214 

215 def _resolve(self, name, create_if_missing=False, leaf_cls=None, check_exists=True): 

216 try: 

217 relative_path = self._relative_path(name) 

218 return self._root.resolve( 

219 relative_path, 

220 create_if_missing=create_if_missing, 

221 leaf_cls=leaf_cls, 

222 check_exists=check_exists, 

223 ) 

224 except NotADirectoryError as exc: 

225 absolute_path = self.path(exc.filename) 

226 raise FileExistsError(f"{absolute_path} exists and is not a directory.") 

227 

228 def _open(self, name, mode="rb"): 

229 create_if_missing = "w" in mode 

230 file_node = self._resolve( 

231 name, create_if_missing=create_if_missing, leaf_cls=InMemoryFileNode 

232 ) 

233 return file_node.open(mode) 

234 

235 def _save(self, name, content): 

236 file_node = self._resolve( 

237 name, create_if_missing=True, leaf_cls=InMemoryFileNode 

238 ) 

239 fd = None 

240 for chunk in content.chunks(): 

241 if fd is None: 

242 mode = "wb" if isinstance(chunk, bytes) else "wt" 

243 fd = file_node.open(mode) 

244 fd.write(chunk) 

245 

246 if hasattr(content, "temporary_file_path"): 

247 os.remove(content.temporary_file_path()) 

248 

249 file_node.modified_time = now() 

250 return self._relative_path(name).replace("\\", "/") 

251 

252 def path(self, name): 

253 return safe_join(self.location, name) 

254 

255 def delete(self, name): 

256 path, filename = os.path.split(name) 

257 dir_node = self._resolve(path, check_exists=False) 

258 if dir_node is None: 

259 return None 

260 dir_node.remove_child(filename) 

261 

262 def exists(self, name): 

263 return self._resolve(name, check_exists=False) is not None 

264 

265 def listdir(self, path): 

266 node = self._resolve(path, leaf_cls=InMemoryDirNode) 

267 return node.listdir() 

268 

269 def size(self, name): 

270 return len(self._open(name, "rb").file.getvalue()) 

271 

272 def url(self, name): 

273 if self.base_url is None: 

274 raise ValueError("This file is not accessible via a URL.") 

275 url = filepath_to_uri(name) 

276 if url is not None: 

277 url = url.lstrip("/") 

278 return urljoin(self.base_url, url) 

279 

280 def get_accessed_time(self, name): 

281 file_node = self._resolve(name) 

282 return file_node.accessed_time 

283 

284 def get_created_time(self, name): 

285 file_node = self._resolve(name) 

286 return file_node.created_time 

287 

288 def get_modified_time(self, name): 

289 file_node = self._resolve(name) 

290 return file_node.modified_time