Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/multipart.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

656 statements  

1import base64 

2import binascii 

3import json 

4import re 

5import sys 

6import uuid 

7import warnings 

8from collections import deque 

9from collections.abc import Mapping, Sequence 

10from types import TracebackType 

11from typing import ( 

12 TYPE_CHECKING, 

13 Any, 

14 Deque, 

15 Dict, 

16 Iterator, 

17 List, 

18 Optional, 

19 Tuple, 

20 Type, 

21 Union, 

22 cast, 

23) 

24from urllib.parse import parse_qsl, unquote, urlencode 

25 

26from multidict import CIMultiDict, CIMultiDictProxy 

27 

28from .abc import AbstractStreamWriter 

29from .compression_utils import ( 

30 DEFAULT_MAX_DECOMPRESS_SIZE, 

31 ZLibCompressor, 

32 ZLibDecompressor, 

33) 

34from .hdrs import ( 

35 CONTENT_DISPOSITION, 

36 CONTENT_ENCODING, 

37 CONTENT_LENGTH, 

38 CONTENT_TRANSFER_ENCODING, 

39 CONTENT_TYPE, 

40) 

41from .helpers import CHAR, TOKEN, parse_mimetype, reify 

42from .http import HeadersParser 

43from .log import internal_logger 

44from .payload import ( 

45 JsonPayload, 

46 LookupError, 

47 Order, 

48 Payload, 

49 StringPayload, 

50 get_payload, 

51 payload_type, 

52) 

53from .streams import StreamReader 

54 

55if sys.version_info >= (3, 11): 

56 from typing import Self 

57else: 

58 from typing import TypeVar 

59 

60 Self = TypeVar("Self", bound="BodyPartReader") 

61 

62__all__ = ( 

63 "MultipartReader", 

64 "MultipartWriter", 

65 "BodyPartReader", 

66 "BadContentDispositionHeader", 

67 "BadContentDispositionParam", 

68 "parse_content_disposition", 

69 "content_disposition_filename", 

70) 

71 

72 

73if TYPE_CHECKING: 

74 from .client_reqrep import ClientResponse 

75 

76 

77class BadContentDispositionHeader(RuntimeWarning): 

78 pass 

79 

80 

81class BadContentDispositionParam(RuntimeWarning): 

82 pass 

83 

84 

85def parse_content_disposition( 

86 header: Optional[str], 

87) -> Tuple[Optional[str], Dict[str, str]]: 

88 def is_token(string: str) -> bool: 

89 return bool(string) and TOKEN >= set(string) 

90 

91 def is_quoted(string: str) -> bool: 

92 return string[0] == string[-1] == '"' 

93 

94 def is_rfc5987(string: str) -> bool: 

95 return is_token(string) and string.count("'") == 2 

96 

97 def is_extended_param(string: str) -> bool: 

98 return string.endswith("*") 

99 

100 def is_continuous_param(string: str) -> bool: 

101 pos = string.find("*") + 1 

102 if not pos: 

103 return False 

104 substring = string[pos:-1] if string.endswith("*") else string[pos:] 

105 return substring.isdigit() 

106 

107 def unescape(text: str, *, chars: str = "".join(map(re.escape, CHAR))) -> str: 

108 return re.sub(f"\\\\([{chars}])", "\\1", text) 

109 

110 if not header: 

111 return None, {} 

112 

113 disptype, *parts = header.split(";") 

114 if not is_token(disptype): 

115 warnings.warn(BadContentDispositionHeader(header)) 

116 return None, {} 

117 

118 params: Dict[str, str] = {} 

119 while parts: 

120 item = parts.pop(0) 

121 

122 if not item: # To handle trailing semicolons 

123 warnings.warn(BadContentDispositionHeader(header)) 

124 continue 

125 

126 if "=" not in item: 

127 warnings.warn(BadContentDispositionHeader(header)) 

128 return None, {} 

129 

130 key, value = item.split("=", 1) 

131 key = key.lower().strip() 

132 value = value.lstrip() 

133 

134 if key in params: 

135 warnings.warn(BadContentDispositionHeader(header)) 

136 return None, {} 

137 

138 if not is_token(key): 

139 warnings.warn(BadContentDispositionParam(item)) 

140 continue 

141 

142 elif is_continuous_param(key): 

143 if is_quoted(value): 

144 value = unescape(value[1:-1]) 

145 elif not is_token(value): 

146 warnings.warn(BadContentDispositionParam(item)) 

147 continue 

148 

149 elif is_extended_param(key): 

150 if is_rfc5987(value): 

151 encoding, _, value = value.split("'", 2) 

152 encoding = encoding or "utf-8" 

153 else: 

154 warnings.warn(BadContentDispositionParam(item)) 

155 continue 

156 

157 try: 

158 value = unquote(value, encoding, "strict") 

159 except UnicodeDecodeError: # pragma: nocover 

160 warnings.warn(BadContentDispositionParam(item)) 

161 continue 

162 

163 else: 

164 failed = True 

165 if is_quoted(value): 

166 failed = False 

167 value = unescape(value[1:-1].lstrip("\\/")) 

168 elif is_token(value): 

169 failed = False 

170 elif parts: 

171 # maybe just ; in filename, in any case this is just 

172 # one case fix, for proper fix we need to redesign parser 

173 _value = f"{value};{parts[0]}" 

174 if is_quoted(_value): 

175 parts.pop(0) 

176 value = unescape(_value[1:-1].lstrip("\\/")) 

177 failed = False 

178 

179 if failed: 

180 warnings.warn(BadContentDispositionHeader(header)) 

181 return None, {} 

182 

183 params[key] = value 

184 

185 return disptype.lower(), params 

186 

187 

188def content_disposition_filename( 

189 params: Mapping[str, str], name: str = "filename" 

190) -> Optional[str]: 

191 name_suf = "%s*" % name 

192 if not params: 

193 return None 

194 elif name_suf in params: 

195 return params[name_suf] 

196 elif name in params: 

197 return params[name] 

198 else: 

199 parts = [] 

200 fnparams = sorted( 

201 (key, value) for key, value in params.items() if key.startswith(name_suf) 

202 ) 

203 for num, (key, value) in enumerate(fnparams): 

204 _, tail = key.split("*", 1) 

205 if tail.endswith("*"): 

206 tail = tail[:-1] 

207 if tail == str(num): 

208 parts.append(value) 

209 else: 

210 break 

211 if not parts: 

212 return None 

213 value = "".join(parts) 

214 if "'" in value: 

215 encoding, _, value = value.split("'", 2) 

216 encoding = encoding or "utf-8" 

217 return unquote(value, encoding, "strict") 

218 return value 

219 

220 

221class MultipartResponseWrapper: 

222 """Wrapper around the MultipartReader. 

223 

224 It takes care about 

225 underlying connection and close it when it needs in. 

226 """ 

227 

228 def __init__( 

229 self, 

230 resp: "ClientResponse", 

231 stream: "MultipartReader", 

232 ) -> None: 

233 self.resp = resp 

234 self.stream = stream 

235 

236 def __aiter__(self) -> "MultipartResponseWrapper": 

237 return self 

238 

239 async def __anext__( 

240 self, 

241 ) -> Union["MultipartReader", "BodyPartReader"]: 

242 part = await self.next() 

243 if part is None: 

244 raise StopAsyncIteration 

245 return part 

246 

247 def at_eof(self) -> bool: 

248 """Returns True when all response data had been read.""" 

249 return self.resp.content.at_eof() 

250 

251 async def next( 

252 self, 

253 ) -> Optional[Union["MultipartReader", "BodyPartReader"]]: 

254 """Emits next multipart reader object.""" 

255 item = await self.stream.next() 

256 if self.stream.at_eof(): 

257 await self.release() 

258 return item 

259 

260 async def release(self) -> None: 

261 """Release the connection gracefully. 

262 

263 All remaining content is read to the void. 

264 """ 

265 await self.resp.release() 

266 

267 

268class BodyPartReader: 

269 """Multipart reader for single body part.""" 

270 

271 chunk_size = 8192 

272 

273 def __init__( 

274 self, 

275 boundary: bytes, 

276 headers: "CIMultiDictProxy[str]", 

277 content: StreamReader, 

278 *, 

279 subtype: str = "mixed", 

280 default_charset: Optional[str] = None, 

281 max_decompress_size: int = DEFAULT_MAX_DECOMPRESS_SIZE, 

282 ) -> None: 

283 self.headers = headers 

284 self._boundary = boundary 

285 self._boundary_len = len(boundary) + 2 # Boundary + \r\n 

286 self._content = content 

287 self._default_charset = default_charset 

288 self._at_eof = False 

289 self._is_form_data = subtype == "form-data" 

290 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8 

291 length = None if self._is_form_data else self.headers.get(CONTENT_LENGTH, None) 

292 self._length = int(length) if length is not None else None 

293 self._read_bytes = 0 

294 self._unread: Deque[bytes] = deque() 

295 self._prev_chunk: Optional[bytes] = None 

296 self._content_eof = 0 

297 self._cache: Dict[str, Any] = {} 

298 self._max_decompress_size = max_decompress_size 

299 

300 def __aiter__(self: Self) -> Self: 

301 return self 

302 

303 async def __anext__(self) -> bytes: 

304 part = await self.next() 

305 if part is None: 

306 raise StopAsyncIteration 

307 return part 

308 

309 async def next(self) -> Optional[bytes]: 

310 item = await self.read() 

311 if not item: 

312 return None 

313 return item 

314 

315 async def read(self, *, decode: bool = False) -> bytes: 

316 """Reads body part data. 

317 

318 decode: Decodes data following by encoding 

319 method from Content-Encoding header. If it missed 

320 data remains untouched 

321 """ 

322 if self._at_eof: 

323 return b"" 

324 data = bytearray() 

325 while not self._at_eof: 

326 data.extend(await self.read_chunk(self.chunk_size)) 

327 if decode: 

328 return await self.decode(data) 

329 return data 

330 

331 async def read_chunk(self, size: int = chunk_size) -> bytes: 

332 """Reads body part content chunk of the specified size. 

333 

334 size: chunk size 

335 """ 

336 if self._at_eof: 

337 return b"" 

338 if self._length: 

339 chunk = await self._read_chunk_from_length(size) 

340 else: 

341 chunk = await self._read_chunk_from_stream(size) 

342 

343 # For the case of base64 data, we must read a fragment of size with a 

344 # remainder of 0 by dividing by 4 for string without symbols \n or \r 

345 encoding = self.headers.get(CONTENT_TRANSFER_ENCODING) 

346 if encoding and encoding.lower() == "base64": 

347 stripped_chunk = b"".join(chunk.split()) 

348 remainder = len(stripped_chunk) % 4 

349 

350 while remainder != 0 and not self.at_eof(): 

351 over_chunk_size = 4 - remainder 

352 over_chunk = b"" 

353 

354 if self._prev_chunk: 

355 over_chunk = self._prev_chunk[:over_chunk_size] 

356 self._prev_chunk = self._prev_chunk[len(over_chunk) :] 

357 

358 if len(over_chunk) != over_chunk_size: 

359 over_chunk += await self._content.read(4 - len(over_chunk)) 

360 

361 if not over_chunk: 

362 self._at_eof = True 

363 

364 stripped_chunk += b"".join(over_chunk.split()) 

365 chunk += over_chunk 

366 remainder = len(stripped_chunk) % 4 

367 

368 self._read_bytes += len(chunk) 

369 if self._read_bytes == self._length: 

370 self._at_eof = True 

371 if self._at_eof and await self._content.readline() != b"\r\n": 

372 raise ValueError("Reader did not read all the data or it is malformed") 

373 return chunk 

374 

375 async def _read_chunk_from_length(self, size: int) -> bytes: 

376 # Reads body part content chunk of the specified size. 

377 # The body part must has Content-Length header with proper value. 

378 assert self._length is not None, "Content-Length required for chunked read" 

379 chunk_size = min(size, self._length - self._read_bytes) 

380 chunk = await self._content.read(chunk_size) 

381 if self._content.at_eof(): 

382 self._at_eof = True 

383 return chunk 

384 

385 async def _read_chunk_from_stream(self, size: int) -> bytes: 

386 # Reads content chunk of body part with unknown length. 

387 # The Content-Length header for body part is not necessary. 

388 assert ( 

389 size >= self._boundary_len 

390 ), "Chunk size must be greater or equal than boundary length + 2" 

391 first_chunk = self._prev_chunk is None 

392 if first_chunk: 

393 # We need to re-add the CRLF that got removed from headers parsing. 

394 self._prev_chunk = b"\r\n" + await self._content.read(size) 

395 

396 chunk = b"" 

397 # content.read() may return less than size, so we need to loop to ensure 

398 # we have enough data to detect the boundary. 

399 while len(chunk) < self._boundary_len: 

400 chunk += await self._content.read(size) 

401 self._content_eof += int(self._content.at_eof()) 

402 if self._content_eof > 2: 

403 raise ValueError("Reading after EOF") 

404 if self._content_eof: 

405 break 

406 if len(chunk) > size: 

407 self._content.unread_data(chunk[size:]) 

408 chunk = chunk[:size] 

409 

410 assert self._prev_chunk is not None 

411 window = self._prev_chunk + chunk 

412 sub = b"\r\n" + self._boundary 

413 if first_chunk: 

414 idx = window.find(sub) 

415 else: 

416 idx = window.find(sub, max(0, len(self._prev_chunk) - len(sub))) 

417 if idx >= 0: 

418 # pushing boundary back to content 

419 with warnings.catch_warnings(): 

420 warnings.filterwarnings("ignore", category=DeprecationWarning) 

421 self._content.unread_data(window[idx:]) 

422 self._prev_chunk = self._prev_chunk[:idx] 

423 chunk = window[len(self._prev_chunk) : idx] 

424 if not chunk: 

425 self._at_eof = True 

426 result = self._prev_chunk[2 if first_chunk else 0 :] # Strip initial CRLF 

427 self._prev_chunk = chunk 

428 return result 

429 

430 async def readline(self) -> bytes: 

431 """Reads body part by line by line.""" 

432 if self._at_eof: 

433 return b"" 

434 

435 if self._unread: 

436 line = self._unread.popleft() 

437 else: 

438 line = await self._content.readline() 

439 

440 if line.startswith(self._boundary): 

441 # the very last boundary may not come with \r\n, 

442 # so set single rules for everyone 

443 sline = line.rstrip(b"\r\n") 

444 boundary = self._boundary 

445 last_boundary = self._boundary + b"--" 

446 # ensure that we read exactly the boundary, not something alike 

447 if sline == boundary or sline == last_boundary: 

448 self._at_eof = True 

449 self._unread.append(line) 

450 return b"" 

451 else: 

452 next_line = await self._content.readline() 

453 if next_line.startswith(self._boundary): 

454 line = line[:-2] # strip CRLF but only once 

455 self._unread.append(next_line) 

456 

457 return line 

458 

459 async def release(self) -> None: 

460 """Like read(), but reads all the data to the void.""" 

461 if self._at_eof: 

462 return 

463 while not self._at_eof: 

464 await self.read_chunk(self.chunk_size) 

465 

466 async def text(self, *, encoding: Optional[str] = None) -> str: 

467 """Like read(), but assumes that body part contains text data.""" 

468 data = await self.read(decode=True) 

469 # see https://www.w3.org/TR/html5/forms.html#multipart/form-data-encoding-algorithm 

470 # and https://dvcs.w3.org/hg/xhr/raw-file/tip/Overview.html#dom-xmlhttprequest-send 

471 encoding = encoding or self.get_charset(default="utf-8") 

472 return data.decode(encoding) 

473 

474 async def json(self, *, encoding: Optional[str] = None) -> Optional[Dict[str, Any]]: 

475 """Like read(), but assumes that body parts contains JSON data.""" 

476 data = await self.read(decode=True) 

477 if not data: 

478 return None 

479 encoding = encoding or self.get_charset(default="utf-8") 

480 return cast(Dict[str, Any], json.loads(data.decode(encoding))) 

481 

482 async def form(self, *, encoding: Optional[str] = None) -> List[Tuple[str, str]]: 

483 """Like read(), but assumes that body parts contain form urlencoded data.""" 

484 data = await self.read(decode=True) 

485 if not data: 

486 return [] 

487 if encoding is not None: 

488 real_encoding = encoding 

489 else: 

490 real_encoding = self.get_charset(default="utf-8") 

491 try: 

492 decoded_data = data.rstrip().decode(real_encoding) 

493 except UnicodeDecodeError: 

494 raise ValueError("data cannot be decoded with %s encoding" % real_encoding) 

495 

496 return parse_qsl( 

497 decoded_data, 

498 keep_blank_values=True, 

499 encoding=real_encoding, 

500 ) 

501 

502 def at_eof(self) -> bool: 

503 """Returns True if the boundary was reached or False otherwise.""" 

504 return self._at_eof 

505 

506 async def decode(self, data: bytes) -> bytes: 

507 """Decodes data. 

508 

509 Decoding is done according the specified Content-Encoding 

510 or Content-Transfer-Encoding headers value. 

511 """ 

512 if CONTENT_TRANSFER_ENCODING in self.headers: 

513 data = self._decode_content_transfer(data) 

514 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8 

515 if not self._is_form_data and CONTENT_ENCODING in self.headers: 

516 return await self._decode_content(data) 

517 return data 

518 

519 async def _decode_content(self, data: bytes) -> bytes: 

520 encoding = self.headers.get(CONTENT_ENCODING, "").lower() 

521 if encoding == "identity": 

522 return data 

523 if encoding in {"deflate", "gzip"}: 

524 return await ZLibDecompressor( 

525 encoding=encoding, 

526 suppress_deflate_header=True, 

527 ).decompress(data, max_length=self._max_decompress_size) 

528 

529 raise RuntimeError(f"unknown content encoding: {encoding}") 

530 

531 def _decode_content_transfer(self, data: bytes) -> bytes: 

532 encoding = self.headers.get(CONTENT_TRANSFER_ENCODING, "").lower() 

533 

534 if encoding == "base64": 

535 return base64.b64decode(data) 

536 elif encoding == "quoted-printable": 

537 return binascii.a2b_qp(data) 

538 elif encoding in ("binary", "8bit", "7bit"): 

539 return data 

540 else: 

541 raise RuntimeError(f"unknown content transfer encoding: {encoding}") 

542 

543 def get_charset(self, default: str) -> str: 

544 """Returns charset parameter from Content-Type header or default.""" 

545 ctype = self.headers.get(CONTENT_TYPE, "") 

546 mimetype = parse_mimetype(ctype) 

547 return mimetype.parameters.get("charset", self._default_charset or default) 

548 

549 @reify 

550 def name(self) -> Optional[str]: 

551 """Returns name specified in Content-Disposition header. 

552 

553 If the header is missing or malformed, returns None. 

554 """ 

555 _, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION)) 

556 return content_disposition_filename(params, "name") 

557 

558 @reify 

559 def filename(self) -> Optional[str]: 

560 """Returns filename specified in Content-Disposition header. 

561 

562 Returns None if the header is missing or malformed. 

563 """ 

564 _, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION)) 

565 return content_disposition_filename(params, "filename") 

566 

567 

568@payload_type(BodyPartReader, order=Order.try_first) 

569class BodyPartReaderPayload(Payload): 

570 _value: BodyPartReader 

571 # _autoclose = False (inherited) - Streaming reader that may have resources 

572 

573 def __init__(self, value: BodyPartReader, *args: Any, **kwargs: Any) -> None: 

574 super().__init__(value, *args, **kwargs) 

575 

576 params: Dict[str, str] = {} 

577 if value.name is not None: 

578 params["name"] = value.name 

579 if value.filename is not None: 

580 params["filename"] = value.filename 

581 

582 if params: 

583 self.set_content_disposition("attachment", True, **params) 

584 

585 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

586 raise TypeError("Unable to decode.") 

587 

588 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

589 """Raises TypeError as body parts should be consumed via write(). 

590 

591 This is intentional: BodyPartReader payloads are designed for streaming 

592 large data (potentially gigabytes) and must be consumed only once via 

593 the write() method to avoid memory exhaustion. They cannot be buffered 

594 in memory for reuse. 

595 """ 

596 raise TypeError("Unable to read body part as bytes. Use write() to consume.") 

597 

598 async def write(self, writer: AbstractStreamWriter) -> None: 

599 field = self._value 

600 chunk = await field.read_chunk(size=2**16) 

601 while chunk: 

602 await writer.write(await field.decode(chunk)) 

603 chunk = await field.read_chunk(size=2**16) 

604 

605 

606class MultipartReader: 

607 """Multipart body reader.""" 

608 

609 #: Response wrapper, used when multipart readers constructs from response. 

610 response_wrapper_cls = MultipartResponseWrapper 

611 #: Multipart reader class, used to handle multipart/* body parts. 

612 #: None points to type(self) 

613 multipart_reader_cls: Optional[Type["MultipartReader"]] = None 

614 #: Body part reader class for non multipart/* content types. 

615 part_reader_cls = BodyPartReader 

616 

617 def __init__(self, headers: Mapping[str, str], content: StreamReader) -> None: 

618 self._mimetype = parse_mimetype(headers[CONTENT_TYPE]) 

619 assert self._mimetype.type == "multipart", "multipart/* content type expected" 

620 if "boundary" not in self._mimetype.parameters: 

621 raise ValueError( 

622 "boundary missed for Content-Type: %s" % headers[CONTENT_TYPE] 

623 ) 

624 

625 self.headers = headers 

626 self._boundary = ("--" + self._get_boundary()).encode() 

627 self._content = content 

628 self._default_charset: Optional[str] = None 

629 self._last_part: Optional[Union["MultipartReader", BodyPartReader]] = None 

630 self._at_eof = False 

631 self._at_bof = True 

632 self._unread: List[bytes] = [] 

633 

634 def __aiter__(self: Self) -> Self: 

635 return self 

636 

637 async def __anext__( 

638 self, 

639 ) -> Optional[Union["MultipartReader", BodyPartReader]]: 

640 part = await self.next() 

641 if part is None: 

642 raise StopAsyncIteration 

643 return part 

644 

645 @classmethod 

646 def from_response( 

647 cls, 

648 response: "ClientResponse", 

649 ) -> MultipartResponseWrapper: 

650 """Constructs reader instance from HTTP response. 

651 

652 :param response: :class:`~aiohttp.client.ClientResponse` instance 

653 """ 

654 obj = cls.response_wrapper_cls( 

655 response, cls(response.headers, response.content) 

656 ) 

657 return obj 

658 

659 def at_eof(self) -> bool: 

660 """Returns True if the final boundary was reached, false otherwise.""" 

661 return self._at_eof 

662 

663 async def next( 

664 self, 

665 ) -> Optional[Union["MultipartReader", BodyPartReader]]: 

666 """Emits the next multipart body part.""" 

667 # So, if we're at BOF, we need to skip till the boundary. 

668 if self._at_eof: 

669 return None 

670 await self._maybe_release_last_part() 

671 if self._at_bof: 

672 await self._read_until_first_boundary() 

673 self._at_bof = False 

674 else: 

675 await self._read_boundary() 

676 if self._at_eof: # we just read the last boundary, nothing to do there 

677 return None 

678 

679 part = await self.fetch_next_part() 

680 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.6 

681 if ( 

682 self._last_part is None 

683 and self._mimetype.subtype == "form-data" 

684 and isinstance(part, BodyPartReader) 

685 ): 

686 _, params = parse_content_disposition(part.headers.get(CONTENT_DISPOSITION)) 

687 if params.get("name") == "_charset_": 

688 # Longest encoding in https://encoding.spec.whatwg.org/encodings.json 

689 # is 19 characters, so 32 should be more than enough for any valid encoding. 

690 charset = await part.read_chunk(32) 

691 if len(charset) > 31: 

692 raise RuntimeError("Invalid default charset") 

693 self._default_charset = charset.strip().decode() 

694 part = await self.fetch_next_part() 

695 self._last_part = part 

696 return self._last_part 

697 

698 async def release(self) -> None: 

699 """Reads all the body parts to the void till the final boundary.""" 

700 while not self._at_eof: 

701 item = await self.next() 

702 if item is None: 

703 break 

704 await item.release() 

705 

706 async def fetch_next_part( 

707 self, 

708 ) -> Union["MultipartReader", BodyPartReader]: 

709 """Returns the next body part reader.""" 

710 headers = await self._read_headers() 

711 return self._get_part_reader(headers) 

712 

713 def _get_part_reader( 

714 self, 

715 headers: "CIMultiDictProxy[str]", 

716 ) -> Union["MultipartReader", BodyPartReader]: 

717 """Dispatches the response by the `Content-Type` header. 

718 

719 Returns a suitable reader instance. 

720 

721 :param dict headers: Response headers 

722 """ 

723 ctype = headers.get(CONTENT_TYPE, "") 

724 mimetype = parse_mimetype(ctype) 

725 

726 if mimetype.type == "multipart": 

727 if self.multipart_reader_cls is None: 

728 return type(self)(headers, self._content) 

729 return self.multipart_reader_cls(headers, self._content) 

730 else: 

731 return self.part_reader_cls( 

732 self._boundary, 

733 headers, 

734 self._content, 

735 subtype=self._mimetype.subtype, 

736 default_charset=self._default_charset, 

737 ) 

738 

739 def _get_boundary(self) -> str: 

740 boundary = self._mimetype.parameters["boundary"] 

741 if len(boundary) > 70: 

742 raise ValueError("boundary %r is too long (70 chars max)" % boundary) 

743 

744 return boundary 

745 

746 async def _readline(self) -> bytes: 

747 if self._unread: 

748 return self._unread.pop() 

749 return await self._content.readline() 

750 

751 async def _read_until_first_boundary(self) -> None: 

752 while True: 

753 chunk = await self._readline() 

754 if chunk == b"": 

755 raise ValueError( 

756 "Could not find starting boundary %r" % (self._boundary) 

757 ) 

758 chunk = chunk.rstrip() 

759 if chunk == self._boundary: 

760 return 

761 elif chunk == self._boundary + b"--": 

762 self._at_eof = True 

763 return 

764 

765 async def _read_boundary(self) -> None: 

766 chunk = (await self._readline()).rstrip() 

767 if chunk == self._boundary: 

768 pass 

769 elif chunk == self._boundary + b"--": 

770 self._at_eof = True 

771 epilogue = await self._readline() 

772 next_line = await self._readline() 

773 

774 # the epilogue is expected and then either the end of input or the 

775 # parent multipart boundary, if the parent boundary is found then 

776 # it should be marked as unread and handed to the parent for 

777 # processing 

778 if next_line[:2] == b"--": 

779 self._unread.append(next_line) 

780 # otherwise the request is likely missing an epilogue and both 

781 # lines should be passed to the parent for processing 

782 # (this handles the old behavior gracefully) 

783 else: 

784 self._unread.extend([next_line, epilogue]) 

785 else: 

786 raise ValueError(f"Invalid boundary {chunk!r}, expected {self._boundary!r}") 

787 

788 async def _read_headers(self) -> "CIMultiDictProxy[str]": 

789 lines = [] 

790 while True: 

791 chunk = await self._content.readline() 

792 chunk = chunk.rstrip(b"\r\n") 

793 lines.append(chunk) 

794 if not chunk: 

795 break 

796 parser = HeadersParser() 

797 headers, raw_headers = parser.parse_headers(lines) 

798 return headers 

799 

800 async def _maybe_release_last_part(self) -> None: 

801 """Ensures that the last read body part is read completely.""" 

802 if self._last_part is not None: 

803 if not self._last_part.at_eof(): 

804 await self._last_part.release() 

805 self._unread.extend(self._last_part._unread) 

806 self._last_part = None 

807 

808 

809_Part = Tuple[Payload, str, str] 

810 

811 

812class MultipartWriter(Payload): 

813 """Multipart body writer.""" 

814 

815 _value: None 

816 # _consumed = False (inherited) - Can be encoded multiple times 

817 _autoclose = True # No file handles, just collects parts in memory 

818 

819 def __init__(self, subtype: str = "mixed", boundary: Optional[str] = None) -> None: 

820 boundary = boundary if boundary is not None else uuid.uuid4().hex 

821 # The underlying Payload API demands a str (utf-8), not bytes, 

822 # so we need to ensure we don't lose anything during conversion. 

823 # As a result, require the boundary to be ASCII only. 

824 # In both situations. 

825 

826 try: 

827 self._boundary = boundary.encode("ascii") 

828 except UnicodeEncodeError: 

829 raise ValueError("boundary should contain ASCII only chars") from None 

830 ctype = f"multipart/{subtype}; boundary={self._boundary_value}" 

831 

832 super().__init__(None, content_type=ctype) 

833 

834 self._parts: List[_Part] = [] 

835 self._is_form_data = subtype == "form-data" 

836 

837 def __enter__(self) -> "MultipartWriter": 

838 return self 

839 

840 def __exit__( 

841 self, 

842 exc_type: Optional[Type[BaseException]], 

843 exc_val: Optional[BaseException], 

844 exc_tb: Optional[TracebackType], 

845 ) -> None: 

846 pass 

847 

848 def __iter__(self) -> Iterator[_Part]: 

849 return iter(self._parts) 

850 

851 def __len__(self) -> int: 

852 return len(self._parts) 

853 

854 def __bool__(self) -> bool: 

855 return True 

856 

857 _valid_tchar_regex = re.compile(rb"\A[!#$%&'*+\-.^_`|~\w]+\Z") 

858 _invalid_qdtext_char_regex = re.compile(rb"[\x00-\x08\x0A-\x1F\x7F]") 

859 

860 @property 

861 def _boundary_value(self) -> str: 

862 """Wrap boundary parameter value in quotes, if necessary. 

863 

864 Reads self.boundary and returns a unicode string. 

865 """ 

866 # Refer to RFCs 7231, 7230, 5234. 

867 # 

868 # parameter = token "=" ( token / quoted-string ) 

869 # token = 1*tchar 

870 # quoted-string = DQUOTE *( qdtext / quoted-pair ) DQUOTE 

871 # qdtext = HTAB / SP / %x21 / %x23-5B / %x5D-7E / obs-text 

872 # obs-text = %x80-FF 

873 # quoted-pair = "\" ( HTAB / SP / VCHAR / obs-text ) 

874 # tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" 

875 # / "+" / "-" / "." / "^" / "_" / "`" / "|" / "~" 

876 # / DIGIT / ALPHA 

877 # ; any VCHAR, except delimiters 

878 # VCHAR = %x21-7E 

879 value = self._boundary 

880 if re.match(self._valid_tchar_regex, value): 

881 return value.decode("ascii") # cannot fail 

882 

883 if re.search(self._invalid_qdtext_char_regex, value): 

884 raise ValueError("boundary value contains invalid characters") 

885 

886 # escape %x5C and %x22 

887 quoted_value_content = value.replace(b"\\", b"\\\\") 

888 quoted_value_content = quoted_value_content.replace(b'"', b'\\"') 

889 

890 return '"' + quoted_value_content.decode("ascii") + '"' 

891 

892 @property 

893 def boundary(self) -> str: 

894 return self._boundary.decode("ascii") 

895 

896 def append(self, obj: Any, headers: Optional[Mapping[str, str]] = None) -> Payload: 

897 if headers is None: 

898 headers = CIMultiDict() 

899 

900 if isinstance(obj, Payload): 

901 obj.headers.update(headers) 

902 return self.append_payload(obj) 

903 else: 

904 try: 

905 payload = get_payload(obj, headers=headers) 

906 except LookupError: 

907 raise TypeError("Cannot create payload from %r" % obj) 

908 else: 

909 return self.append_payload(payload) 

910 

911 def append_payload(self, payload: Payload) -> Payload: 

912 """Adds a new body part to multipart writer.""" 

913 encoding: Optional[str] = None 

914 te_encoding: Optional[str] = None 

915 if self._is_form_data: 

916 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.7 

917 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8 

918 assert ( 

919 not {CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TRANSFER_ENCODING} 

920 & payload.headers.keys() 

921 ) 

922 # Set default Content-Disposition in case user doesn't create one 

923 if CONTENT_DISPOSITION not in payload.headers: 

924 name = f"section-{len(self._parts)}" 

925 payload.set_content_disposition("form-data", name=name) 

926 else: 

927 # compression 

928 encoding = payload.headers.get(CONTENT_ENCODING, "").lower() 

929 if encoding and encoding not in ("deflate", "gzip", "identity"): 

930 raise RuntimeError(f"unknown content encoding: {encoding}") 

931 if encoding == "identity": 

932 encoding = None 

933 

934 # te encoding 

935 te_encoding = payload.headers.get(CONTENT_TRANSFER_ENCODING, "").lower() 

936 if te_encoding not in ("", "base64", "quoted-printable", "binary"): 

937 raise RuntimeError(f"unknown content transfer encoding: {te_encoding}") 

938 if te_encoding == "binary": 

939 te_encoding = None 

940 

941 # size 

942 size = payload.size 

943 if size is not None and not (encoding or te_encoding): 

944 payload.headers[CONTENT_LENGTH] = str(size) 

945 

946 self._parts.append((payload, encoding, te_encoding)) # type: ignore[arg-type] 

947 return payload 

948 

949 def append_json( 

950 self, obj: Any, headers: Optional[Mapping[str, str]] = None 

951 ) -> Payload: 

952 """Helper to append JSON part.""" 

953 if headers is None: 

954 headers = CIMultiDict() 

955 

956 return self.append_payload(JsonPayload(obj, headers=headers)) 

957 

958 def append_form( 

959 self, 

960 obj: Union[Sequence[Tuple[str, str]], Mapping[str, str]], 

961 headers: Optional[Mapping[str, str]] = None, 

962 ) -> Payload: 

963 """Helper to append form urlencoded part.""" 

964 assert isinstance(obj, (Sequence, Mapping)) 

965 

966 if headers is None: 

967 headers = CIMultiDict() 

968 

969 if isinstance(obj, Mapping): 

970 obj = list(obj.items()) 

971 data = urlencode(obj, doseq=True) 

972 

973 return self.append_payload( 

974 StringPayload( 

975 data, headers=headers, content_type="application/x-www-form-urlencoded" 

976 ) 

977 ) 

978 

979 @property 

980 def size(self) -> Optional[int]: 

981 """Size of the payload.""" 

982 total = 0 

983 for part, encoding, te_encoding in self._parts: 

984 part_size = part.size 

985 if encoding or te_encoding or part_size is None: 

986 return None 

987 

988 total += int( 

989 2 

990 + len(self._boundary) 

991 + 2 

992 + part_size # b'--'+self._boundary+b'\r\n' 

993 + len(part._binary_headers) 

994 + 2 # b'\r\n' 

995 ) 

996 

997 total += 2 + len(self._boundary) + 4 # b'--'+self._boundary+b'--\r\n' 

998 return total 

999 

1000 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

1001 """Return string representation of the multipart data. 

1002 

1003 WARNING: This method may do blocking I/O if parts contain file payloads. 

1004 It should not be called in the event loop. Use as_bytes().decode() instead. 

1005 """ 

1006 return "".join( 

1007 "--" 

1008 + self.boundary 

1009 + "\r\n" 

1010 + part._binary_headers.decode(encoding, errors) 

1011 + part.decode() 

1012 for part, _e, _te in self._parts 

1013 ) 

1014 

1015 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

1016 """Return bytes representation of the multipart data. 

1017 

1018 This method is async-safe and calls as_bytes on underlying payloads. 

1019 """ 

1020 parts: List[bytes] = [] 

1021 

1022 # Process each part 

1023 for part, _e, _te in self._parts: 

1024 # Add boundary 

1025 parts.append(b"--" + self._boundary + b"\r\n") 

1026 

1027 # Add headers 

1028 parts.append(part._binary_headers) 

1029 

1030 # Add payload content using as_bytes for async safety 

1031 part_bytes = await part.as_bytes(encoding, errors) 

1032 parts.append(part_bytes) 

1033 

1034 # Add trailing CRLF 

1035 parts.append(b"\r\n") 

1036 

1037 # Add closing boundary 

1038 parts.append(b"--" + self._boundary + b"--\r\n") 

1039 

1040 return b"".join(parts) 

1041 

1042 async def write( 

1043 self, writer: AbstractStreamWriter, close_boundary: bool = True 

1044 ) -> None: 

1045 """Write body.""" 

1046 for part, encoding, te_encoding in self._parts: 

1047 if self._is_form_data: 

1048 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.2 

1049 assert CONTENT_DISPOSITION in part.headers 

1050 assert "name=" in part.headers[CONTENT_DISPOSITION] 

1051 

1052 await writer.write(b"--" + self._boundary + b"\r\n") 

1053 await writer.write(part._binary_headers) 

1054 

1055 if encoding or te_encoding: 

1056 w = MultipartPayloadWriter(writer) 

1057 if encoding: 

1058 w.enable_compression(encoding) 

1059 if te_encoding: 

1060 w.enable_encoding(te_encoding) 

1061 await part.write(w) # type: ignore[arg-type] 

1062 await w.write_eof() 

1063 else: 

1064 await part.write(writer) 

1065 

1066 await writer.write(b"\r\n") 

1067 

1068 if close_boundary: 

1069 await writer.write(b"--" + self._boundary + b"--\r\n") 

1070 

1071 async def close(self) -> None: 

1072 """ 

1073 Close all part payloads that need explicit closing. 

1074 

1075 IMPORTANT: This method must not await anything that might not finish 

1076 immediately, as it may be called during cleanup/cancellation. Schedule 

1077 any long-running operations without awaiting them. 

1078 """ 

1079 if self._consumed: 

1080 return 

1081 self._consumed = True 

1082 

1083 # Close all parts that need explicit closing 

1084 # We catch and log exceptions to ensure all parts get a chance to close 

1085 # we do not use asyncio.gather() here because we are not allowed 

1086 # to suspend given we may be called during cleanup 

1087 for idx, (part, _, _) in enumerate(self._parts): 

1088 if not part.autoclose and not part.consumed: 

1089 try: 

1090 await part.close() 

1091 except Exception as exc: 

1092 internal_logger.error( 

1093 "Failed to close multipart part %d: %s", idx, exc, exc_info=True 

1094 ) 

1095 

1096 

1097class MultipartPayloadWriter: 

1098 def __init__(self, writer: AbstractStreamWriter) -> None: 

1099 self._writer = writer 

1100 self._encoding: Optional[str] = None 

1101 self._compress: Optional[ZLibCompressor] = None 

1102 self._encoding_buffer: Optional[bytearray] = None 

1103 

1104 def enable_encoding(self, encoding: str) -> None: 

1105 if encoding == "base64": 

1106 self._encoding = encoding 

1107 self._encoding_buffer = bytearray() 

1108 elif encoding == "quoted-printable": 

1109 self._encoding = "quoted-printable" 

1110 

1111 def enable_compression( 

1112 self, encoding: str = "deflate", strategy: Optional[int] = None 

1113 ) -> None: 

1114 self._compress = ZLibCompressor( 

1115 encoding=encoding, 

1116 suppress_deflate_header=True, 

1117 strategy=strategy, 

1118 ) 

1119 

1120 async def write_eof(self) -> None: 

1121 if self._compress is not None: 

1122 chunk = self._compress.flush() 

1123 if chunk: 

1124 self._compress = None 

1125 await self.write(chunk) 

1126 

1127 if self._encoding == "base64": 

1128 if self._encoding_buffer: 

1129 await self._writer.write(base64.b64encode(self._encoding_buffer)) 

1130 

1131 async def write(self, chunk: bytes) -> None: 

1132 if self._compress is not None: 

1133 if chunk: 

1134 chunk = await self._compress.compress(chunk) 

1135 if not chunk: 

1136 return 

1137 

1138 if self._encoding == "base64": 

1139 buf = self._encoding_buffer 

1140 assert buf is not None 

1141 buf.extend(chunk) 

1142 

1143 if buf: 

1144 div, mod = divmod(len(buf), 3) 

1145 enc_chunk, self._encoding_buffer = (buf[: div * 3], buf[div * 3 :]) 

1146 if enc_chunk: 

1147 b64chunk = base64.b64encode(enc_chunk) 

1148 await self._writer.write(b64chunk) 

1149 elif self._encoding == "quoted-printable": 

1150 await self._writer.write(binascii.b2a_qp(chunk)) 

1151 else: 

1152 await self._writer.write(chunk)