1import os
2from datetime import datetime, timezone
3from urllib.parse import urljoin
4
5from django.conf import settings
6from django.core.files import File, locks
7from django.core.files.move import file_move_safe
8from django.core.signals import setting_changed
9from django.utils._os import safe_join
10from django.utils.deconstruct import deconstructible
11from django.utils.encoding import filepath_to_uri
12from django.utils.functional import cached_property
13
14from .base import Storage
15from .mixins import StorageSettingsMixin
16
17
18@deconstructible(path="django.core.files.storage.FileSystemStorage")
19class FileSystemStorage(Storage, StorageSettingsMixin):
20 """
21 Standard filesystem storage
22 """
23
24 def __init__(
25 self,
26 location=None,
27 base_url=None,
28 file_permissions_mode=None,
29 directory_permissions_mode=None,
30 allow_overwrite=False,
31 ):
32 self._location = location
33 self._base_url = base_url
34 self._file_permissions_mode = file_permissions_mode
35 self._directory_permissions_mode = directory_permissions_mode
36 self._allow_overwrite = allow_overwrite
37 setting_changed.connect(self._clear_cached_properties)
38
39 @cached_property
40 def base_location(self):
41 return self._value_or_setting(self._location, settings.MEDIA_ROOT)
42
43 @cached_property
44 def location(self):
45 return os.path.abspath(self.base_location)
46
47 @cached_property
48 def base_url(self):
49 if self._base_url is not None and not self._base_url.endswith("/"):
50 self._base_url += "/"
51 return self._value_or_setting(self._base_url, settings.MEDIA_URL)
52
53 @cached_property
54 def file_permissions_mode(self):
55 return self._value_or_setting(
56 self._file_permissions_mode, settings.FILE_UPLOAD_PERMISSIONS
57 )
58
59 @cached_property
60 def directory_permissions_mode(self):
61 return self._value_or_setting(
62 self._directory_permissions_mode, settings.FILE_UPLOAD_DIRECTORY_PERMISSIONS
63 )
64
65 def _open(self, name, mode="rb"):
66 return File(open(self.path(name), mode))
67
68 def _save(self, name, content):
69 full_path = self.path(name)
70
71 # Create any intermediate directories that do not exist.
72 directory = os.path.dirname(full_path)
73 try:
74 if self.directory_permissions_mode is not None:
75 # Set the umask because os.makedirs() doesn't apply the "mode"
76 # argument to intermediate-level directories.
77 old_umask = os.umask(0o777 & ~self.directory_permissions_mode)
78 try:
79 os.makedirs(
80 directory, self.directory_permissions_mode, exist_ok=True
81 )
82 finally:
83 os.umask(old_umask)
84 else:
85 os.makedirs(directory, exist_ok=True)
86 except FileExistsError:
87 raise FileExistsError("%s exists and is not a directory." % directory)
88
89 # There's a potential race condition between get_available_name and
90 # saving the file; it's possible that two threads might return the
91 # same name, at which point all sorts of fun happens. So we need to
92 # try to create the file, but if it already exists we have to go back
93 # to get_available_name() and try again.
94
95 while True:
96 try:
97 # This file has a file path that we can move.
98 if hasattr(content, "temporary_file_path"):
99 file_move_safe(
100 content.temporary_file_path(),
101 full_path,
102 allow_overwrite=self._allow_overwrite,
103 )
104
105 # This is a normal uploadedfile that we can stream.
106 else:
107 # The combination of O_CREAT and O_EXCL makes os.open() raises an
108 # OSError if the file already exists before it's opened.
109 open_flags = (
110 os.O_WRONLY
111 | os.O_CREAT
112 | os.O_EXCL
113 | getattr(os, "O_BINARY", 0)
114 )
115 if self._allow_overwrite:
116 open_flags = open_flags & ~os.O_EXCL
117 fd = os.open(full_path, open_flags, 0o666)
118 _file = None
119 try:
120 locks.lock(fd, locks.LOCK_EX)
121 for chunk in content.chunks():
122 if _file is None:
123 mode = "wb" if isinstance(chunk, bytes) else "wt"
124 _file = os.fdopen(fd, mode)
125 _file.write(chunk)
126 finally:
127 locks.unlock(fd)
128 if _file is not None:
129 _file.close()
130 else:
131 os.close(fd)
132 except FileExistsError:
133 # A new name is needed if the file exists.
134 name = self.get_available_name(name)
135 full_path = self.path(name)
136 else:
137 # OK, the file save worked. Break out of the loop.
138 break
139
140 if self.file_permissions_mode is not None:
141 os.chmod(full_path, self.file_permissions_mode)
142
143 # Ensure the saved path is always relative to the storage root.
144 name = os.path.relpath(full_path, self.location)
145 # Ensure the moved file has the same gid as the storage root.
146 self._ensure_location_group_id(full_path)
147 # Store filenames with forward slashes, even on Windows.
148 return str(name).replace("\\", "/")
149
150 def _ensure_location_group_id(self, full_path):
151 if os.name == "posix":
152 file_gid = os.stat(full_path).st_gid
153 location_gid = os.stat(self.location).st_gid
154 if file_gid != location_gid:
155 try:
156 os.chown(full_path, uid=-1, gid=location_gid)
157 except PermissionError:
158 pass
159
160 def delete(self, name):
161 if not name:
162 raise ValueError("The name must be given to delete().")
163 name = self.path(name)
164 # If the file or directory exists, delete it from the filesystem.
165 try:
166 if os.path.isdir(name):
167 os.rmdir(name)
168 else:
169 os.remove(name)
170 except FileNotFoundError:
171 # FileNotFoundError is raised if the file or directory was removed
172 # concurrently.
173 pass
174
175 def is_name_available(self, name, max_length=None):
176 if self._allow_overwrite:
177 return not (max_length and len(name) > max_length)
178 return super().is_name_available(name, max_length=max_length)
179
180 def get_alternative_name(self, file_root, file_ext):
181 if self._allow_overwrite:
182 return f"{file_root}{file_ext}"
183 return super().get_alternative_name(file_root, file_ext)
184
185 def exists(self, name):
186 return os.path.lexists(self.path(name))
187
188 def listdir(self, path):
189 path = self.path(path)
190 directories, files = [], []
191 with os.scandir(path) as entries:
192 for entry in entries:
193 if entry.is_dir():
194 directories.append(entry.name)
195 else:
196 files.append(entry.name)
197 return directories, files
198
199 def path(self, name):
200 return safe_join(self.location, name)
201
202 def size(self, name):
203 return os.path.getsize(self.path(name))
204
205 def url(self, name):
206 if self.base_url is None:
207 raise ValueError("This file is not accessible via a URL.")
208 url = filepath_to_uri(name)
209 if url is not None:
210 url = url.lstrip("/")
211 return urljoin(self.base_url, url)
212
213 def _datetime_from_timestamp(self, ts):
214 """
215 If timezone support is enabled, make an aware datetime object in UTC;
216 otherwise make a naive one in the local timezone.
217 """
218 tz = timezone.utc if settings.USE_TZ else None
219 return datetime.fromtimestamp(ts, tz=tz)
220
221 def get_accessed_time(self, name):
222 return self._datetime_from_timestamp(os.path.getatime(self.path(name)))
223
224 def get_created_time(self, name):
225 return self._datetime_from_timestamp(os.path.getctime(self.path(name)))
226
227 def get_modified_time(self, name):
228 return self._datetime_from_timestamp(os.path.getmtime(self.path(name)))