Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_multipart.py: 20%
147 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import binascii
2import io
3import os
4import typing
5from pathlib import Path
7from ._types import (
8 AsyncByteStream,
9 FileContent,
10 FileTypes,
11 RequestData,
12 RequestFiles,
13 SyncByteStream,
14)
15from ._utils import (
16 format_form_param,
17 guess_content_type,
18 peek_filelike_length,
19 primitive_value_to_str,
20 to_bytes,
21)
24def get_multipart_boundary_from_content_type(
25 content_type: typing.Optional[bytes],
26) -> typing.Optional[bytes]:
27 if not content_type or not content_type.startswith(b"multipart/form-data"):
28 return None
29 # parse boundary according to
30 # https://www.rfc-editor.org/rfc/rfc2046#section-5.1.1
31 if b";" in content_type:
32 for section in content_type.split(b";"):
33 if section.strip().lower().startswith(b"boundary="):
34 return section.strip()[len(b"boundary=") :].strip(b'"')
35 return None
38class DataField:
39 """
40 A single form field item, within a multipart form field.
41 """
43 def __init__(
44 self, name: str, value: typing.Union[str, bytes, int, float, None]
45 ) -> None:
46 if not isinstance(name, str):
47 raise TypeError(
48 f"Invalid type for name. Expected str, got {type(name)}: {name!r}"
49 )
50 if value is not None and not isinstance(value, (str, bytes, int, float)):
51 raise TypeError(
52 f"Invalid type for value. Expected primitive type, got {type(value)}: {value!r}"
53 )
54 self.name = name
55 self.value: typing.Union[str, bytes] = (
56 value if isinstance(value, bytes) else primitive_value_to_str(value)
57 )
59 def render_headers(self) -> bytes:
60 if not hasattr(self, "_headers"):
61 name = format_form_param("name", self.name)
62 self._headers = b"".join(
63 [b"Content-Disposition: form-data; ", name, b"\r\n\r\n"]
64 )
66 return self._headers
68 def render_data(self) -> bytes:
69 if not hasattr(self, "_data"):
70 self._data = to_bytes(self.value)
72 return self._data
74 def get_length(self) -> int:
75 headers = self.render_headers()
76 data = self.render_data()
77 return len(headers) + len(data)
79 def render(self) -> typing.Iterator[bytes]:
80 yield self.render_headers()
81 yield self.render_data()
84class FileField:
85 """
86 A single file field item, within a multipart form field.
87 """
89 CHUNK_SIZE = 64 * 1024
91 def __init__(self, name: str, value: FileTypes) -> None:
92 self.name = name
94 fileobj: FileContent
96 headers: typing.Dict[str, str] = {}
97 content_type: typing.Optional[str] = None
99 # This large tuple based API largely mirror's requests' API
100 # It would be good to think of better APIs for this that we could include in httpx 2.0
101 # since variable length tuples (especially of 4 elements) are quite unwieldly
102 if isinstance(value, tuple):
103 if len(value) == 2:
104 # neither the 3rd parameter (content_type) nor the 4th (headers) was included
105 filename, fileobj = value # type: ignore
106 elif len(value) == 3:
107 filename, fileobj, content_type = value # type: ignore
108 else:
109 # all 4 parameters included
110 filename, fileobj, content_type, headers = value # type: ignore
111 else:
112 filename = Path(str(getattr(value, "name", "upload"))).name
113 fileobj = value
115 if content_type is None:
116 content_type = guess_content_type(filename)
118 has_content_type_header = any("content-type" in key.lower() for key in headers)
119 if content_type is not None and not has_content_type_header:
120 # note that unlike requests, we ignore the content_type
121 # provided in the 3rd tuple element if it is also included in the headers
122 # requests does the opposite (it overwrites the header with the 3rd tuple element)
123 headers["Content-Type"] = content_type
125 if "b" not in getattr(fileobj, "mode", "b"):
126 raise TypeError(
127 "Multipart file uploads must be opened in binary mode, not text mode."
128 )
129 if isinstance(fileobj, io.StringIO):
130 raise TypeError(
131 "Multipart file uploads require 'io.BytesIO', not 'io.StringIO'."
132 )
134 self.filename = filename
135 self.file = fileobj
136 self.headers = headers
138 def get_length(self) -> typing.Optional[int]:
139 headers = self.render_headers()
141 if isinstance(self.file, (str, bytes)):
142 return len(headers) + len(to_bytes(self.file))
144 file_length = peek_filelike_length(self.file)
146 # If we can't determine the filesize without reading it into memory,
147 # then return `None` here, to indicate an unknown file length.
148 if file_length is None:
149 return None
151 return len(headers) + file_length
153 def render_headers(self) -> bytes:
154 if not hasattr(self, "_headers"):
155 parts = [
156 b"Content-Disposition: form-data; ",
157 format_form_param("name", self.name),
158 ]
159 if self.filename:
160 filename = format_form_param("filename", self.filename)
161 parts.extend([b"; ", filename])
162 for header_name, header_value in self.headers.items():
163 key, val = f"\r\n{header_name}: ".encode(), header_value.encode()
164 parts.extend([key, val])
165 parts.append(b"\r\n\r\n")
166 self._headers = b"".join(parts)
168 return self._headers
170 def render_data(self) -> typing.Iterator[bytes]:
171 if isinstance(self.file, (str, bytes)):
172 yield to_bytes(self.file)
173 return
175 if hasattr(self.file, "seek"):
176 try:
177 self.file.seek(0)
178 except io.UnsupportedOperation:
179 pass
181 chunk = self.file.read(self.CHUNK_SIZE)
182 while chunk:
183 yield to_bytes(chunk)
184 chunk = self.file.read(self.CHUNK_SIZE)
186 def render(self) -> typing.Iterator[bytes]:
187 yield self.render_headers()
188 yield from self.render_data()
191class MultipartStream(SyncByteStream, AsyncByteStream):
192 """
193 Request content as streaming multipart encoded form data.
194 """
196 def __init__(
197 self,
198 data: RequestData,
199 files: RequestFiles,
200 boundary: typing.Optional[bytes] = None,
201 ) -> None:
202 if boundary is None:
203 boundary = binascii.hexlify(os.urandom(16))
205 self.boundary = boundary
206 self.content_type = "multipart/form-data; boundary=%s" % boundary.decode(
207 "ascii"
208 )
209 self.fields = list(self._iter_fields(data, files))
211 def _iter_fields(
212 self, data: RequestData, files: RequestFiles
213 ) -> typing.Iterator[typing.Union[FileField, DataField]]:
214 for name, value in data.items():
215 if isinstance(value, (tuple, list)):
216 for item in value:
217 yield DataField(name=name, value=item)
218 else:
219 yield DataField(name=name, value=value)
221 file_items = files.items() if isinstance(files, typing.Mapping) else files
222 for name, value in file_items:
223 yield FileField(name=name, value=value)
225 def iter_chunks(self) -> typing.Iterator[bytes]:
226 for field in self.fields:
227 yield b"--%s\r\n" % self.boundary
228 yield from field.render()
229 yield b"\r\n"
230 yield b"--%s--\r\n" % self.boundary
232 def get_content_length(self) -> typing.Optional[int]:
233 """
234 Return the length of the multipart encoded content, or `None` if
235 any of the files have a length that cannot be determined upfront.
236 """
237 boundary_length = len(self.boundary)
238 length = 0
240 for field in self.fields:
241 field_length = field.get_length()
242 if field_length is None:
243 return None
245 length += 2 + boundary_length + 2 # b"--{boundary}\r\n"
246 length += field_length
247 length += 2 # b"\r\n"
249 length += 2 + boundary_length + 4 # b"--{boundary}--\r\n"
250 return length
252 # Content stream interface.
254 def get_headers(self) -> typing.Dict[str, str]:
255 content_length = self.get_content_length()
256 content_type = self.content_type
257 if content_length is None:
258 return {"Transfer-Encoding": "chunked", "Content-Type": content_type}
259 return {"Content-Length": str(content_length), "Content-Type": content_type}
261 def __iter__(self) -> typing.Iterator[bytes]:
262 for chunk in self.iter_chunks():
263 yield chunk
265 async def __aiter__(self) -> typing.AsyncIterator[bytes]:
266 for chunk in self.iter_chunks():
267 yield chunk