Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_multipart.py: 20%

147 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import binascii 

2import io 

3import os 

4import typing 

5from pathlib import Path 

6 

7from ._types import ( 

8 AsyncByteStream, 

9 FileContent, 

10 FileTypes, 

11 RequestData, 

12 RequestFiles, 

13 SyncByteStream, 

14) 

15from ._utils import ( 

16 format_form_param, 

17 guess_content_type, 

18 peek_filelike_length, 

19 primitive_value_to_str, 

20 to_bytes, 

21) 

22 

23 

24def get_multipart_boundary_from_content_type( 

25 content_type: typing.Optional[bytes], 

26) -> typing.Optional[bytes]: 

27 if not content_type or not content_type.startswith(b"multipart/form-data"): 

28 return None 

29 # parse boundary according to 

30 # https://www.rfc-editor.org/rfc/rfc2046#section-5.1.1 

31 if b";" in content_type: 

32 for section in content_type.split(b";"): 

33 if section.strip().lower().startswith(b"boundary="): 

34 return section.strip()[len(b"boundary=") :].strip(b'"') 

35 return None 

36 

37 

38class DataField: 

39 """ 

40 A single form field item, within a multipart form field. 

41 """ 

42 

43 def __init__( 

44 self, name: str, value: typing.Union[str, bytes, int, float, None] 

45 ) -> None: 

46 if not isinstance(name, str): 

47 raise TypeError( 

48 f"Invalid type for name. Expected str, got {type(name)}: {name!r}" 

49 ) 

50 if value is not None and not isinstance(value, (str, bytes, int, float)): 

51 raise TypeError( 

52 f"Invalid type for value. Expected primitive type, got {type(value)}: {value!r}" 

53 ) 

54 self.name = name 

55 self.value: typing.Union[str, bytes] = ( 

56 value if isinstance(value, bytes) else primitive_value_to_str(value) 

57 ) 

58 

59 def render_headers(self) -> bytes: 

60 if not hasattr(self, "_headers"): 

61 name = format_form_param("name", self.name) 

62 self._headers = b"".join( 

63 [b"Content-Disposition: form-data; ", name, b"\r\n\r\n"] 

64 ) 

65 

66 return self._headers 

67 

68 def render_data(self) -> bytes: 

69 if not hasattr(self, "_data"): 

70 self._data = to_bytes(self.value) 

71 

72 return self._data 

73 

74 def get_length(self) -> int: 

75 headers = self.render_headers() 

76 data = self.render_data() 

77 return len(headers) + len(data) 

78 

79 def render(self) -> typing.Iterator[bytes]: 

80 yield self.render_headers() 

81 yield self.render_data() 

82 

83 

84class FileField: 

85 """ 

86 A single file field item, within a multipart form field. 

87 """ 

88 

89 CHUNK_SIZE = 64 * 1024 

90 

91 def __init__(self, name: str, value: FileTypes) -> None: 

92 self.name = name 

93 

94 fileobj: FileContent 

95 

96 headers: typing.Dict[str, str] = {} 

97 content_type: typing.Optional[str] = None 

98 

99 # This large tuple based API largely mirror's requests' API 

100 # It would be good to think of better APIs for this that we could include in httpx 2.0 

101 # since variable length tuples (especially of 4 elements) are quite unwieldly 

102 if isinstance(value, tuple): 

103 if len(value) == 2: 

104 # neither the 3rd parameter (content_type) nor the 4th (headers) was included 

105 filename, fileobj = value # type: ignore 

106 elif len(value) == 3: 

107 filename, fileobj, content_type = value # type: ignore 

108 else: 

109 # all 4 parameters included 

110 filename, fileobj, content_type, headers = value # type: ignore 

111 else: 

112 filename = Path(str(getattr(value, "name", "upload"))).name 

113 fileobj = value 

114 

115 if content_type is None: 

116 content_type = guess_content_type(filename) 

117 

118 has_content_type_header = any("content-type" in key.lower() for key in headers) 

119 if content_type is not None and not has_content_type_header: 

120 # note that unlike requests, we ignore the content_type 

121 # provided in the 3rd tuple element if it is also included in the headers 

122 # requests does the opposite (it overwrites the header with the 3rd tuple element) 

123 headers["Content-Type"] = content_type 

124 

125 if "b" not in getattr(fileobj, "mode", "b"): 

126 raise TypeError( 

127 "Multipart file uploads must be opened in binary mode, not text mode." 

128 ) 

129 if isinstance(fileobj, io.StringIO): 

130 raise TypeError( 

131 "Multipart file uploads require 'io.BytesIO', not 'io.StringIO'." 

132 ) 

133 

134 self.filename = filename 

135 self.file = fileobj 

136 self.headers = headers 

137 

138 def get_length(self) -> typing.Optional[int]: 

139 headers = self.render_headers() 

140 

141 if isinstance(self.file, (str, bytes)): 

142 return len(headers) + len(to_bytes(self.file)) 

143 

144 file_length = peek_filelike_length(self.file) 

145 

146 # If we can't determine the filesize without reading it into memory, 

147 # then return `None` here, to indicate an unknown file length. 

148 if file_length is None: 

149 return None 

150 

151 return len(headers) + file_length 

152 

153 def render_headers(self) -> bytes: 

154 if not hasattr(self, "_headers"): 

155 parts = [ 

156 b"Content-Disposition: form-data; ", 

157 format_form_param("name", self.name), 

158 ] 

159 if self.filename: 

160 filename = format_form_param("filename", self.filename) 

161 parts.extend([b"; ", filename]) 

162 for header_name, header_value in self.headers.items(): 

163 key, val = f"\r\n{header_name}: ".encode(), header_value.encode() 

164 parts.extend([key, val]) 

165 parts.append(b"\r\n\r\n") 

166 self._headers = b"".join(parts) 

167 

168 return self._headers 

169 

170 def render_data(self) -> typing.Iterator[bytes]: 

171 if isinstance(self.file, (str, bytes)): 

172 yield to_bytes(self.file) 

173 return 

174 

175 if hasattr(self.file, "seek"): 

176 try: 

177 self.file.seek(0) 

178 except io.UnsupportedOperation: 

179 pass 

180 

181 chunk = self.file.read(self.CHUNK_SIZE) 

182 while chunk: 

183 yield to_bytes(chunk) 

184 chunk = self.file.read(self.CHUNK_SIZE) 

185 

186 def render(self) -> typing.Iterator[bytes]: 

187 yield self.render_headers() 

188 yield from self.render_data() 

189 

190 

191class MultipartStream(SyncByteStream, AsyncByteStream): 

192 """ 

193 Request content as streaming multipart encoded form data. 

194 """ 

195 

196 def __init__( 

197 self, 

198 data: RequestData, 

199 files: RequestFiles, 

200 boundary: typing.Optional[bytes] = None, 

201 ) -> None: 

202 if boundary is None: 

203 boundary = binascii.hexlify(os.urandom(16)) 

204 

205 self.boundary = boundary 

206 self.content_type = "multipart/form-data; boundary=%s" % boundary.decode( 

207 "ascii" 

208 ) 

209 self.fields = list(self._iter_fields(data, files)) 

210 

211 def _iter_fields( 

212 self, data: RequestData, files: RequestFiles 

213 ) -> typing.Iterator[typing.Union[FileField, DataField]]: 

214 for name, value in data.items(): 

215 if isinstance(value, (tuple, list)): 

216 for item in value: 

217 yield DataField(name=name, value=item) 

218 else: 

219 yield DataField(name=name, value=value) 

220 

221 file_items = files.items() if isinstance(files, typing.Mapping) else files 

222 for name, value in file_items: 

223 yield FileField(name=name, value=value) 

224 

225 def iter_chunks(self) -> typing.Iterator[bytes]: 

226 for field in self.fields: 

227 yield b"--%s\r\n" % self.boundary 

228 yield from field.render() 

229 yield b"\r\n" 

230 yield b"--%s--\r\n" % self.boundary 

231 

232 def get_content_length(self) -> typing.Optional[int]: 

233 """ 

234 Return the length of the multipart encoded content, or `None` if 

235 any of the files have a length that cannot be determined upfront. 

236 """ 

237 boundary_length = len(self.boundary) 

238 length = 0 

239 

240 for field in self.fields: 

241 field_length = field.get_length() 

242 if field_length is None: 

243 return None 

244 

245 length += 2 + boundary_length + 2 # b"--{boundary}\r\n" 

246 length += field_length 

247 length += 2 # b"\r\n" 

248 

249 length += 2 + boundary_length + 4 # b"--{boundary}--\r\n" 

250 return length 

251 

252 # Content stream interface. 

253 

254 def get_headers(self) -> typing.Dict[str, str]: 

255 content_length = self.get_content_length() 

256 content_type = self.content_type 

257 if content_length is None: 

258 return {"Transfer-Encoding": "chunked", "Content-Type": content_type} 

259 return {"Content-Length": str(content_length), "Content-Type": content_type} 

260 

261 def __iter__(self) -> typing.Iterator[bytes]: 

262 for chunk in self.iter_chunks(): 

263 yield chunk 

264 

265 async def __aiter__(self) -> typing.AsyncIterator[bytes]: 

266 for chunk in self.iter_chunks(): 

267 yield chunk