Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/django/core/files/storage/filesystem.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

140 statements  

1import os 

2from datetime import datetime, timezone 

3from urllib.parse import urljoin 

4 

5from django.conf import settings 

6from django.core.files import File, locks 

7from django.core.files.move import file_move_safe 

8from django.core.signals import setting_changed 

9from django.utils._os import safe_join 

10from django.utils.deconstruct import deconstructible 

11from django.utils.encoding import filepath_to_uri 

12from django.utils.functional import cached_property 

13 

14from .base import Storage 

15from .mixins import StorageSettingsMixin 

16 

17 

18@deconstructible(path="django.core.files.storage.FileSystemStorage") 

19class FileSystemStorage(Storage, StorageSettingsMixin): 

20 """ 

21 Standard filesystem storage 

22 """ 

23 

24 def __init__( 

25 self, 

26 location=None, 

27 base_url=None, 

28 file_permissions_mode=None, 

29 directory_permissions_mode=None, 

30 allow_overwrite=False, 

31 ): 

32 self._location = location 

33 self._base_url = base_url 

34 self._file_permissions_mode = file_permissions_mode 

35 self._directory_permissions_mode = directory_permissions_mode 

36 self._allow_overwrite = allow_overwrite 

37 setting_changed.connect(self._clear_cached_properties) 

38 

39 @cached_property 

40 def base_location(self): 

41 return self._value_or_setting(self._location, settings.MEDIA_ROOT) 

42 

43 @cached_property 

44 def location(self): 

45 return os.path.abspath(self.base_location) 

46 

47 @cached_property 

48 def base_url(self): 

49 if self._base_url is not None and not self._base_url.endswith("/"): 

50 self._base_url += "/" 

51 return self._value_or_setting(self._base_url, settings.MEDIA_URL) 

52 

53 @cached_property 

54 def file_permissions_mode(self): 

55 return self._value_or_setting( 

56 self._file_permissions_mode, settings.FILE_UPLOAD_PERMISSIONS 

57 ) 

58 

59 @cached_property 

60 def directory_permissions_mode(self): 

61 return self._value_or_setting( 

62 self._directory_permissions_mode, settings.FILE_UPLOAD_DIRECTORY_PERMISSIONS 

63 ) 

64 

65 def _open(self, name, mode="rb"): 

66 return File(open(self.path(name), mode)) 

67 

68 def _save(self, name, content): 

69 full_path = self.path(name) 

70 

71 # Create any intermediate directories that do not exist. 

72 directory = os.path.dirname(full_path) 

73 try: 

74 if self.directory_permissions_mode is not None: 

75 # Set the umask because os.makedirs() doesn't apply the "mode" 

76 # argument to intermediate-level directories. 

77 old_umask = os.umask(0o777 & ~self.directory_permissions_mode) 

78 try: 

79 os.makedirs( 

80 directory, self.directory_permissions_mode, exist_ok=True 

81 ) 

82 finally: 

83 os.umask(old_umask) 

84 else: 

85 os.makedirs(directory, exist_ok=True) 

86 except FileExistsError: 

87 raise FileExistsError("%s exists and is not a directory." % directory) 

88 

89 # There's a potential race condition between get_available_name and 

90 # saving the file; it's possible that two threads might return the 

91 # same name, at which point all sorts of fun happens. So we need to 

92 # try to create the file, but if it already exists we have to go back 

93 # to get_available_name() and try again. 

94 

95 while True: 

96 try: 

97 # This file has a file path that we can move. 

98 if hasattr(content, "temporary_file_path"): 

99 file_move_safe( 

100 content.temporary_file_path(), 

101 full_path, 

102 allow_overwrite=self._allow_overwrite, 

103 ) 

104 

105 # This is a normal uploadedfile that we can stream. 

106 else: 

107 # The combination of O_CREAT and O_EXCL makes os.open() raises an 

108 # OSError if the file already exists before it's opened. 

109 open_flags = ( 

110 os.O_WRONLY 

111 | os.O_CREAT 

112 | os.O_EXCL 

113 | getattr(os, "O_BINARY", 0) 

114 ) 

115 if self._allow_overwrite: 

116 open_flags = open_flags & ~os.O_EXCL 

117 fd = os.open(full_path, open_flags, 0o666) 

118 _file = None 

119 try: 

120 locks.lock(fd, locks.LOCK_EX) 

121 for chunk in content.chunks(): 

122 if _file is None: 

123 mode = "wb" if isinstance(chunk, bytes) else "wt" 

124 _file = os.fdopen(fd, mode) 

125 _file.write(chunk) 

126 finally: 

127 locks.unlock(fd) 

128 if _file is not None: 

129 _file.close() 

130 else: 

131 os.close(fd) 

132 except FileExistsError: 

133 # A new name is needed if the file exists. 

134 name = self.get_available_name(name) 

135 full_path = self.path(name) 

136 else: 

137 # OK, the file save worked. Break out of the loop. 

138 break 

139 

140 if self.file_permissions_mode is not None: 

141 os.chmod(full_path, self.file_permissions_mode) 

142 

143 # Ensure the saved path is always relative to the storage root. 

144 name = os.path.relpath(full_path, self.location) 

145 # Ensure the moved file has the same gid as the storage root. 

146 self._ensure_location_group_id(full_path) 

147 # Store filenames with forward slashes, even on Windows. 

148 return str(name).replace("\\", "/") 

149 

150 def _ensure_location_group_id(self, full_path): 

151 if os.name == "posix": 

152 file_gid = os.stat(full_path).st_gid 

153 location_gid = os.stat(self.location).st_gid 

154 if file_gid != location_gid: 

155 try: 

156 os.chown(full_path, uid=-1, gid=location_gid) 

157 except PermissionError: 

158 pass 

159 

160 def delete(self, name): 

161 if not name: 

162 raise ValueError("The name must be given to delete().") 

163 name = self.path(name) 

164 # If the file or directory exists, delete it from the filesystem. 

165 try: 

166 if os.path.isdir(name): 

167 os.rmdir(name) 

168 else: 

169 os.remove(name) 

170 except FileNotFoundError: 

171 # FileNotFoundError is raised if the file or directory was removed 

172 # concurrently. 

173 pass 

174 

175 def is_name_available(self, name, max_length=None): 

176 if self._allow_overwrite: 

177 return not (max_length and len(name) > max_length) 

178 return super().is_name_available(name, max_length=max_length) 

179 

180 def get_alternative_name(self, file_root, file_ext): 

181 if self._allow_overwrite: 

182 return f"{file_root}{file_ext}" 

183 return super().get_alternative_name(file_root, file_ext) 

184 

185 def exists(self, name): 

186 return os.path.lexists(self.path(name)) 

187 

188 def listdir(self, path): 

189 path = self.path(path) 

190 directories, files = [], [] 

191 with os.scandir(path) as entries: 

192 for entry in entries: 

193 if entry.is_dir(): 

194 directories.append(entry.name) 

195 else: 

196 files.append(entry.name) 

197 return directories, files 

198 

199 def path(self, name): 

200 return safe_join(self.location, name) 

201 

202 def size(self, name): 

203 return os.path.getsize(self.path(name)) 

204 

205 def url(self, name): 

206 if self.base_url is None: 

207 raise ValueError("This file is not accessible via a URL.") 

208 url = filepath_to_uri(name) 

209 if url is not None: 

210 url = url.lstrip("/") 

211 return urljoin(self.base_url, url) 

212 

213 def _datetime_from_timestamp(self, ts): 

214 """ 

215 If timezone support is enabled, make an aware datetime object in UTC; 

216 otherwise make a naive one in the local timezone. 

217 """ 

218 tz = timezone.utc if settings.USE_TZ else None 

219 return datetime.fromtimestamp(ts, tz=tz) 

220 

221 def get_accessed_time(self, name): 

222 return self._datetime_from_timestamp(os.path.getatime(self.path(name))) 

223 

224 def get_created_time(self, name): 

225 return self._datetime_from_timestamp(os.path.getctime(self.path(name))) 

226 

227 def get_modified_time(self, name): 

228 return self._datetime_from_timestamp(os.path.getmtime(self.path(name)))