Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/datastructures/file_storage.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

92 statements  

1from __future__ import annotations 

2 

3import mimetypes 

4from io import BytesIO 

5from os import fsdecode 

6from os import fspath 

7 

8from .._internal import _plain_int 

9from .structures import MultiDict 

10 

11 

12class FileStorage: 

13 """The :class:`FileStorage` class is a thin wrapper over incoming files. 

14 It is used by the request object to represent uploaded files. All the 

15 attributes of the wrapper stream are proxied by the file storage so 

16 it's possible to do ``storage.read()`` instead of the long form 

17 ``storage.stream.read()``. 

18 """ 

19 

20 def __init__( 

21 self, 

22 stream=None, 

23 filename=None, 

24 name=None, 

25 content_type=None, 

26 content_length=None, 

27 headers=None, 

28 ): 

29 self.name = name 

30 self.stream = stream or BytesIO() 

31 

32 # If no filename is provided, attempt to get the filename from 

33 # the stream object. Python names special streams like 

34 # ``<stderr>`` with angular brackets, skip these streams. 

35 if filename is None: 

36 filename = getattr(stream, "name", None) 

37 

38 if filename is not None: 

39 filename = fsdecode(filename) 

40 

41 if filename and filename[0] == "<" and filename[-1] == ">": 

42 filename = None 

43 else: 

44 filename = fsdecode(filename) 

45 

46 self.filename = filename 

47 

48 if headers is None: 

49 from .headers import Headers 

50 

51 headers = Headers() 

52 self.headers = headers 

53 if content_type is not None: 

54 headers["Content-Type"] = content_type 

55 if content_length is not None: 

56 headers["Content-Length"] = str(content_length) 

57 

58 def _parse_content_type(self): 

59 if not hasattr(self, "_parsed_content_type"): 

60 self._parsed_content_type = http.parse_options_header(self.content_type) 

61 

62 @property 

63 def content_type(self): 

64 """The content-type sent in the header. Usually not available""" 

65 return self.headers.get("content-type") 

66 

67 @property 

68 def content_length(self): 

69 """The content-length sent in the header. Usually not available""" 

70 if "content-length" in self.headers: 

71 try: 

72 return _plain_int(self.headers["content-length"]) 

73 except ValueError: 

74 pass 

75 

76 return 0 

77 

78 @property 

79 def mimetype(self): 

80 """Like :attr:`content_type`, but without parameters (eg, without 

81 charset, type etc.) and always lowercase. For example if the content 

82 type is ``text/HTML; charset=utf-8`` the mimetype would be 

83 ``'text/html'``. 

84 

85 .. versionadded:: 0.7 

86 """ 

87 self._parse_content_type() 

88 return self._parsed_content_type[0].lower() 

89 

90 @property 

91 def mimetype_params(self): 

92 """The mimetype parameters as dict. For example if the content 

93 type is ``text/html; charset=utf-8`` the params would be 

94 ``{'charset': 'utf-8'}``. 

95 

96 .. versionadded:: 0.7 

97 """ 

98 self._parse_content_type() 

99 return self._parsed_content_type[1] 

100 

101 def save(self, dst, buffer_size=16384): 

102 """Save the file to a destination path or file object. If the 

103 destination is a file object you have to close it yourself after the 

104 call. The buffer size is the number of bytes held in memory during 

105 the copy process. It defaults to 16KB. 

106 

107 For secure file saving also have a look at :func:`secure_filename`. 

108 

109 :param dst: a filename, :class:`os.PathLike`, or open file 

110 object to write to. 

111 :param buffer_size: Passed as the ``length`` parameter of 

112 :func:`shutil.copyfileobj`. 

113 

114 .. versionchanged:: 1.0 

115 Supports :mod:`pathlib`. 

116 """ 

117 from shutil import copyfileobj 

118 

119 close_dst = False 

120 

121 if hasattr(dst, "__fspath__"): 

122 dst = fspath(dst) 

123 

124 if isinstance(dst, str): 

125 dst = open(dst, "wb") 

126 close_dst = True 

127 

128 try: 

129 copyfileobj(self.stream, dst, buffer_size) 

130 finally: 

131 if close_dst: 

132 dst.close() 

133 

134 def close(self): 

135 """Close the underlying file if possible.""" 

136 try: 

137 self.stream.close() 

138 except Exception: 

139 pass 

140 

141 def __bool__(self): 

142 return bool(self.filename) 

143 

144 def __getattr__(self, name): 

145 try: 

146 return getattr(self.stream, name) 

147 except AttributeError: 

148 # SpooledTemporaryFile doesn't implement IOBase, get the 

149 # attribute from its backing file instead. 

150 # https://github.com/python/cpython/pull/3249 

151 if hasattr(self.stream, "_file"): 

152 return getattr(self.stream._file, name) 

153 raise 

154 

155 def __iter__(self): 

156 return iter(self.stream) 

157 

158 def __repr__(self): 

159 return f"<{type(self).__name__}: {self.filename!r} ({self.content_type!r})>" 

160 

161 

162class FileMultiDict(MultiDict): 

163 """A special :class:`MultiDict` that has convenience methods to add 

164 files to it. This is used for :class:`EnvironBuilder` and generally 

165 useful for unittesting. 

166 

167 .. versionadded:: 0.5 

168 """ 

169 

170 def add_file(self, name, file, filename=None, content_type=None): 

171 """Adds a new file to the dict. `file` can be a file name or 

172 a :class:`file`-like or a :class:`FileStorage` object. 

173 

174 :param name: the name of the field. 

175 :param file: a filename or :class:`file`-like object 

176 :param filename: an optional filename 

177 :param content_type: an optional content type 

178 """ 

179 if isinstance(file, FileStorage): 

180 value = file 

181 else: 

182 if isinstance(file, str): 

183 if filename is None: 

184 filename = file 

185 file = open(file, "rb") 

186 if filename and content_type is None: 

187 content_type = ( 

188 mimetypes.guess_type(filename)[0] or "application/octet-stream" 

189 ) 

190 value = FileStorage(file, filename, name, content_type) 

191 

192 self.add(name, value) 

193 

194 

195# circular dependencies 

196from .. import http