Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/multipart.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import base64
2import binascii
3import json
4import re
5import sys
6import uuid
7import warnings
8from collections import deque
9from collections.abc import Mapping, Sequence
10from types import TracebackType
11from typing import (
12 TYPE_CHECKING,
13 Any,
14 Deque,
15 Dict,
16 Iterator,
17 List,
18 Optional,
19 Tuple,
20 Type,
21 Union,
22 cast,
23)
24from urllib.parse import parse_qsl, unquote, urlencode
26from multidict import CIMultiDict, CIMultiDictProxy
28from .abc import AbstractStreamWriter
29from .compression_utils import (
30 DEFAULT_MAX_DECOMPRESS_SIZE,
31 ZLibCompressor,
32 ZLibDecompressor,
33)
34from .hdrs import (
35 CONTENT_DISPOSITION,
36 CONTENT_ENCODING,
37 CONTENT_LENGTH,
38 CONTENT_TRANSFER_ENCODING,
39 CONTENT_TYPE,
40)
41from .helpers import CHAR, TOKEN, parse_mimetype, reify
42from .http import HeadersParser
43from .log import internal_logger
44from .payload import (
45 JsonPayload,
46 LookupError,
47 Order,
48 Payload,
49 StringPayload,
50 get_payload,
51 payload_type,
52)
53from .streams import StreamReader
55if sys.version_info >= (3, 11):
56 from typing import Self
57else:
58 from typing import TypeVar
60 Self = TypeVar("Self", bound="BodyPartReader")
62__all__ = (
63 "MultipartReader",
64 "MultipartWriter",
65 "BodyPartReader",
66 "BadContentDispositionHeader",
67 "BadContentDispositionParam",
68 "parse_content_disposition",
69 "content_disposition_filename",
70)
73if TYPE_CHECKING:
74 from .client_reqrep import ClientResponse
77class BadContentDispositionHeader(RuntimeWarning):
78 pass
81class BadContentDispositionParam(RuntimeWarning):
82 pass
85def parse_content_disposition(
86 header: Optional[str],
87) -> Tuple[Optional[str], Dict[str, str]]:
88 def is_token(string: str) -> bool:
89 return bool(string) and TOKEN >= set(string)
91 def is_quoted(string: str) -> bool:
92 return string[0] == string[-1] == '"'
94 def is_rfc5987(string: str) -> bool:
95 return is_token(string) and string.count("'") == 2
97 def is_extended_param(string: str) -> bool:
98 return string.endswith("*")
100 def is_continuous_param(string: str) -> bool:
101 pos = string.find("*") + 1
102 if not pos:
103 return False
104 substring = string[pos:-1] if string.endswith("*") else string[pos:]
105 return substring.isdigit()
107 def unescape(text: str, *, chars: str = "".join(map(re.escape, CHAR))) -> str:
108 return re.sub(f"\\\\([{chars}])", "\\1", text)
110 if not header:
111 return None, {}
113 disptype, *parts = header.split(";")
114 if not is_token(disptype):
115 warnings.warn(BadContentDispositionHeader(header))
116 return None, {}
118 params: Dict[str, str] = {}
119 while parts:
120 item = parts.pop(0)
122 if not item: # To handle trailing semicolons
123 warnings.warn(BadContentDispositionHeader(header))
124 continue
126 if "=" not in item:
127 warnings.warn(BadContentDispositionHeader(header))
128 return None, {}
130 key, value = item.split("=", 1)
131 key = key.lower().strip()
132 value = value.lstrip()
134 if key in params:
135 warnings.warn(BadContentDispositionHeader(header))
136 return None, {}
138 if not is_token(key):
139 warnings.warn(BadContentDispositionParam(item))
140 continue
142 elif is_continuous_param(key):
143 if is_quoted(value):
144 value = unescape(value[1:-1])
145 elif not is_token(value):
146 warnings.warn(BadContentDispositionParam(item))
147 continue
149 elif is_extended_param(key):
150 if is_rfc5987(value):
151 encoding, _, value = value.split("'", 2)
152 encoding = encoding or "utf-8"
153 else:
154 warnings.warn(BadContentDispositionParam(item))
155 continue
157 try:
158 value = unquote(value, encoding, "strict")
159 except UnicodeDecodeError: # pragma: nocover
160 warnings.warn(BadContentDispositionParam(item))
161 continue
163 else:
164 failed = True
165 if is_quoted(value):
166 failed = False
167 value = unescape(value[1:-1].lstrip("\\/"))
168 elif is_token(value):
169 failed = False
170 elif parts:
171 # maybe just ; in filename, in any case this is just
172 # one case fix, for proper fix we need to redesign parser
173 _value = f"{value};{parts[0]}"
174 if is_quoted(_value):
175 parts.pop(0)
176 value = unescape(_value[1:-1].lstrip("\\/"))
177 failed = False
179 if failed:
180 warnings.warn(BadContentDispositionHeader(header))
181 return None, {}
183 params[key] = value
185 return disptype.lower(), params
188def content_disposition_filename(
189 params: Mapping[str, str], name: str = "filename"
190) -> Optional[str]:
191 name_suf = "%s*" % name
192 if not params:
193 return None
194 elif name_suf in params:
195 return params[name_suf]
196 elif name in params:
197 return params[name]
198 else:
199 parts = []
200 fnparams = sorted(
201 (key, value) for key, value in params.items() if key.startswith(name_suf)
202 )
203 for num, (key, value) in enumerate(fnparams):
204 _, tail = key.split("*", 1)
205 if tail.endswith("*"):
206 tail = tail[:-1]
207 if tail == str(num):
208 parts.append(value)
209 else:
210 break
211 if not parts:
212 return None
213 value = "".join(parts)
214 if "'" in value:
215 encoding, _, value = value.split("'", 2)
216 encoding = encoding or "utf-8"
217 return unquote(value, encoding, "strict")
218 return value
221class MultipartResponseWrapper:
222 """Wrapper around the MultipartReader.
224 It takes care about
225 underlying connection and close it when it needs in.
226 """
228 def __init__(
229 self,
230 resp: "ClientResponse",
231 stream: "MultipartReader",
232 ) -> None:
233 self.resp = resp
234 self.stream = stream
236 def __aiter__(self) -> "MultipartResponseWrapper":
237 return self
239 async def __anext__(
240 self,
241 ) -> Union["MultipartReader", "BodyPartReader"]:
242 part = await self.next()
243 if part is None:
244 raise StopAsyncIteration
245 return part
247 def at_eof(self) -> bool:
248 """Returns True when all response data had been read."""
249 return self.resp.content.at_eof()
251 async def next(
252 self,
253 ) -> Optional[Union["MultipartReader", "BodyPartReader"]]:
254 """Emits next multipart reader object."""
255 item = await self.stream.next()
256 if self.stream.at_eof():
257 await self.release()
258 return item
260 async def release(self) -> None:
261 """Release the connection gracefully.
263 All remaining content is read to the void.
264 """
265 await self.resp.release()
268class BodyPartReader:
269 """Multipart reader for single body part."""
271 chunk_size = 8192
273 def __init__(
274 self,
275 boundary: bytes,
276 headers: "CIMultiDictProxy[str]",
277 content: StreamReader,
278 *,
279 subtype: str = "mixed",
280 default_charset: Optional[str] = None,
281 max_decompress_size: int = DEFAULT_MAX_DECOMPRESS_SIZE,
282 ) -> None:
283 self.headers = headers
284 self._boundary = boundary
285 self._boundary_len = len(boundary) + 2 # Boundary + \r\n
286 self._content = content
287 self._default_charset = default_charset
288 self._at_eof = False
289 self._is_form_data = subtype == "form-data"
290 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8
291 length = None if self._is_form_data else self.headers.get(CONTENT_LENGTH, None)
292 self._length = int(length) if length is not None else None
293 self._read_bytes = 0
294 self._unread: Deque[bytes] = deque()
295 self._prev_chunk: Optional[bytes] = None
296 self._content_eof = 0
297 self._cache: Dict[str, Any] = {}
298 self._max_decompress_size = max_decompress_size
300 def __aiter__(self: Self) -> Self:
301 return self
303 async def __anext__(self) -> bytes:
304 part = await self.next()
305 if part is None:
306 raise StopAsyncIteration
307 return part
309 async def next(self) -> Optional[bytes]:
310 item = await self.read()
311 if not item:
312 return None
313 return item
315 async def read(self, *, decode: bool = False) -> bytes:
316 """Reads body part data.
318 decode: Decodes data following by encoding
319 method from Content-Encoding header. If it missed
320 data remains untouched
321 """
322 if self._at_eof:
323 return b""
324 data = bytearray()
325 while not self._at_eof:
326 data.extend(await self.read_chunk(self.chunk_size))
327 if decode:
328 return await self.decode(data)
329 return data
331 async def read_chunk(self, size: int = chunk_size) -> bytes:
332 """Reads body part content chunk of the specified size.
334 size: chunk size
335 """
336 if self._at_eof:
337 return b""
338 if self._length:
339 chunk = await self._read_chunk_from_length(size)
340 else:
341 chunk = await self._read_chunk_from_stream(size)
343 # For the case of base64 data, we must read a fragment of size with a
344 # remainder of 0 by dividing by 4 for string without symbols \n or \r
345 encoding = self.headers.get(CONTENT_TRANSFER_ENCODING)
346 if encoding and encoding.lower() == "base64":
347 stripped_chunk = b"".join(chunk.split())
348 remainder = len(stripped_chunk) % 4
350 while remainder != 0 and not self.at_eof():
351 over_chunk_size = 4 - remainder
352 over_chunk = b""
354 if self._prev_chunk:
355 over_chunk = self._prev_chunk[:over_chunk_size]
356 self._prev_chunk = self._prev_chunk[len(over_chunk) :]
358 if len(over_chunk) != over_chunk_size:
359 over_chunk += await self._content.read(4 - len(over_chunk))
361 if not over_chunk:
362 self._at_eof = True
364 stripped_chunk += b"".join(over_chunk.split())
365 chunk += over_chunk
366 remainder = len(stripped_chunk) % 4
368 self._read_bytes += len(chunk)
369 if self._read_bytes == self._length:
370 self._at_eof = True
371 if self._at_eof and await self._content.readline() != b"\r\n":
372 raise ValueError("Reader did not read all the data or it is malformed")
373 return chunk
375 async def _read_chunk_from_length(self, size: int) -> bytes:
376 # Reads body part content chunk of the specified size.
377 # The body part must has Content-Length header with proper value.
378 assert self._length is not None, "Content-Length required for chunked read"
379 chunk_size = min(size, self._length - self._read_bytes)
380 chunk = await self._content.read(chunk_size)
381 if self._content.at_eof():
382 self._at_eof = True
383 return chunk
385 async def _read_chunk_from_stream(self, size: int) -> bytes:
386 # Reads content chunk of body part with unknown length.
387 # The Content-Length header for body part is not necessary.
388 assert (
389 size >= self._boundary_len
390 ), "Chunk size must be greater or equal than boundary length + 2"
391 first_chunk = self._prev_chunk is None
392 if first_chunk:
393 # We need to re-add the CRLF that got removed from headers parsing.
394 self._prev_chunk = b"\r\n" + await self._content.read(size)
396 chunk = b""
397 # content.read() may return less than size, so we need to loop to ensure
398 # we have enough data to detect the boundary.
399 while len(chunk) < self._boundary_len:
400 chunk += await self._content.read(size)
401 self._content_eof += int(self._content.at_eof())
402 if self._content_eof > 2:
403 raise ValueError("Reading after EOF")
404 if self._content_eof:
405 break
406 if len(chunk) > size:
407 self._content.unread_data(chunk[size:])
408 chunk = chunk[:size]
410 assert self._prev_chunk is not None
411 window = self._prev_chunk + chunk
412 sub = b"\r\n" + self._boundary
413 if first_chunk:
414 idx = window.find(sub)
415 else:
416 idx = window.find(sub, max(0, len(self._prev_chunk) - len(sub)))
417 if idx >= 0:
418 # pushing boundary back to content
419 with warnings.catch_warnings():
420 warnings.filterwarnings("ignore", category=DeprecationWarning)
421 self._content.unread_data(window[idx:])
422 self._prev_chunk = self._prev_chunk[:idx]
423 chunk = window[len(self._prev_chunk) : idx]
424 if not chunk:
425 self._at_eof = True
426 result = self._prev_chunk[2 if first_chunk else 0 :] # Strip initial CRLF
427 self._prev_chunk = chunk
428 return result
430 async def readline(self) -> bytes:
431 """Reads body part by line by line."""
432 if self._at_eof:
433 return b""
435 if self._unread:
436 line = self._unread.popleft()
437 else:
438 line = await self._content.readline()
440 if line.startswith(self._boundary):
441 # the very last boundary may not come with \r\n,
442 # so set single rules for everyone
443 sline = line.rstrip(b"\r\n")
444 boundary = self._boundary
445 last_boundary = self._boundary + b"--"
446 # ensure that we read exactly the boundary, not something alike
447 if sline == boundary or sline == last_boundary:
448 self._at_eof = True
449 self._unread.append(line)
450 return b""
451 else:
452 next_line = await self._content.readline()
453 if next_line.startswith(self._boundary):
454 line = line[:-2] # strip CRLF but only once
455 self._unread.append(next_line)
457 return line
459 async def release(self) -> None:
460 """Like read(), but reads all the data to the void."""
461 if self._at_eof:
462 return
463 while not self._at_eof:
464 await self.read_chunk(self.chunk_size)
466 async def text(self, *, encoding: Optional[str] = None) -> str:
467 """Like read(), but assumes that body part contains text data."""
468 data = await self.read(decode=True)
469 # see https://www.w3.org/TR/html5/forms.html#multipart/form-data-encoding-algorithm
470 # and https://dvcs.w3.org/hg/xhr/raw-file/tip/Overview.html#dom-xmlhttprequest-send
471 encoding = encoding or self.get_charset(default="utf-8")
472 return data.decode(encoding)
474 async def json(self, *, encoding: Optional[str] = None) -> Optional[Dict[str, Any]]:
475 """Like read(), but assumes that body parts contains JSON data."""
476 data = await self.read(decode=True)
477 if not data:
478 return None
479 encoding = encoding or self.get_charset(default="utf-8")
480 return cast(Dict[str, Any], json.loads(data.decode(encoding)))
482 async def form(self, *, encoding: Optional[str] = None) -> List[Tuple[str, str]]:
483 """Like read(), but assumes that body parts contain form urlencoded data."""
484 data = await self.read(decode=True)
485 if not data:
486 return []
487 if encoding is not None:
488 real_encoding = encoding
489 else:
490 real_encoding = self.get_charset(default="utf-8")
491 try:
492 decoded_data = data.rstrip().decode(real_encoding)
493 except UnicodeDecodeError:
494 raise ValueError("data cannot be decoded with %s encoding" % real_encoding)
496 return parse_qsl(
497 decoded_data,
498 keep_blank_values=True,
499 encoding=real_encoding,
500 )
502 def at_eof(self) -> bool:
503 """Returns True if the boundary was reached or False otherwise."""
504 return self._at_eof
506 async def decode(self, data: bytes) -> bytes:
507 """Decodes data.
509 Decoding is done according the specified Content-Encoding
510 or Content-Transfer-Encoding headers value.
511 """
512 if CONTENT_TRANSFER_ENCODING in self.headers:
513 data = self._decode_content_transfer(data)
514 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8
515 if not self._is_form_data and CONTENT_ENCODING in self.headers:
516 return await self._decode_content(data)
517 return data
519 async def _decode_content(self, data: bytes) -> bytes:
520 encoding = self.headers.get(CONTENT_ENCODING, "").lower()
521 if encoding == "identity":
522 return data
523 if encoding in {"deflate", "gzip"}:
524 return await ZLibDecompressor(
525 encoding=encoding,
526 suppress_deflate_header=True,
527 ).decompress(data, max_length=self._max_decompress_size)
529 raise RuntimeError(f"unknown content encoding: {encoding}")
531 def _decode_content_transfer(self, data: bytes) -> bytes:
532 encoding = self.headers.get(CONTENT_TRANSFER_ENCODING, "").lower()
534 if encoding == "base64":
535 return base64.b64decode(data)
536 elif encoding == "quoted-printable":
537 return binascii.a2b_qp(data)
538 elif encoding in ("binary", "8bit", "7bit"):
539 return data
540 else:
541 raise RuntimeError(f"unknown content transfer encoding: {encoding}")
543 def get_charset(self, default: str) -> str:
544 """Returns charset parameter from Content-Type header or default."""
545 ctype = self.headers.get(CONTENT_TYPE, "")
546 mimetype = parse_mimetype(ctype)
547 return mimetype.parameters.get("charset", self._default_charset or default)
549 @reify
550 def name(self) -> Optional[str]:
551 """Returns name specified in Content-Disposition header.
553 If the header is missing or malformed, returns None.
554 """
555 _, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION))
556 return content_disposition_filename(params, "name")
558 @reify
559 def filename(self) -> Optional[str]:
560 """Returns filename specified in Content-Disposition header.
562 Returns None if the header is missing or malformed.
563 """
564 _, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION))
565 return content_disposition_filename(params, "filename")
568@payload_type(BodyPartReader, order=Order.try_first)
569class BodyPartReaderPayload(Payload):
570 _value: BodyPartReader
571 # _autoclose = False (inherited) - Streaming reader that may have resources
573 def __init__(self, value: BodyPartReader, *args: Any, **kwargs: Any) -> None:
574 super().__init__(value, *args, **kwargs)
576 params: Dict[str, str] = {}
577 if value.name is not None:
578 params["name"] = value.name
579 if value.filename is not None:
580 params["filename"] = value.filename
582 if params:
583 self.set_content_disposition("attachment", True, **params)
585 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
586 raise TypeError("Unable to decode.")
588 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
589 """Raises TypeError as body parts should be consumed via write().
591 This is intentional: BodyPartReader payloads are designed for streaming
592 large data (potentially gigabytes) and must be consumed only once via
593 the write() method to avoid memory exhaustion. They cannot be buffered
594 in memory for reuse.
595 """
596 raise TypeError("Unable to read body part as bytes. Use write() to consume.")
598 async def write(self, writer: AbstractStreamWriter) -> None:
599 field = self._value
600 chunk = await field.read_chunk(size=2**16)
601 while chunk:
602 await writer.write(await field.decode(chunk))
603 chunk = await field.read_chunk(size=2**16)
606class MultipartReader:
607 """Multipart body reader."""
609 #: Response wrapper, used when multipart readers constructs from response.
610 response_wrapper_cls = MultipartResponseWrapper
611 #: Multipart reader class, used to handle multipart/* body parts.
612 #: None points to type(self)
613 multipart_reader_cls: Optional[Type["MultipartReader"]] = None
614 #: Body part reader class for non multipart/* content types.
615 part_reader_cls = BodyPartReader
617 def __init__(self, headers: Mapping[str, str], content: StreamReader) -> None:
618 self._mimetype = parse_mimetype(headers[CONTENT_TYPE])
619 assert self._mimetype.type == "multipart", "multipart/* content type expected"
620 if "boundary" not in self._mimetype.parameters:
621 raise ValueError(
622 "boundary missed for Content-Type: %s" % headers[CONTENT_TYPE]
623 )
625 self.headers = headers
626 self._boundary = ("--" + self._get_boundary()).encode()
627 self._content = content
628 self._default_charset: Optional[str] = None
629 self._last_part: Optional[Union["MultipartReader", BodyPartReader]] = None
630 self._at_eof = False
631 self._at_bof = True
632 self._unread: List[bytes] = []
634 def __aiter__(self: Self) -> Self:
635 return self
637 async def __anext__(
638 self,
639 ) -> Optional[Union["MultipartReader", BodyPartReader]]:
640 part = await self.next()
641 if part is None:
642 raise StopAsyncIteration
643 return part
645 @classmethod
646 def from_response(
647 cls,
648 response: "ClientResponse",
649 ) -> MultipartResponseWrapper:
650 """Constructs reader instance from HTTP response.
652 :param response: :class:`~aiohttp.client.ClientResponse` instance
653 """
654 obj = cls.response_wrapper_cls(
655 response, cls(response.headers, response.content)
656 )
657 return obj
659 def at_eof(self) -> bool:
660 """Returns True if the final boundary was reached, false otherwise."""
661 return self._at_eof
663 async def next(
664 self,
665 ) -> Optional[Union["MultipartReader", BodyPartReader]]:
666 """Emits the next multipart body part."""
667 # So, if we're at BOF, we need to skip till the boundary.
668 if self._at_eof:
669 return None
670 await self._maybe_release_last_part()
671 if self._at_bof:
672 await self._read_until_first_boundary()
673 self._at_bof = False
674 else:
675 await self._read_boundary()
676 if self._at_eof: # we just read the last boundary, nothing to do there
677 return None
679 part = await self.fetch_next_part()
680 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.6
681 if (
682 self._last_part is None
683 and self._mimetype.subtype == "form-data"
684 and isinstance(part, BodyPartReader)
685 ):
686 _, params = parse_content_disposition(part.headers.get(CONTENT_DISPOSITION))
687 if params.get("name") == "_charset_":
688 # Longest encoding in https://encoding.spec.whatwg.org/encodings.json
689 # is 19 characters, so 32 should be more than enough for any valid encoding.
690 charset = await part.read_chunk(32)
691 if len(charset) > 31:
692 raise RuntimeError("Invalid default charset")
693 self._default_charset = charset.strip().decode()
694 part = await self.fetch_next_part()
695 self._last_part = part
696 return self._last_part
698 async def release(self) -> None:
699 """Reads all the body parts to the void till the final boundary."""
700 while not self._at_eof:
701 item = await self.next()
702 if item is None:
703 break
704 await item.release()
706 async def fetch_next_part(
707 self,
708 ) -> Union["MultipartReader", BodyPartReader]:
709 """Returns the next body part reader."""
710 headers = await self._read_headers()
711 return self._get_part_reader(headers)
713 def _get_part_reader(
714 self,
715 headers: "CIMultiDictProxy[str]",
716 ) -> Union["MultipartReader", BodyPartReader]:
717 """Dispatches the response by the `Content-Type` header.
719 Returns a suitable reader instance.
721 :param dict headers: Response headers
722 """
723 ctype = headers.get(CONTENT_TYPE, "")
724 mimetype = parse_mimetype(ctype)
726 if mimetype.type == "multipart":
727 if self.multipart_reader_cls is None:
728 return type(self)(headers, self._content)
729 return self.multipart_reader_cls(headers, self._content)
730 else:
731 return self.part_reader_cls(
732 self._boundary,
733 headers,
734 self._content,
735 subtype=self._mimetype.subtype,
736 default_charset=self._default_charset,
737 )
739 def _get_boundary(self) -> str:
740 boundary = self._mimetype.parameters["boundary"]
741 if len(boundary) > 70:
742 raise ValueError("boundary %r is too long (70 chars max)" % boundary)
744 return boundary
746 async def _readline(self) -> bytes:
747 if self._unread:
748 return self._unread.pop()
749 return await self._content.readline()
751 async def _read_until_first_boundary(self) -> None:
752 while True:
753 chunk = await self._readline()
754 if chunk == b"":
755 raise ValueError(
756 "Could not find starting boundary %r" % (self._boundary)
757 )
758 chunk = chunk.rstrip()
759 if chunk == self._boundary:
760 return
761 elif chunk == self._boundary + b"--":
762 self._at_eof = True
763 return
765 async def _read_boundary(self) -> None:
766 chunk = (await self._readline()).rstrip()
767 if chunk == self._boundary:
768 pass
769 elif chunk == self._boundary + b"--":
770 self._at_eof = True
771 epilogue = await self._readline()
772 next_line = await self._readline()
774 # the epilogue is expected and then either the end of input or the
775 # parent multipart boundary, if the parent boundary is found then
776 # it should be marked as unread and handed to the parent for
777 # processing
778 if next_line[:2] == b"--":
779 self._unread.append(next_line)
780 # otherwise the request is likely missing an epilogue and both
781 # lines should be passed to the parent for processing
782 # (this handles the old behavior gracefully)
783 else:
784 self._unread.extend([next_line, epilogue])
785 else:
786 raise ValueError(f"Invalid boundary {chunk!r}, expected {self._boundary!r}")
788 async def _read_headers(self) -> "CIMultiDictProxy[str]":
789 lines = []
790 while True:
791 chunk = await self._content.readline()
792 chunk = chunk.rstrip(b"\r\n")
793 lines.append(chunk)
794 if not chunk:
795 break
796 parser = HeadersParser()
797 headers, raw_headers = parser.parse_headers(lines)
798 return headers
800 async def _maybe_release_last_part(self) -> None:
801 """Ensures that the last read body part is read completely."""
802 if self._last_part is not None:
803 if not self._last_part.at_eof():
804 await self._last_part.release()
805 self._unread.extend(self._last_part._unread)
806 self._last_part = None
809_Part = Tuple[Payload, str, str]
812class MultipartWriter(Payload):
813 """Multipart body writer."""
815 _value: None
816 # _consumed = False (inherited) - Can be encoded multiple times
817 _autoclose = True # No file handles, just collects parts in memory
819 def __init__(self, subtype: str = "mixed", boundary: Optional[str] = None) -> None:
820 boundary = boundary if boundary is not None else uuid.uuid4().hex
821 # The underlying Payload API demands a str (utf-8), not bytes,
822 # so we need to ensure we don't lose anything during conversion.
823 # As a result, require the boundary to be ASCII only.
824 # In both situations.
826 try:
827 self._boundary = boundary.encode("ascii")
828 except UnicodeEncodeError:
829 raise ValueError("boundary should contain ASCII only chars") from None
830 ctype = f"multipart/{subtype}; boundary={self._boundary_value}"
832 super().__init__(None, content_type=ctype)
834 self._parts: List[_Part] = []
835 self._is_form_data = subtype == "form-data"
837 def __enter__(self) -> "MultipartWriter":
838 return self
840 def __exit__(
841 self,
842 exc_type: Optional[Type[BaseException]],
843 exc_val: Optional[BaseException],
844 exc_tb: Optional[TracebackType],
845 ) -> None:
846 pass
848 def __iter__(self) -> Iterator[_Part]:
849 return iter(self._parts)
851 def __len__(self) -> int:
852 return len(self._parts)
854 def __bool__(self) -> bool:
855 return True
857 _valid_tchar_regex = re.compile(rb"\A[!#$%&'*+\-.^_`|~\w]+\Z")
858 _invalid_qdtext_char_regex = re.compile(rb"[\x00-\x08\x0A-\x1F\x7F]")
860 @property
861 def _boundary_value(self) -> str:
862 """Wrap boundary parameter value in quotes, if necessary.
864 Reads self.boundary and returns a unicode string.
865 """
866 # Refer to RFCs 7231, 7230, 5234.
867 #
868 # parameter = token "=" ( token / quoted-string )
869 # token = 1*tchar
870 # quoted-string = DQUOTE *( qdtext / quoted-pair ) DQUOTE
871 # qdtext = HTAB / SP / %x21 / %x23-5B / %x5D-7E / obs-text
872 # obs-text = %x80-FF
873 # quoted-pair = "\" ( HTAB / SP / VCHAR / obs-text )
874 # tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*"
875 # / "+" / "-" / "." / "^" / "_" / "`" / "|" / "~"
876 # / DIGIT / ALPHA
877 # ; any VCHAR, except delimiters
878 # VCHAR = %x21-7E
879 value = self._boundary
880 if re.match(self._valid_tchar_regex, value):
881 return value.decode("ascii") # cannot fail
883 if re.search(self._invalid_qdtext_char_regex, value):
884 raise ValueError("boundary value contains invalid characters")
886 # escape %x5C and %x22
887 quoted_value_content = value.replace(b"\\", b"\\\\")
888 quoted_value_content = quoted_value_content.replace(b'"', b'\\"')
890 return '"' + quoted_value_content.decode("ascii") + '"'
892 @property
893 def boundary(self) -> str:
894 return self._boundary.decode("ascii")
896 def append(self, obj: Any, headers: Optional[Mapping[str, str]] = None) -> Payload:
897 if headers is None:
898 headers = CIMultiDict()
900 if isinstance(obj, Payload):
901 obj.headers.update(headers)
902 return self.append_payload(obj)
903 else:
904 try:
905 payload = get_payload(obj, headers=headers)
906 except LookupError:
907 raise TypeError("Cannot create payload from %r" % obj)
908 else:
909 return self.append_payload(payload)
911 def append_payload(self, payload: Payload) -> Payload:
912 """Adds a new body part to multipart writer."""
913 encoding: Optional[str] = None
914 te_encoding: Optional[str] = None
915 if self._is_form_data:
916 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.7
917 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8
918 assert (
919 not {CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TRANSFER_ENCODING}
920 & payload.headers.keys()
921 )
922 # Set default Content-Disposition in case user doesn't create one
923 if CONTENT_DISPOSITION not in payload.headers:
924 name = f"section-{len(self._parts)}"
925 payload.set_content_disposition("form-data", name=name)
926 else:
927 # compression
928 encoding = payload.headers.get(CONTENT_ENCODING, "").lower()
929 if encoding and encoding not in ("deflate", "gzip", "identity"):
930 raise RuntimeError(f"unknown content encoding: {encoding}")
931 if encoding == "identity":
932 encoding = None
934 # te encoding
935 te_encoding = payload.headers.get(CONTENT_TRANSFER_ENCODING, "").lower()
936 if te_encoding not in ("", "base64", "quoted-printable", "binary"):
937 raise RuntimeError(f"unknown content transfer encoding: {te_encoding}")
938 if te_encoding == "binary":
939 te_encoding = None
941 # size
942 size = payload.size
943 if size is not None and not (encoding or te_encoding):
944 payload.headers[CONTENT_LENGTH] = str(size)
946 self._parts.append((payload, encoding, te_encoding)) # type: ignore[arg-type]
947 return payload
949 def append_json(
950 self, obj: Any, headers: Optional[Mapping[str, str]] = None
951 ) -> Payload:
952 """Helper to append JSON part."""
953 if headers is None:
954 headers = CIMultiDict()
956 return self.append_payload(JsonPayload(obj, headers=headers))
958 def append_form(
959 self,
960 obj: Union[Sequence[Tuple[str, str]], Mapping[str, str]],
961 headers: Optional[Mapping[str, str]] = None,
962 ) -> Payload:
963 """Helper to append form urlencoded part."""
964 assert isinstance(obj, (Sequence, Mapping))
966 if headers is None:
967 headers = CIMultiDict()
969 if isinstance(obj, Mapping):
970 obj = list(obj.items())
971 data = urlencode(obj, doseq=True)
973 return self.append_payload(
974 StringPayload(
975 data, headers=headers, content_type="application/x-www-form-urlencoded"
976 )
977 )
979 @property
980 def size(self) -> Optional[int]:
981 """Size of the payload."""
982 total = 0
983 for part, encoding, te_encoding in self._parts:
984 part_size = part.size
985 if encoding or te_encoding or part_size is None:
986 return None
988 total += int(
989 2
990 + len(self._boundary)
991 + 2
992 + part_size # b'--'+self._boundary+b'\r\n'
993 + len(part._binary_headers)
994 + 2 # b'\r\n'
995 )
997 total += 2 + len(self._boundary) + 4 # b'--'+self._boundary+b'--\r\n'
998 return total
1000 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
1001 """Return string representation of the multipart data.
1003 WARNING: This method may do blocking I/O if parts contain file payloads.
1004 It should not be called in the event loop. Use as_bytes().decode() instead.
1005 """
1006 return "".join(
1007 "--"
1008 + self.boundary
1009 + "\r\n"
1010 + part._binary_headers.decode(encoding, errors)
1011 + part.decode()
1012 for part, _e, _te in self._parts
1013 )
1015 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
1016 """Return bytes representation of the multipart data.
1018 This method is async-safe and calls as_bytes on underlying payloads.
1019 """
1020 parts: List[bytes] = []
1022 # Process each part
1023 for part, _e, _te in self._parts:
1024 # Add boundary
1025 parts.append(b"--" + self._boundary + b"\r\n")
1027 # Add headers
1028 parts.append(part._binary_headers)
1030 # Add payload content using as_bytes for async safety
1031 part_bytes = await part.as_bytes(encoding, errors)
1032 parts.append(part_bytes)
1034 # Add trailing CRLF
1035 parts.append(b"\r\n")
1037 # Add closing boundary
1038 parts.append(b"--" + self._boundary + b"--\r\n")
1040 return b"".join(parts)
1042 async def write(
1043 self, writer: AbstractStreamWriter, close_boundary: bool = True
1044 ) -> None:
1045 """Write body."""
1046 for part, encoding, te_encoding in self._parts:
1047 if self._is_form_data:
1048 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.2
1049 assert CONTENT_DISPOSITION in part.headers
1050 assert "name=" in part.headers[CONTENT_DISPOSITION]
1052 await writer.write(b"--" + self._boundary + b"\r\n")
1053 await writer.write(part._binary_headers)
1055 if encoding or te_encoding:
1056 w = MultipartPayloadWriter(writer)
1057 if encoding:
1058 w.enable_compression(encoding)
1059 if te_encoding:
1060 w.enable_encoding(te_encoding)
1061 await part.write(w) # type: ignore[arg-type]
1062 await w.write_eof()
1063 else:
1064 await part.write(writer)
1066 await writer.write(b"\r\n")
1068 if close_boundary:
1069 await writer.write(b"--" + self._boundary + b"--\r\n")
1071 async def close(self) -> None:
1072 """
1073 Close all part payloads that need explicit closing.
1075 IMPORTANT: This method must not await anything that might not finish
1076 immediately, as it may be called during cleanup/cancellation. Schedule
1077 any long-running operations without awaiting them.
1078 """
1079 if self._consumed:
1080 return
1081 self._consumed = True
1083 # Close all parts that need explicit closing
1084 # We catch and log exceptions to ensure all parts get a chance to close
1085 # we do not use asyncio.gather() here because we are not allowed
1086 # to suspend given we may be called during cleanup
1087 for idx, (part, _, _) in enumerate(self._parts):
1088 if not part.autoclose and not part.consumed:
1089 try:
1090 await part.close()
1091 except Exception as exc:
1092 internal_logger.error(
1093 "Failed to close multipart part %d: %s", idx, exc, exc_info=True
1094 )
1097class MultipartPayloadWriter:
1098 def __init__(self, writer: AbstractStreamWriter) -> None:
1099 self._writer = writer
1100 self._encoding: Optional[str] = None
1101 self._compress: Optional[ZLibCompressor] = None
1102 self._encoding_buffer: Optional[bytearray] = None
1104 def enable_encoding(self, encoding: str) -> None:
1105 if encoding == "base64":
1106 self._encoding = encoding
1107 self._encoding_buffer = bytearray()
1108 elif encoding == "quoted-printable":
1109 self._encoding = "quoted-printable"
1111 def enable_compression(
1112 self, encoding: str = "deflate", strategy: Optional[int] = None
1113 ) -> None:
1114 self._compress = ZLibCompressor(
1115 encoding=encoding,
1116 suppress_deflate_header=True,
1117 strategy=strategy,
1118 )
1120 async def write_eof(self) -> None:
1121 if self._compress is not None:
1122 chunk = self._compress.flush()
1123 if chunk:
1124 self._compress = None
1125 await self.write(chunk)
1127 if self._encoding == "base64":
1128 if self._encoding_buffer:
1129 await self._writer.write(base64.b64encode(self._encoding_buffer))
1131 async def write(self, chunk: bytes) -> None:
1132 if self._compress is not None:
1133 if chunk:
1134 chunk = await self._compress.compress(chunk)
1135 if not chunk:
1136 return
1138 if self._encoding == "base64":
1139 buf = self._encoding_buffer
1140 assert buf is not None
1141 buf.extend(chunk)
1143 if buf:
1144 div, mod = divmod(len(buf), 3)
1145 enc_chunk, self._encoding_buffer = (buf[: div * 3], buf[div * 3 :])
1146 if enc_chunk:
1147 b64chunk = base64.b64encode(enc_chunk)
1148 await self._writer.write(b64chunk)
1149 elif self._encoding == "quoted-printable":
1150 await self._writer.write(binascii.b2a_qp(chunk))
1151 else:
1152 await self._writer.write(chunk)