Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/multipart.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

695 statements  

1import base64 

2import binascii 

3import json 

4import re 

5import sys 

6import uuid 

7import warnings 

8from collections import deque 

9from collections.abc import AsyncIterator, Iterator, Mapping, Sequence 

10from types import TracebackType 

11from typing import TYPE_CHECKING, Any, TypeVar, Union, cast 

12from urllib.parse import parse_qsl, unquote, urlencode 

13 

14from multidict import CIMultiDict, CIMultiDictProxy 

15 

16from .abc import AbstractStreamWriter 

17from .compression_utils import ZLibCompressor, ZLibDecompressor 

18from .hdrs import ( 

19 CONTENT_DISPOSITION, 

20 CONTENT_ENCODING, 

21 CONTENT_LENGTH, 

22 CONTENT_TRANSFER_ENCODING, 

23 CONTENT_TYPE, 

24) 

25from .helpers import CHAR, DEFAULT_CHUNK_SIZE, TOKEN, parse_mimetype, reify 

26from .http import HeadersParser 

27from .http_exceptions import BadHttpMessage 

28from .log import internal_logger 

29from .payload import ( 

30 JsonPayload, 

31 LookupError, 

32 Order, 

33 Payload, 

34 StringPayload, 

35 get_payload, 

36 payload_type, 

37) 

38from .streams import StreamReader 

39 

40if sys.version_info >= (3, 11): 

41 from typing import Self 

42else: 

43 Self = TypeVar("Self", bound="BodyPartReader") 

44 

45if sys.version_info >= (3, 12): 

46 from collections.abc import Buffer 

47else: 

48 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

49 

50_Buffer = TypeVar("_Buffer", bound=Buffer) 

51 

52__all__ = ( 

53 "MultipartReader", 

54 "MultipartWriter", 

55 "BodyPartReader", 

56 "BadContentDispositionHeader", 

57 "BadContentDispositionParam", 

58 "parse_content_disposition", 

59 "content_disposition_filename", 

60) 

61 

62 

63if TYPE_CHECKING: 

64 from .client_reqrep import ClientResponse 

65 

66 

67class BadContentDispositionHeader(RuntimeWarning): 

68 pass 

69 

70 

71class BadContentDispositionParam(RuntimeWarning): 

72 pass 

73 

74 

75def parse_content_disposition( 

76 header: str | None, 

77) -> tuple[str | None, dict[str, str]]: 

78 def is_token(string: str) -> bool: 

79 return bool(string) and TOKEN >= set(string) 

80 

81 def is_quoted(string: str) -> bool: 

82 return string[0] == string[-1] == '"' 

83 

84 def is_rfc5987(string: str) -> bool: 

85 return is_token(string) and string.count("'") == 2 

86 

87 def is_extended_param(string: str) -> bool: 

88 return string.endswith("*") 

89 

90 def is_continuous_param(string: str) -> bool: 

91 pos = string.find("*") + 1 

92 if not pos: 

93 return False 

94 substring = string[pos:-1] if string.endswith("*") else string[pos:] 

95 return substring.isdigit() 

96 

97 def unescape(text: str, *, chars: str = "".join(map(re.escape, CHAR))) -> str: 

98 return re.sub(f"\\\\([{chars}])", "\\1", text) 

99 

100 if not header: 

101 return None, {} 

102 

103 disptype, *parts = header.split(";") 

104 if not is_token(disptype): 

105 warnings.warn(BadContentDispositionHeader(header)) 

106 return None, {} 

107 

108 params: dict[str, str] = {} 

109 while parts: 

110 item = parts.pop(0) 

111 

112 if not item: # To handle trailing semicolons 

113 warnings.warn(BadContentDispositionHeader(header)) 

114 continue 

115 

116 if "=" not in item: 

117 warnings.warn(BadContentDispositionHeader(header)) 

118 return None, {} 

119 

120 key, value = item.split("=", 1) 

121 key = key.lower().strip() 

122 value = value.lstrip() 

123 

124 if key in params: 

125 warnings.warn(BadContentDispositionHeader(header)) 

126 return None, {} 

127 

128 if not is_token(key): 

129 warnings.warn(BadContentDispositionParam(item)) 

130 continue 

131 

132 elif is_continuous_param(key): 

133 if is_quoted(value): 

134 value = unescape(value[1:-1]) 

135 elif not is_token(value): 

136 warnings.warn(BadContentDispositionParam(item)) 

137 continue 

138 

139 elif is_extended_param(key): 

140 if is_rfc5987(value): 

141 encoding, _, value = value.split("'", 2) 

142 encoding = encoding or "utf-8" 

143 else: 

144 warnings.warn(BadContentDispositionParam(item)) 

145 continue 

146 

147 try: 

148 value = unquote(value, encoding, "strict") 

149 except UnicodeDecodeError: # pragma: nocover 

150 warnings.warn(BadContentDispositionParam(item)) 

151 continue 

152 

153 else: 

154 failed = True 

155 if is_quoted(value): 

156 failed = False 

157 value = unescape(value[1:-1].lstrip("\\/")) 

158 elif is_token(value): 

159 failed = False 

160 elif parts: 

161 # maybe just ; in filename, in any case this is just 

162 # one case fix, for proper fix we need to redesign parser 

163 _value = f"{value};{parts[0]}" 

164 if is_quoted(_value): 

165 parts.pop(0) 

166 value = unescape(_value[1:-1].lstrip("\\/")) 

167 failed = False 

168 

169 if failed: 

170 warnings.warn(BadContentDispositionHeader(header)) 

171 return None, {} 

172 

173 params[key] = value 

174 

175 return disptype.lower(), params 

176 

177 

178def content_disposition_filename( 

179 params: Mapping[str, str], name: str = "filename" 

180) -> str | None: 

181 name_suf = "%s*" % name 

182 if not params: 

183 return None 

184 elif name_suf in params: 

185 return params[name_suf] 

186 elif name in params: 

187 return params[name] 

188 else: 

189 parts = [] 

190 fnparams = sorted( 

191 (key, value) for key, value in params.items() if key.startswith(name_suf) 

192 ) 

193 for num, (key, value) in enumerate(fnparams): 

194 _, tail = key.split("*", 1) 

195 if tail.endswith("*"): 

196 tail = tail[:-1] 

197 if tail == str(num): 

198 parts.append(value) 

199 else: 

200 break 

201 if not parts: 

202 return None 

203 value = "".join(parts) 

204 if "'" in value: 

205 encoding, _, value = value.split("'", 2) 

206 encoding = encoding or "utf-8" 

207 return unquote(value, encoding, "strict") 

208 return value 

209 

210 

211class MultipartResponseWrapper: 

212 """Wrapper around the MultipartReader. 

213 

214 It takes care about 

215 underlying connection and close it when it needs in. 

216 """ 

217 

218 def __init__( 

219 self, 

220 resp: "ClientResponse", 

221 stream: "MultipartReader", 

222 ) -> None: 

223 self.resp = resp 

224 self.stream = stream 

225 

226 def __aiter__(self) -> "MultipartResponseWrapper": 

227 return self 

228 

229 async def __anext__( 

230 self, 

231 ) -> Union["MultipartReader", "BodyPartReader"]: 

232 part = await self.next() 

233 if part is None: 

234 raise StopAsyncIteration 

235 return part 

236 

237 def at_eof(self) -> bool: 

238 """Returns True when all response data had been read.""" 

239 return self.resp.content.at_eof() 

240 

241 async def next( 

242 self, 

243 ) -> Union["MultipartReader", "BodyPartReader"] | None: 

244 """Emits next multipart reader object.""" 

245 item = await self.stream.next() 

246 if self.stream.at_eof(): 

247 await self.release() 

248 return item 

249 

250 async def release(self) -> None: 

251 """Release the connection gracefully. 

252 

253 All remaining content is read to the void. 

254 """ 

255 await self.resp.release() 

256 

257 

258class BodyPartReader: 

259 """Multipart reader for single body part.""" 

260 

261 chunk_size = 8192 

262 

263 def __init__( 

264 self, 

265 boundary: bytes, 

266 headers: "CIMultiDictProxy[str]", 

267 content: StreamReader, 

268 *, 

269 subtype: str = "mixed", 

270 default_charset: str | None = None, 

271 max_decompress_size: int = DEFAULT_CHUNK_SIZE, 

272 client_max_size: int = sys.maxsize, 

273 max_size_error_cls: type[Exception] = ValueError, 

274 ) -> None: 

275 self.headers = headers 

276 self._boundary = boundary 

277 self._boundary_len = len(boundary) + 2 # Boundary + \r\n 

278 self._content = content 

279 self._default_charset = default_charset 

280 self._at_eof = False 

281 self._is_form_data = subtype == "form-data" 

282 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8 

283 length = None if self._is_form_data else self.headers.get(CONTENT_LENGTH, None) 

284 self._length = int(length) if length is not None else None 

285 self._read_bytes = 0 

286 self._unread: deque[bytes] = deque() 

287 self._prev_chunk: bytes | None = None 

288 self._content_eof = 0 

289 self._cache: dict[str, Any] = {} 

290 self._max_decompress_size = max_decompress_size 

291 self._client_max_size = client_max_size 

292 self._max_size_error_cls = max_size_error_cls 

293 

294 def __aiter__(self: Self) -> Self: 

295 return self 

296 

297 async def __anext__(self) -> bytes: 

298 part = await self.next() 

299 if part is None: 

300 raise StopAsyncIteration 

301 return part 

302 

303 async def next(self) -> bytes | None: 

304 item = await self.read() 

305 if not item: 

306 return None 

307 return item 

308 

309 async def read(self, *, decode: bool = False) -> bytes: 

310 """Reads body part data. 

311 

312 decode: Decodes data following by encoding 

313 method from Content-Encoding header. If it missed 

314 data remains untouched 

315 """ 

316 if self._at_eof: 

317 return b"" 

318 data = bytearray() 

319 while not self._at_eof: 

320 data.extend(await self.read_chunk(self.chunk_size)) 

321 if len(data) > self._client_max_size: 

322 raise self._max_size_error_cls(self._client_max_size) 

323 if decode: 

324 decoded_data = bytearray() 

325 async for d in self.decode_iter(data): 

326 decoded_data.extend(d) 

327 if len(decoded_data) > self._client_max_size: 

328 raise self._max_size_error_cls(self._client_max_size) 

329 return decoded_data 

330 return data 

331 

332 async def read_chunk(self, size: int = chunk_size) -> bytes: 

333 """Reads body part content chunk of the specified size. 

334 

335 size: chunk size 

336 """ 

337 if self._at_eof: 

338 return b"" 

339 if self._length: 

340 chunk = await self._read_chunk_from_length(size) 

341 else: 

342 chunk = await self._read_chunk_from_stream(size) 

343 

344 # For the case of base64 data, we must read a fragment of size with a 

345 # remainder of 0 by dividing by 4 for string without symbols \n or \r 

346 encoding = self.headers.get(CONTENT_TRANSFER_ENCODING) 

347 if encoding and encoding.lower() == "base64": 

348 stripped_chunk = b"".join(chunk.split()) 

349 remainder = len(stripped_chunk) % 4 

350 

351 while remainder != 0 and not self.at_eof(): 

352 over_chunk_size = 4 - remainder 

353 over_chunk = b"" 

354 

355 if self._prev_chunk: 

356 over_chunk = self._prev_chunk[:over_chunk_size] 

357 self._prev_chunk = self._prev_chunk[len(over_chunk) :] 

358 

359 if len(over_chunk) != over_chunk_size: 

360 over_chunk += await self._content.read(4 - len(over_chunk)) 

361 

362 if not over_chunk: 

363 self._at_eof = True 

364 

365 stripped_chunk += b"".join(over_chunk.split()) 

366 chunk += over_chunk 

367 remainder = len(stripped_chunk) % 4 

368 

369 self._read_bytes += len(chunk) 

370 if self._read_bytes == self._length: 

371 self._at_eof = True 

372 if self._at_eof and await self._content.readline() != b"\r\n": 

373 raise ValueError("Reader did not read all the data or it is malformed") 

374 return chunk 

375 

376 async def _read_chunk_from_length(self, size: int) -> bytes: 

377 # Reads body part content chunk of the specified size. 

378 # The body part must has Content-Length header with proper value. 

379 assert self._length is not None, "Content-Length required for chunked read" 

380 chunk_size = min(size, self._length - self._read_bytes) 

381 chunk = await self._content.read(chunk_size) 

382 if self._content.at_eof(): 

383 self._at_eof = True 

384 return chunk 

385 

386 async def _read_chunk_from_stream(self, size: int) -> bytes: 

387 # Reads content chunk of body part with unknown length. 

388 # The Content-Length header for body part is not necessary. 

389 assert ( 

390 size >= self._boundary_len 

391 ), "Chunk size must be greater or equal than boundary length + 2" 

392 first_chunk = self._prev_chunk is None 

393 if first_chunk: 

394 # We need to re-add the CRLF that got removed from headers parsing. 

395 self._prev_chunk = b"\r\n" + await self._content.read(size) 

396 

397 chunk = b"" 

398 # content.read() may return less than size, so we need to loop to ensure 

399 # we have enough data to detect the boundary. 

400 while len(chunk) < self._boundary_len: 

401 chunk += await self._content.read(size) 

402 self._content_eof += int(self._content.at_eof()) 

403 if self._content_eof > 2: 

404 raise ValueError("Reading after EOF") 

405 if self._content_eof: 

406 break 

407 if len(chunk) > size: 

408 self._content.unread_data(chunk[size:]) 

409 chunk = chunk[:size] 

410 

411 assert self._prev_chunk is not None 

412 window = self._prev_chunk + chunk 

413 sub = b"\r\n" + self._boundary 

414 if first_chunk: 

415 idx = window.find(sub) 

416 else: 

417 idx = window.find(sub, max(0, len(self._prev_chunk) - len(sub))) 

418 if idx >= 0: 

419 # pushing boundary back to content 

420 with warnings.catch_warnings(): 

421 warnings.filterwarnings("ignore", category=DeprecationWarning) 

422 self._content.unread_data(window[idx:]) 

423 self._prev_chunk = self._prev_chunk[:idx] 

424 chunk = window[len(self._prev_chunk) : idx] 

425 if not chunk: 

426 self._at_eof = True 

427 result = self._prev_chunk[2 if first_chunk else 0 :] # Strip initial CRLF 

428 self._prev_chunk = chunk 

429 return result 

430 

431 async def readline(self) -> bytes: 

432 """Reads body part by line by line.""" 

433 if self._at_eof: 

434 return b"" 

435 

436 if self._unread: 

437 line = self._unread.popleft() 

438 else: 

439 line = await self._content.readline() 

440 

441 if line.startswith(self._boundary): 

442 # the very last boundary may not come with \r\n, 

443 # so set single rules for everyone 

444 sline = line.rstrip(b"\r\n") 

445 boundary = self._boundary 

446 last_boundary = self._boundary + b"--" 

447 # ensure that we read exactly the boundary, not something alike 

448 if sline == boundary or sline == last_boundary: 

449 self._at_eof = True 

450 self._unread.append(line) 

451 return b"" 

452 else: 

453 next_line = await self._content.readline() 

454 if next_line.startswith(self._boundary): 

455 line = line[:-2] # strip CRLF but only once 

456 self._unread.append(next_line) 

457 

458 return line 

459 

460 async def release(self) -> None: 

461 """Like read(), but reads all the data to the void.""" 

462 if self._at_eof: 

463 return 

464 while not self._at_eof: 

465 await self.read_chunk(self.chunk_size) 

466 

467 async def text(self, *, encoding: str | None = None) -> str: 

468 """Like read(), but assumes that body part contains text data.""" 

469 data = await self.read(decode=True) 

470 # see https://www.w3.org/TR/html5/forms.html#multipart/form-data-encoding-algorithm 

471 # and https://dvcs.w3.org/hg/xhr/raw-file/tip/Overview.html#dom-xmlhttprequest-send 

472 encoding = encoding or self.get_charset(default="utf-8") 

473 return data.decode(encoding) 

474 

475 async def json(self, *, encoding: str | None = None) -> dict[str, Any] | None: 

476 """Like read(), but assumes that body parts contains JSON data.""" 

477 data = await self.read(decode=True) 

478 if not data: 

479 return None 

480 encoding = encoding or self.get_charset(default="utf-8") 

481 return cast(dict[str, Any], json.loads(data.decode(encoding))) 

482 

483 async def form(self, *, encoding: str | None = None) -> list[tuple[str, str]]: 

484 """Like read(), but assumes that body parts contain form urlencoded data.""" 

485 data = await self.read(decode=True) 

486 if not data: 

487 return [] 

488 if encoding is not None: 

489 real_encoding = encoding 

490 else: 

491 real_encoding = self.get_charset(default="utf-8") 

492 try: 

493 decoded_data = data.rstrip().decode(real_encoding) 

494 except UnicodeDecodeError: 

495 raise ValueError("data cannot be decoded with %s encoding" % real_encoding) 

496 

497 return parse_qsl( 

498 decoded_data, 

499 keep_blank_values=True, 

500 encoding=real_encoding, 

501 ) 

502 

503 def at_eof(self) -> bool: 

504 """Returns True if the boundary was reached or False otherwise.""" 

505 return self._at_eof 

506 

507 def _apply_content_transfer_decoding(self, data: _Buffer) -> _Buffer | bytes: 

508 """Apply Content-Transfer-Encoding decoding if header is present.""" 

509 if CONTENT_TRANSFER_ENCODING in self.headers: 

510 return self._decode_content_transfer(data) 

511 return data 

512 

513 def _needs_content_decoding(self) -> bool: 

514 """Check if Content-Encoding decoding should be applied.""" 

515 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8 

516 return not self._is_form_data and CONTENT_ENCODING in self.headers 

517 

518 def decode(self, data: _Buffer) -> _Buffer | bytes: 

519 """Decodes data synchronously. 

520 

521 Decodes data according the specified Content-Encoding 

522 or Content-Transfer-Encoding headers value. 

523 

524 Note: For large payloads, consider using decode_iter() instead 

525 to avoid blocking the event loop during decompression. 

526 """ 

527 decoded = self._apply_content_transfer_decoding(data) 

528 if self._needs_content_decoding(): 

529 return self._decode_content(decoded) 

530 return decoded 

531 

532 async def decode_iter(self, data: _Buffer) -> AsyncIterator[_Buffer | bytes]: 

533 """Async generator that yields decoded data chunks. 

534 

535 Decodes data according the specified Content-Encoding 

536 or Content-Transfer-Encoding headers value. 

537 

538 This method offloads decompression to an executor for large payloads 

539 to avoid blocking the event loop. 

540 """ 

541 decoded = self._apply_content_transfer_decoding(data) 

542 if self._needs_content_decoding(): 

543 async for d in self._decode_content_async(decoded): 

544 yield d 

545 else: 

546 yield decoded 

547 

548 def _decode_content(self, data: _Buffer) -> _Buffer | bytes: 

549 encoding = self.headers.get(CONTENT_ENCODING, "").lower() 

550 if encoding == "identity": 

551 return data 

552 if encoding in {"deflate", "gzip"}: 

553 return ZLibDecompressor( 

554 encoding=encoding, 

555 suppress_deflate_header=True, 

556 ).decompress_sync(data, max_length=self._max_decompress_size) 

557 

558 raise RuntimeError(f"unknown content encoding: {encoding}") 

559 

560 async def _decode_content_async( 

561 self, data: _Buffer 

562 ) -> AsyncIterator[_Buffer | bytes]: 

563 encoding = self.headers.get(CONTENT_ENCODING, "").lower() 

564 if encoding == "identity": 

565 yield data 

566 elif encoding in {"deflate", "gzip"}: 

567 d = ZLibDecompressor( 

568 encoding=encoding, 

569 suppress_deflate_header=True, 

570 ) 

571 yield await d.decompress(data, max_length=self._max_decompress_size) 

572 while d.data_available: 

573 yield await d.decompress(b"", max_length=self._max_decompress_size) 

574 else: 

575 raise RuntimeError(f"unknown content encoding: {encoding}") 

576 

577 def _decode_content_transfer(self, data: _Buffer) -> _Buffer | bytes: 

578 encoding = self.headers.get(CONTENT_TRANSFER_ENCODING, "").lower() 

579 

580 if encoding == "base64": 

581 return base64.b64decode(data) 

582 elif encoding == "quoted-printable": 

583 return binascii.a2b_qp(data) 

584 elif encoding in ("binary", "8bit", "7bit"): 

585 return data 

586 else: 

587 raise RuntimeError(f"unknown content transfer encoding: {encoding}") 

588 

589 def get_charset(self, default: str) -> str: 

590 """Returns charset parameter from Content-Type header or default.""" 

591 ctype = self.headers.get(CONTENT_TYPE, "") 

592 mimetype = parse_mimetype(ctype) 

593 return mimetype.parameters.get("charset", self._default_charset or default) 

594 

595 @reify 

596 def name(self) -> str | None: 

597 """Returns name specified in Content-Disposition header. 

598 

599 If the header is missing or malformed, returns None. 

600 """ 

601 _, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION)) 

602 return content_disposition_filename(params, "name") 

603 

604 @reify 

605 def filename(self) -> str | None: 

606 """Returns filename specified in Content-Disposition header. 

607 

608 Returns None if the header is missing or malformed. 

609 """ 

610 _, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION)) 

611 return content_disposition_filename(params, "filename") 

612 

613 

614@payload_type(BodyPartReader, order=Order.try_first) 

615class BodyPartReaderPayload(Payload): 

616 _value: BodyPartReader 

617 # _autoclose = False (inherited) - Streaming reader that may have resources 

618 

619 def __init__(self, value: BodyPartReader, *args: Any, **kwargs: Any) -> None: 

620 super().__init__(value, *args, **kwargs) 

621 

622 params: dict[str, str] = {} 

623 if value.name is not None: 

624 params["name"] = value.name 

625 if value.filename is not None: 

626 params["filename"] = value.filename 

627 

628 if params: 

629 self.set_content_disposition("attachment", True, **params) 

630 

631 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

632 raise TypeError("Unable to decode.") 

633 

634 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

635 """Raises TypeError as body parts should be consumed via write(). 

636 

637 This is intentional: BodyPartReader payloads are designed for streaming 

638 large data (potentially gigabytes) and must be consumed only once via 

639 the write() method to avoid memory exhaustion. They cannot be buffered 

640 in memory for reuse. 

641 """ 

642 raise TypeError("Unable to read body part as bytes. Use write() to consume.") 

643 

644 async def write(self, writer: AbstractStreamWriter) -> None: 

645 field = self._value 

646 while chunk := await field.read_chunk(size=DEFAULT_CHUNK_SIZE): 

647 async for d in field.decode_iter(chunk): 

648 await writer.write(d) 

649 

650 

651class MultipartReader: 

652 """Multipart body reader.""" 

653 

654 #: Response wrapper, used when multipart readers constructs from response. 

655 response_wrapper_cls = MultipartResponseWrapper 

656 #: Multipart reader class, used to handle multipart/* body parts. 

657 #: None points to type(self) 

658 multipart_reader_cls: type["MultipartReader"] | None = None 

659 #: Body part reader class for non multipart/* content types. 

660 part_reader_cls = BodyPartReader 

661 

662 def __init__( 

663 self, 

664 headers: Mapping[str, str], 

665 content: StreamReader, 

666 *, 

667 client_max_size: int = sys.maxsize, 

668 max_field_size: int = 8190, 

669 max_headers: int = 128, 

670 max_size_error_cls: type[Exception] = ValueError, 

671 ) -> None: 

672 self._mimetype = parse_mimetype(headers[CONTENT_TYPE]) 

673 assert self._mimetype.type == "multipart", "multipart/* content type expected" 

674 if "boundary" not in self._mimetype.parameters: 

675 raise ValueError( 

676 "boundary missed for Content-Type: %s" % headers[CONTENT_TYPE] 

677 ) 

678 

679 self.headers = headers 

680 self._boundary = ("--" + self._get_boundary()).encode() 

681 self._client_max_size = client_max_size 

682 self._content = content 

683 self._default_charset: str | None = None 

684 self._last_part: MultipartReader | BodyPartReader | None = None 

685 self._max_field_size = max_field_size 

686 self._max_headers = max_headers 

687 self._max_size_error_cls = max_size_error_cls 

688 self._at_eof = False 

689 self._at_bof = True 

690 self._unread: list[bytes] = [] 

691 

692 def __aiter__(self: Self) -> Self: 

693 return self 

694 

695 async def __anext__( 

696 self, 

697 ) -> Union["MultipartReader", BodyPartReader] | None: 

698 part = await self.next() 

699 if part is None: 

700 raise StopAsyncIteration 

701 return part 

702 

703 @classmethod 

704 def from_response( 

705 cls, 

706 response: "ClientResponse", 

707 ) -> MultipartResponseWrapper: 

708 """Constructs reader instance from HTTP response. 

709 

710 :param response: :class:`~aiohttp.client.ClientResponse` instance 

711 """ 

712 obj = cls.response_wrapper_cls( 

713 response, cls(response.headers, response.content) 

714 ) 

715 return obj 

716 

717 def at_eof(self) -> bool: 

718 """Returns True if the final boundary was reached, false otherwise.""" 

719 return self._at_eof 

720 

721 async def next( 

722 self, 

723 ) -> Union["MultipartReader", BodyPartReader] | None: 

724 """Emits the next multipart body part.""" 

725 # So, if we're at BOF, we need to skip till the boundary. 

726 if self._at_eof: 

727 return None 

728 await self._maybe_release_last_part() 

729 if self._at_bof: 

730 await self._read_until_first_boundary() 

731 self._at_bof = False 

732 else: 

733 await self._read_boundary() 

734 if self._at_eof: # we just read the last boundary, nothing to do there 

735 return None 

736 

737 part = await self.fetch_next_part() 

738 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.6 

739 if ( 

740 self._last_part is None 

741 and self._mimetype.subtype == "form-data" 

742 and isinstance(part, BodyPartReader) 

743 ): 

744 _, params = parse_content_disposition(part.headers.get(CONTENT_DISPOSITION)) 

745 if params.get("name") == "_charset_": 

746 # Longest encoding in https://encoding.spec.whatwg.org/encodings.json 

747 # is 19 characters, so 32 should be more than enough for any valid encoding. 

748 charset = await part.read_chunk(32) 

749 if len(charset) > 31: 

750 raise RuntimeError("Invalid default charset") 

751 self._default_charset = charset.strip().decode() 

752 part = await self.fetch_next_part() 

753 self._last_part = part 

754 return self._last_part 

755 

756 async def release(self) -> None: 

757 """Reads all the body parts to the void till the final boundary.""" 

758 while not self._at_eof: 

759 item = await self.next() 

760 if item is None: 

761 break 

762 await item.release() 

763 

764 async def fetch_next_part( 

765 self, 

766 ) -> Union["MultipartReader", BodyPartReader]: 

767 """Returns the next body part reader.""" 

768 headers = await self._read_headers() 

769 return self._get_part_reader(headers) 

770 

771 def _get_part_reader( 

772 self, 

773 headers: "CIMultiDictProxy[str]", 

774 ) -> Union["MultipartReader", BodyPartReader]: 

775 """Dispatches the response by the `Content-Type` header. 

776 

777 Returns a suitable reader instance. 

778 

779 :param dict headers: Response headers 

780 """ 

781 ctype = headers.get(CONTENT_TYPE, "") 

782 mimetype = parse_mimetype(ctype) 

783 

784 if mimetype.type == "multipart": 

785 if self.multipart_reader_cls is None: 

786 return type(self)( 

787 headers, 

788 self._content, 

789 client_max_size=self._client_max_size, 

790 max_field_size=self._max_field_size, 

791 max_headers=self._max_headers, 

792 max_size_error_cls=self._max_size_error_cls, 

793 ) 

794 return self.multipart_reader_cls( 

795 headers, 

796 self._content, 

797 client_max_size=self._client_max_size, 

798 max_field_size=self._max_field_size, 

799 max_headers=self._max_headers, 

800 max_size_error_cls=self._max_size_error_cls, 

801 ) 

802 else: 

803 return self.part_reader_cls( 

804 self._boundary, 

805 headers, 

806 self._content, 

807 subtype=self._mimetype.subtype, 

808 default_charset=self._default_charset, 

809 client_max_size=self._client_max_size, 

810 max_size_error_cls=self._max_size_error_cls, 

811 ) 

812 

813 def _get_boundary(self) -> str: 

814 boundary = self._mimetype.parameters["boundary"] 

815 if len(boundary) > 70: 

816 raise ValueError("boundary %r is too long (70 chars max)" % boundary) 

817 

818 return boundary 

819 

820 async def _readline(self) -> bytes: 

821 if self._unread: 

822 return self._unread.pop() 

823 return await self._content.readline() 

824 

825 async def _read_until_first_boundary(self) -> None: 

826 while True: 

827 chunk = await self._readline() 

828 if chunk == b"": 

829 raise ValueError( 

830 "Could not find starting boundary %r" % (self._boundary) 

831 ) 

832 chunk = chunk.rstrip() 

833 if chunk == self._boundary: 

834 return 

835 elif chunk == self._boundary + b"--": 

836 self._at_eof = True 

837 return 

838 

839 async def _read_boundary(self) -> None: 

840 chunk = (await self._readline()).rstrip() 

841 if chunk == self._boundary: 

842 pass 

843 elif chunk == self._boundary + b"--": 

844 self._at_eof = True 

845 epilogue = await self._readline() 

846 next_line = await self._readline() 

847 

848 # the epilogue is expected and then either the end of input or the 

849 # parent multipart boundary, if the parent boundary is found then 

850 # it should be marked as unread and handed to the parent for 

851 # processing 

852 if next_line[:2] == b"--": 

853 self._unread.append(next_line) 

854 # otherwise the request is likely missing an epilogue and both 

855 # lines should be passed to the parent for processing 

856 # (this handles the old behavior gracefully) 

857 else: 

858 self._unread.extend([next_line, epilogue]) 

859 else: 

860 raise ValueError(f"Invalid boundary {chunk!r}, expected {self._boundary!r}") 

861 

862 async def _read_headers(self) -> "CIMultiDictProxy[str]": 

863 lines = [] 

864 while True: 

865 chunk = await self._content.readline(max_line_length=self._max_field_size) 

866 chunk = chunk.rstrip(b"\r\n") 

867 lines.append(chunk) 

868 if not chunk: 

869 break 

870 if len(lines) > self._max_headers: 

871 raise BadHttpMessage("Too many headers received") 

872 parser = HeadersParser(max_field_size=self._max_field_size) 

873 headers, raw_headers = parser.parse_headers(lines) 

874 return headers 

875 

876 async def _maybe_release_last_part(self) -> None: 

877 """Ensures that the last read body part is read completely.""" 

878 if self._last_part is not None: 

879 if not self._last_part.at_eof(): 

880 await self._last_part.release() 

881 self._unread.extend(self._last_part._unread) 

882 self._last_part = None 

883 

884 

885_Part = tuple[Payload, str, str] 

886 

887 

888class MultipartWriter(Payload): 

889 """Multipart body writer.""" 

890 

891 _value: None 

892 # _consumed = False (inherited) - Can be encoded multiple times 

893 _autoclose = True # No file handles, just collects parts in memory 

894 

895 def __init__(self, subtype: str = "mixed", boundary: str | None = None) -> None: 

896 boundary = boundary if boundary is not None else uuid.uuid4().hex 

897 # The underlying Payload API demands a str (utf-8), not bytes, 

898 # so we need to ensure we don't lose anything during conversion. 

899 # As a result, require the boundary to be ASCII only. 

900 # In both situations. 

901 

902 try: 

903 self._boundary = boundary.encode("ascii") 

904 except UnicodeEncodeError: 

905 raise ValueError("boundary should contain ASCII only chars") from None 

906 ctype = f"multipart/{subtype}; boundary={self._boundary_value}" 

907 

908 super().__init__(None, content_type=ctype) 

909 

910 self._parts: list[_Part] = [] 

911 self._is_form_data = subtype == "form-data" 

912 

913 def __enter__(self) -> "MultipartWriter": 

914 return self 

915 

916 def __exit__( 

917 self, 

918 exc_type: type[BaseException] | None, 

919 exc_val: BaseException | None, 

920 exc_tb: TracebackType | None, 

921 ) -> None: 

922 pass 

923 

924 def __iter__(self) -> Iterator[_Part]: 

925 return iter(self._parts) 

926 

927 def __len__(self) -> int: 

928 return len(self._parts) 

929 

930 def __bool__(self) -> bool: 

931 return True 

932 

933 _valid_tchar_regex = re.compile(rb"\A[!#$%&'*+\-.^_`|~\w]+\Z") 

934 _invalid_qdtext_char_regex = re.compile(rb"[\x00-\x08\x0A-\x1F\x7F]") 

935 

936 @property 

937 def _boundary_value(self) -> str: 

938 """Wrap boundary parameter value in quotes, if necessary. 

939 

940 Reads self.boundary and returns a unicode string. 

941 """ 

942 # Refer to RFCs 7231, 7230, 5234. 

943 # 

944 # parameter = token "=" ( token / quoted-string ) 

945 # token = 1*tchar 

946 # quoted-string = DQUOTE *( qdtext / quoted-pair ) DQUOTE 

947 # qdtext = HTAB / SP / %x21 / %x23-5B / %x5D-7E / obs-text 

948 # obs-text = %x80-FF 

949 # quoted-pair = "\" ( HTAB / SP / VCHAR / obs-text ) 

950 # tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" 

951 # / "+" / "-" / "." / "^" / "_" / "`" / "|" / "~" 

952 # / DIGIT / ALPHA 

953 # ; any VCHAR, except delimiters 

954 # VCHAR = %x21-7E 

955 value = self._boundary 

956 if re.match(self._valid_tchar_regex, value): 

957 return value.decode("ascii") # cannot fail 

958 

959 if re.search(self._invalid_qdtext_char_regex, value): 

960 raise ValueError("boundary value contains invalid characters") 

961 

962 # escape %x5C and %x22 

963 quoted_value_content = value.replace(b"\\", b"\\\\") 

964 quoted_value_content = quoted_value_content.replace(b'"', b'\\"') 

965 

966 return '"' + quoted_value_content.decode("ascii") + '"' 

967 

968 @property 

969 def boundary(self) -> str: 

970 return self._boundary.decode("ascii") 

971 

972 def append(self, obj: Any, headers: Mapping[str, str] | None = None) -> Payload: 

973 if headers is None: 

974 headers = CIMultiDict() 

975 

976 if isinstance(obj, Payload): 

977 obj.headers.update(headers) 

978 return self.append_payload(obj) 

979 else: 

980 try: 

981 payload = get_payload(obj, headers=headers) 

982 except LookupError: 

983 raise TypeError("Cannot create payload from %r" % obj) 

984 else: 

985 return self.append_payload(payload) 

986 

987 def append_payload(self, payload: Payload) -> Payload: 

988 """Adds a new body part to multipart writer.""" 

989 encoding: str | None = None 

990 te_encoding: str | None = None 

991 if self._is_form_data: 

992 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.7 

993 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8 

994 assert ( 

995 not {CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TRANSFER_ENCODING} 

996 & payload.headers.keys() 

997 ) 

998 # Set default Content-Disposition in case user doesn't create one 

999 if CONTENT_DISPOSITION not in payload.headers: 

1000 name = f"section-{len(self._parts)}" 

1001 payload.set_content_disposition("form-data", name=name) 

1002 else: 

1003 # compression 

1004 encoding = payload.headers.get(CONTENT_ENCODING, "").lower() 

1005 if encoding and encoding not in ("deflate", "gzip", "identity"): 

1006 raise RuntimeError(f"unknown content encoding: {encoding}") 

1007 if encoding == "identity": 

1008 encoding = None 

1009 

1010 # te encoding 

1011 te_encoding = payload.headers.get(CONTENT_TRANSFER_ENCODING, "").lower() 

1012 if te_encoding not in ("", "base64", "quoted-printable", "binary"): 

1013 raise RuntimeError(f"unknown content transfer encoding: {te_encoding}") 

1014 if te_encoding == "binary": 

1015 te_encoding = None 

1016 

1017 # size 

1018 size = payload.size 

1019 if size is not None and not (encoding or te_encoding): 

1020 payload.headers[CONTENT_LENGTH] = str(size) 

1021 

1022 self._parts.append((payload, encoding, te_encoding)) # type: ignore[arg-type] 

1023 return payload 

1024 

1025 def append_json( 

1026 self, obj: Any, headers: Mapping[str, str] | None = None 

1027 ) -> Payload: 

1028 """Helper to append JSON part.""" 

1029 if headers is None: 

1030 headers = CIMultiDict() 

1031 

1032 return self.append_payload(JsonPayload(obj, headers=headers)) 

1033 

1034 def append_form( 

1035 self, 

1036 obj: Sequence[tuple[str, str]] | Mapping[str, str], 

1037 headers: Mapping[str, str] | None = None, 

1038 ) -> Payload: 

1039 """Helper to append form urlencoded part.""" 

1040 assert isinstance(obj, (Sequence, Mapping)) 

1041 

1042 if headers is None: 

1043 headers = CIMultiDict() 

1044 

1045 if isinstance(obj, Mapping): 

1046 obj = list(obj.items()) 

1047 data = urlencode(obj, doseq=True) 

1048 

1049 return self.append_payload( 

1050 StringPayload( 

1051 data, headers=headers, content_type="application/x-www-form-urlencoded" 

1052 ) 

1053 ) 

1054 

1055 @property 

1056 def size(self) -> int | None: 

1057 """Size of the payload.""" 

1058 total = 0 

1059 for part, encoding, te_encoding in self._parts: 

1060 part_size = part.size 

1061 if encoding or te_encoding or part_size is None: 

1062 return None 

1063 

1064 total += int( 

1065 2 

1066 + len(self._boundary) 

1067 + 2 

1068 + part_size # b'--'+self._boundary+b'\r\n' 

1069 + len(part._binary_headers) 

1070 + 2 # b'\r\n' 

1071 ) 

1072 

1073 total += 2 + len(self._boundary) + 4 # b'--'+self._boundary+b'--\r\n' 

1074 return total 

1075 

1076 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

1077 """Return string representation of the multipart data. 

1078 

1079 WARNING: This method may do blocking I/O if parts contain file payloads. 

1080 It should not be called in the event loop. Use as_bytes().decode() instead. 

1081 """ 

1082 return "".join( 

1083 "--" 

1084 + self.boundary 

1085 + "\r\n" 

1086 + part._binary_headers.decode(encoding, errors) 

1087 + part.decode() 

1088 for part, _e, _te in self._parts 

1089 ) 

1090 

1091 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

1092 """Return bytes representation of the multipart data. 

1093 

1094 This method is async-safe and calls as_bytes on underlying payloads. 

1095 """ 

1096 parts: list[bytes] = [] 

1097 

1098 # Process each part 

1099 for part, _e, _te in self._parts: 

1100 # Add boundary 

1101 parts.append(b"--" + self._boundary + b"\r\n") 

1102 

1103 # Add headers 

1104 parts.append(part._binary_headers) 

1105 

1106 # Add payload content using as_bytes for async safety 

1107 part_bytes = await part.as_bytes(encoding, errors) 

1108 parts.append(part_bytes) 

1109 

1110 # Add trailing CRLF 

1111 parts.append(b"\r\n") 

1112 

1113 # Add closing boundary 

1114 parts.append(b"--" + self._boundary + b"--\r\n") 

1115 

1116 return b"".join(parts) 

1117 

1118 async def write( 

1119 self, writer: AbstractStreamWriter, close_boundary: bool = True 

1120 ) -> None: 

1121 """Write body.""" 

1122 for part, encoding, te_encoding in self._parts: 

1123 if self._is_form_data: 

1124 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.2 

1125 assert CONTENT_DISPOSITION in part.headers 

1126 assert "name=" in part.headers[CONTENT_DISPOSITION] 

1127 

1128 await writer.write(b"--" + self._boundary + b"\r\n") 

1129 await writer.write(part._binary_headers) 

1130 

1131 if encoding or te_encoding: 

1132 w = MultipartPayloadWriter(writer) 

1133 if encoding: 

1134 w.enable_compression(encoding) 

1135 if te_encoding: 

1136 w.enable_encoding(te_encoding) 

1137 await part.write(w) # type: ignore[arg-type] 

1138 await w.write_eof() 

1139 else: 

1140 await part.write(writer) 

1141 

1142 await writer.write(b"\r\n") 

1143 

1144 if close_boundary: 

1145 await writer.write(b"--" + self._boundary + b"--\r\n") 

1146 

1147 async def close(self) -> None: 

1148 """ 

1149 Close all part payloads that need explicit closing. 

1150 

1151 IMPORTANT: This method must not await anything that might not finish 

1152 immediately, as it may be called during cleanup/cancellation. Schedule 

1153 any long-running operations without awaiting them. 

1154 """ 

1155 if self._consumed: 

1156 return 

1157 self._consumed = True 

1158 

1159 # Close all parts that need explicit closing 

1160 # We catch and log exceptions to ensure all parts get a chance to close 

1161 # we do not use asyncio.gather() here because we are not allowed 

1162 # to suspend given we may be called during cleanup 

1163 for idx, (part, _, _) in enumerate(self._parts): 

1164 if not part.autoclose and not part.consumed: 

1165 try: 

1166 await part.close() 

1167 except Exception as exc: 

1168 internal_logger.error( 

1169 "Failed to close multipart part %d: %s", idx, exc, exc_info=True 

1170 ) 

1171 

1172 

1173class MultipartPayloadWriter: 

1174 def __init__(self, writer: AbstractStreamWriter) -> None: 

1175 self._writer = writer 

1176 self._encoding: str | None = None 

1177 self._compress: ZLibCompressor | None = None 

1178 self._encoding_buffer: bytearray | None = None 

1179 

1180 def enable_encoding(self, encoding: str) -> None: 

1181 if encoding == "base64": 

1182 self._encoding = encoding 

1183 self._encoding_buffer = bytearray() 

1184 elif encoding == "quoted-printable": 

1185 self._encoding = "quoted-printable" 

1186 

1187 def enable_compression( 

1188 self, encoding: str = "deflate", strategy: int | None = None 

1189 ) -> None: 

1190 self._compress = ZLibCompressor( 

1191 encoding=encoding, 

1192 suppress_deflate_header=True, 

1193 strategy=strategy, 

1194 ) 

1195 

1196 async def write_eof(self) -> None: 

1197 if self._compress is not None: 

1198 chunk = self._compress.flush() 

1199 if chunk: 

1200 self._compress = None 

1201 await self.write(chunk) 

1202 

1203 if self._encoding == "base64": 

1204 if self._encoding_buffer: 

1205 await self._writer.write(base64.b64encode(self._encoding_buffer)) 

1206 

1207 async def write(self, chunk: bytes) -> None: 

1208 if self._compress is not None: 

1209 if chunk: 

1210 chunk = await self._compress.compress(chunk) 

1211 if not chunk: 

1212 return 

1213 

1214 if self._encoding == "base64": 

1215 buf = self._encoding_buffer 

1216 assert buf is not None 

1217 buf.extend(chunk) 

1218 

1219 if buf: 

1220 div, mod = divmod(len(buf), 3) 

1221 enc_chunk, self._encoding_buffer = (buf[: div * 3], buf[div * 3 :]) 

1222 if enc_chunk: 

1223 b64chunk = base64.b64encode(enc_chunk) 

1224 await self._writer.write(b64chunk) 

1225 elif self._encoding == "quoted-printable": 

1226 await self._writer.write(binascii.b2a_qp(chunk)) 

1227 else: 

1228 await self._writer.write(chunk)