Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_multipart.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

147 statements  

1from __future__ import annotations 

2 

3import io 

4import os 

5import typing 

6from pathlib import Path 

7 

8from ._types import ( 

9 AsyncByteStream, 

10 FileContent, 

11 FileTypes, 

12 RequestData, 

13 RequestFiles, 

14 SyncByteStream, 

15) 

16from ._utils import ( 

17 format_form_param, 

18 guess_content_type, 

19 peek_filelike_length, 

20 primitive_value_to_str, 

21 to_bytes, 

22) 

23 

24 

25def get_multipart_boundary_from_content_type( 

26 content_type: bytes | None, 

27) -> bytes | None: 

28 if not content_type or not content_type.startswith(b"multipart/form-data"): 

29 return None 

30 # parse boundary according to 

31 # https://www.rfc-editor.org/rfc/rfc2046#section-5.1.1 

32 if b";" in content_type: 

33 for section in content_type.split(b";"): 

34 if section.strip().lower().startswith(b"boundary="): 

35 return section.strip()[len(b"boundary=") :].strip(b'"') 

36 return None 

37 

38 

39class DataField: 

40 """ 

41 A single form field item, within a multipart form field. 

42 """ 

43 

44 def __init__(self, name: str, value: str | bytes | int | float | None) -> None: 

45 if not isinstance(name, str): 

46 raise TypeError( 

47 f"Invalid type for name. Expected str, got {type(name)}: {name!r}" 

48 ) 

49 if value is not None and not isinstance(value, (str, bytes, int, float)): 

50 raise TypeError( 

51 "Invalid type for value. Expected primitive type," 

52 f" got {type(value)}: {value!r}" 

53 ) 

54 self.name = name 

55 self.value: str | bytes = ( 

56 value if isinstance(value, bytes) else primitive_value_to_str(value) 

57 ) 

58 

59 def render_headers(self) -> bytes: 

60 if not hasattr(self, "_headers"): 

61 name = format_form_param("name", self.name) 

62 self._headers = b"".join( 

63 [b"Content-Disposition: form-data; ", name, b"\r\n\r\n"] 

64 ) 

65 

66 return self._headers 

67 

68 def render_data(self) -> bytes: 

69 if not hasattr(self, "_data"): 

70 self._data = to_bytes(self.value) 

71 

72 return self._data 

73 

74 def get_length(self) -> int: 

75 headers = self.render_headers() 

76 data = self.render_data() 

77 return len(headers) + len(data) 

78 

79 def render(self) -> typing.Iterator[bytes]: 

80 yield self.render_headers() 

81 yield self.render_data() 

82 

83 

84class FileField: 

85 """ 

86 A single file field item, within a multipart form field. 

87 """ 

88 

89 CHUNK_SIZE = 64 * 1024 

90 

91 def __init__(self, name: str, value: FileTypes) -> None: 

92 self.name = name 

93 

94 fileobj: FileContent 

95 

96 headers: dict[str, str] = {} 

97 content_type: str | None = None 

98 

99 # This large tuple based API largely mirror's requests' API 

100 # It would be good to think of better APIs for this that we could 

101 # include in httpx 2.0 since variable length tuples(especially of 4 elements) 

102 # are quite unwieldly 

103 if isinstance(value, tuple): 

104 if len(value) == 2: 

105 # neither the 3rd parameter (content_type) nor the 4th (headers) 

106 # was included 

107 filename, fileobj = value 

108 elif len(value) == 3: 

109 filename, fileobj, content_type = value 

110 else: 

111 # all 4 parameters included 

112 filename, fileobj, content_type, headers = value # type: ignore 

113 else: 

114 filename = Path(str(getattr(value, "name", "upload"))).name 

115 fileobj = value 

116 

117 if content_type is None: 

118 content_type = guess_content_type(filename) 

119 

120 has_content_type_header = any("content-type" in key.lower() for key in headers) 

121 if content_type is not None and not has_content_type_header: 

122 # note that unlike requests, we ignore the content_type provided in the 3rd 

123 # tuple element if it is also included in the headers requests does 

124 # the opposite (it overwrites the headerwith the 3rd tuple element) 

125 headers["Content-Type"] = content_type 

126 

127 if isinstance(fileobj, io.StringIO): 

128 raise TypeError( 

129 "Multipart file uploads require 'io.BytesIO', not 'io.StringIO'." 

130 ) 

131 if isinstance(fileobj, io.TextIOBase): 

132 raise TypeError( 

133 "Multipart file uploads must be opened in binary mode, not text mode." 

134 ) 

135 

136 self.filename = filename 

137 self.file = fileobj 

138 self.headers = headers 

139 

140 def get_length(self) -> int | None: 

141 headers = self.render_headers() 

142 

143 if isinstance(self.file, (str, bytes)): 

144 return len(headers) + len(to_bytes(self.file)) 

145 

146 file_length = peek_filelike_length(self.file) 

147 

148 # If we can't determine the filesize without reading it into memory, 

149 # then return `None` here, to indicate an unknown file length. 

150 if file_length is None: 

151 return None 

152 

153 return len(headers) + file_length 

154 

155 def render_headers(self) -> bytes: 

156 if not hasattr(self, "_headers"): 

157 parts = [ 

158 b"Content-Disposition: form-data; ", 

159 format_form_param("name", self.name), 

160 ] 

161 if self.filename: 

162 filename = format_form_param("filename", self.filename) 

163 parts.extend([b"; ", filename]) 

164 for header_name, header_value in self.headers.items(): 

165 key, val = f"\r\n{header_name}: ".encode(), header_value.encode() 

166 parts.extend([key, val]) 

167 parts.append(b"\r\n\r\n") 

168 self._headers = b"".join(parts) 

169 

170 return self._headers 

171 

172 def render_data(self) -> typing.Iterator[bytes]: 

173 if isinstance(self.file, (str, bytes)): 

174 yield to_bytes(self.file) 

175 return 

176 

177 if hasattr(self.file, "seek"): 

178 try: 

179 self.file.seek(0) 

180 except io.UnsupportedOperation: 

181 pass 

182 

183 chunk = self.file.read(self.CHUNK_SIZE) 

184 while chunk: 

185 yield to_bytes(chunk) 

186 chunk = self.file.read(self.CHUNK_SIZE) 

187 

188 def render(self) -> typing.Iterator[bytes]: 

189 yield self.render_headers() 

190 yield from self.render_data() 

191 

192 

193class MultipartStream(SyncByteStream, AsyncByteStream): 

194 """ 

195 Request content as streaming multipart encoded form data. 

196 """ 

197 

198 def __init__( 

199 self, 

200 data: RequestData, 

201 files: RequestFiles, 

202 boundary: bytes | None = None, 

203 ) -> None: 

204 if boundary is None: 

205 boundary = os.urandom(16).hex().encode("ascii") 

206 

207 self.boundary = boundary 

208 self.content_type = "multipart/form-data; boundary=%s" % boundary.decode( 

209 "ascii" 

210 ) 

211 self.fields = list(self._iter_fields(data, files)) 

212 

213 def _iter_fields( 

214 self, data: RequestData, files: RequestFiles 

215 ) -> typing.Iterator[FileField | DataField]: 

216 for name, value in data.items(): 

217 if isinstance(value, (tuple, list)): 

218 for item in value: 

219 yield DataField(name=name, value=item) 

220 else: 

221 yield DataField(name=name, value=value) 

222 

223 file_items = files.items() if isinstance(files, typing.Mapping) else files 

224 for name, value in file_items: 

225 yield FileField(name=name, value=value) 

226 

227 def iter_chunks(self) -> typing.Iterator[bytes]: 

228 for field in self.fields: 

229 yield b"--%s\r\n" % self.boundary 

230 yield from field.render() 

231 yield b"\r\n" 

232 yield b"--%s--\r\n" % self.boundary 

233 

234 def get_content_length(self) -> int | None: 

235 """ 

236 Return the length of the multipart encoded content, or `None` if 

237 any of the files have a length that cannot be determined upfront. 

238 """ 

239 boundary_length = len(self.boundary) 

240 length = 0 

241 

242 for field in self.fields: 

243 field_length = field.get_length() 

244 if field_length is None: 

245 return None 

246 

247 length += 2 + boundary_length + 2 # b"--{boundary}\r\n" 

248 length += field_length 

249 length += 2 # b"\r\n" 

250 

251 length += 2 + boundary_length + 4 # b"--{boundary}--\r\n" 

252 return length 

253 

254 # Content stream interface. 

255 

256 def get_headers(self) -> dict[str, str]: 

257 content_length = self.get_content_length() 

258 content_type = self.content_type 

259 if content_length is None: 

260 return {"Transfer-Encoding": "chunked", "Content-Type": content_type} 

261 return {"Content-Length": str(content_length), "Content-Type": content_type} 

262 

263 def __iter__(self) -> typing.Iterator[bytes]: 

264 for chunk in self.iter_chunks(): 

265 yield chunk 

266 

267 async def __aiter__(self) -> typing.AsyncIterator[bytes]: 

268 for chunk in self.iter_chunks(): 

269 yield chunk