Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/django/core/files/storage/memory.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

175 statements  

1""" 

2Based on dj-inmemorystorage (BSD) by Cody Soyland, Seán Hayes, Tore Birkeland, 

3and Nick Presta. 

4""" 

5 

6import errno 

7import io 

8import os 

9import pathlib 

10from urllib.parse import urljoin 

11 

12from django.conf import settings 

13from django.core.files.base import ContentFile 

14from django.core.signals import setting_changed 

15from django.utils._os import safe_join 

16from django.utils.deconstruct import deconstructible 

17from django.utils.encoding import filepath_to_uri 

18from django.utils.functional import cached_property 

19from django.utils.timezone import now 

20 

21from .base import Storage 

22from .mixins import StorageSettingsMixin 

23 

24__all__ = ("InMemoryStorage",) 

25 

26 

27class TimingMixin: 

28 def _initialize_times(self): 

29 self.created_time = now() 

30 self.accessed_time = self.created_time 

31 self.modified_time = self.created_time 

32 

33 def _update_accessed_time(self): 

34 self.accessed_time = now() 

35 

36 def _update_modified_time(self): 

37 self.modified_time = now() 

38 

39 

40class InMemoryFileNode(ContentFile, TimingMixin): 

41 """ 

42 Helper class representing an in-memory file node. 

43 

44 Handle unicode/bytes conversion during I/O operations and record creation, 

45 modification, and access times. 

46 """ 

47 

48 def __init__(self, content="", name=None): 

49 super().__init__(content, name) 

50 self._content_type = type(content) 

51 self._initialize_times() 

52 

53 def open(self, mode): 

54 self._convert_stream_content(mode) 

55 self._update_accessed_time() 

56 return super().open(mode) 

57 

58 def write(self, data): 

59 super().write(data) 

60 self._update_modified_time() 

61 

62 def _initialize_stream(self): 

63 """Initialize underlying stream according to the content type.""" 

64 self.file = io.BytesIO() if self._content_type == bytes else io.StringIO() 

65 

66 def _convert_stream_content(self, mode): 

67 """Convert actual file content according to the opening mode.""" 

68 new_content_type = bytes if "b" in mode else str 

69 # No conversion needed. 

70 if self._content_type == new_content_type: 

71 return 

72 

73 content = self.file.getvalue() 

74 content = content.encode() if isinstance(content, str) else content.decode() 

75 self._content_type = new_content_type 

76 self._initialize_stream() 

77 

78 self.file.write(content) 

79 

80 

81class InMemoryDirNode(TimingMixin): 

82 """ 

83 Helper class representing an in-memory directory node. 

84 

85 Handle path navigation of directory trees, creating missing nodes if 

86 needed. 

87 """ 

88 

89 def __init__(self): 

90 self._children = {} 

91 self._initialize_times() 

92 

93 def resolve(self, path, create_if_missing=False, leaf_cls=None, check_exists=True): 

94 """ 

95 Navigate current directory tree, returning node matching path or 

96 creating a new one, if missing. 

97 - path: path of the node to search 

98 - create_if_missing: create nodes if not exist. Defaults to False. 

99 - leaf_cls: expected type of leaf node. Defaults to None. 

100 - check_exists: if True and the leaf node does not exist, raise a 

101 FileNotFoundError. Defaults to True. 

102 """ 

103 path_segments = list(pathlib.Path(path).parts) 

104 current_node = self 

105 

106 while path_segments: 

107 path_segment = path_segments.pop(0) 

108 # If current node is a file node and there are unprocessed 

109 # segments, raise an error. 

110 if isinstance(current_node, InMemoryFileNode): 

111 path_segments = os.path.split(path) 

112 current_path = "/".join( 

113 path_segments[: path_segments.index(path_segment)] 

114 ) 

115 raise NotADirectoryError( 

116 errno.ENOTDIR, os.strerror(errno.ENOTDIR), current_path 

117 ) 

118 current_node = current_node._resolve_child( 

119 path_segment, 

120 create_if_missing, 

121 leaf_cls if len(path_segments) == 0 else InMemoryDirNode, 

122 ) 

123 if current_node is None: 

124 break 

125 

126 if current_node is None and check_exists: 

127 raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path) 

128 

129 # If a leaf_cls is not None, check if leaf node is of right type. 

130 if leaf_cls and not isinstance(current_node, leaf_cls): 

131 error_cls, error_code = ( 

132 (NotADirectoryError, errno.ENOTDIR) 

133 if leaf_cls is InMemoryDirNode 

134 else (IsADirectoryError, errno.EISDIR) 

135 ) 

136 raise error_cls(error_code, os.strerror(error_code), path) 

137 

138 return current_node 

139 

140 def _resolve_child(self, path_segment, create_if_missing, child_cls): 

141 if create_if_missing: 

142 self._update_accessed_time() 

143 self._update_modified_time() 

144 if child_cls is InMemoryFileNode: 

145 child = child_cls(name=path_segment) 

146 else: 

147 child = child_cls() 

148 return self._children.setdefault(path_segment, child) 

149 return self._children.get(path_segment) 

150 

151 def listdir(self): 

152 directories, files = [], [] 

153 for name, entry in self._children.items(): 

154 if isinstance(entry, InMemoryDirNode): 

155 directories.append(name) 

156 else: 

157 files.append(name) 

158 return directories, files 

159 

160 def remove_child(self, name): 

161 if name in self._children: 

162 self._update_accessed_time() 

163 self._update_modified_time() 

164 del self._children[name] 

165 

166 

167@deconstructible(path="django.core.files.storage.InMemoryStorage") 

168class InMemoryStorage(Storage, StorageSettingsMixin): 

169 """A storage saving files in memory.""" 

170 

171 def __init__( 

172 self, 

173 location=None, 

174 base_url=None, 

175 file_permissions_mode=None, 

176 directory_permissions_mode=None, 

177 ): 

178 self._location = location 

179 self._base_url = base_url 

180 self._file_permissions_mode = file_permissions_mode 

181 self._directory_permissions_mode = directory_permissions_mode 

182 self._root = InMemoryDirNode() 

183 self._resolve( 

184 self.base_location, create_if_missing=True, leaf_cls=InMemoryDirNode 

185 ) 

186 setting_changed.connect(self._clear_cached_properties) 

187 

188 @cached_property 

189 def base_location(self): 

190 return self._value_or_setting(self._location, settings.MEDIA_ROOT) 

191 

192 @cached_property 

193 def location(self): 

194 return os.path.abspath(self.base_location) 

195 

196 @cached_property 

197 def base_url(self): 

198 if self._base_url is not None and not self._base_url.endswith("/"): 

199 self._base_url += "/" 

200 return self._value_or_setting(self._base_url, settings.MEDIA_URL) 

201 

202 @cached_property 

203 def file_permissions_mode(self): 

204 return self._value_or_setting( 

205 self._file_permissions_mode, settings.FILE_UPLOAD_PERMISSIONS 

206 ) 

207 

208 @cached_property 

209 def directory_permissions_mode(self): 

210 return self._value_or_setting( 

211 self._directory_permissions_mode, settings.FILE_UPLOAD_DIRECTORY_PERMISSIONS 

212 ) 

213 

214 def _relative_path(self, name): 

215 full_path = self.path(name) 

216 return os.path.relpath(full_path, self.location) 

217 

218 def _resolve(self, name, create_if_missing=False, leaf_cls=None, check_exists=True): 

219 try: 

220 relative_path = self._relative_path(name) 

221 return self._root.resolve( 

222 relative_path, 

223 create_if_missing=create_if_missing, 

224 leaf_cls=leaf_cls, 

225 check_exists=check_exists, 

226 ) 

227 except NotADirectoryError as exc: 

228 absolute_path = self.path(exc.filename) 

229 raise FileExistsError(f"{absolute_path} exists and is not a directory.") 

230 

231 def _open(self, name, mode="rb"): 

232 create_if_missing = "w" in mode 

233 file_node = self._resolve( 

234 name, create_if_missing=create_if_missing, leaf_cls=InMemoryFileNode 

235 ) 

236 return file_node.open(mode) 

237 

238 def _save(self, name, content): 

239 file_node = self._resolve( 

240 name, create_if_missing=True, leaf_cls=InMemoryFileNode 

241 ) 

242 fd = None 

243 for chunk in content.chunks(): 

244 if fd is None: 

245 mode = "wb" if isinstance(chunk, bytes) else "wt" 

246 fd = file_node.open(mode) 

247 fd.write(chunk) 

248 

249 if hasattr(content, "temporary_file_path"): 

250 os.remove(content.temporary_file_path()) 

251 

252 file_node.modified_time = now() 

253 return self._relative_path(name).replace("\\", "/") 

254 

255 def path(self, name): 

256 return safe_join(self.location, name) 

257 

258 def delete(self, name): 

259 path, filename = os.path.split(name) 

260 dir_node = self._resolve(path, check_exists=False) 

261 if dir_node is None: 

262 return None 

263 dir_node.remove_child(filename) 

264 

265 def exists(self, name): 

266 return self._resolve(name, check_exists=False) is not None 

267 

268 def listdir(self, path): 

269 node = self._resolve(path, leaf_cls=InMemoryDirNode) 

270 return node.listdir() 

271 

272 def size(self, name): 

273 return len(self._open(name, "rb").file.getvalue()) 

274 

275 def url(self, name): 

276 if self.base_url is None: 

277 raise ValueError("This file is not accessible via a URL.") 

278 url = filepath_to_uri(name) 

279 if url is not None: 

280 url = url.lstrip("/") 

281 return urljoin(self.base_url, url) 

282 

283 def get_accessed_time(self, name): 

284 file_node = self._resolve(name) 

285 return file_node.accessed_time 

286 

287 def get_created_time(self, name): 

288 file_node = self._resolve(name) 

289 return file_node.created_time 

290 

291 def get_modified_time(self, name): 

292 file_node = self._resolve(name) 

293 return file_node.modified_time