1"""
2Based on dj-inmemorystorage (BSD) by Cody Soyland, Seán Hayes, Tore Birkeland,
3and Nick Presta.
4"""
5
6import errno
7import io
8import os
9import pathlib
10from urllib.parse import urljoin
11
12from django.conf import settings
13from django.core.files.base import ContentFile
14from django.core.signals import setting_changed
15from django.utils._os import safe_join
16from django.utils.deconstruct import deconstructible
17from django.utils.encoding import filepath_to_uri
18from django.utils.functional import cached_property
19from django.utils.timezone import now
20
21from .base import Storage
22from .mixins import StorageSettingsMixin
23
24__all__ = ("InMemoryStorage",)
25
26
27class TimingMixin:
28 def _initialize_times(self):
29 self.created_time = now()
30 self.accessed_time = self.created_time
31 self.modified_time = self.created_time
32
33 def _update_accessed_time(self):
34 self.accessed_time = now()
35
36 def _update_modified_time(self):
37 self.modified_time = now()
38
39
40class InMemoryFileNode(ContentFile, TimingMixin):
41 """
42 Helper class representing an in-memory file node.
43
44 Handle unicode/bytes conversion during I/O operations and record creation,
45 modification, and access times.
46 """
47
48 def __init__(self, content="", name=None):
49 super().__init__(content, name)
50 self._content_type = type(content)
51 self._initialize_times()
52
53 def open(self, mode):
54 self._convert_stream_content(mode)
55 self._update_accessed_time()
56 return super().open(mode)
57
58 def write(self, data):
59 super().write(data)
60 self._update_modified_time()
61
62 def _initialize_stream(self):
63 """Initialize underlying stream according to the content type."""
64 self.file = io.BytesIO() if self._content_type == bytes else io.StringIO()
65
66 def _convert_stream_content(self, mode):
67 """Convert actual file content according to the opening mode."""
68 new_content_type = bytes if "b" in mode else str
69 # No conversion needed.
70 if self._content_type == new_content_type:
71 return
72
73 content = self.file.getvalue()
74 content = content.encode() if isinstance(content, str) else content.decode()
75 self._content_type = new_content_type
76 self._initialize_stream()
77
78 self.file.write(content)
79
80
81class InMemoryDirNode(TimingMixin):
82 """
83 Helper class representing an in-memory directory node.
84
85 Handle path navigation of directory trees, creating missing nodes if
86 needed.
87 """
88
89 def __init__(self):
90 self._children = {}
91 self._initialize_times()
92
93 def resolve(self, path, create_if_missing=False, leaf_cls=None, check_exists=True):
94 """
95 Navigate current directory tree, returning node matching path or
96 creating a new one, if missing.
97 - path: path of the node to search
98 - create_if_missing: create nodes if not exist. Defaults to False.
99 - leaf_cls: expected type of leaf node. Defaults to None.
100 - check_exists: if True and the leaf node does not exist, raise a
101 FileNotFoundError. Defaults to True.
102 """
103 path_segments = list(pathlib.Path(path).parts)
104 current_node = self
105
106 while path_segments:
107 path_segment = path_segments.pop(0)
108 # If current node is a file node and there are unprocessed
109 # segments, raise an error.
110 if isinstance(current_node, InMemoryFileNode):
111 path_segments = os.path.split(path)
112 current_path = "/".join(
113 path_segments[: path_segments.index(path_segment)]
114 )
115 raise NotADirectoryError(
116 errno.ENOTDIR, os.strerror(errno.ENOTDIR), current_path
117 )
118 current_node = current_node._resolve_child(
119 path_segment,
120 create_if_missing,
121 leaf_cls if len(path_segments) == 0 else InMemoryDirNode,
122 )
123 if current_node is None:
124 break
125
126 if current_node is None and check_exists:
127 raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path)
128
129 # If a leaf_cls is not None, check if leaf node is of right type.
130 if leaf_cls and not isinstance(current_node, leaf_cls):
131 error_cls, error_code = (
132 (NotADirectoryError, errno.ENOTDIR)
133 if leaf_cls is InMemoryDirNode
134 else (IsADirectoryError, errno.EISDIR)
135 )
136 raise error_cls(error_code, os.strerror(error_code), path)
137
138 return current_node
139
140 def _resolve_child(self, path_segment, create_if_missing, child_cls):
141 if create_if_missing:
142 self._update_accessed_time()
143 self._update_modified_time()
144 if child_cls is InMemoryFileNode:
145 child = child_cls(name=path_segment)
146 else:
147 child = child_cls()
148 return self._children.setdefault(path_segment, child)
149 return self._children.get(path_segment)
150
151 def listdir(self):
152 directories, files = [], []
153 for name, entry in self._children.items():
154 if isinstance(entry, InMemoryDirNode):
155 directories.append(name)
156 else:
157 files.append(name)
158 return directories, files
159
160 def remove_child(self, name):
161 if name in self._children:
162 self._update_accessed_time()
163 self._update_modified_time()
164 del self._children[name]
165
166
167@deconstructible(path="django.core.files.storage.InMemoryStorage")
168class InMemoryStorage(Storage, StorageSettingsMixin):
169 """A storage saving files in memory."""
170
171 def __init__(
172 self,
173 location=None,
174 base_url=None,
175 file_permissions_mode=None,
176 directory_permissions_mode=None,
177 ):
178 self._location = location
179 self._base_url = base_url
180 self._file_permissions_mode = file_permissions_mode
181 self._directory_permissions_mode = directory_permissions_mode
182 self._root = InMemoryDirNode()
183 self._resolve(
184 self.base_location, create_if_missing=True, leaf_cls=InMemoryDirNode
185 )
186 setting_changed.connect(self._clear_cached_properties)
187
188 @cached_property
189 def base_location(self):
190 return self._value_or_setting(self._location, settings.MEDIA_ROOT)
191
192 @cached_property
193 def location(self):
194 return os.path.abspath(self.base_location)
195
196 @cached_property
197 def base_url(self):
198 if self._base_url is not None and not self._base_url.endswith("/"):
199 self._base_url += "/"
200 return self._value_or_setting(self._base_url, settings.MEDIA_URL)
201
202 @cached_property
203 def file_permissions_mode(self):
204 return self._value_or_setting(
205 self._file_permissions_mode, settings.FILE_UPLOAD_PERMISSIONS
206 )
207
208 @cached_property
209 def directory_permissions_mode(self):
210 return self._value_or_setting(
211 self._directory_permissions_mode, settings.FILE_UPLOAD_DIRECTORY_PERMISSIONS
212 )
213
214 def _relative_path(self, name):
215 full_path = self.path(name)
216 return os.path.relpath(full_path, self.location)
217
218 def _resolve(self, name, create_if_missing=False, leaf_cls=None, check_exists=True):
219 try:
220 relative_path = self._relative_path(name)
221 return self._root.resolve(
222 relative_path,
223 create_if_missing=create_if_missing,
224 leaf_cls=leaf_cls,
225 check_exists=check_exists,
226 )
227 except NotADirectoryError as exc:
228 absolute_path = self.path(exc.filename)
229 raise FileExistsError(f"{absolute_path} exists and is not a directory.")
230
231 def _open(self, name, mode="rb"):
232 create_if_missing = "w" in mode
233 file_node = self._resolve(
234 name, create_if_missing=create_if_missing, leaf_cls=InMemoryFileNode
235 )
236 return file_node.open(mode)
237
238 def _save(self, name, content):
239 file_node = self._resolve(
240 name, create_if_missing=True, leaf_cls=InMemoryFileNode
241 )
242 fd = None
243 for chunk in content.chunks():
244 if fd is None:
245 mode = "wb" if isinstance(chunk, bytes) else "wt"
246 fd = file_node.open(mode)
247 fd.write(chunk)
248
249 if hasattr(content, "temporary_file_path"):
250 os.remove(content.temporary_file_path())
251
252 file_node.modified_time = now()
253 return self._relative_path(name).replace("\\", "/")
254
255 def path(self, name):
256 return safe_join(self.location, name)
257
258 def delete(self, name):
259 path, filename = os.path.split(name)
260 dir_node = self._resolve(path, check_exists=False)
261 if dir_node is None:
262 return None
263 dir_node.remove_child(filename)
264
265 def exists(self, name):
266 return self._resolve(name, check_exists=False) is not None
267
268 def listdir(self, path):
269 node = self._resolve(path, leaf_cls=InMemoryDirNode)
270 return node.listdir()
271
272 def size(self, name):
273 return len(self._open(name, "rb").file.getvalue())
274
275 def url(self, name):
276 if self.base_url is None:
277 raise ValueError("This file is not accessible via a URL.")
278 url = filepath_to_uri(name)
279 if url is not None:
280 url = url.lstrip("/")
281 return urljoin(self.base_url, url)
282
283 def get_accessed_time(self, name):
284 file_node = self._resolve(name)
285 return file_node.accessed_time
286
287 def get_created_time(self, name):
288 file_node = self._resolve(name)
289 return file_node.created_time
290
291 def get_modified_time(self, name):
292 file_node = self._resolve(name)
293 return file_node.modified_time