Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/datastructures/file_storage.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

107 statements  

1from __future__ import annotations 

2 

3import collections.abc as cabc 

4import mimetypes 

5import os 

6import typing as t 

7from io import BytesIO 

8from os import fsdecode 

9from os import fspath 

10 

11from .._internal import _plain_int 

12from ..http import parse_options_header 

13from .headers import Headers 

14from .structures import MultiDict 

15 

16 

17class FileStorage: 

18 """The :class:`FileStorage` class is a thin wrapper over incoming files. 

19 It is used by the request object to represent uploaded files. All the 

20 attributes of the wrapper stream are proxied by the file storage so 

21 it's possible to do ``storage.read()`` instead of the long form 

22 ``storage.stream.read()``. 

23 """ 

24 

25 def __init__( 

26 self, 

27 stream: t.IO[bytes] | None = None, 

28 filename: str | None = None, 

29 name: str | None = None, 

30 content_type: str | None = None, 

31 content_length: int | None = None, 

32 headers: Headers | None = None, 

33 ): 

34 self.name = name 

35 self.stream = stream or BytesIO() 

36 self.filename = _guess_filename(self.stream, filename) 

37 

38 if headers is None: 

39 headers = Headers() 

40 self.headers = headers 

41 if content_type is not None: 

42 headers["Content-Type"] = content_type 

43 if content_length is not None: 

44 headers["Content-Length"] = str(content_length) 

45 

46 def _parse_content_type(self) -> None: 

47 if not hasattr(self, "_parsed_content_type"): 

48 self._parsed_content_type = parse_options_header(self.content_type) 

49 

50 @property 

51 def content_type(self) -> str | None: 

52 """The content-type sent in the header. Usually not available""" 

53 return self.headers.get("Content-Type") 

54 

55 @property 

56 def content_length(self) -> int: 

57 """The content-length sent in the header. Usually not available""" 

58 if "Content-Length" in self.headers: 

59 try: 

60 return _plain_int(self.headers["Content-Length"]) 

61 except ValueError: 

62 pass 

63 

64 return 0 

65 

66 @property 

67 def mimetype(self) -> str: 

68 """Like :attr:`content_type`, but without parameters (eg, without 

69 charset, type etc.) and always lowercase. For example if the content 

70 type is ``text/HTML; charset=utf-8`` the mimetype would be 

71 ``'text/html'``. 

72 

73 .. versionadded:: 0.7 

74 """ 

75 self._parse_content_type() 

76 return self._parsed_content_type[0].lower() 

77 

78 @property 

79 def mimetype_params(self) -> dict[str, str]: 

80 """The mimetype parameters as dict. For example if the content 

81 type is ``text/html; charset=utf-8`` the params would be 

82 ``{'charset': 'utf-8'}``. 

83 

84 .. versionadded:: 0.7 

85 """ 

86 self._parse_content_type() 

87 return self._parsed_content_type[1] 

88 

89 def save( 

90 self, dst: str | os.PathLike[str] | t.IO[bytes], buffer_size: int = 16384 

91 ) -> None: 

92 """Save the file to a destination path or file object. If the 

93 destination is a file object you have to close it yourself after the 

94 call. The buffer size is the number of bytes held in memory during 

95 the copy process. It defaults to 16KB. 

96 

97 For secure file saving also have a look at :func:`secure_filename`. 

98 

99 :param dst: a filename, :class:`os.PathLike`, or open file 

100 object to write to. 

101 :param buffer_size: Passed as the ``length`` parameter of 

102 :func:`shutil.copyfileobj`. 

103 

104 .. versionchanged:: 1.0 

105 Supports :mod:`pathlib`. 

106 """ 

107 from shutil import copyfileobj 

108 

109 close_dst = False 

110 

111 if hasattr(dst, "__fspath__"): 

112 dst = fspath(dst) 

113 

114 if isinstance(dst, str): 

115 dst = open(dst, "wb") 

116 close_dst = True 

117 

118 try: 

119 copyfileobj(self.stream, dst, buffer_size) 

120 finally: 

121 if close_dst: 

122 dst.close() 

123 

124 def close(self) -> None: 

125 """Close the underlying file if possible.""" 

126 try: 

127 self.stream.close() 

128 except Exception: 

129 pass 

130 

131 def __bool__(self) -> bool: 

132 return bool(self.filename) 

133 

134 def __getattr__(self, name: str) -> t.Any: 

135 try: 

136 return getattr(self.stream, name) 

137 except AttributeError: 

138 # SpooledTemporaryFile on Python < 3.11 doesn't implement IOBase, 

139 # get the attribute from its backing file instead. 

140 if hasattr(self.stream, "_file"): 

141 return getattr(self.stream._file, name) 

142 raise 

143 

144 def __iter__(self) -> cabc.Iterator[bytes]: 

145 return iter(self.stream) 

146 

147 def __repr__(self) -> str: 

148 return f"<{type(self).__name__}: {self.filename!r} ({self.content_type!r})>" 

149 

150 

151class FileMultiDict(MultiDict[str, FileStorage]): 

152 """A :class:`MultiDict` for managing form data file values. Used by 

153 :class:`.EnvironBuilder` for tests. 

154 

155 .. versionadded:: 0.5 

156 """ 

157 

158 def add_file( 

159 self, 

160 name: str, 

161 file: str | os.PathLike[str] | t.IO[bytes] | FileStorage, 

162 filename: str | None = None, 

163 content_type: str | None = None, 

164 ) -> None: 

165 """Add a file to the given key. Can be passed a filename or IO object, 

166 which will construct a :class:`.FileStorage` object. 

167 

168 :param name: The key to add the file to. 

169 :param file: The file to add. Constructs a :class:`FileStorage` object 

170 if the value is not one. 

171 :param filename: The filename to set for the field. Defaults to ``file`` 

172 if it's a filename or ``file.name`` if it's an IO object. 

173 :param content_type: The content type to set for the field. Defaults to 

174 guessing based on the filename, falling back to 

175 ``application/octet-stream``. 

176 

177 .. versionchanged:: 3.2 

178 The filename is detected from an IO object. 

179 """ 

180 if isinstance(file, FileStorage): 

181 self.add(name, file) 

182 return 

183 

184 if isinstance(file, (str, os.PathLike)): 

185 if filename is None: 

186 filename = os.fspath(file) 

187 

188 file_obj: t.IO[bytes] = open(file, "rb") 

189 else: 

190 file_obj = file # type: ignore[assignment] 

191 filename = _guess_filename(file_obj, filename) 

192 

193 if filename is not None and content_type is None: 

194 content_type = ( 

195 mimetypes.guess_type(filename)[0] or "application/octet-stream" 

196 ) 

197 

198 self.add(name, FileStorage(file_obj, filename, name, content_type)) 

199 

200 def close(self) -> None: 

201 """Call :meth:`~FileStorage.close` on every open file. 

202 

203 .. versionadded:: 3.2 

204 """ 

205 for values in self.listvalues(): 

206 for value in values: 

207 if not value.closed: 

208 value.close() 

209 

210 def clear(self) -> None: 

211 """Call :meth:`close`, then remove all items. 

212 

213 .. versionadded:: 3.2 

214 """ 

215 self.close() 

216 super().clear() 

217 

218 

219def _guess_filename(stream: t.IO[t.Any], filename: str | None) -> str | None: 

220 if filename is not None: 

221 return fsdecode(filename) 

222 

223 filename = getattr(stream, "name", None) 

224 

225 if filename is not None: 

226 filename = fsdecode(filename) 

227 

228 # Python names special streams like `<stderr>`, ignore these. 

229 if filename[:1] == "<" and filename[-1:] == ">": 

230 filename = None 

231 

232 return filename