Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/core/files/storage/memory.py: 34%
173 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
1"""
2Based on dj-inmemorystorage (BSD) by Cody Soyland, Seán Hayes, Tore Birkeland,
3and Nick Presta.
4"""
6import errno
7import io
8import os
9import pathlib
10from urllib.parse import urljoin
12from django.conf import settings
13from django.core.files.base import ContentFile
14from django.core.signals import setting_changed
15from django.utils._os import safe_join
16from django.utils.deconstruct import deconstructible
17from django.utils.encoding import filepath_to_uri
18from django.utils.functional import cached_property
19from django.utils.timezone import now
21from .base import Storage
22from .mixins import StorageSettingsMixin
24__all__ = ("InMemoryStorage",)
27class TimingMixin:
28 def _initialize_times(self):
29 self.created_time = now()
30 self.accessed_time = self.created_time
31 self.modified_time = self.created_time
33 def _update_accessed_time(self):
34 self.accessed_time = now()
36 def _update_modified_time(self):
37 self.modified_time = now()
40class InMemoryFileNode(ContentFile, TimingMixin):
41 """
42 Helper class representing an in-memory file node.
44 Handle unicode/bytes conversion during I/O operations and record creation,
45 modification, and access times.
46 """
48 def __init__(self, content="", name=""):
49 self.file = None
50 self._content_type = type(content)
51 self._initialize_stream()
52 self._initialize_times()
54 def open(self, mode):
55 self._convert_stream_content(mode)
56 self._update_accessed_time()
57 return super().open(mode)
59 def write(self, data):
60 super().write(data)
61 self._update_modified_time()
63 def _initialize_stream(self):
64 """Initialize underlying stream according to the content type."""
65 self.file = io.BytesIO() if self._content_type == bytes else io.StringIO()
67 def _convert_stream_content(self, mode):
68 """Convert actual file content according to the opening mode."""
69 new_content_type = bytes if "b" in mode else str
70 # No conversion needed.
71 if self._content_type == new_content_type:
72 return
74 content = self.file.getvalue()
75 content = content.encode() if isinstance(content, str) else content.decode()
76 self._content_type = new_content_type
77 self._initialize_stream()
79 self.file.write(content)
82class InMemoryDirNode(TimingMixin):
83 """
84 Helper class representing an in-memory directory node.
86 Handle path navigation of directory trees, creating missing nodes if
87 needed.
88 """
90 def __init__(self):
91 self._children = {}
92 self._initialize_times()
94 def resolve(self, path, create_if_missing=False, leaf_cls=None, check_exists=True):
95 """
96 Navigate current directory tree, returning node matching path or
97 creating a new one, if missing.
98 - path: path of the node to search
99 - create_if_missing: create nodes if not exist. Defaults to False.
100 - leaf_cls: expected type of leaf node. Defaults to None.
101 - check_exists: if True and the leaf node does not exist, raise a
102 FileNotFoundError. Defaults to True.
103 """
104 path_segments = list(pathlib.Path(path).parts)
105 current_node = self
107 while path_segments:
108 path_segment = path_segments.pop(0)
109 # If current node is a file node and there are unprocessed
110 # segments, raise an error.
111 if isinstance(current_node, InMemoryFileNode):
112 path_segments = os.path.split(path)
113 current_path = "/".join(
114 path_segments[: path_segments.index(path_segment)]
115 )
116 raise NotADirectoryError(
117 errno.ENOTDIR, os.strerror(errno.ENOTDIR), current_path
118 )
119 current_node = current_node._resolve_child(
120 path_segment,
121 create_if_missing,
122 leaf_cls if len(path_segments) == 0 else InMemoryDirNode,
123 )
124 if current_node is None:
125 break
127 if current_node is None and check_exists:
128 raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path)
130 # If a leaf_cls is not None, check if leaf node is of right type.
131 if leaf_cls and not isinstance(current_node, leaf_cls):
132 error_cls, error_code = (
133 (NotADirectoryError, errno.ENOTDIR)
134 if leaf_cls is InMemoryDirNode
135 else (IsADirectoryError, errno.EISDIR)
136 )
137 raise error_cls(error_code, os.strerror(error_code), path)
139 return current_node
141 def _resolve_child(self, path_segment, create_if_missing, child_cls):
142 if create_if_missing:
143 self._update_accessed_time()
144 self._update_modified_time()
145 return self._children.setdefault(path_segment, child_cls())
146 return self._children.get(path_segment)
148 def listdir(self):
149 directories, files = [], []
150 for name, entry in self._children.items():
151 if isinstance(entry, InMemoryDirNode):
152 directories.append(name)
153 else:
154 files.append(name)
155 return directories, files
157 def remove_child(self, name):
158 if name in self._children:
159 self._update_accessed_time()
160 self._update_modified_time()
161 del self._children[name]
164@deconstructible(path="django.core.files.storage.InMemoryStorage")
165class InMemoryStorage(Storage, StorageSettingsMixin):
166 """A storage saving files in memory."""
168 def __init__(
169 self,
170 location=None,
171 base_url=None,
172 file_permissions_mode=None,
173 directory_permissions_mode=None,
174 ):
175 self._location = location
176 self._base_url = base_url
177 self._file_permissions_mode = file_permissions_mode
178 self._directory_permissions_mode = directory_permissions_mode
179 self._root = InMemoryDirNode()
180 self._resolve(
181 self.base_location, create_if_missing=True, leaf_cls=InMemoryDirNode
182 )
183 setting_changed.connect(self._clear_cached_properties)
185 @cached_property
186 def base_location(self):
187 return self._value_or_setting(self._location, settings.MEDIA_ROOT)
189 @cached_property
190 def location(self):
191 return os.path.abspath(self.base_location)
193 @cached_property
194 def base_url(self):
195 if self._base_url is not None and not self._base_url.endswith("/"):
196 self._base_url += "/"
197 return self._value_or_setting(self._base_url, settings.MEDIA_URL)
199 @cached_property
200 def file_permissions_mode(self):
201 return self._value_or_setting(
202 self._file_permissions_mode, settings.FILE_UPLOAD_PERMISSIONS
203 )
205 @cached_property
206 def directory_permissions_mode(self):
207 return self._value_or_setting(
208 self._directory_permissions_mode, settings.FILE_UPLOAD_DIRECTORY_PERMISSIONS
209 )
211 def _relative_path(self, name):
212 full_path = self.path(name)
213 return os.path.relpath(full_path, self.location)
215 def _resolve(self, name, create_if_missing=False, leaf_cls=None, check_exists=True):
216 try:
217 relative_path = self._relative_path(name)
218 return self._root.resolve(
219 relative_path,
220 create_if_missing=create_if_missing,
221 leaf_cls=leaf_cls,
222 check_exists=check_exists,
223 )
224 except NotADirectoryError as exc:
225 absolute_path = self.path(exc.filename)
226 raise FileExistsError(f"{absolute_path} exists and is not a directory.")
228 def _open(self, name, mode="rb"):
229 create_if_missing = "w" in mode
230 file_node = self._resolve(
231 name, create_if_missing=create_if_missing, leaf_cls=InMemoryFileNode
232 )
233 return file_node.open(mode)
235 def _save(self, name, content):
236 file_node = self._resolve(
237 name, create_if_missing=True, leaf_cls=InMemoryFileNode
238 )
239 fd = None
240 for chunk in content.chunks():
241 if fd is None:
242 mode = "wb" if isinstance(chunk, bytes) else "wt"
243 fd = file_node.open(mode)
244 fd.write(chunk)
246 if hasattr(content, "temporary_file_path"):
247 os.remove(content.temporary_file_path())
249 file_node.modified_time = now()
250 return self._relative_path(name).replace("\\", "/")
252 def path(self, name):
253 return safe_join(self.location, name)
255 def delete(self, name):
256 path, filename = os.path.split(name)
257 dir_node = self._resolve(path, check_exists=False)
258 if dir_node is None:
259 return None
260 dir_node.remove_child(filename)
262 def exists(self, name):
263 return self._resolve(name, check_exists=False) is not None
265 def listdir(self, path):
266 node = self._resolve(path, leaf_cls=InMemoryDirNode)
267 return node.listdir()
269 def size(self, name):
270 return len(self._open(name, "rb").file.getvalue())
272 def url(self, name):
273 if self.base_url is None:
274 raise ValueError("This file is not accessible via a URL.")
275 url = filepath_to_uri(name)
276 if url is not None:
277 url = url.lstrip("/")
278 return urljoin(self.base_url, url)
280 def get_accessed_time(self, name):
281 file_node = self._resolve(name)
282 return file_node.accessed_time
284 def get_created_time(self, name):
285 file_node = self._resolve(name)
286 return file_node.created_time
288 def get_modified_time(self, name):
289 file_node = self._resolve(name)
290 return file_node.modified_time