Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/core/files/storage/filesystem.py: 31%

128 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-01-17 06:13 +0000

1import os 

2from datetime import datetime, timezone 

3from urllib.parse import urljoin 

4 

5from django.conf import settings 

6from django.core.files import File, locks 

7from django.core.files.move import file_move_safe 

8from django.core.signals import setting_changed 

9from django.utils._os import safe_join 

10from django.utils.deconstruct import deconstructible 

11from django.utils.encoding import filepath_to_uri 

12from django.utils.functional import cached_property 

13 

14from .base import Storage 

15from .mixins import StorageSettingsMixin 

16 

17 

18@deconstructible(path="django.core.files.storage.FileSystemStorage") 

19class FileSystemStorage(Storage, StorageSettingsMixin): 

20 """ 

21 Standard filesystem storage 

22 """ 

23 

24 # The combination of O_CREAT and O_EXCL makes os.open() raise OSError if 

25 # the file already exists before it's opened. 

26 OS_OPEN_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0) 

27 

28 def __init__( 

29 self, 

30 location=None, 

31 base_url=None, 

32 file_permissions_mode=None, 

33 directory_permissions_mode=None, 

34 ): 

35 self._location = location 

36 self._base_url = base_url 

37 self._file_permissions_mode = file_permissions_mode 

38 self._directory_permissions_mode = directory_permissions_mode 

39 setting_changed.connect(self._clear_cached_properties) 

40 

41 @cached_property 

42 def base_location(self): 

43 return self._value_or_setting(self._location, settings.MEDIA_ROOT) 

44 

45 @cached_property 

46 def location(self): 

47 return os.path.abspath(self.base_location) 

48 

49 @cached_property 

50 def base_url(self): 

51 if self._base_url is not None and not self._base_url.endswith("/"): 

52 self._base_url += "/" 

53 return self._value_or_setting(self._base_url, settings.MEDIA_URL) 

54 

55 @cached_property 

56 def file_permissions_mode(self): 

57 return self._value_or_setting( 

58 self._file_permissions_mode, settings.FILE_UPLOAD_PERMISSIONS 

59 ) 

60 

61 @cached_property 

62 def directory_permissions_mode(self): 

63 return self._value_or_setting( 

64 self._directory_permissions_mode, settings.FILE_UPLOAD_DIRECTORY_PERMISSIONS 

65 ) 

66 

67 def _open(self, name, mode="rb"): 

68 return File(open(self.path(name), mode)) 

69 

70 def _save(self, name, content): 

71 full_path = self.path(name) 

72 

73 # Create any intermediate directories that do not exist. 

74 directory = os.path.dirname(full_path) 

75 try: 

76 if self.directory_permissions_mode is not None: 

77 # Set the umask because os.makedirs() doesn't apply the "mode" 

78 # argument to intermediate-level directories. 

79 old_umask = os.umask(0o777 & ~self.directory_permissions_mode) 

80 try: 

81 os.makedirs( 

82 directory, self.directory_permissions_mode, exist_ok=True 

83 ) 

84 finally: 

85 os.umask(old_umask) 

86 else: 

87 os.makedirs(directory, exist_ok=True) 

88 except FileExistsError: 

89 raise FileExistsError("%s exists and is not a directory." % directory) 

90 

91 # There's a potential race condition between get_available_name and 

92 # saving the file; it's possible that two threads might return the 

93 # same name, at which point all sorts of fun happens. So we need to 

94 # try to create the file, but if it already exists we have to go back 

95 # to get_available_name() and try again. 

96 

97 while True: 

98 try: 

99 # This file has a file path that we can move. 

100 if hasattr(content, "temporary_file_path"): 

101 file_move_safe(content.temporary_file_path(), full_path) 

102 

103 # This is a normal uploadedfile that we can stream. 

104 else: 

105 # The current umask value is masked out by os.open! 

106 fd = os.open(full_path, self.OS_OPEN_FLAGS, 0o666) 

107 _file = None 

108 try: 

109 locks.lock(fd, locks.LOCK_EX) 

110 for chunk in content.chunks(): 

111 if _file is None: 

112 mode = "wb" if isinstance(chunk, bytes) else "wt" 

113 _file = os.fdopen(fd, mode) 

114 _file.write(chunk) 

115 finally: 

116 locks.unlock(fd) 

117 if _file is not None: 

118 _file.close() 

119 else: 

120 os.close(fd) 

121 except FileExistsError: 

122 # A new name is needed if the file exists. 

123 name = self.get_available_name(name) 

124 full_path = self.path(name) 

125 else: 

126 # OK, the file save worked. Break out of the loop. 

127 break 

128 

129 if self.file_permissions_mode is not None: 

130 os.chmod(full_path, self.file_permissions_mode) 

131 

132 # Ensure the saved path is always relative to the storage root. 

133 name = os.path.relpath(full_path, self.location) 

134 # Ensure the moved file has the same gid as the storage root. 

135 self._ensure_location_group_id(full_path) 

136 # Store filenames with forward slashes, even on Windows. 

137 return str(name).replace("\\", "/") 

138 

139 def _ensure_location_group_id(self, full_path): 

140 if os.name == "posix": 

141 file_gid = os.stat(full_path).st_gid 

142 location_gid = os.stat(self.location).st_gid 

143 if file_gid != location_gid: 

144 try: 

145 os.chown(full_path, uid=-1, gid=location_gid) 

146 except PermissionError: 

147 pass 

148 

149 def delete(self, name): 

150 if not name: 

151 raise ValueError("The name must be given to delete().") 

152 name = self.path(name) 

153 # If the file or directory exists, delete it from the filesystem. 

154 try: 

155 if os.path.isdir(name): 

156 os.rmdir(name) 

157 else: 

158 os.remove(name) 

159 except FileNotFoundError: 

160 # FileNotFoundError is raised if the file or directory was removed 

161 # concurrently. 

162 pass 

163 

164 def exists(self, name): 

165 return os.path.lexists(self.path(name)) 

166 

167 def listdir(self, path): 

168 path = self.path(path) 

169 directories, files = [], [] 

170 with os.scandir(path) as entries: 

171 for entry in entries: 

172 if entry.is_dir(): 

173 directories.append(entry.name) 

174 else: 

175 files.append(entry.name) 

176 return directories, files 

177 

178 def path(self, name): 

179 return safe_join(self.location, name) 

180 

181 def size(self, name): 

182 return os.path.getsize(self.path(name)) 

183 

184 def url(self, name): 

185 if self.base_url is None: 

186 raise ValueError("This file is not accessible via a URL.") 

187 url = filepath_to_uri(name) 

188 if url is not None: 

189 url = url.lstrip("/") 

190 return urljoin(self.base_url, url) 

191 

192 def _datetime_from_timestamp(self, ts): 

193 """ 

194 If timezone support is enabled, make an aware datetime object in UTC; 

195 otherwise make a naive one in the local timezone. 

196 """ 

197 tz = timezone.utc if settings.USE_TZ else None 

198 return datetime.fromtimestamp(ts, tz=tz) 

199 

200 def get_accessed_time(self, name): 

201 return self._datetime_from_timestamp(os.path.getatime(self.path(name))) 

202 

203 def get_created_time(self, name): 

204 return self._datetime_from_timestamp(os.path.getctime(self.path(name))) 

205 

206 def get_modified_time(self, name): 

207 return self._datetime_from_timestamp(os.path.getmtime(self.path(name)))