Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_multipart.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import io
4import os
5import typing
6from pathlib import Path
8from ._types import (
9 AsyncByteStream,
10 FileContent,
11 FileTypes,
12 RequestData,
13 RequestFiles,
14 SyncByteStream,
15)
16from ._utils import (
17 format_form_param,
18 guess_content_type,
19 peek_filelike_length,
20 primitive_value_to_str,
21 to_bytes,
22)
25def get_multipart_boundary_from_content_type(
26 content_type: bytes | None,
27) -> bytes | None:
28 if not content_type or not content_type.startswith(b"multipart/form-data"):
29 return None
30 # parse boundary according to
31 # https://www.rfc-editor.org/rfc/rfc2046#section-5.1.1
32 if b";" in content_type:
33 for section in content_type.split(b";"):
34 if section.strip().lower().startswith(b"boundary="):
35 return section.strip()[len(b"boundary=") :].strip(b'"')
36 return None
39class DataField:
40 """
41 A single form field item, within a multipart form field.
42 """
44 def __init__(self, name: str, value: str | bytes | int | float | None) -> None:
45 if not isinstance(name, str):
46 raise TypeError(
47 f"Invalid type for name. Expected str, got {type(name)}: {name!r}"
48 )
49 if value is not None and not isinstance(value, (str, bytes, int, float)):
50 raise TypeError(
51 "Invalid type for value. Expected primitive type,"
52 f" got {type(value)}: {value!r}"
53 )
54 self.name = name
55 self.value: str | bytes = (
56 value if isinstance(value, bytes) else primitive_value_to_str(value)
57 )
59 def render_headers(self) -> bytes:
60 if not hasattr(self, "_headers"):
61 name = format_form_param("name", self.name)
62 self._headers = b"".join(
63 [b"Content-Disposition: form-data; ", name, b"\r\n\r\n"]
64 )
66 return self._headers
68 def render_data(self) -> bytes:
69 if not hasattr(self, "_data"):
70 self._data = to_bytes(self.value)
72 return self._data
74 def get_length(self) -> int:
75 headers = self.render_headers()
76 data = self.render_data()
77 return len(headers) + len(data)
79 def render(self) -> typing.Iterator[bytes]:
80 yield self.render_headers()
81 yield self.render_data()
84class FileField:
85 """
86 A single file field item, within a multipart form field.
87 """
89 CHUNK_SIZE = 64 * 1024
91 def __init__(self, name: str, value: FileTypes) -> None:
92 self.name = name
94 fileobj: FileContent
96 headers: dict[str, str] = {}
97 content_type: str | None = None
99 # This large tuple based API largely mirror's requests' API
100 # It would be good to think of better APIs for this that we could
101 # include in httpx 2.0 since variable length tuples(especially of 4 elements)
102 # are quite unwieldly
103 if isinstance(value, tuple):
104 if len(value) == 2:
105 # neither the 3rd parameter (content_type) nor the 4th (headers)
106 # was included
107 filename, fileobj = value
108 elif len(value) == 3:
109 filename, fileobj, content_type = value
110 else:
111 # all 4 parameters included
112 filename, fileobj, content_type, headers = value # type: ignore
113 else:
114 filename = Path(str(getattr(value, "name", "upload"))).name
115 fileobj = value
117 if content_type is None:
118 content_type = guess_content_type(filename)
120 has_content_type_header = any("content-type" in key.lower() for key in headers)
121 if content_type is not None and not has_content_type_header:
122 # note that unlike requests, we ignore the content_type provided in the 3rd
123 # tuple element if it is also included in the headers requests does
124 # the opposite (it overwrites the headerwith the 3rd tuple element)
125 headers["Content-Type"] = content_type
127 if isinstance(fileobj, io.StringIO):
128 raise TypeError(
129 "Multipart file uploads require 'io.BytesIO', not 'io.StringIO'."
130 )
131 if isinstance(fileobj, io.TextIOBase):
132 raise TypeError(
133 "Multipart file uploads must be opened in binary mode, not text mode."
134 )
136 self.filename = filename
137 self.file = fileobj
138 self.headers = headers
140 def get_length(self) -> int | None:
141 headers = self.render_headers()
143 if isinstance(self.file, (str, bytes)):
144 return len(headers) + len(to_bytes(self.file))
146 file_length = peek_filelike_length(self.file)
148 # If we can't determine the filesize without reading it into memory,
149 # then return `None` here, to indicate an unknown file length.
150 if file_length is None:
151 return None
153 return len(headers) + file_length
155 def render_headers(self) -> bytes:
156 if not hasattr(self, "_headers"):
157 parts = [
158 b"Content-Disposition: form-data; ",
159 format_form_param("name", self.name),
160 ]
161 if self.filename:
162 filename = format_form_param("filename", self.filename)
163 parts.extend([b"; ", filename])
164 for header_name, header_value in self.headers.items():
165 key, val = f"\r\n{header_name}: ".encode(), header_value.encode()
166 parts.extend([key, val])
167 parts.append(b"\r\n\r\n")
168 self._headers = b"".join(parts)
170 return self._headers
172 def render_data(self) -> typing.Iterator[bytes]:
173 if isinstance(self.file, (str, bytes)):
174 yield to_bytes(self.file)
175 return
177 if hasattr(self.file, "seek"):
178 try:
179 self.file.seek(0)
180 except io.UnsupportedOperation:
181 pass
183 chunk = self.file.read(self.CHUNK_SIZE)
184 while chunk:
185 yield to_bytes(chunk)
186 chunk = self.file.read(self.CHUNK_SIZE)
188 def render(self) -> typing.Iterator[bytes]:
189 yield self.render_headers()
190 yield from self.render_data()
193class MultipartStream(SyncByteStream, AsyncByteStream):
194 """
195 Request content as streaming multipart encoded form data.
196 """
198 def __init__(
199 self,
200 data: RequestData,
201 files: RequestFiles,
202 boundary: bytes | None = None,
203 ) -> None:
204 if boundary is None:
205 boundary = os.urandom(16).hex().encode("ascii")
207 self.boundary = boundary
208 self.content_type = "multipart/form-data; boundary=%s" % boundary.decode(
209 "ascii"
210 )
211 self.fields = list(self._iter_fields(data, files))
213 def _iter_fields(
214 self, data: RequestData, files: RequestFiles
215 ) -> typing.Iterator[FileField | DataField]:
216 for name, value in data.items():
217 if isinstance(value, (tuple, list)):
218 for item in value:
219 yield DataField(name=name, value=item)
220 else:
221 yield DataField(name=name, value=value)
223 file_items = files.items() if isinstance(files, typing.Mapping) else files
224 for name, value in file_items:
225 yield FileField(name=name, value=value)
227 def iter_chunks(self) -> typing.Iterator[bytes]:
228 for field in self.fields:
229 yield b"--%s\r\n" % self.boundary
230 yield from field.render()
231 yield b"\r\n"
232 yield b"--%s--\r\n" % self.boundary
234 def get_content_length(self) -> int | None:
235 """
236 Return the length of the multipart encoded content, or `None` if
237 any of the files have a length that cannot be determined upfront.
238 """
239 boundary_length = len(self.boundary)
240 length = 0
242 for field in self.fields:
243 field_length = field.get_length()
244 if field_length is None:
245 return None
247 length += 2 + boundary_length + 2 # b"--{boundary}\r\n"
248 length += field_length
249 length += 2 # b"\r\n"
251 length += 2 + boundary_length + 4 # b"--{boundary}--\r\n"
252 return length
254 # Content stream interface.
256 def get_headers(self) -> dict[str, str]:
257 content_length = self.get_content_length()
258 content_type = self.content_type
259 if content_length is None:
260 return {"Transfer-Encoding": "chunked", "Content-Type": content_type}
261 return {"Content-Length": str(content_length), "Content-Type": content_type}
263 def __iter__(self) -> typing.Iterator[bytes]:
264 for chunk in self.iter_chunks():
265 yield chunk
267 async def __aiter__(self) -> typing.AsyncIterator[bytes]:
268 for chunk in self.iter_chunks():
269 yield chunk