Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/datastructures/file_storage.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

96 statements  

1from __future__ import annotations 

2 

3import collections.abc as cabc 

4import mimetypes 

5import os 

6import typing as t 

7from io import BytesIO 

8from os import fsdecode 

9from os import fspath 

10 

11from .._internal import _plain_int 

12from .headers import Headers 

13from .structures import MultiDict 

14 

15 

16class FileStorage: 

17 """The :class:`FileStorage` class is a thin wrapper over incoming files. 

18 It is used by the request object to represent uploaded files. All the 

19 attributes of the wrapper stream are proxied by the file storage so 

20 it's possible to do ``storage.read()`` instead of the long form 

21 ``storage.stream.read()``. 

22 """ 

23 

24 def __init__( 

25 self, 

26 stream: t.IO[bytes] | None = None, 

27 filename: str | None = None, 

28 name: str | None = None, 

29 content_type: str | None = None, 

30 content_length: int | None = None, 

31 headers: Headers | None = None, 

32 ): 

33 self.name = name 

34 self.stream = stream or BytesIO() 

35 

36 # If no filename is provided, attempt to get the filename from 

37 # the stream object. Python names special streams like 

38 # ``<stderr>`` with angular brackets, skip these streams. 

39 if filename is None: 

40 filename = getattr(stream, "name", None) 

41 

42 if filename is not None: 

43 filename = fsdecode(filename) 

44 

45 if filename and filename[0] == "<" and filename[-1] == ">": 

46 filename = None 

47 else: 

48 filename = fsdecode(filename) 

49 

50 self.filename = filename 

51 

52 if headers is None: 

53 headers = Headers() 

54 self.headers = headers 

55 if content_type is not None: 

56 headers["Content-Type"] = content_type 

57 if content_length is not None: 

58 headers["Content-Length"] = str(content_length) 

59 

60 def _parse_content_type(self) -> None: 

61 if not hasattr(self, "_parsed_content_type"): 

62 self._parsed_content_type = http.parse_options_header(self.content_type) 

63 

64 @property 

65 def content_type(self) -> str | None: 

66 """The content-type sent in the header. Usually not available""" 

67 return self.headers.get("content-type") 

68 

69 @property 

70 def content_length(self) -> int: 

71 """The content-length sent in the header. Usually not available""" 

72 if "content-length" in self.headers: 

73 try: 

74 return _plain_int(self.headers["content-length"]) 

75 except ValueError: 

76 pass 

77 

78 return 0 

79 

80 @property 

81 def mimetype(self) -> str: 

82 """Like :attr:`content_type`, but without parameters (eg, without 

83 charset, type etc.) and always lowercase. For example if the content 

84 type is ``text/HTML; charset=utf-8`` the mimetype would be 

85 ``'text/html'``. 

86 

87 .. versionadded:: 0.7 

88 """ 

89 self._parse_content_type() 

90 return self._parsed_content_type[0].lower() 

91 

92 @property 

93 def mimetype_params(self) -> dict[str, str]: 

94 """The mimetype parameters as dict. For example if the content 

95 type is ``text/html; charset=utf-8`` the params would be 

96 ``{'charset': 'utf-8'}``. 

97 

98 .. versionadded:: 0.7 

99 """ 

100 self._parse_content_type() 

101 return self._parsed_content_type[1] 

102 

103 def save( 

104 self, dst: str | os.PathLike[str] | t.IO[bytes], buffer_size: int = 16384 

105 ) -> None: 

106 """Save the file to a destination path or file object. If the 

107 destination is a file object you have to close it yourself after the 

108 call. The buffer size is the number of bytes held in memory during 

109 the copy process. It defaults to 16KB. 

110 

111 For secure file saving also have a look at :func:`secure_filename`. 

112 

113 :param dst: a filename, :class:`os.PathLike`, or open file 

114 object to write to. 

115 :param buffer_size: Passed as the ``length`` parameter of 

116 :func:`shutil.copyfileobj`. 

117 

118 .. versionchanged:: 1.0 

119 Supports :mod:`pathlib`. 

120 """ 

121 from shutil import copyfileobj 

122 

123 close_dst = False 

124 

125 if hasattr(dst, "__fspath__"): 

126 dst = fspath(dst) 

127 

128 if isinstance(dst, str): 

129 dst = open(dst, "wb") 

130 close_dst = True 

131 

132 try: 

133 copyfileobj(self.stream, dst, buffer_size) 

134 finally: 

135 if close_dst: 

136 dst.close() 

137 

138 def close(self) -> None: 

139 """Close the underlying file if possible.""" 

140 try: 

141 self.stream.close() 

142 except Exception: 

143 pass 

144 

145 def __bool__(self) -> bool: 

146 return bool(self.filename) 

147 

148 def __getattr__(self, name: str) -> t.Any: 

149 try: 

150 return getattr(self.stream, name) 

151 except AttributeError: 

152 # SpooledTemporaryFile on Python < 3.11 doesn't implement IOBase, 

153 # get the attribute from its backing file instead. 

154 if hasattr(self.stream, "_file"): 

155 return getattr(self.stream._file, name) 

156 raise 

157 

158 def __iter__(self) -> cabc.Iterator[bytes]: 

159 return iter(self.stream) 

160 

161 def __repr__(self) -> str: 

162 return f"<{type(self).__name__}: {self.filename!r} ({self.content_type!r})>" 

163 

164 

165class FileMultiDict(MultiDict[str, FileStorage]): 

166 """A special :class:`MultiDict` that has convenience methods to add 

167 files to it. This is used for :class:`EnvironBuilder` and generally 

168 useful for unittesting. 

169 

170 .. versionadded:: 0.5 

171 """ 

172 

173 def add_file( 

174 self, 

175 name: str, 

176 file: str | os.PathLike[str] | t.IO[bytes] | FileStorage, 

177 filename: str | None = None, 

178 content_type: str | None = None, 

179 ) -> None: 

180 """Adds a new file to the dict. `file` can be a file name or 

181 a :class:`file`-like or a :class:`FileStorage` object. 

182 

183 :param name: the name of the field. 

184 :param file: a filename or :class:`file`-like object 

185 :param filename: an optional filename 

186 :param content_type: an optional content type 

187 """ 

188 if isinstance(file, FileStorage): 

189 self.add(name, file) 

190 return 

191 

192 if isinstance(file, (str, os.PathLike)): 

193 if filename is None: 

194 filename = os.fspath(file) 

195 

196 file_obj: t.IO[bytes] = open(file, "rb") 

197 else: 

198 file_obj = file # type: ignore[assignment] 

199 

200 if filename and content_type is None: 

201 content_type = ( 

202 mimetypes.guess_type(filename)[0] or "application/octet-stream" 

203 ) 

204 

205 self.add(name, FileStorage(file_obj, filename, name, content_type)) 

206 

207 

208# circular dependencies 

209from .. import http