Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/multipart.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import base64
2import binascii
3import json
4import re
5import sys
6import uuid
7import warnings
8from collections import deque
9from collections.abc import Mapping, Sequence
10from types import TracebackType
11from typing import (
12 TYPE_CHECKING,
13 Any,
14 AsyncIterator,
15 Deque,
16 Dict,
17 Iterator,
18 List,
19 Optional,
20 Tuple,
21 Type,
22 Union,
23 cast,
24)
25from urllib.parse import parse_qsl, unquote, urlencode
27from multidict import CIMultiDict, CIMultiDictProxy
29from .abc import AbstractStreamWriter
30from .compression_utils import (
31 DEFAULT_MAX_DECOMPRESS_SIZE,
32 ZLibCompressor,
33 ZLibDecompressor,
34)
35from .hdrs import (
36 CONTENT_DISPOSITION,
37 CONTENT_ENCODING,
38 CONTENT_LENGTH,
39 CONTENT_TRANSFER_ENCODING,
40 CONTENT_TYPE,
41)
42from .helpers import CHAR, TOKEN, parse_mimetype, reify
43from .http import HeadersParser
44from .http_exceptions import BadHttpMessage
45from .log import internal_logger
46from .payload import (
47 JsonPayload,
48 LookupError,
49 Order,
50 Payload,
51 StringPayload,
52 get_payload,
53 payload_type,
54)
55from .streams import StreamReader
57if sys.version_info >= (3, 11):
58 from typing import Self
59else:
60 from typing import TypeVar
62 Self = TypeVar("Self", bound="BodyPartReader")
64__all__ = (
65 "MultipartReader",
66 "MultipartWriter",
67 "BodyPartReader",
68 "BadContentDispositionHeader",
69 "BadContentDispositionParam",
70 "parse_content_disposition",
71 "content_disposition_filename",
72)
75if TYPE_CHECKING:
76 from .client_reqrep import ClientResponse
79class BadContentDispositionHeader(RuntimeWarning):
80 pass
83class BadContentDispositionParam(RuntimeWarning):
84 pass
87def parse_content_disposition(
88 header: Optional[str],
89) -> Tuple[Optional[str], Dict[str, str]]:
90 def is_token(string: str) -> bool:
91 return bool(string) and TOKEN >= set(string)
93 def is_quoted(string: str) -> bool:
94 return string[0] == string[-1] == '"'
96 def is_rfc5987(string: str) -> bool:
97 return is_token(string) and string.count("'") == 2
99 def is_extended_param(string: str) -> bool:
100 return string.endswith("*")
102 def is_continuous_param(string: str) -> bool:
103 pos = string.find("*") + 1
104 if not pos:
105 return False
106 substring = string[pos:-1] if string.endswith("*") else string[pos:]
107 return substring.isdigit()
109 def unescape(text: str, *, chars: str = "".join(map(re.escape, CHAR))) -> str:
110 return re.sub(f"\\\\([{chars}])", "\\1", text)
112 if not header:
113 return None, {}
115 disptype, *parts = header.split(";")
116 if not is_token(disptype):
117 warnings.warn(BadContentDispositionHeader(header))
118 return None, {}
120 params: Dict[str, str] = {}
121 while parts:
122 item = parts.pop(0)
124 if not item: # To handle trailing semicolons
125 warnings.warn(BadContentDispositionHeader(header))
126 continue
128 if "=" not in item:
129 warnings.warn(BadContentDispositionHeader(header))
130 return None, {}
132 key, value = item.split("=", 1)
133 key = key.lower().strip()
134 value = value.lstrip()
136 if key in params:
137 warnings.warn(BadContentDispositionHeader(header))
138 return None, {}
140 if not is_token(key):
141 warnings.warn(BadContentDispositionParam(item))
142 continue
144 elif is_continuous_param(key):
145 if is_quoted(value):
146 value = unescape(value[1:-1])
147 elif not is_token(value):
148 warnings.warn(BadContentDispositionParam(item))
149 continue
151 elif is_extended_param(key):
152 if is_rfc5987(value):
153 encoding, _, value = value.split("'", 2)
154 encoding = encoding or "utf-8"
155 else:
156 warnings.warn(BadContentDispositionParam(item))
157 continue
159 try:
160 value = unquote(value, encoding, "strict")
161 except UnicodeDecodeError: # pragma: nocover
162 warnings.warn(BadContentDispositionParam(item))
163 continue
165 else:
166 failed = True
167 if is_quoted(value):
168 failed = False
169 value = unescape(value[1:-1].lstrip("\\/"))
170 elif is_token(value):
171 failed = False
172 elif parts:
173 # maybe just ; in filename, in any case this is just
174 # one case fix, for proper fix we need to redesign parser
175 _value = f"{value};{parts[0]}"
176 if is_quoted(_value):
177 parts.pop(0)
178 value = unescape(_value[1:-1].lstrip("\\/"))
179 failed = False
181 if failed:
182 warnings.warn(BadContentDispositionHeader(header))
183 return None, {}
185 params[key] = value
187 return disptype.lower(), params
190def content_disposition_filename(
191 params: Mapping[str, str], name: str = "filename"
192) -> Optional[str]:
193 name_suf = "%s*" % name
194 if not params:
195 return None
196 elif name_suf in params:
197 return params[name_suf]
198 elif name in params:
199 return params[name]
200 else:
201 parts = []
202 fnparams = sorted(
203 (key, value) for key, value in params.items() if key.startswith(name_suf)
204 )
205 for num, (key, value) in enumerate(fnparams):
206 _, tail = key.split("*", 1)
207 if tail.endswith("*"):
208 tail = tail[:-1]
209 if tail == str(num):
210 parts.append(value)
211 else:
212 break
213 if not parts:
214 return None
215 value = "".join(parts)
216 if "'" in value:
217 encoding, _, value = value.split("'", 2)
218 encoding = encoding or "utf-8"
219 return unquote(value, encoding, "strict")
220 return value
223class MultipartResponseWrapper:
224 """Wrapper around the MultipartReader.
226 It takes care about
227 underlying connection and close it when it needs in.
228 """
230 def __init__(
231 self,
232 resp: "ClientResponse",
233 stream: "MultipartReader",
234 ) -> None:
235 self.resp = resp
236 self.stream = stream
238 def __aiter__(self) -> "MultipartResponseWrapper":
239 return self
241 async def __anext__(
242 self,
243 ) -> Union["MultipartReader", "BodyPartReader"]:
244 part = await self.next()
245 if part is None:
246 raise StopAsyncIteration
247 return part
249 def at_eof(self) -> bool:
250 """Returns True when all response data had been read."""
251 return self.resp.content.at_eof()
253 async def next(
254 self,
255 ) -> Optional[Union["MultipartReader", "BodyPartReader"]]:
256 """Emits next multipart reader object."""
257 item = await self.stream.next()
258 if self.stream.at_eof():
259 await self.release()
260 return item
262 async def release(self) -> None:
263 """Release the connection gracefully.
265 All remaining content is read to the void.
266 """
267 await self.resp.release()
270class BodyPartReader:
271 """Multipart reader for single body part."""
273 chunk_size = 8192
275 def __init__(
276 self,
277 boundary: bytes,
278 headers: "CIMultiDictProxy[str]",
279 content: StreamReader,
280 *,
281 subtype: str = "mixed",
282 default_charset: Optional[str] = None,
283 max_decompress_size: int = DEFAULT_MAX_DECOMPRESS_SIZE,
284 ) -> None:
285 self.headers = headers
286 self._boundary = boundary
287 self._boundary_len = len(boundary) + 2 # Boundary + \r\n
288 self._content = content
289 self._default_charset = default_charset
290 self._at_eof = False
291 self._is_form_data = subtype == "form-data"
292 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8
293 length = None if self._is_form_data else self.headers.get(CONTENT_LENGTH, None)
294 self._length = int(length) if length is not None else None
295 self._read_bytes = 0
296 self._unread: Deque[bytes] = deque()
297 self._prev_chunk: Optional[bytes] = None
298 self._content_eof = 0
299 self._cache: Dict[str, Any] = {}
300 self._max_decompress_size = max_decompress_size
302 def __aiter__(self: Self) -> Self:
303 return self
305 async def __anext__(self) -> bytes:
306 part = await self.next()
307 if part is None:
308 raise StopAsyncIteration
309 return part
311 async def next(self) -> Optional[bytes]:
312 item = await self.read()
313 if not item:
314 return None
315 return item
317 async def read(self, *, decode: bool = False) -> bytes:
318 """Reads body part data.
320 decode: Decodes data following by encoding
321 method from Content-Encoding header. If it missed
322 data remains untouched
323 """
324 if self._at_eof:
325 return b""
326 data = bytearray()
327 while not self._at_eof:
328 data.extend(await self.read_chunk(self.chunk_size))
329 if decode:
330 decoded_data = bytearray()
331 async for d in self.decode_iter(data):
332 decoded_data.extend(d)
333 return decoded_data
334 return data
336 async def read_chunk(self, size: int = chunk_size) -> bytes:
337 """Reads body part content chunk of the specified size.
339 size: chunk size
340 """
341 if self._at_eof:
342 return b""
343 if self._length:
344 chunk = await self._read_chunk_from_length(size)
345 else:
346 chunk = await self._read_chunk_from_stream(size)
348 # For the case of base64 data, we must read a fragment of size with a
349 # remainder of 0 by dividing by 4 for string without symbols \n or \r
350 encoding = self.headers.get(CONTENT_TRANSFER_ENCODING)
351 if encoding and encoding.lower() == "base64":
352 stripped_chunk = b"".join(chunk.split())
353 remainder = len(stripped_chunk) % 4
355 while remainder != 0 and not self.at_eof():
356 over_chunk_size = 4 - remainder
357 over_chunk = b""
359 if self._prev_chunk:
360 over_chunk = self._prev_chunk[:over_chunk_size]
361 self._prev_chunk = self._prev_chunk[len(over_chunk) :]
363 if len(over_chunk) != over_chunk_size:
364 over_chunk += await self._content.read(4 - len(over_chunk))
366 if not over_chunk:
367 self._at_eof = True
369 stripped_chunk += b"".join(over_chunk.split())
370 chunk += over_chunk
371 remainder = len(stripped_chunk) % 4
373 self._read_bytes += len(chunk)
374 if self._read_bytes == self._length:
375 self._at_eof = True
376 if self._at_eof and await self._content.readline() != b"\r\n":
377 raise ValueError("Reader did not read all the data or it is malformed")
378 return chunk
380 async def _read_chunk_from_length(self, size: int) -> bytes:
381 # Reads body part content chunk of the specified size.
382 # The body part must has Content-Length header with proper value.
383 assert self._length is not None, "Content-Length required for chunked read"
384 chunk_size = min(size, self._length - self._read_bytes)
385 chunk = await self._content.read(chunk_size)
386 if self._content.at_eof():
387 self._at_eof = True
388 return chunk
390 async def _read_chunk_from_stream(self, size: int) -> bytes:
391 # Reads content chunk of body part with unknown length.
392 # The Content-Length header for body part is not necessary.
393 assert (
394 size >= self._boundary_len
395 ), "Chunk size must be greater or equal than boundary length + 2"
396 first_chunk = self._prev_chunk is None
397 if first_chunk:
398 # We need to re-add the CRLF that got removed from headers parsing.
399 self._prev_chunk = b"\r\n" + await self._content.read(size)
401 chunk = b""
402 # content.read() may return less than size, so we need to loop to ensure
403 # we have enough data to detect the boundary.
404 while len(chunk) < self._boundary_len:
405 chunk += await self._content.read(size)
406 self._content_eof += int(self._content.at_eof())
407 if self._content_eof > 2:
408 raise ValueError("Reading after EOF")
409 if self._content_eof:
410 break
411 if len(chunk) > size:
412 self._content.unread_data(chunk[size:])
413 chunk = chunk[:size]
415 assert self._prev_chunk is not None
416 window = self._prev_chunk + chunk
417 sub = b"\r\n" + self._boundary
418 if first_chunk:
419 idx = window.find(sub)
420 else:
421 idx = window.find(sub, max(0, len(self._prev_chunk) - len(sub)))
422 if idx >= 0:
423 # pushing boundary back to content
424 with warnings.catch_warnings():
425 warnings.filterwarnings("ignore", category=DeprecationWarning)
426 self._content.unread_data(window[idx:])
427 self._prev_chunk = self._prev_chunk[:idx]
428 chunk = window[len(self._prev_chunk) : idx]
429 if not chunk:
430 self._at_eof = True
431 result = self._prev_chunk[2 if first_chunk else 0 :] # Strip initial CRLF
432 self._prev_chunk = chunk
433 return result
435 async def readline(self) -> bytes:
436 """Reads body part by line by line."""
437 if self._at_eof:
438 return b""
440 if self._unread:
441 line = self._unread.popleft()
442 else:
443 line = await self._content.readline()
445 if line.startswith(self._boundary):
446 # the very last boundary may not come with \r\n,
447 # so set single rules for everyone
448 sline = line.rstrip(b"\r\n")
449 boundary = self._boundary
450 last_boundary = self._boundary + b"--"
451 # ensure that we read exactly the boundary, not something alike
452 if sline == boundary or sline == last_boundary:
453 self._at_eof = True
454 self._unread.append(line)
455 return b""
456 else:
457 next_line = await self._content.readline()
458 if next_line.startswith(self._boundary):
459 line = line[:-2] # strip CRLF but only once
460 self._unread.append(next_line)
462 return line
464 async def release(self) -> None:
465 """Like read(), but reads all the data to the void."""
466 if self._at_eof:
467 return
468 while not self._at_eof:
469 await self.read_chunk(self.chunk_size)
471 async def text(self, *, encoding: Optional[str] = None) -> str:
472 """Like read(), but assumes that body part contains text data."""
473 data = await self.read(decode=True)
474 # see https://www.w3.org/TR/html5/forms.html#multipart/form-data-encoding-algorithm
475 # and https://dvcs.w3.org/hg/xhr/raw-file/tip/Overview.html#dom-xmlhttprequest-send
476 encoding = encoding or self.get_charset(default="utf-8")
477 return data.decode(encoding)
479 async def json(self, *, encoding: Optional[str] = None) -> Optional[Dict[str, Any]]:
480 """Like read(), but assumes that body parts contains JSON data."""
481 data = await self.read(decode=True)
482 if not data:
483 return None
484 encoding = encoding or self.get_charset(default="utf-8")
485 return cast(Dict[str, Any], json.loads(data.decode(encoding)))
487 async def form(self, *, encoding: Optional[str] = None) -> List[Tuple[str, str]]:
488 """Like read(), but assumes that body parts contain form urlencoded data."""
489 data = await self.read(decode=True)
490 if not data:
491 return []
492 if encoding is not None:
493 real_encoding = encoding
494 else:
495 real_encoding = self.get_charset(default="utf-8")
496 try:
497 decoded_data = data.rstrip().decode(real_encoding)
498 except UnicodeDecodeError:
499 raise ValueError("data cannot be decoded with %s encoding" % real_encoding)
501 return parse_qsl(
502 decoded_data,
503 keep_blank_values=True,
504 encoding=real_encoding,
505 )
507 def at_eof(self) -> bool:
508 """Returns True if the boundary was reached or False otherwise."""
509 return self._at_eof
511 def _apply_content_transfer_decoding(self, data: bytes) -> bytes:
512 """Apply Content-Transfer-Encoding decoding if header is present."""
513 if CONTENT_TRANSFER_ENCODING in self.headers:
514 return self._decode_content_transfer(data)
515 return data
517 def _needs_content_decoding(self) -> bool:
518 """Check if Content-Encoding decoding should be applied."""
519 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8
520 return not self._is_form_data and CONTENT_ENCODING in self.headers
522 def decode(self, data: bytes) -> bytes:
523 """Decodes data synchronously.
525 Decodes data according the specified Content-Encoding
526 or Content-Transfer-Encoding headers value.
528 Note: For large payloads, consider using decode_iter() instead
529 to avoid blocking the event loop during decompression.
530 """
531 data = self._apply_content_transfer_decoding(data)
532 if self._needs_content_decoding():
533 return self._decode_content(data)
534 return data
536 async def decode_iter(self, data: bytes) -> AsyncIterator[bytes]:
537 """Async generator that yields decoded data chunks.
539 Decodes data according the specified Content-Encoding
540 or Content-Transfer-Encoding headers value.
542 This method offloads decompression to an executor for large payloads
543 to avoid blocking the event loop.
544 """
545 data = self._apply_content_transfer_decoding(data)
546 if self._needs_content_decoding():
547 async for d in self._decode_content_async(data):
548 yield d
549 else:
550 yield data
552 def _decode_content(self, data: bytes) -> bytes:
553 encoding = self.headers.get(CONTENT_ENCODING, "").lower()
554 if encoding == "identity":
555 return data
556 if encoding in {"deflate", "gzip"}:
557 return ZLibDecompressor(
558 encoding=encoding,
559 suppress_deflate_header=True,
560 ).decompress_sync(data, max_length=self._max_decompress_size)
562 raise RuntimeError(f"unknown content encoding: {encoding}")
564 async def _decode_content_async(self, data: bytes) -> AsyncIterator[bytes]:
565 encoding = self.headers.get(CONTENT_ENCODING, "").lower()
566 if encoding == "identity":
567 yield data
568 elif encoding in {"deflate", "gzip"}:
569 d = ZLibDecompressor(
570 encoding=encoding,
571 suppress_deflate_header=True,
572 )
573 yield await d.decompress(data, max_length=self._max_decompress_size)
574 else:
575 raise RuntimeError(f"unknown content encoding: {encoding}")
577 def _decode_content_transfer(self, data: bytes) -> bytes:
578 encoding = self.headers.get(CONTENT_TRANSFER_ENCODING, "").lower()
580 if encoding == "base64":
581 return base64.b64decode(data)
582 elif encoding == "quoted-printable":
583 return binascii.a2b_qp(data)
584 elif encoding in ("binary", "8bit", "7bit"):
585 return data
586 else:
587 raise RuntimeError(f"unknown content transfer encoding: {encoding}")
589 def get_charset(self, default: str) -> str:
590 """Returns charset parameter from Content-Type header or default."""
591 ctype = self.headers.get(CONTENT_TYPE, "")
592 mimetype = parse_mimetype(ctype)
593 return mimetype.parameters.get("charset", self._default_charset or default)
595 @reify
596 def name(self) -> Optional[str]:
597 """Returns name specified in Content-Disposition header.
599 If the header is missing or malformed, returns None.
600 """
601 _, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION))
602 return content_disposition_filename(params, "name")
604 @reify
605 def filename(self) -> Optional[str]:
606 """Returns filename specified in Content-Disposition header.
608 Returns None if the header is missing or malformed.
609 """
610 _, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION))
611 return content_disposition_filename(params, "filename")
614@payload_type(BodyPartReader, order=Order.try_first)
615class BodyPartReaderPayload(Payload):
616 _value: BodyPartReader
617 # _autoclose = False (inherited) - Streaming reader that may have resources
619 def __init__(self, value: BodyPartReader, *args: Any, **kwargs: Any) -> None:
620 super().__init__(value, *args, **kwargs)
622 params: Dict[str, str] = {}
623 if value.name is not None:
624 params["name"] = value.name
625 if value.filename is not None:
626 params["filename"] = value.filename
628 if params:
629 self.set_content_disposition("attachment", True, **params)
631 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
632 raise TypeError("Unable to decode.")
634 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
635 """Raises TypeError as body parts should be consumed via write().
637 This is intentional: BodyPartReader payloads are designed for streaming
638 large data (potentially gigabytes) and must be consumed only once via
639 the write() method to avoid memory exhaustion. They cannot be buffered
640 in memory for reuse.
641 """
642 raise TypeError("Unable to read body part as bytes. Use write() to consume.")
644 async def write(self, writer: AbstractStreamWriter) -> None:
645 field = self._value
646 while chunk := await field.read_chunk(size=2**18):
647 async for d in field.decode_iter(chunk):
648 await writer.write(d)
651class MultipartReader:
652 """Multipart body reader."""
654 #: Response wrapper, used when multipart readers constructs from response.
655 response_wrapper_cls = MultipartResponseWrapper
656 #: Multipart reader class, used to handle multipart/* body parts.
657 #: None points to type(self)
658 multipart_reader_cls: Optional[Type["MultipartReader"]] = None
659 #: Body part reader class for non multipart/* content types.
660 part_reader_cls = BodyPartReader
662 def __init__(
663 self,
664 headers: Mapping[str, str],
665 content: StreamReader,
666 *,
667 max_field_size: int = 8190,
668 max_headers: int = 128,
669 ) -> None:
670 self._mimetype = parse_mimetype(headers[CONTENT_TYPE])
671 assert self._mimetype.type == "multipart", "multipart/* content type expected"
672 if "boundary" not in self._mimetype.parameters:
673 raise ValueError(
674 "boundary missed for Content-Type: %s" % headers[CONTENT_TYPE]
675 )
677 self.headers = headers
678 self._boundary = ("--" + self._get_boundary()).encode()
679 self._content = content
680 self._default_charset: str | None = None
681 self._last_part: MultipartReader | BodyPartReader | None = None
682 self._max_field_size = max_field_size
683 self._max_headers = max_headers
684 self._at_eof = False
685 self._at_bof = True
686 self._unread: List[bytes] = []
688 def __aiter__(self: Self) -> Self:
689 return self
691 async def __anext__(
692 self,
693 ) -> Optional[Union["MultipartReader", BodyPartReader]]:
694 part = await self.next()
695 if part is None:
696 raise StopAsyncIteration
697 return part
699 @classmethod
700 def from_response(
701 cls,
702 response: "ClientResponse",
703 ) -> MultipartResponseWrapper:
704 """Constructs reader instance from HTTP response.
706 :param response: :class:`~aiohttp.client.ClientResponse` instance
707 """
708 obj = cls.response_wrapper_cls(
709 response, cls(response.headers, response.content)
710 )
711 return obj
713 def at_eof(self) -> bool:
714 """Returns True if the final boundary was reached, false otherwise."""
715 return self._at_eof
717 async def next(
718 self,
719 ) -> Optional[Union["MultipartReader", BodyPartReader]]:
720 """Emits the next multipart body part."""
721 # So, if we're at BOF, we need to skip till the boundary.
722 if self._at_eof:
723 return None
724 await self._maybe_release_last_part()
725 if self._at_bof:
726 await self._read_until_first_boundary()
727 self._at_bof = False
728 else:
729 await self._read_boundary()
730 if self._at_eof: # we just read the last boundary, nothing to do there
731 return None
733 part = await self.fetch_next_part()
734 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.6
735 if (
736 self._last_part is None
737 and self._mimetype.subtype == "form-data"
738 and isinstance(part, BodyPartReader)
739 ):
740 _, params = parse_content_disposition(part.headers.get(CONTENT_DISPOSITION))
741 if params.get("name") == "_charset_":
742 # Longest encoding in https://encoding.spec.whatwg.org/encodings.json
743 # is 19 characters, so 32 should be more than enough for any valid encoding.
744 charset = await part.read_chunk(32)
745 if len(charset) > 31:
746 raise RuntimeError("Invalid default charset")
747 self._default_charset = charset.strip().decode()
748 part = await self.fetch_next_part()
749 self._last_part = part
750 return self._last_part
752 async def release(self) -> None:
753 """Reads all the body parts to the void till the final boundary."""
754 while not self._at_eof:
755 item = await self.next()
756 if item is None:
757 break
758 await item.release()
760 async def fetch_next_part(
761 self,
762 ) -> Union["MultipartReader", BodyPartReader]:
763 """Returns the next body part reader."""
764 headers = await self._read_headers()
765 return self._get_part_reader(headers)
767 def _get_part_reader(
768 self,
769 headers: "CIMultiDictProxy[str]",
770 ) -> Union["MultipartReader", BodyPartReader]:
771 """Dispatches the response by the `Content-Type` header.
773 Returns a suitable reader instance.
775 :param dict headers: Response headers
776 """
777 ctype = headers.get(CONTENT_TYPE, "")
778 mimetype = parse_mimetype(ctype)
780 if mimetype.type == "multipart":
781 if self.multipart_reader_cls is None:
782 return type(self)(headers, self._content)
783 return self.multipart_reader_cls(
784 headers,
785 self._content,
786 max_field_size=self._max_field_size,
787 max_headers=self._max_headers,
788 )
789 else:
790 return self.part_reader_cls(
791 self._boundary,
792 headers,
793 self._content,
794 subtype=self._mimetype.subtype,
795 default_charset=self._default_charset,
796 )
798 def _get_boundary(self) -> str:
799 boundary = self._mimetype.parameters["boundary"]
800 if len(boundary) > 70:
801 raise ValueError("boundary %r is too long (70 chars max)" % boundary)
803 return boundary
805 async def _readline(self) -> bytes:
806 if self._unread:
807 return self._unread.pop()
808 return await self._content.readline()
810 async def _read_until_first_boundary(self) -> None:
811 while True:
812 chunk = await self._readline()
813 if chunk == b"":
814 raise ValueError(
815 "Could not find starting boundary %r" % (self._boundary)
816 )
817 chunk = chunk.rstrip()
818 if chunk == self._boundary:
819 return
820 elif chunk == self._boundary + b"--":
821 self._at_eof = True
822 return
824 async def _read_boundary(self) -> None:
825 chunk = (await self._readline()).rstrip()
826 if chunk == self._boundary:
827 pass
828 elif chunk == self._boundary + b"--":
829 self._at_eof = True
830 epilogue = await self._readline()
831 next_line = await self._readline()
833 # the epilogue is expected and then either the end of input or the
834 # parent multipart boundary, if the parent boundary is found then
835 # it should be marked as unread and handed to the parent for
836 # processing
837 if next_line[:2] == b"--":
838 self._unread.append(next_line)
839 # otherwise the request is likely missing an epilogue and both
840 # lines should be passed to the parent for processing
841 # (this handles the old behavior gracefully)
842 else:
843 self._unread.extend([next_line, epilogue])
844 else:
845 raise ValueError(f"Invalid boundary {chunk!r}, expected {self._boundary!r}")
847 async def _read_headers(self) -> "CIMultiDictProxy[str]":
848 lines = []
849 while True:
850 chunk = await self._content.readline(max_line_length=self._max_field_size)
851 chunk = chunk.rstrip(b"\r\n")
852 lines.append(chunk)
853 if not chunk:
854 break
855 if len(lines) > self._max_headers:
856 raise BadHttpMessage("Too many headers received")
857 parser = HeadersParser(max_field_size=self._max_field_size)
858 headers, raw_headers = parser.parse_headers(lines)
859 return headers
861 async def _maybe_release_last_part(self) -> None:
862 """Ensures that the last read body part is read completely."""
863 if self._last_part is not None:
864 if not self._last_part.at_eof():
865 await self._last_part.release()
866 self._unread.extend(self._last_part._unread)
867 self._last_part = None
870_Part = Tuple[Payload, str, str]
873class MultipartWriter(Payload):
874 """Multipart body writer."""
876 _value: None
877 # _consumed = False (inherited) - Can be encoded multiple times
878 _autoclose = True # No file handles, just collects parts in memory
880 def __init__(self, subtype: str = "mixed", boundary: Optional[str] = None) -> None:
881 boundary = boundary if boundary is not None else uuid.uuid4().hex
882 # The underlying Payload API demands a str (utf-8), not bytes,
883 # so we need to ensure we don't lose anything during conversion.
884 # As a result, require the boundary to be ASCII only.
885 # In both situations.
887 try:
888 self._boundary = boundary.encode("ascii")
889 except UnicodeEncodeError:
890 raise ValueError("boundary should contain ASCII only chars") from None
891 ctype = f"multipart/{subtype}; boundary={self._boundary_value}"
893 super().__init__(None, content_type=ctype)
895 self._parts: List[_Part] = []
896 self._is_form_data = subtype == "form-data"
898 def __enter__(self) -> "MultipartWriter":
899 return self
901 def __exit__(
902 self,
903 exc_type: Optional[Type[BaseException]],
904 exc_val: Optional[BaseException],
905 exc_tb: Optional[TracebackType],
906 ) -> None:
907 pass
909 def __iter__(self) -> Iterator[_Part]:
910 return iter(self._parts)
912 def __len__(self) -> int:
913 return len(self._parts)
915 def __bool__(self) -> bool:
916 return True
918 _valid_tchar_regex = re.compile(rb"\A[!#$%&'*+\-.^_`|~\w]+\Z")
919 _invalid_qdtext_char_regex = re.compile(rb"[\x00-\x08\x0A-\x1F\x7F]")
921 @property
922 def _boundary_value(self) -> str:
923 """Wrap boundary parameter value in quotes, if necessary.
925 Reads self.boundary and returns a unicode string.
926 """
927 # Refer to RFCs 7231, 7230, 5234.
928 #
929 # parameter = token "=" ( token / quoted-string )
930 # token = 1*tchar
931 # quoted-string = DQUOTE *( qdtext / quoted-pair ) DQUOTE
932 # qdtext = HTAB / SP / %x21 / %x23-5B / %x5D-7E / obs-text
933 # obs-text = %x80-FF
934 # quoted-pair = "\" ( HTAB / SP / VCHAR / obs-text )
935 # tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*"
936 # / "+" / "-" / "." / "^" / "_" / "`" / "|" / "~"
937 # / DIGIT / ALPHA
938 # ; any VCHAR, except delimiters
939 # VCHAR = %x21-7E
940 value = self._boundary
941 if re.match(self._valid_tchar_regex, value):
942 return value.decode("ascii") # cannot fail
944 if re.search(self._invalid_qdtext_char_regex, value):
945 raise ValueError("boundary value contains invalid characters")
947 # escape %x5C and %x22
948 quoted_value_content = value.replace(b"\\", b"\\\\")
949 quoted_value_content = quoted_value_content.replace(b'"', b'\\"')
951 return '"' + quoted_value_content.decode("ascii") + '"'
953 @property
954 def boundary(self) -> str:
955 return self._boundary.decode("ascii")
957 def append(self, obj: Any, headers: Optional[Mapping[str, str]] = None) -> Payload:
958 if headers is None:
959 headers = CIMultiDict()
961 if isinstance(obj, Payload):
962 obj.headers.update(headers)
963 return self.append_payload(obj)
964 else:
965 try:
966 payload = get_payload(obj, headers=headers)
967 except LookupError:
968 raise TypeError("Cannot create payload from %r" % obj)
969 else:
970 return self.append_payload(payload)
972 def append_payload(self, payload: Payload) -> Payload:
973 """Adds a new body part to multipart writer."""
974 encoding: Optional[str] = None
975 te_encoding: Optional[str] = None
976 if self._is_form_data:
977 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.7
978 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8
979 assert (
980 not {CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TRANSFER_ENCODING}
981 & payload.headers.keys()
982 )
983 # Set default Content-Disposition in case user doesn't create one
984 if CONTENT_DISPOSITION not in payload.headers:
985 name = f"section-{len(self._parts)}"
986 payload.set_content_disposition("form-data", name=name)
987 else:
988 # compression
989 encoding = payload.headers.get(CONTENT_ENCODING, "").lower()
990 if encoding and encoding not in ("deflate", "gzip", "identity"):
991 raise RuntimeError(f"unknown content encoding: {encoding}")
992 if encoding == "identity":
993 encoding = None
995 # te encoding
996 te_encoding = payload.headers.get(CONTENT_TRANSFER_ENCODING, "").lower()
997 if te_encoding not in ("", "base64", "quoted-printable", "binary"):
998 raise RuntimeError(f"unknown content transfer encoding: {te_encoding}")
999 if te_encoding == "binary":
1000 te_encoding = None
1002 # size
1003 size = payload.size
1004 if size is not None and not (encoding or te_encoding):
1005 payload.headers[CONTENT_LENGTH] = str(size)
1007 self._parts.append((payload, encoding, te_encoding)) # type: ignore[arg-type]
1008 return payload
1010 def append_json(
1011 self, obj: Any, headers: Optional[Mapping[str, str]] = None
1012 ) -> Payload:
1013 """Helper to append JSON part."""
1014 if headers is None:
1015 headers = CIMultiDict()
1017 return self.append_payload(JsonPayload(obj, headers=headers))
1019 def append_form(
1020 self,
1021 obj: Union[Sequence[Tuple[str, str]], Mapping[str, str]],
1022 headers: Optional[Mapping[str, str]] = None,
1023 ) -> Payload:
1024 """Helper to append form urlencoded part."""
1025 assert isinstance(obj, (Sequence, Mapping))
1027 if headers is None:
1028 headers = CIMultiDict()
1030 if isinstance(obj, Mapping):
1031 obj = list(obj.items())
1032 data = urlencode(obj, doseq=True)
1034 return self.append_payload(
1035 StringPayload(
1036 data, headers=headers, content_type="application/x-www-form-urlencoded"
1037 )
1038 )
1040 @property
1041 def size(self) -> Optional[int]:
1042 """Size of the payload."""
1043 total = 0
1044 for part, encoding, te_encoding in self._parts:
1045 part_size = part.size
1046 if encoding or te_encoding or part_size is None:
1047 return None
1049 total += int(
1050 2
1051 + len(self._boundary)
1052 + 2
1053 + part_size # b'--'+self._boundary+b'\r\n'
1054 + len(part._binary_headers)
1055 + 2 # b'\r\n'
1056 )
1058 total += 2 + len(self._boundary) + 4 # b'--'+self._boundary+b'--\r\n'
1059 return total
1061 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
1062 """Return string representation of the multipart data.
1064 WARNING: This method may do blocking I/O if parts contain file payloads.
1065 It should not be called in the event loop. Use as_bytes().decode() instead.
1066 """
1067 return "".join(
1068 "--"
1069 + self.boundary
1070 + "\r\n"
1071 + part._binary_headers.decode(encoding, errors)
1072 + part.decode()
1073 for part, _e, _te in self._parts
1074 )
1076 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
1077 """Return bytes representation of the multipart data.
1079 This method is async-safe and calls as_bytes on underlying payloads.
1080 """
1081 parts: List[bytes] = []
1083 # Process each part
1084 for part, _e, _te in self._parts:
1085 # Add boundary
1086 parts.append(b"--" + self._boundary + b"\r\n")
1088 # Add headers
1089 parts.append(part._binary_headers)
1091 # Add payload content using as_bytes for async safety
1092 part_bytes = await part.as_bytes(encoding, errors)
1093 parts.append(part_bytes)
1095 # Add trailing CRLF
1096 parts.append(b"\r\n")
1098 # Add closing boundary
1099 parts.append(b"--" + self._boundary + b"--\r\n")
1101 return b"".join(parts)
1103 async def write(
1104 self, writer: AbstractStreamWriter, close_boundary: bool = True
1105 ) -> None:
1106 """Write body."""
1107 for part, encoding, te_encoding in self._parts:
1108 if self._is_form_data:
1109 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.2
1110 assert CONTENT_DISPOSITION in part.headers
1111 assert "name=" in part.headers[CONTENT_DISPOSITION]
1113 await writer.write(b"--" + self._boundary + b"\r\n")
1114 await writer.write(part._binary_headers)
1116 if encoding or te_encoding:
1117 w = MultipartPayloadWriter(writer)
1118 if encoding:
1119 w.enable_compression(encoding)
1120 if te_encoding:
1121 w.enable_encoding(te_encoding)
1122 await part.write(w) # type: ignore[arg-type]
1123 await w.write_eof()
1124 else:
1125 await part.write(writer)
1127 await writer.write(b"\r\n")
1129 if close_boundary:
1130 await writer.write(b"--" + self._boundary + b"--\r\n")
1132 async def close(self) -> None:
1133 """
1134 Close all part payloads that need explicit closing.
1136 IMPORTANT: This method must not await anything that might not finish
1137 immediately, as it may be called during cleanup/cancellation. Schedule
1138 any long-running operations without awaiting them.
1139 """
1140 if self._consumed:
1141 return
1142 self._consumed = True
1144 # Close all parts that need explicit closing
1145 # We catch and log exceptions to ensure all parts get a chance to close
1146 # we do not use asyncio.gather() here because we are not allowed
1147 # to suspend given we may be called during cleanup
1148 for idx, (part, _, _) in enumerate(self._parts):
1149 if not part.autoclose and not part.consumed:
1150 try:
1151 await part.close()
1152 except Exception as exc:
1153 internal_logger.error(
1154 "Failed to close multipart part %d: %s", idx, exc, exc_info=True
1155 )
1158class MultipartPayloadWriter:
1159 def __init__(self, writer: AbstractStreamWriter) -> None:
1160 self._writer = writer
1161 self._encoding: Optional[str] = None
1162 self._compress: Optional[ZLibCompressor] = None
1163 self._encoding_buffer: Optional[bytearray] = None
1165 def enable_encoding(self, encoding: str) -> None:
1166 if encoding == "base64":
1167 self._encoding = encoding
1168 self._encoding_buffer = bytearray()
1169 elif encoding == "quoted-printable":
1170 self._encoding = "quoted-printable"
1172 def enable_compression(
1173 self, encoding: str = "deflate", strategy: Optional[int] = None
1174 ) -> None:
1175 self._compress = ZLibCompressor(
1176 encoding=encoding,
1177 suppress_deflate_header=True,
1178 strategy=strategy,
1179 )
1181 async def write_eof(self) -> None:
1182 if self._compress is not None:
1183 chunk = self._compress.flush()
1184 if chunk:
1185 self._compress = None
1186 await self.write(chunk)
1188 if self._encoding == "base64":
1189 if self._encoding_buffer:
1190 await self._writer.write(base64.b64encode(self._encoding_buffer))
1192 async def write(self, chunk: bytes) -> None:
1193 if self._compress is not None:
1194 if chunk:
1195 chunk = await self._compress.compress(chunk)
1196 if not chunk:
1197 return
1199 if self._encoding == "base64":
1200 buf = self._encoding_buffer
1201 assert buf is not None
1202 buf.extend(chunk)
1204 if buf:
1205 div, mod = divmod(len(buf), 3)
1206 enc_chunk, self._encoding_buffer = (buf[: div * 3], buf[div * 3 :])
1207 if enc_chunk:
1208 b64chunk = base64.b64encode(enc_chunk)
1209 await self._writer.write(b64chunk)
1210 elif self._encoding == "quoted-printable":
1211 await self._writer.write(binascii.b2a_qp(chunk))
1212 else:
1213 await self._writer.write(chunk)