Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/multipart.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import base64
2import binascii
3import json
4import re
5import sys
6import uuid
7import warnings
8from collections import deque
9from collections.abc import AsyncIterator, Iterator, Mapping, Sequence
10from types import TracebackType
11from typing import TYPE_CHECKING, Any, TypeVar, Union, cast
12from urllib.parse import parse_qsl, unquote, urlencode
14from multidict import CIMultiDict, CIMultiDictProxy
16from .abc import AbstractStreamWriter
17from .compression_utils import ZLibCompressor, ZLibDecompressor
18from .hdrs import (
19 CONTENT_DISPOSITION,
20 CONTENT_ENCODING,
21 CONTENT_LENGTH,
22 CONTENT_TRANSFER_ENCODING,
23 CONTENT_TYPE,
24)
25from .helpers import CHAR, DEFAULT_CHUNK_SIZE, TOKEN, parse_mimetype, reify
26from .http import HeadersParser
27from .http_exceptions import BadHttpMessage
28from .log import internal_logger
29from .payload import (
30 JsonPayload,
31 LookupError,
32 Order,
33 Payload,
34 StringPayload,
35 get_payload,
36 payload_type,
37)
38from .streams import StreamReader
40if sys.version_info >= (3, 11):
41 from typing import Self
42else:
43 Self = TypeVar("Self", bound="BodyPartReader")
45if sys.version_info >= (3, 12):
46 from collections.abc import Buffer
47else:
48 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
50_Buffer = TypeVar("_Buffer", bound=Buffer)
52__all__ = (
53 "MultipartReader",
54 "MultipartWriter",
55 "BodyPartReader",
56 "BadContentDispositionHeader",
57 "BadContentDispositionParam",
58 "parse_content_disposition",
59 "content_disposition_filename",
60)
63if TYPE_CHECKING:
64 from .client_reqrep import ClientResponse
67class BadContentDispositionHeader(RuntimeWarning):
68 pass
71class BadContentDispositionParam(RuntimeWarning):
72 pass
75def parse_content_disposition(
76 header: str | None,
77) -> tuple[str | None, dict[str, str]]:
78 def is_token(string: str) -> bool:
79 return bool(string) and TOKEN >= set(string)
81 def is_quoted(string: str) -> bool:
82 return string[0] == string[-1] == '"'
84 def is_rfc5987(string: str) -> bool:
85 return is_token(string) and string.count("'") == 2
87 def is_extended_param(string: str) -> bool:
88 return string.endswith("*")
90 def is_continuous_param(string: str) -> bool:
91 pos = string.find("*") + 1
92 if not pos:
93 return False
94 substring = string[pos:-1] if string.endswith("*") else string[pos:]
95 return substring.isdigit()
97 def unescape(text: str, *, chars: str = "".join(map(re.escape, CHAR))) -> str:
98 return re.sub(f"\\\\([{chars}])", "\\1", text)
100 if not header:
101 return None, {}
103 disptype, *parts = header.split(";")
104 if not is_token(disptype):
105 warnings.warn(BadContentDispositionHeader(header))
106 return None, {}
108 params: dict[str, str] = {}
109 while parts:
110 item = parts.pop(0)
112 if not item: # To handle trailing semicolons
113 warnings.warn(BadContentDispositionHeader(header))
114 continue
116 if "=" not in item:
117 warnings.warn(BadContentDispositionHeader(header))
118 return None, {}
120 key, value = item.split("=", 1)
121 key = key.lower().strip()
122 value = value.lstrip()
124 if key in params:
125 warnings.warn(BadContentDispositionHeader(header))
126 return None, {}
128 if not is_token(key):
129 warnings.warn(BadContentDispositionParam(item))
130 continue
132 elif is_continuous_param(key):
133 if is_quoted(value):
134 value = unescape(value[1:-1])
135 elif not is_token(value):
136 warnings.warn(BadContentDispositionParam(item))
137 continue
139 elif is_extended_param(key):
140 if is_rfc5987(value):
141 encoding, _, value = value.split("'", 2)
142 encoding = encoding or "utf-8"
143 else:
144 warnings.warn(BadContentDispositionParam(item))
145 continue
147 try:
148 value = unquote(value, encoding, "strict")
149 except UnicodeDecodeError: # pragma: nocover
150 warnings.warn(BadContentDispositionParam(item))
151 continue
153 else:
154 failed = True
155 if is_quoted(value):
156 failed = False
157 value = unescape(value[1:-1].lstrip("\\/"))
158 elif is_token(value):
159 failed = False
160 elif parts:
161 # maybe just ; in filename, in any case this is just
162 # one case fix, for proper fix we need to redesign parser
163 _value = f"{value};{parts[0]}"
164 if is_quoted(_value):
165 parts.pop(0)
166 value = unescape(_value[1:-1].lstrip("\\/"))
167 failed = False
169 if failed:
170 warnings.warn(BadContentDispositionHeader(header))
171 return None, {}
173 params[key] = value
175 return disptype.lower(), params
178def content_disposition_filename(
179 params: Mapping[str, str], name: str = "filename"
180) -> str | None:
181 name_suf = "%s*" % name
182 if not params:
183 return None
184 elif name_suf in params:
185 return params[name_suf]
186 elif name in params:
187 return params[name]
188 else:
189 parts = []
190 fnparams = sorted(
191 (key, value) for key, value in params.items() if key.startswith(name_suf)
192 )
193 for num, (key, value) in enumerate(fnparams):
194 _, tail = key.split("*", 1)
195 if tail.endswith("*"):
196 tail = tail[:-1]
197 if tail == str(num):
198 parts.append(value)
199 else:
200 break
201 if not parts:
202 return None
203 value = "".join(parts)
204 if "'" in value:
205 encoding, _, value = value.split("'", 2)
206 encoding = encoding or "utf-8"
207 return unquote(value, encoding, "strict")
208 return value
211class MultipartResponseWrapper:
212 """Wrapper around the MultipartReader.
214 It takes care about
215 underlying connection and close it when it needs in.
216 """
218 def __init__(
219 self,
220 resp: "ClientResponse",
221 stream: "MultipartReader",
222 ) -> None:
223 self.resp = resp
224 self.stream = stream
226 def __aiter__(self) -> "MultipartResponseWrapper":
227 return self
229 async def __anext__(
230 self,
231 ) -> Union["MultipartReader", "BodyPartReader"]:
232 part = await self.next()
233 if part is None:
234 raise StopAsyncIteration
235 return part
237 def at_eof(self) -> bool:
238 """Returns True when all response data had been read."""
239 return self.resp.content.at_eof()
241 async def next(
242 self,
243 ) -> Union["MultipartReader", "BodyPartReader"] | None:
244 """Emits next multipart reader object."""
245 item = await self.stream.next()
246 if self.stream.at_eof():
247 await self.release()
248 return item
250 async def release(self) -> None:
251 """Release the connection gracefully.
253 All remaining content is read to the void.
254 """
255 await self.resp.release()
258class BodyPartReader:
259 """Multipart reader for single body part."""
261 chunk_size = 8192
263 def __init__(
264 self,
265 boundary: bytes,
266 headers: "CIMultiDictProxy[str]",
267 content: StreamReader,
268 *,
269 subtype: str = "mixed",
270 default_charset: str | None = None,
271 max_decompress_size: int = DEFAULT_CHUNK_SIZE,
272 client_max_size: int = sys.maxsize,
273 max_size_error_cls: type[Exception] = ValueError,
274 ) -> None:
275 self.headers = headers
276 self._boundary = boundary
277 self._boundary_len = len(boundary) + 2 # Boundary + \r\n
278 self._content = content
279 self._default_charset = default_charset
280 self._at_eof = False
281 self._is_form_data = subtype == "form-data"
282 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8
283 length = None if self._is_form_data else self.headers.get(CONTENT_LENGTH, None)
284 self._length = int(length) if length is not None else None
285 self._read_bytes = 0
286 self._unread: deque[bytes] = deque()
287 self._prev_chunk: bytes | None = None
288 self._content_eof = 0
289 self._cache: dict[str, Any] = {}
290 self._max_decompress_size = max_decompress_size
291 self._client_max_size = client_max_size
292 self._max_size_error_cls = max_size_error_cls
294 def __aiter__(self: Self) -> Self:
295 return self
297 async def __anext__(self) -> bytes:
298 part = await self.next()
299 if part is None:
300 raise StopAsyncIteration
301 return part
303 async def next(self) -> bytes | None:
304 item = await self.read()
305 if not item:
306 return None
307 return item
309 async def read(self, *, decode: bool = False) -> bytes:
310 """Reads body part data.
312 decode: Decodes data following by encoding
313 method from Content-Encoding header. If it missed
314 data remains untouched
315 """
316 if self._at_eof:
317 return b""
318 data = bytearray()
319 while not self._at_eof:
320 data.extend(await self.read_chunk(self.chunk_size))
321 if len(data) > self._client_max_size:
322 raise self._max_size_error_cls(self._client_max_size)
323 if decode:
324 decoded_data = bytearray()
325 async for d in self.decode_iter(data):
326 decoded_data.extend(d)
327 if len(decoded_data) > self._client_max_size:
328 raise self._max_size_error_cls(self._client_max_size)
329 return decoded_data
330 return data
332 async def read_chunk(self, size: int = chunk_size) -> bytes:
333 """Reads body part content chunk of the specified size.
335 size: chunk size
336 """
337 if self._at_eof:
338 return b""
339 if self._length:
340 chunk = await self._read_chunk_from_length(size)
341 else:
342 chunk = await self._read_chunk_from_stream(size)
344 # For the case of base64 data, we must read a fragment of size with a
345 # remainder of 0 by dividing by 4 for string without symbols \n or \r
346 encoding = self.headers.get(CONTENT_TRANSFER_ENCODING)
347 if encoding and encoding.lower() == "base64":
348 stripped_chunk = b"".join(chunk.split())
349 remainder = len(stripped_chunk) % 4
351 while remainder != 0 and not self.at_eof():
352 over_chunk_size = 4 - remainder
353 over_chunk = b""
355 if self._prev_chunk:
356 over_chunk = self._prev_chunk[:over_chunk_size]
357 self._prev_chunk = self._prev_chunk[len(over_chunk) :]
359 if len(over_chunk) != over_chunk_size:
360 over_chunk += await self._content.read(4 - len(over_chunk))
362 if not over_chunk:
363 self._at_eof = True
365 stripped_chunk += b"".join(over_chunk.split())
366 chunk += over_chunk
367 remainder = len(stripped_chunk) % 4
369 self._read_bytes += len(chunk)
370 if self._read_bytes == self._length:
371 self._at_eof = True
372 if self._at_eof and await self._content.readline() != b"\r\n":
373 raise ValueError("Reader did not read all the data or it is malformed")
374 return chunk
376 async def _read_chunk_from_length(self, size: int) -> bytes:
377 # Reads body part content chunk of the specified size.
378 # The body part must has Content-Length header with proper value.
379 assert self._length is not None, "Content-Length required for chunked read"
380 chunk_size = min(size, self._length - self._read_bytes)
381 chunk = await self._content.read(chunk_size)
382 if self._content.at_eof():
383 self._at_eof = True
384 return chunk
386 async def _read_chunk_from_stream(self, size: int) -> bytes:
387 # Reads content chunk of body part with unknown length.
388 # The Content-Length header for body part is not necessary.
389 assert (
390 size >= self._boundary_len
391 ), "Chunk size must be greater or equal than boundary length + 2"
392 first_chunk = self._prev_chunk is None
393 if first_chunk:
394 # We need to re-add the CRLF that got removed from headers parsing.
395 self._prev_chunk = b"\r\n" + await self._content.read(size)
397 chunk = b""
398 # content.read() may return less than size, so we need to loop to ensure
399 # we have enough data to detect the boundary.
400 while len(chunk) < self._boundary_len:
401 chunk += await self._content.read(size)
402 self._content_eof += int(self._content.at_eof())
403 if self._content_eof > 2:
404 raise ValueError("Reading after EOF")
405 if self._content_eof:
406 break
407 if len(chunk) > size:
408 self._content.unread_data(chunk[size:])
409 chunk = chunk[:size]
411 assert self._prev_chunk is not None
412 window = self._prev_chunk + chunk
413 sub = b"\r\n" + self._boundary
414 if first_chunk:
415 idx = window.find(sub)
416 else:
417 idx = window.find(sub, max(0, len(self._prev_chunk) - len(sub)))
418 if idx >= 0:
419 # pushing boundary back to content
420 with warnings.catch_warnings():
421 warnings.filterwarnings("ignore", category=DeprecationWarning)
422 self._content.unread_data(window[idx:])
423 self._prev_chunk = self._prev_chunk[:idx]
424 chunk = window[len(self._prev_chunk) : idx]
425 if not chunk:
426 self._at_eof = True
427 result = self._prev_chunk[2 if first_chunk else 0 :] # Strip initial CRLF
428 self._prev_chunk = chunk
429 return result
431 async def readline(self) -> bytes:
432 """Reads body part by line by line."""
433 if self._at_eof:
434 return b""
436 if self._unread:
437 line = self._unread.popleft()
438 else:
439 line = await self._content.readline()
441 if line.startswith(self._boundary):
442 # the very last boundary may not come with \r\n,
443 # so set single rules for everyone
444 sline = line.rstrip(b"\r\n")
445 boundary = self._boundary
446 last_boundary = self._boundary + b"--"
447 # ensure that we read exactly the boundary, not something alike
448 if sline == boundary or sline == last_boundary:
449 self._at_eof = True
450 self._unread.append(line)
451 return b""
452 else:
453 next_line = await self._content.readline()
454 if next_line.startswith(self._boundary):
455 line = line[:-2] # strip CRLF but only once
456 self._unread.append(next_line)
458 return line
460 async def release(self) -> None:
461 """Like read(), but reads all the data to the void."""
462 if self._at_eof:
463 return
464 while not self._at_eof:
465 await self.read_chunk(self.chunk_size)
467 async def text(self, *, encoding: str | None = None) -> str:
468 """Like read(), but assumes that body part contains text data."""
469 data = await self.read(decode=True)
470 # see https://www.w3.org/TR/html5/forms.html#multipart/form-data-encoding-algorithm
471 # and https://dvcs.w3.org/hg/xhr/raw-file/tip/Overview.html#dom-xmlhttprequest-send
472 encoding = encoding or self.get_charset(default="utf-8")
473 return data.decode(encoding)
475 async def json(self, *, encoding: str | None = None) -> dict[str, Any] | None:
476 """Like read(), but assumes that body parts contains JSON data."""
477 data = await self.read(decode=True)
478 if not data:
479 return None
480 encoding = encoding or self.get_charset(default="utf-8")
481 return cast(dict[str, Any], json.loads(data.decode(encoding)))
483 async def form(self, *, encoding: str | None = None) -> list[tuple[str, str]]:
484 """Like read(), but assumes that body parts contain form urlencoded data."""
485 data = await self.read(decode=True)
486 if not data:
487 return []
488 if encoding is not None:
489 real_encoding = encoding
490 else:
491 real_encoding = self.get_charset(default="utf-8")
492 try:
493 decoded_data = data.rstrip().decode(real_encoding)
494 except UnicodeDecodeError:
495 raise ValueError("data cannot be decoded with %s encoding" % real_encoding)
497 return parse_qsl(
498 decoded_data,
499 keep_blank_values=True,
500 encoding=real_encoding,
501 )
503 def at_eof(self) -> bool:
504 """Returns True if the boundary was reached or False otherwise."""
505 return self._at_eof
507 def _apply_content_transfer_decoding(self, data: _Buffer) -> _Buffer | bytes:
508 """Apply Content-Transfer-Encoding decoding if header is present."""
509 if CONTENT_TRANSFER_ENCODING in self.headers:
510 return self._decode_content_transfer(data)
511 return data
513 def _needs_content_decoding(self) -> bool:
514 """Check if Content-Encoding decoding should be applied."""
515 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8
516 return not self._is_form_data and CONTENT_ENCODING in self.headers
518 def decode(self, data: _Buffer) -> _Buffer | bytes:
519 """Decodes data synchronously.
521 Decodes data according the specified Content-Encoding
522 or Content-Transfer-Encoding headers value.
524 Note: For large payloads, consider using decode_iter() instead
525 to avoid blocking the event loop during decompression.
526 """
527 decoded = self._apply_content_transfer_decoding(data)
528 if self._needs_content_decoding():
529 return self._decode_content(decoded)
530 return decoded
532 async def decode_iter(self, data: _Buffer) -> AsyncIterator[_Buffer | bytes]:
533 """Async generator that yields decoded data chunks.
535 Decodes data according the specified Content-Encoding
536 or Content-Transfer-Encoding headers value.
538 This method offloads decompression to an executor for large payloads
539 to avoid blocking the event loop.
540 """
541 decoded = self._apply_content_transfer_decoding(data)
542 if self._needs_content_decoding():
543 async for d in self._decode_content_async(decoded):
544 yield d
545 else:
546 yield decoded
548 def _decode_content(self, data: _Buffer) -> _Buffer | bytes:
549 encoding = self.headers.get(CONTENT_ENCODING, "").lower()
550 if encoding == "identity":
551 return data
552 if encoding in {"deflate", "gzip"}:
553 return ZLibDecompressor(
554 encoding=encoding,
555 suppress_deflate_header=True,
556 ).decompress_sync(data, max_length=self._max_decompress_size)
558 raise RuntimeError(f"unknown content encoding: {encoding}")
560 async def _decode_content_async(
561 self, data: _Buffer
562 ) -> AsyncIterator[_Buffer | bytes]:
563 encoding = self.headers.get(CONTENT_ENCODING, "").lower()
564 if encoding == "identity":
565 yield data
566 elif encoding in {"deflate", "gzip"}:
567 d = ZLibDecompressor(
568 encoding=encoding,
569 suppress_deflate_header=True,
570 )
571 yield await d.decompress(data, max_length=self._max_decompress_size)
572 while d.data_available:
573 yield await d.decompress(b"", max_length=self._max_decompress_size)
574 else:
575 raise RuntimeError(f"unknown content encoding: {encoding}")
577 def _decode_content_transfer(self, data: _Buffer) -> _Buffer | bytes:
578 encoding = self.headers.get(CONTENT_TRANSFER_ENCODING, "").lower()
580 if encoding == "base64":
581 return base64.b64decode(data)
582 elif encoding == "quoted-printable":
583 return binascii.a2b_qp(data)
584 elif encoding in ("binary", "8bit", "7bit"):
585 return data
586 else:
587 raise RuntimeError(f"unknown content transfer encoding: {encoding}")
589 def get_charset(self, default: str) -> str:
590 """Returns charset parameter from Content-Type header or default."""
591 ctype = self.headers.get(CONTENT_TYPE, "")
592 mimetype = parse_mimetype(ctype)
593 return mimetype.parameters.get("charset", self._default_charset or default)
595 @reify
596 def name(self) -> str | None:
597 """Returns name specified in Content-Disposition header.
599 If the header is missing or malformed, returns None.
600 """
601 _, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION))
602 return content_disposition_filename(params, "name")
604 @reify
605 def filename(self) -> str | None:
606 """Returns filename specified in Content-Disposition header.
608 Returns None if the header is missing or malformed.
609 """
610 _, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION))
611 return content_disposition_filename(params, "filename")
614@payload_type(BodyPartReader, order=Order.try_first)
615class BodyPartReaderPayload(Payload):
616 _value: BodyPartReader
617 # _autoclose = False (inherited) - Streaming reader that may have resources
619 def __init__(self, value: BodyPartReader, *args: Any, **kwargs: Any) -> None:
620 super().__init__(value, *args, **kwargs)
622 params: dict[str, str] = {}
623 if value.name is not None:
624 params["name"] = value.name
625 if value.filename is not None:
626 params["filename"] = value.filename
628 if params:
629 self.set_content_disposition("attachment", True, **params)
631 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
632 raise TypeError("Unable to decode.")
634 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
635 """Raises TypeError as body parts should be consumed via write().
637 This is intentional: BodyPartReader payloads are designed for streaming
638 large data (potentially gigabytes) and must be consumed only once via
639 the write() method to avoid memory exhaustion. They cannot be buffered
640 in memory for reuse.
641 """
642 raise TypeError("Unable to read body part as bytes. Use write() to consume.")
644 async def write(self, writer: AbstractStreamWriter) -> None:
645 field = self._value
646 while chunk := await field.read_chunk(size=DEFAULT_CHUNK_SIZE):
647 async for d in field.decode_iter(chunk):
648 await writer.write(d)
651class MultipartReader:
652 """Multipart body reader."""
654 #: Response wrapper, used when multipart readers constructs from response.
655 response_wrapper_cls = MultipartResponseWrapper
656 #: Multipart reader class, used to handle multipart/* body parts.
657 #: None points to type(self)
658 multipart_reader_cls: type["MultipartReader"] | None = None
659 #: Body part reader class for non multipart/* content types.
660 part_reader_cls = BodyPartReader
662 def __init__(
663 self,
664 headers: Mapping[str, str],
665 content: StreamReader,
666 *,
667 client_max_size: int = sys.maxsize,
668 max_field_size: int = 8190,
669 max_headers: int = 128,
670 max_size_error_cls: type[Exception] = ValueError,
671 ) -> None:
672 self._mimetype = parse_mimetype(headers[CONTENT_TYPE])
673 assert self._mimetype.type == "multipart", "multipart/* content type expected"
674 if "boundary" not in self._mimetype.parameters:
675 raise ValueError(
676 "boundary missed for Content-Type: %s" % headers[CONTENT_TYPE]
677 )
679 self.headers = headers
680 self._boundary = ("--" + self._get_boundary()).encode()
681 self._client_max_size = client_max_size
682 self._content = content
683 self._default_charset: str | None = None
684 self._last_part: MultipartReader | BodyPartReader | None = None
685 self._max_field_size = max_field_size
686 self._max_headers = max_headers
687 self._max_size_error_cls = max_size_error_cls
688 self._at_eof = False
689 self._at_bof = True
690 self._unread: list[bytes] = []
692 def __aiter__(self: Self) -> Self:
693 return self
695 async def __anext__(
696 self,
697 ) -> Union["MultipartReader", BodyPartReader] | None:
698 part = await self.next()
699 if part is None:
700 raise StopAsyncIteration
701 return part
703 @classmethod
704 def from_response(
705 cls,
706 response: "ClientResponse",
707 ) -> MultipartResponseWrapper:
708 """Constructs reader instance from HTTP response.
710 :param response: :class:`~aiohttp.client.ClientResponse` instance
711 """
712 obj = cls.response_wrapper_cls(
713 response, cls(response.headers, response.content)
714 )
715 return obj
717 def at_eof(self) -> bool:
718 """Returns True if the final boundary was reached, false otherwise."""
719 return self._at_eof
721 async def next(
722 self,
723 ) -> Union["MultipartReader", BodyPartReader] | None:
724 """Emits the next multipart body part."""
725 # So, if we're at BOF, we need to skip till the boundary.
726 if self._at_eof:
727 return None
728 await self._maybe_release_last_part()
729 if self._at_bof:
730 await self._read_until_first_boundary()
731 self._at_bof = False
732 else:
733 await self._read_boundary()
734 if self._at_eof: # we just read the last boundary, nothing to do there
735 return None
737 part = await self.fetch_next_part()
738 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.6
739 if (
740 self._last_part is None
741 and self._mimetype.subtype == "form-data"
742 and isinstance(part, BodyPartReader)
743 ):
744 _, params = parse_content_disposition(part.headers.get(CONTENT_DISPOSITION))
745 if params.get("name") == "_charset_":
746 # Longest encoding in https://encoding.spec.whatwg.org/encodings.json
747 # is 19 characters, so 32 should be more than enough for any valid encoding.
748 charset = await part.read_chunk(32)
749 if len(charset) > 31:
750 raise RuntimeError("Invalid default charset")
751 self._default_charset = charset.strip().decode()
752 part = await self.fetch_next_part()
753 self._last_part = part
754 return self._last_part
756 async def release(self) -> None:
757 """Reads all the body parts to the void till the final boundary."""
758 while not self._at_eof:
759 item = await self.next()
760 if item is None:
761 break
762 await item.release()
764 async def fetch_next_part(
765 self,
766 ) -> Union["MultipartReader", BodyPartReader]:
767 """Returns the next body part reader."""
768 headers = await self._read_headers()
769 return self._get_part_reader(headers)
771 def _get_part_reader(
772 self,
773 headers: "CIMultiDictProxy[str]",
774 ) -> Union["MultipartReader", BodyPartReader]:
775 """Dispatches the response by the `Content-Type` header.
777 Returns a suitable reader instance.
779 :param dict headers: Response headers
780 """
781 ctype = headers.get(CONTENT_TYPE, "")
782 mimetype = parse_mimetype(ctype)
784 if mimetype.type == "multipart":
785 if self.multipart_reader_cls is None:
786 return type(self)(
787 headers,
788 self._content,
789 client_max_size=self._client_max_size,
790 max_field_size=self._max_field_size,
791 max_headers=self._max_headers,
792 max_size_error_cls=self._max_size_error_cls,
793 )
794 return self.multipart_reader_cls(
795 headers,
796 self._content,
797 client_max_size=self._client_max_size,
798 max_field_size=self._max_field_size,
799 max_headers=self._max_headers,
800 max_size_error_cls=self._max_size_error_cls,
801 )
802 else:
803 return self.part_reader_cls(
804 self._boundary,
805 headers,
806 self._content,
807 subtype=self._mimetype.subtype,
808 default_charset=self._default_charset,
809 client_max_size=self._client_max_size,
810 max_size_error_cls=self._max_size_error_cls,
811 )
813 def _get_boundary(self) -> str:
814 boundary = self._mimetype.parameters["boundary"]
815 if len(boundary) > 70:
816 raise ValueError("boundary %r is too long (70 chars max)" % boundary)
818 return boundary
820 async def _readline(self) -> bytes:
821 if self._unread:
822 return self._unread.pop()
823 return await self._content.readline()
825 async def _read_until_first_boundary(self) -> None:
826 while True:
827 chunk = await self._readline()
828 if chunk == b"":
829 raise ValueError(
830 "Could not find starting boundary %r" % (self._boundary)
831 )
832 chunk = chunk.rstrip()
833 if chunk == self._boundary:
834 return
835 elif chunk == self._boundary + b"--":
836 self._at_eof = True
837 return
839 async def _read_boundary(self) -> None:
840 chunk = (await self._readline()).rstrip()
841 if chunk == self._boundary:
842 pass
843 elif chunk == self._boundary + b"--":
844 self._at_eof = True
845 epilogue = await self._readline()
846 next_line = await self._readline()
848 # the epilogue is expected and then either the end of input or the
849 # parent multipart boundary, if the parent boundary is found then
850 # it should be marked as unread and handed to the parent for
851 # processing
852 if next_line[:2] == b"--":
853 self._unread.append(next_line)
854 # otherwise the request is likely missing an epilogue and both
855 # lines should be passed to the parent for processing
856 # (this handles the old behavior gracefully)
857 else:
858 self._unread.extend([next_line, epilogue])
859 else:
860 raise ValueError(f"Invalid boundary {chunk!r}, expected {self._boundary!r}")
862 async def _read_headers(self) -> "CIMultiDictProxy[str]":
863 lines = []
864 while True:
865 chunk = await self._content.readline(max_line_length=self._max_field_size)
866 chunk = chunk.rstrip(b"\r\n")
867 lines.append(chunk)
868 if not chunk:
869 break
870 if len(lines) > self._max_headers:
871 raise BadHttpMessage("Too many headers received")
872 parser = HeadersParser(max_field_size=self._max_field_size)
873 headers, raw_headers = parser.parse_headers(lines)
874 return headers
876 async def _maybe_release_last_part(self) -> None:
877 """Ensures that the last read body part is read completely."""
878 if self._last_part is not None:
879 if not self._last_part.at_eof():
880 await self._last_part.release()
881 self._unread.extend(self._last_part._unread)
882 self._last_part = None
885_Part = tuple[Payload, str, str]
888class MultipartWriter(Payload):
889 """Multipart body writer."""
891 _value: None
892 # _consumed = False (inherited) - Can be encoded multiple times
893 _autoclose = True # No file handles, just collects parts in memory
895 def __init__(self, subtype: str = "mixed", boundary: str | None = None) -> None:
896 boundary = boundary if boundary is not None else uuid.uuid4().hex
897 # The underlying Payload API demands a str (utf-8), not bytes,
898 # so we need to ensure we don't lose anything during conversion.
899 # As a result, require the boundary to be ASCII only.
900 # In both situations.
902 try:
903 self._boundary = boundary.encode("ascii")
904 except UnicodeEncodeError:
905 raise ValueError("boundary should contain ASCII only chars") from None
906 ctype = f"multipart/{subtype}; boundary={self._boundary_value}"
908 super().__init__(None, content_type=ctype)
910 self._parts: list[_Part] = []
911 self._is_form_data = subtype == "form-data"
913 def __enter__(self) -> "MultipartWriter":
914 return self
916 def __exit__(
917 self,
918 exc_type: type[BaseException] | None,
919 exc_val: BaseException | None,
920 exc_tb: TracebackType | None,
921 ) -> None:
922 pass
924 def __iter__(self) -> Iterator[_Part]:
925 return iter(self._parts)
927 def __len__(self) -> int:
928 return len(self._parts)
930 def __bool__(self) -> bool:
931 return True
933 _valid_tchar_regex = re.compile(rb"\A[!#$%&'*+\-.^_`|~\w]+\Z")
934 _invalid_qdtext_char_regex = re.compile(rb"[\x00-\x08\x0A-\x1F\x7F]")
936 @property
937 def _boundary_value(self) -> str:
938 """Wrap boundary parameter value in quotes, if necessary.
940 Reads self.boundary and returns a unicode string.
941 """
942 # Refer to RFCs 7231, 7230, 5234.
943 #
944 # parameter = token "=" ( token / quoted-string )
945 # token = 1*tchar
946 # quoted-string = DQUOTE *( qdtext / quoted-pair ) DQUOTE
947 # qdtext = HTAB / SP / %x21 / %x23-5B / %x5D-7E / obs-text
948 # obs-text = %x80-FF
949 # quoted-pair = "\" ( HTAB / SP / VCHAR / obs-text )
950 # tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*"
951 # / "+" / "-" / "." / "^" / "_" / "`" / "|" / "~"
952 # / DIGIT / ALPHA
953 # ; any VCHAR, except delimiters
954 # VCHAR = %x21-7E
955 value = self._boundary
956 if re.match(self._valid_tchar_regex, value):
957 return value.decode("ascii") # cannot fail
959 if re.search(self._invalid_qdtext_char_regex, value):
960 raise ValueError("boundary value contains invalid characters")
962 # escape %x5C and %x22
963 quoted_value_content = value.replace(b"\\", b"\\\\")
964 quoted_value_content = quoted_value_content.replace(b'"', b'\\"')
966 return '"' + quoted_value_content.decode("ascii") + '"'
968 @property
969 def boundary(self) -> str:
970 return self._boundary.decode("ascii")
972 def append(self, obj: Any, headers: Mapping[str, str] | None = None) -> Payload:
973 if headers is None:
974 headers = CIMultiDict()
976 if isinstance(obj, Payload):
977 obj.headers.update(headers)
978 return self.append_payload(obj)
979 else:
980 try:
981 payload = get_payload(obj, headers=headers)
982 except LookupError:
983 raise TypeError("Cannot create payload from %r" % obj)
984 else:
985 return self.append_payload(payload)
987 def append_payload(self, payload: Payload) -> Payload:
988 """Adds a new body part to multipart writer."""
989 encoding: str | None = None
990 te_encoding: str | None = None
991 if self._is_form_data:
992 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.7
993 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8
994 assert (
995 not {CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TRANSFER_ENCODING}
996 & payload.headers.keys()
997 )
998 # Set default Content-Disposition in case user doesn't create one
999 if CONTENT_DISPOSITION not in payload.headers:
1000 name = f"section-{len(self._parts)}"
1001 payload.set_content_disposition("form-data", name=name)
1002 else:
1003 # compression
1004 encoding = payload.headers.get(CONTENT_ENCODING, "").lower()
1005 if encoding and encoding not in ("deflate", "gzip", "identity"):
1006 raise RuntimeError(f"unknown content encoding: {encoding}")
1007 if encoding == "identity":
1008 encoding = None
1010 # te encoding
1011 te_encoding = payload.headers.get(CONTENT_TRANSFER_ENCODING, "").lower()
1012 if te_encoding not in ("", "base64", "quoted-printable", "binary"):
1013 raise RuntimeError(f"unknown content transfer encoding: {te_encoding}")
1014 if te_encoding == "binary":
1015 te_encoding = None
1017 # size
1018 size = payload.size
1019 if size is not None and not (encoding or te_encoding):
1020 payload.headers[CONTENT_LENGTH] = str(size)
1022 self._parts.append((payload, encoding, te_encoding)) # type: ignore[arg-type]
1023 return payload
1025 def append_json(
1026 self, obj: Any, headers: Mapping[str, str] | None = None
1027 ) -> Payload:
1028 """Helper to append JSON part."""
1029 if headers is None:
1030 headers = CIMultiDict()
1032 return self.append_payload(JsonPayload(obj, headers=headers))
1034 def append_form(
1035 self,
1036 obj: Sequence[tuple[str, str]] | Mapping[str, str],
1037 headers: Mapping[str, str] | None = None,
1038 ) -> Payload:
1039 """Helper to append form urlencoded part."""
1040 assert isinstance(obj, (Sequence, Mapping))
1042 if headers is None:
1043 headers = CIMultiDict()
1045 if isinstance(obj, Mapping):
1046 obj = list(obj.items())
1047 data = urlencode(obj, doseq=True)
1049 return self.append_payload(
1050 StringPayload(
1051 data, headers=headers, content_type="application/x-www-form-urlencoded"
1052 )
1053 )
1055 @property
1056 def size(self) -> int | None:
1057 """Size of the payload."""
1058 total = 0
1059 for part, encoding, te_encoding in self._parts:
1060 part_size = part.size
1061 if encoding or te_encoding or part_size is None:
1062 return None
1064 total += int(
1065 2
1066 + len(self._boundary)
1067 + 2
1068 + part_size # b'--'+self._boundary+b'\r\n'
1069 + len(part._binary_headers)
1070 + 2 # b'\r\n'
1071 )
1073 total += 2 + len(self._boundary) + 4 # b'--'+self._boundary+b'--\r\n'
1074 return total
1076 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
1077 """Return string representation of the multipart data.
1079 WARNING: This method may do blocking I/O if parts contain file payloads.
1080 It should not be called in the event loop. Use as_bytes().decode() instead.
1081 """
1082 return "".join(
1083 "--"
1084 + self.boundary
1085 + "\r\n"
1086 + part._binary_headers.decode(encoding, errors)
1087 + part.decode()
1088 for part, _e, _te in self._parts
1089 )
1091 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
1092 """Return bytes representation of the multipart data.
1094 This method is async-safe and calls as_bytes on underlying payloads.
1095 """
1096 parts: list[bytes] = []
1098 # Process each part
1099 for part, _e, _te in self._parts:
1100 # Add boundary
1101 parts.append(b"--" + self._boundary + b"\r\n")
1103 # Add headers
1104 parts.append(part._binary_headers)
1106 # Add payload content using as_bytes for async safety
1107 part_bytes = await part.as_bytes(encoding, errors)
1108 parts.append(part_bytes)
1110 # Add trailing CRLF
1111 parts.append(b"\r\n")
1113 # Add closing boundary
1114 parts.append(b"--" + self._boundary + b"--\r\n")
1116 return b"".join(parts)
1118 async def write(
1119 self, writer: AbstractStreamWriter, close_boundary: bool = True
1120 ) -> None:
1121 """Write body."""
1122 for part, encoding, te_encoding in self._parts:
1123 if self._is_form_data:
1124 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.2
1125 assert CONTENT_DISPOSITION in part.headers
1126 assert "name=" in part.headers[CONTENT_DISPOSITION]
1128 await writer.write(b"--" + self._boundary + b"\r\n")
1129 await writer.write(part._binary_headers)
1131 if encoding or te_encoding:
1132 w = MultipartPayloadWriter(writer)
1133 if encoding:
1134 w.enable_compression(encoding)
1135 if te_encoding:
1136 w.enable_encoding(te_encoding)
1137 await part.write(w) # type: ignore[arg-type]
1138 await w.write_eof()
1139 else:
1140 await part.write(writer)
1142 await writer.write(b"\r\n")
1144 if close_boundary:
1145 await writer.write(b"--" + self._boundary + b"--\r\n")
1147 async def close(self) -> None:
1148 """
1149 Close all part payloads that need explicit closing.
1151 IMPORTANT: This method must not await anything that might not finish
1152 immediately, as it may be called during cleanup/cancellation. Schedule
1153 any long-running operations without awaiting them.
1154 """
1155 if self._consumed:
1156 return
1157 self._consumed = True
1159 # Close all parts that need explicit closing
1160 # We catch and log exceptions to ensure all parts get a chance to close
1161 # we do not use asyncio.gather() here because we are not allowed
1162 # to suspend given we may be called during cleanup
1163 for idx, (part, _, _) in enumerate(self._parts):
1164 if not part.autoclose and not part.consumed:
1165 try:
1166 await part.close()
1167 except Exception as exc:
1168 internal_logger.error(
1169 "Failed to close multipart part %d: %s", idx, exc, exc_info=True
1170 )
1173class MultipartPayloadWriter:
1174 def __init__(self, writer: AbstractStreamWriter) -> None:
1175 self._writer = writer
1176 self._encoding: str | None = None
1177 self._compress: ZLibCompressor | None = None
1178 self._encoding_buffer: bytearray | None = None
1180 def enable_encoding(self, encoding: str) -> None:
1181 if encoding == "base64":
1182 self._encoding = encoding
1183 self._encoding_buffer = bytearray()
1184 elif encoding == "quoted-printable":
1185 self._encoding = "quoted-printable"
1187 def enable_compression(
1188 self, encoding: str = "deflate", strategy: int | None = None
1189 ) -> None:
1190 self._compress = ZLibCompressor(
1191 encoding=encoding,
1192 suppress_deflate_header=True,
1193 strategy=strategy,
1194 )
1196 async def write_eof(self) -> None:
1197 if self._compress is not None:
1198 chunk = self._compress.flush()
1199 if chunk:
1200 self._compress = None
1201 await self.write(chunk)
1203 if self._encoding == "base64":
1204 if self._encoding_buffer:
1205 await self._writer.write(base64.b64encode(self._encoding_buffer))
1207 async def write(self, chunk: bytes) -> None:
1208 if self._compress is not None:
1209 if chunk:
1210 chunk = await self._compress.compress(chunk)
1211 if not chunk:
1212 return
1214 if self._encoding == "base64":
1215 buf = self._encoding_buffer
1216 assert buf is not None
1217 buf.extend(chunk)
1219 if buf:
1220 div, mod = divmod(len(buf), 3)
1221 enc_chunk, self._encoding_buffer = (buf[: div * 3], buf[div * 3 :])
1222 if enc_chunk:
1223 b64chunk = base64.b64encode(enc_chunk)
1224 await self._writer.write(b64chunk)
1225 elif self._encoding == "quoted-printable":
1226 await self._writer.write(binascii.b2a_qp(chunk))
1227 else:
1228 await self._writer.write(chunk)