Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/multipart.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

682 statements  

1import base64 

2import binascii 

3import json 

4import re 

5import sys 

6import uuid 

7import warnings 

8from collections import deque 

9from collections.abc import Mapping, Sequence 

10from types import TracebackType 

11from typing import ( 

12 TYPE_CHECKING, 

13 Any, 

14 AsyncIterator, 

15 Deque, 

16 Dict, 

17 Iterator, 

18 List, 

19 Optional, 

20 Tuple, 

21 Type, 

22 Union, 

23 cast, 

24) 

25from urllib.parse import parse_qsl, unquote, urlencode 

26 

27from multidict import CIMultiDict, CIMultiDictProxy 

28 

29from .abc import AbstractStreamWriter 

30from .compression_utils import ( 

31 DEFAULT_MAX_DECOMPRESS_SIZE, 

32 ZLibCompressor, 

33 ZLibDecompressor, 

34) 

35from .hdrs import ( 

36 CONTENT_DISPOSITION, 

37 CONTENT_ENCODING, 

38 CONTENT_LENGTH, 

39 CONTENT_TRANSFER_ENCODING, 

40 CONTENT_TYPE, 

41) 

42from .helpers import CHAR, TOKEN, parse_mimetype, reify 

43from .http import HeadersParser 

44from .http_exceptions import BadHttpMessage 

45from .log import internal_logger 

46from .payload import ( 

47 JsonPayload, 

48 LookupError, 

49 Order, 

50 Payload, 

51 StringPayload, 

52 get_payload, 

53 payload_type, 

54) 

55from .streams import StreamReader 

56 

57if sys.version_info >= (3, 11): 

58 from typing import Self 

59else: 

60 from typing import TypeVar 

61 

62 Self = TypeVar("Self", bound="BodyPartReader") 

63 

64__all__ = ( 

65 "MultipartReader", 

66 "MultipartWriter", 

67 "BodyPartReader", 

68 "BadContentDispositionHeader", 

69 "BadContentDispositionParam", 

70 "parse_content_disposition", 

71 "content_disposition_filename", 

72) 

73 

74 

75if TYPE_CHECKING: 

76 from .client_reqrep import ClientResponse 

77 

78 

79class BadContentDispositionHeader(RuntimeWarning): 

80 pass 

81 

82 

83class BadContentDispositionParam(RuntimeWarning): 

84 pass 

85 

86 

87def parse_content_disposition( 

88 header: Optional[str], 

89) -> Tuple[Optional[str], Dict[str, str]]: 

90 def is_token(string: str) -> bool: 

91 return bool(string) and TOKEN >= set(string) 

92 

93 def is_quoted(string: str) -> bool: 

94 return string[0] == string[-1] == '"' 

95 

96 def is_rfc5987(string: str) -> bool: 

97 return is_token(string) and string.count("'") == 2 

98 

99 def is_extended_param(string: str) -> bool: 

100 return string.endswith("*") 

101 

102 def is_continuous_param(string: str) -> bool: 

103 pos = string.find("*") + 1 

104 if not pos: 

105 return False 

106 substring = string[pos:-1] if string.endswith("*") else string[pos:] 

107 return substring.isdigit() 

108 

109 def unescape(text: str, *, chars: str = "".join(map(re.escape, CHAR))) -> str: 

110 return re.sub(f"\\\\([{chars}])", "\\1", text) 

111 

112 if not header: 

113 return None, {} 

114 

115 disptype, *parts = header.split(";") 

116 if not is_token(disptype): 

117 warnings.warn(BadContentDispositionHeader(header)) 

118 return None, {} 

119 

120 params: Dict[str, str] = {} 

121 while parts: 

122 item = parts.pop(0) 

123 

124 if not item: # To handle trailing semicolons 

125 warnings.warn(BadContentDispositionHeader(header)) 

126 continue 

127 

128 if "=" not in item: 

129 warnings.warn(BadContentDispositionHeader(header)) 

130 return None, {} 

131 

132 key, value = item.split("=", 1) 

133 key = key.lower().strip() 

134 value = value.lstrip() 

135 

136 if key in params: 

137 warnings.warn(BadContentDispositionHeader(header)) 

138 return None, {} 

139 

140 if not is_token(key): 

141 warnings.warn(BadContentDispositionParam(item)) 

142 continue 

143 

144 elif is_continuous_param(key): 

145 if is_quoted(value): 

146 value = unescape(value[1:-1]) 

147 elif not is_token(value): 

148 warnings.warn(BadContentDispositionParam(item)) 

149 continue 

150 

151 elif is_extended_param(key): 

152 if is_rfc5987(value): 

153 encoding, _, value = value.split("'", 2) 

154 encoding = encoding or "utf-8" 

155 else: 

156 warnings.warn(BadContentDispositionParam(item)) 

157 continue 

158 

159 try: 

160 value = unquote(value, encoding, "strict") 

161 except UnicodeDecodeError: # pragma: nocover 

162 warnings.warn(BadContentDispositionParam(item)) 

163 continue 

164 

165 else: 

166 failed = True 

167 if is_quoted(value): 

168 failed = False 

169 value = unescape(value[1:-1].lstrip("\\/")) 

170 elif is_token(value): 

171 failed = False 

172 elif parts: 

173 # maybe just ; in filename, in any case this is just 

174 # one case fix, for proper fix we need to redesign parser 

175 _value = f"{value};{parts[0]}" 

176 if is_quoted(_value): 

177 parts.pop(0) 

178 value = unescape(_value[1:-1].lstrip("\\/")) 

179 failed = False 

180 

181 if failed: 

182 warnings.warn(BadContentDispositionHeader(header)) 

183 return None, {} 

184 

185 params[key] = value 

186 

187 return disptype.lower(), params 

188 

189 

190def content_disposition_filename( 

191 params: Mapping[str, str], name: str = "filename" 

192) -> Optional[str]: 

193 name_suf = "%s*" % name 

194 if not params: 

195 return None 

196 elif name_suf in params: 

197 return params[name_suf] 

198 elif name in params: 

199 return params[name] 

200 else: 

201 parts = [] 

202 fnparams = sorted( 

203 (key, value) for key, value in params.items() if key.startswith(name_suf) 

204 ) 

205 for num, (key, value) in enumerate(fnparams): 

206 _, tail = key.split("*", 1) 

207 if tail.endswith("*"): 

208 tail = tail[:-1] 

209 if tail == str(num): 

210 parts.append(value) 

211 else: 

212 break 

213 if not parts: 

214 return None 

215 value = "".join(parts) 

216 if "'" in value: 

217 encoding, _, value = value.split("'", 2) 

218 encoding = encoding or "utf-8" 

219 return unquote(value, encoding, "strict") 

220 return value 

221 

222 

223class MultipartResponseWrapper: 

224 """Wrapper around the MultipartReader. 

225 

226 It takes care about 

227 underlying connection and close it when it needs in. 

228 """ 

229 

230 def __init__( 

231 self, 

232 resp: "ClientResponse", 

233 stream: "MultipartReader", 

234 ) -> None: 

235 self.resp = resp 

236 self.stream = stream 

237 

238 def __aiter__(self) -> "MultipartResponseWrapper": 

239 return self 

240 

241 async def __anext__( 

242 self, 

243 ) -> Union["MultipartReader", "BodyPartReader"]: 

244 part = await self.next() 

245 if part is None: 

246 raise StopAsyncIteration 

247 return part 

248 

249 def at_eof(self) -> bool: 

250 """Returns True when all response data had been read.""" 

251 return self.resp.content.at_eof() 

252 

253 async def next( 

254 self, 

255 ) -> Optional[Union["MultipartReader", "BodyPartReader"]]: 

256 """Emits next multipart reader object.""" 

257 item = await self.stream.next() 

258 if self.stream.at_eof(): 

259 await self.release() 

260 return item 

261 

262 async def release(self) -> None: 

263 """Release the connection gracefully. 

264 

265 All remaining content is read to the void. 

266 """ 

267 await self.resp.release() 

268 

269 

270class BodyPartReader: 

271 """Multipart reader for single body part.""" 

272 

273 chunk_size = 8192 

274 

275 def __init__( 

276 self, 

277 boundary: bytes, 

278 headers: "CIMultiDictProxy[str]", 

279 content: StreamReader, 

280 *, 

281 subtype: str = "mixed", 

282 default_charset: Optional[str] = None, 

283 max_decompress_size: int = DEFAULT_MAX_DECOMPRESS_SIZE, 

284 ) -> None: 

285 self.headers = headers 

286 self._boundary = boundary 

287 self._boundary_len = len(boundary) + 2 # Boundary + \r\n 

288 self._content = content 

289 self._default_charset = default_charset 

290 self._at_eof = False 

291 self._is_form_data = subtype == "form-data" 

292 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8 

293 length = None if self._is_form_data else self.headers.get(CONTENT_LENGTH, None) 

294 self._length = int(length) if length is not None else None 

295 self._read_bytes = 0 

296 self._unread: Deque[bytes] = deque() 

297 self._prev_chunk: Optional[bytes] = None 

298 self._content_eof = 0 

299 self._cache: Dict[str, Any] = {} 

300 self._max_decompress_size = max_decompress_size 

301 

302 def __aiter__(self: Self) -> Self: 

303 return self 

304 

305 async def __anext__(self) -> bytes: 

306 part = await self.next() 

307 if part is None: 

308 raise StopAsyncIteration 

309 return part 

310 

311 async def next(self) -> Optional[bytes]: 

312 item = await self.read() 

313 if not item: 

314 return None 

315 return item 

316 

317 async def read(self, *, decode: bool = False) -> bytes: 

318 """Reads body part data. 

319 

320 decode: Decodes data following by encoding 

321 method from Content-Encoding header. If it missed 

322 data remains untouched 

323 """ 

324 if self._at_eof: 

325 return b"" 

326 data = bytearray() 

327 while not self._at_eof: 

328 data.extend(await self.read_chunk(self.chunk_size)) 

329 if decode: 

330 decoded_data = bytearray() 

331 async for d in self.decode_iter(data): 

332 decoded_data.extend(d) 

333 return decoded_data 

334 return data 

335 

336 async def read_chunk(self, size: int = chunk_size) -> bytes: 

337 """Reads body part content chunk of the specified size. 

338 

339 size: chunk size 

340 """ 

341 if self._at_eof: 

342 return b"" 

343 if self._length: 

344 chunk = await self._read_chunk_from_length(size) 

345 else: 

346 chunk = await self._read_chunk_from_stream(size) 

347 

348 # For the case of base64 data, we must read a fragment of size with a 

349 # remainder of 0 by dividing by 4 for string without symbols \n or \r 

350 encoding = self.headers.get(CONTENT_TRANSFER_ENCODING) 

351 if encoding and encoding.lower() == "base64": 

352 stripped_chunk = b"".join(chunk.split()) 

353 remainder = len(stripped_chunk) % 4 

354 

355 while remainder != 0 and not self.at_eof(): 

356 over_chunk_size = 4 - remainder 

357 over_chunk = b"" 

358 

359 if self._prev_chunk: 

360 over_chunk = self._prev_chunk[:over_chunk_size] 

361 self._prev_chunk = self._prev_chunk[len(over_chunk) :] 

362 

363 if len(over_chunk) != over_chunk_size: 

364 over_chunk += await self._content.read(4 - len(over_chunk)) 

365 

366 if not over_chunk: 

367 self._at_eof = True 

368 

369 stripped_chunk += b"".join(over_chunk.split()) 

370 chunk += over_chunk 

371 remainder = len(stripped_chunk) % 4 

372 

373 self._read_bytes += len(chunk) 

374 if self._read_bytes == self._length: 

375 self._at_eof = True 

376 if self._at_eof and await self._content.readline() != b"\r\n": 

377 raise ValueError("Reader did not read all the data or it is malformed") 

378 return chunk 

379 

380 async def _read_chunk_from_length(self, size: int) -> bytes: 

381 # Reads body part content chunk of the specified size. 

382 # The body part must has Content-Length header with proper value. 

383 assert self._length is not None, "Content-Length required for chunked read" 

384 chunk_size = min(size, self._length - self._read_bytes) 

385 chunk = await self._content.read(chunk_size) 

386 if self._content.at_eof(): 

387 self._at_eof = True 

388 return chunk 

389 

390 async def _read_chunk_from_stream(self, size: int) -> bytes: 

391 # Reads content chunk of body part with unknown length. 

392 # The Content-Length header for body part is not necessary. 

393 assert ( 

394 size >= self._boundary_len 

395 ), "Chunk size must be greater or equal than boundary length + 2" 

396 first_chunk = self._prev_chunk is None 

397 if first_chunk: 

398 # We need to re-add the CRLF that got removed from headers parsing. 

399 self._prev_chunk = b"\r\n" + await self._content.read(size) 

400 

401 chunk = b"" 

402 # content.read() may return less than size, so we need to loop to ensure 

403 # we have enough data to detect the boundary. 

404 while len(chunk) < self._boundary_len: 

405 chunk += await self._content.read(size) 

406 self._content_eof += int(self._content.at_eof()) 

407 if self._content_eof > 2: 

408 raise ValueError("Reading after EOF") 

409 if self._content_eof: 

410 break 

411 if len(chunk) > size: 

412 self._content.unread_data(chunk[size:]) 

413 chunk = chunk[:size] 

414 

415 assert self._prev_chunk is not None 

416 window = self._prev_chunk + chunk 

417 sub = b"\r\n" + self._boundary 

418 if first_chunk: 

419 idx = window.find(sub) 

420 else: 

421 idx = window.find(sub, max(0, len(self._prev_chunk) - len(sub))) 

422 if idx >= 0: 

423 # pushing boundary back to content 

424 with warnings.catch_warnings(): 

425 warnings.filterwarnings("ignore", category=DeprecationWarning) 

426 self._content.unread_data(window[idx:]) 

427 self._prev_chunk = self._prev_chunk[:idx] 

428 chunk = window[len(self._prev_chunk) : idx] 

429 if not chunk: 

430 self._at_eof = True 

431 result = self._prev_chunk[2 if first_chunk else 0 :] # Strip initial CRLF 

432 self._prev_chunk = chunk 

433 return result 

434 

435 async def readline(self) -> bytes: 

436 """Reads body part by line by line.""" 

437 if self._at_eof: 

438 return b"" 

439 

440 if self._unread: 

441 line = self._unread.popleft() 

442 else: 

443 line = await self._content.readline() 

444 

445 if line.startswith(self._boundary): 

446 # the very last boundary may not come with \r\n, 

447 # so set single rules for everyone 

448 sline = line.rstrip(b"\r\n") 

449 boundary = self._boundary 

450 last_boundary = self._boundary + b"--" 

451 # ensure that we read exactly the boundary, not something alike 

452 if sline == boundary or sline == last_boundary: 

453 self._at_eof = True 

454 self._unread.append(line) 

455 return b"" 

456 else: 

457 next_line = await self._content.readline() 

458 if next_line.startswith(self._boundary): 

459 line = line[:-2] # strip CRLF but only once 

460 self._unread.append(next_line) 

461 

462 return line 

463 

464 async def release(self) -> None: 

465 """Like read(), but reads all the data to the void.""" 

466 if self._at_eof: 

467 return 

468 while not self._at_eof: 

469 await self.read_chunk(self.chunk_size) 

470 

471 async def text(self, *, encoding: Optional[str] = None) -> str: 

472 """Like read(), but assumes that body part contains text data.""" 

473 data = await self.read(decode=True) 

474 # see https://www.w3.org/TR/html5/forms.html#multipart/form-data-encoding-algorithm 

475 # and https://dvcs.w3.org/hg/xhr/raw-file/tip/Overview.html#dom-xmlhttprequest-send 

476 encoding = encoding or self.get_charset(default="utf-8") 

477 return data.decode(encoding) 

478 

479 async def json(self, *, encoding: Optional[str] = None) -> Optional[Dict[str, Any]]: 

480 """Like read(), but assumes that body parts contains JSON data.""" 

481 data = await self.read(decode=True) 

482 if not data: 

483 return None 

484 encoding = encoding or self.get_charset(default="utf-8") 

485 return cast(Dict[str, Any], json.loads(data.decode(encoding))) 

486 

487 async def form(self, *, encoding: Optional[str] = None) -> List[Tuple[str, str]]: 

488 """Like read(), but assumes that body parts contain form urlencoded data.""" 

489 data = await self.read(decode=True) 

490 if not data: 

491 return [] 

492 if encoding is not None: 

493 real_encoding = encoding 

494 else: 

495 real_encoding = self.get_charset(default="utf-8") 

496 try: 

497 decoded_data = data.rstrip().decode(real_encoding) 

498 except UnicodeDecodeError: 

499 raise ValueError("data cannot be decoded with %s encoding" % real_encoding) 

500 

501 return parse_qsl( 

502 decoded_data, 

503 keep_blank_values=True, 

504 encoding=real_encoding, 

505 ) 

506 

507 def at_eof(self) -> bool: 

508 """Returns True if the boundary was reached or False otherwise.""" 

509 return self._at_eof 

510 

511 def _apply_content_transfer_decoding(self, data: bytes) -> bytes: 

512 """Apply Content-Transfer-Encoding decoding if header is present.""" 

513 if CONTENT_TRANSFER_ENCODING in self.headers: 

514 return self._decode_content_transfer(data) 

515 return data 

516 

517 def _needs_content_decoding(self) -> bool: 

518 """Check if Content-Encoding decoding should be applied.""" 

519 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8 

520 return not self._is_form_data and CONTENT_ENCODING in self.headers 

521 

522 def decode(self, data: bytes) -> bytes: 

523 """Decodes data synchronously. 

524 

525 Decodes data according the specified Content-Encoding 

526 or Content-Transfer-Encoding headers value. 

527 

528 Note: For large payloads, consider using decode_iter() instead 

529 to avoid blocking the event loop during decompression. 

530 """ 

531 data = self._apply_content_transfer_decoding(data) 

532 if self._needs_content_decoding(): 

533 return self._decode_content(data) 

534 return data 

535 

536 async def decode_iter(self, data: bytes) -> AsyncIterator[bytes]: 

537 """Async generator that yields decoded data chunks. 

538 

539 Decodes data according the specified Content-Encoding 

540 or Content-Transfer-Encoding headers value. 

541 

542 This method offloads decompression to an executor for large payloads 

543 to avoid blocking the event loop. 

544 """ 

545 data = self._apply_content_transfer_decoding(data) 

546 if self._needs_content_decoding(): 

547 async for d in self._decode_content_async(data): 

548 yield d 

549 else: 

550 yield data 

551 

552 def _decode_content(self, data: bytes) -> bytes: 

553 encoding = self.headers.get(CONTENT_ENCODING, "").lower() 

554 if encoding == "identity": 

555 return data 

556 if encoding in {"deflate", "gzip"}: 

557 return ZLibDecompressor( 

558 encoding=encoding, 

559 suppress_deflate_header=True, 

560 ).decompress_sync(data, max_length=self._max_decompress_size) 

561 

562 raise RuntimeError(f"unknown content encoding: {encoding}") 

563 

564 async def _decode_content_async(self, data: bytes) -> AsyncIterator[bytes]: 

565 encoding = self.headers.get(CONTENT_ENCODING, "").lower() 

566 if encoding == "identity": 

567 yield data 

568 elif encoding in {"deflate", "gzip"}: 

569 d = ZLibDecompressor( 

570 encoding=encoding, 

571 suppress_deflate_header=True, 

572 ) 

573 yield await d.decompress(data, max_length=self._max_decompress_size) 

574 else: 

575 raise RuntimeError(f"unknown content encoding: {encoding}") 

576 

577 def _decode_content_transfer(self, data: bytes) -> bytes: 

578 encoding = self.headers.get(CONTENT_TRANSFER_ENCODING, "").lower() 

579 

580 if encoding == "base64": 

581 return base64.b64decode(data) 

582 elif encoding == "quoted-printable": 

583 return binascii.a2b_qp(data) 

584 elif encoding in ("binary", "8bit", "7bit"): 

585 return data 

586 else: 

587 raise RuntimeError(f"unknown content transfer encoding: {encoding}") 

588 

589 def get_charset(self, default: str) -> str: 

590 """Returns charset parameter from Content-Type header or default.""" 

591 ctype = self.headers.get(CONTENT_TYPE, "") 

592 mimetype = parse_mimetype(ctype) 

593 return mimetype.parameters.get("charset", self._default_charset or default) 

594 

595 @reify 

596 def name(self) -> Optional[str]: 

597 """Returns name specified in Content-Disposition header. 

598 

599 If the header is missing or malformed, returns None. 

600 """ 

601 _, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION)) 

602 return content_disposition_filename(params, "name") 

603 

604 @reify 

605 def filename(self) -> Optional[str]: 

606 """Returns filename specified in Content-Disposition header. 

607 

608 Returns None if the header is missing or malformed. 

609 """ 

610 _, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION)) 

611 return content_disposition_filename(params, "filename") 

612 

613 

614@payload_type(BodyPartReader, order=Order.try_first) 

615class BodyPartReaderPayload(Payload): 

616 _value: BodyPartReader 

617 # _autoclose = False (inherited) - Streaming reader that may have resources 

618 

619 def __init__(self, value: BodyPartReader, *args: Any, **kwargs: Any) -> None: 

620 super().__init__(value, *args, **kwargs) 

621 

622 params: Dict[str, str] = {} 

623 if value.name is not None: 

624 params["name"] = value.name 

625 if value.filename is not None: 

626 params["filename"] = value.filename 

627 

628 if params: 

629 self.set_content_disposition("attachment", True, **params) 

630 

631 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

632 raise TypeError("Unable to decode.") 

633 

634 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

635 """Raises TypeError as body parts should be consumed via write(). 

636 

637 This is intentional: BodyPartReader payloads are designed for streaming 

638 large data (potentially gigabytes) and must be consumed only once via 

639 the write() method to avoid memory exhaustion. They cannot be buffered 

640 in memory for reuse. 

641 """ 

642 raise TypeError("Unable to read body part as bytes. Use write() to consume.") 

643 

644 async def write(self, writer: AbstractStreamWriter) -> None: 

645 field = self._value 

646 while chunk := await field.read_chunk(size=2**18): 

647 async for d in field.decode_iter(chunk): 

648 await writer.write(d) 

649 

650 

651class MultipartReader: 

652 """Multipart body reader.""" 

653 

654 #: Response wrapper, used when multipart readers constructs from response. 

655 response_wrapper_cls = MultipartResponseWrapper 

656 #: Multipart reader class, used to handle multipart/* body parts. 

657 #: None points to type(self) 

658 multipart_reader_cls: Optional[Type["MultipartReader"]] = None 

659 #: Body part reader class for non multipart/* content types. 

660 part_reader_cls = BodyPartReader 

661 

662 def __init__( 

663 self, 

664 headers: Mapping[str, str], 

665 content: StreamReader, 

666 *, 

667 max_field_size: int = 8190, 

668 max_headers: int = 128, 

669 ) -> None: 

670 self._mimetype = parse_mimetype(headers[CONTENT_TYPE]) 

671 assert self._mimetype.type == "multipart", "multipart/* content type expected" 

672 if "boundary" not in self._mimetype.parameters: 

673 raise ValueError( 

674 "boundary missed for Content-Type: %s" % headers[CONTENT_TYPE] 

675 ) 

676 

677 self.headers = headers 

678 self._boundary = ("--" + self._get_boundary()).encode() 

679 self._content = content 

680 self._default_charset: str | None = None 

681 self._last_part: MultipartReader | BodyPartReader | None = None 

682 self._max_field_size = max_field_size 

683 self._max_headers = max_headers 

684 self._at_eof = False 

685 self._at_bof = True 

686 self._unread: List[bytes] = [] 

687 

688 def __aiter__(self: Self) -> Self: 

689 return self 

690 

691 async def __anext__( 

692 self, 

693 ) -> Optional[Union["MultipartReader", BodyPartReader]]: 

694 part = await self.next() 

695 if part is None: 

696 raise StopAsyncIteration 

697 return part 

698 

699 @classmethod 

700 def from_response( 

701 cls, 

702 response: "ClientResponse", 

703 ) -> MultipartResponseWrapper: 

704 """Constructs reader instance from HTTP response. 

705 

706 :param response: :class:`~aiohttp.client.ClientResponse` instance 

707 """ 

708 obj = cls.response_wrapper_cls( 

709 response, cls(response.headers, response.content) 

710 ) 

711 return obj 

712 

713 def at_eof(self) -> bool: 

714 """Returns True if the final boundary was reached, false otherwise.""" 

715 return self._at_eof 

716 

717 async def next( 

718 self, 

719 ) -> Optional[Union["MultipartReader", BodyPartReader]]: 

720 """Emits the next multipart body part.""" 

721 # So, if we're at BOF, we need to skip till the boundary. 

722 if self._at_eof: 

723 return None 

724 await self._maybe_release_last_part() 

725 if self._at_bof: 

726 await self._read_until_first_boundary() 

727 self._at_bof = False 

728 else: 

729 await self._read_boundary() 

730 if self._at_eof: # we just read the last boundary, nothing to do there 

731 return None 

732 

733 part = await self.fetch_next_part() 

734 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.6 

735 if ( 

736 self._last_part is None 

737 and self._mimetype.subtype == "form-data" 

738 and isinstance(part, BodyPartReader) 

739 ): 

740 _, params = parse_content_disposition(part.headers.get(CONTENT_DISPOSITION)) 

741 if params.get("name") == "_charset_": 

742 # Longest encoding in https://encoding.spec.whatwg.org/encodings.json 

743 # is 19 characters, so 32 should be more than enough for any valid encoding. 

744 charset = await part.read_chunk(32) 

745 if len(charset) > 31: 

746 raise RuntimeError("Invalid default charset") 

747 self._default_charset = charset.strip().decode() 

748 part = await self.fetch_next_part() 

749 self._last_part = part 

750 return self._last_part 

751 

752 async def release(self) -> None: 

753 """Reads all the body parts to the void till the final boundary.""" 

754 while not self._at_eof: 

755 item = await self.next() 

756 if item is None: 

757 break 

758 await item.release() 

759 

760 async def fetch_next_part( 

761 self, 

762 ) -> Union["MultipartReader", BodyPartReader]: 

763 """Returns the next body part reader.""" 

764 headers = await self._read_headers() 

765 return self._get_part_reader(headers) 

766 

767 def _get_part_reader( 

768 self, 

769 headers: "CIMultiDictProxy[str]", 

770 ) -> Union["MultipartReader", BodyPartReader]: 

771 """Dispatches the response by the `Content-Type` header. 

772 

773 Returns a suitable reader instance. 

774 

775 :param dict headers: Response headers 

776 """ 

777 ctype = headers.get(CONTENT_TYPE, "") 

778 mimetype = parse_mimetype(ctype) 

779 

780 if mimetype.type == "multipart": 

781 if self.multipart_reader_cls is None: 

782 return type(self)(headers, self._content) 

783 return self.multipart_reader_cls( 

784 headers, 

785 self._content, 

786 max_field_size=self._max_field_size, 

787 max_headers=self._max_headers, 

788 ) 

789 else: 

790 return self.part_reader_cls( 

791 self._boundary, 

792 headers, 

793 self._content, 

794 subtype=self._mimetype.subtype, 

795 default_charset=self._default_charset, 

796 ) 

797 

798 def _get_boundary(self) -> str: 

799 boundary = self._mimetype.parameters["boundary"] 

800 if len(boundary) > 70: 

801 raise ValueError("boundary %r is too long (70 chars max)" % boundary) 

802 

803 return boundary 

804 

805 async def _readline(self) -> bytes: 

806 if self._unread: 

807 return self._unread.pop() 

808 return await self._content.readline() 

809 

810 async def _read_until_first_boundary(self) -> None: 

811 while True: 

812 chunk = await self._readline() 

813 if chunk == b"": 

814 raise ValueError( 

815 "Could not find starting boundary %r" % (self._boundary) 

816 ) 

817 chunk = chunk.rstrip() 

818 if chunk == self._boundary: 

819 return 

820 elif chunk == self._boundary + b"--": 

821 self._at_eof = True 

822 return 

823 

824 async def _read_boundary(self) -> None: 

825 chunk = (await self._readline()).rstrip() 

826 if chunk == self._boundary: 

827 pass 

828 elif chunk == self._boundary + b"--": 

829 self._at_eof = True 

830 epilogue = await self._readline() 

831 next_line = await self._readline() 

832 

833 # the epilogue is expected and then either the end of input or the 

834 # parent multipart boundary, if the parent boundary is found then 

835 # it should be marked as unread and handed to the parent for 

836 # processing 

837 if next_line[:2] == b"--": 

838 self._unread.append(next_line) 

839 # otherwise the request is likely missing an epilogue and both 

840 # lines should be passed to the parent for processing 

841 # (this handles the old behavior gracefully) 

842 else: 

843 self._unread.extend([next_line, epilogue]) 

844 else: 

845 raise ValueError(f"Invalid boundary {chunk!r}, expected {self._boundary!r}") 

846 

847 async def _read_headers(self) -> "CIMultiDictProxy[str]": 

848 lines = [] 

849 while True: 

850 chunk = await self._content.readline(max_line_length=self._max_field_size) 

851 chunk = chunk.rstrip(b"\r\n") 

852 lines.append(chunk) 

853 if not chunk: 

854 break 

855 if len(lines) > self._max_headers: 

856 raise BadHttpMessage("Too many headers received") 

857 parser = HeadersParser(max_field_size=self._max_field_size) 

858 headers, raw_headers = parser.parse_headers(lines) 

859 return headers 

860 

861 async def _maybe_release_last_part(self) -> None: 

862 """Ensures that the last read body part is read completely.""" 

863 if self._last_part is not None: 

864 if not self._last_part.at_eof(): 

865 await self._last_part.release() 

866 self._unread.extend(self._last_part._unread) 

867 self._last_part = None 

868 

869 

870_Part = Tuple[Payload, str, str] 

871 

872 

873class MultipartWriter(Payload): 

874 """Multipart body writer.""" 

875 

876 _value: None 

877 # _consumed = False (inherited) - Can be encoded multiple times 

878 _autoclose = True # No file handles, just collects parts in memory 

879 

880 def __init__(self, subtype: str = "mixed", boundary: Optional[str] = None) -> None: 

881 boundary = boundary if boundary is not None else uuid.uuid4().hex 

882 # The underlying Payload API demands a str (utf-8), not bytes, 

883 # so we need to ensure we don't lose anything during conversion. 

884 # As a result, require the boundary to be ASCII only. 

885 # In both situations. 

886 

887 try: 

888 self._boundary = boundary.encode("ascii") 

889 except UnicodeEncodeError: 

890 raise ValueError("boundary should contain ASCII only chars") from None 

891 ctype = f"multipart/{subtype}; boundary={self._boundary_value}" 

892 

893 super().__init__(None, content_type=ctype) 

894 

895 self._parts: List[_Part] = [] 

896 self._is_form_data = subtype == "form-data" 

897 

898 def __enter__(self) -> "MultipartWriter": 

899 return self 

900 

901 def __exit__( 

902 self, 

903 exc_type: Optional[Type[BaseException]], 

904 exc_val: Optional[BaseException], 

905 exc_tb: Optional[TracebackType], 

906 ) -> None: 

907 pass 

908 

909 def __iter__(self) -> Iterator[_Part]: 

910 return iter(self._parts) 

911 

912 def __len__(self) -> int: 

913 return len(self._parts) 

914 

915 def __bool__(self) -> bool: 

916 return True 

917 

918 _valid_tchar_regex = re.compile(rb"\A[!#$%&'*+\-.^_`|~\w]+\Z") 

919 _invalid_qdtext_char_regex = re.compile(rb"[\x00-\x08\x0A-\x1F\x7F]") 

920 

921 @property 

922 def _boundary_value(self) -> str: 

923 """Wrap boundary parameter value in quotes, if necessary. 

924 

925 Reads self.boundary and returns a unicode string. 

926 """ 

927 # Refer to RFCs 7231, 7230, 5234. 

928 # 

929 # parameter = token "=" ( token / quoted-string ) 

930 # token = 1*tchar 

931 # quoted-string = DQUOTE *( qdtext / quoted-pair ) DQUOTE 

932 # qdtext = HTAB / SP / %x21 / %x23-5B / %x5D-7E / obs-text 

933 # obs-text = %x80-FF 

934 # quoted-pair = "\" ( HTAB / SP / VCHAR / obs-text ) 

935 # tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" 

936 # / "+" / "-" / "." / "^" / "_" / "`" / "|" / "~" 

937 # / DIGIT / ALPHA 

938 # ; any VCHAR, except delimiters 

939 # VCHAR = %x21-7E 

940 value = self._boundary 

941 if re.match(self._valid_tchar_regex, value): 

942 return value.decode("ascii") # cannot fail 

943 

944 if re.search(self._invalid_qdtext_char_regex, value): 

945 raise ValueError("boundary value contains invalid characters") 

946 

947 # escape %x5C and %x22 

948 quoted_value_content = value.replace(b"\\", b"\\\\") 

949 quoted_value_content = quoted_value_content.replace(b'"', b'\\"') 

950 

951 return '"' + quoted_value_content.decode("ascii") + '"' 

952 

953 @property 

954 def boundary(self) -> str: 

955 return self._boundary.decode("ascii") 

956 

957 def append(self, obj: Any, headers: Optional[Mapping[str, str]] = None) -> Payload: 

958 if headers is None: 

959 headers = CIMultiDict() 

960 

961 if isinstance(obj, Payload): 

962 obj.headers.update(headers) 

963 return self.append_payload(obj) 

964 else: 

965 try: 

966 payload = get_payload(obj, headers=headers) 

967 except LookupError: 

968 raise TypeError("Cannot create payload from %r" % obj) 

969 else: 

970 return self.append_payload(payload) 

971 

972 def append_payload(self, payload: Payload) -> Payload: 

973 """Adds a new body part to multipart writer.""" 

974 encoding: Optional[str] = None 

975 te_encoding: Optional[str] = None 

976 if self._is_form_data: 

977 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.7 

978 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8 

979 assert ( 

980 not {CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TRANSFER_ENCODING} 

981 & payload.headers.keys() 

982 ) 

983 # Set default Content-Disposition in case user doesn't create one 

984 if CONTENT_DISPOSITION not in payload.headers: 

985 name = f"section-{len(self._parts)}" 

986 payload.set_content_disposition("form-data", name=name) 

987 else: 

988 # compression 

989 encoding = payload.headers.get(CONTENT_ENCODING, "").lower() 

990 if encoding and encoding not in ("deflate", "gzip", "identity"): 

991 raise RuntimeError(f"unknown content encoding: {encoding}") 

992 if encoding == "identity": 

993 encoding = None 

994 

995 # te encoding 

996 te_encoding = payload.headers.get(CONTENT_TRANSFER_ENCODING, "").lower() 

997 if te_encoding not in ("", "base64", "quoted-printable", "binary"): 

998 raise RuntimeError(f"unknown content transfer encoding: {te_encoding}") 

999 if te_encoding == "binary": 

1000 te_encoding = None 

1001 

1002 # size 

1003 size = payload.size 

1004 if size is not None and not (encoding or te_encoding): 

1005 payload.headers[CONTENT_LENGTH] = str(size) 

1006 

1007 self._parts.append((payload, encoding, te_encoding)) # type: ignore[arg-type] 

1008 return payload 

1009 

1010 def append_json( 

1011 self, obj: Any, headers: Optional[Mapping[str, str]] = None 

1012 ) -> Payload: 

1013 """Helper to append JSON part.""" 

1014 if headers is None: 

1015 headers = CIMultiDict() 

1016 

1017 return self.append_payload(JsonPayload(obj, headers=headers)) 

1018 

1019 def append_form( 

1020 self, 

1021 obj: Union[Sequence[Tuple[str, str]], Mapping[str, str]], 

1022 headers: Optional[Mapping[str, str]] = None, 

1023 ) -> Payload: 

1024 """Helper to append form urlencoded part.""" 

1025 assert isinstance(obj, (Sequence, Mapping)) 

1026 

1027 if headers is None: 

1028 headers = CIMultiDict() 

1029 

1030 if isinstance(obj, Mapping): 

1031 obj = list(obj.items()) 

1032 data = urlencode(obj, doseq=True) 

1033 

1034 return self.append_payload( 

1035 StringPayload( 

1036 data, headers=headers, content_type="application/x-www-form-urlencoded" 

1037 ) 

1038 ) 

1039 

1040 @property 

1041 def size(self) -> Optional[int]: 

1042 """Size of the payload.""" 

1043 total = 0 

1044 for part, encoding, te_encoding in self._parts: 

1045 part_size = part.size 

1046 if encoding or te_encoding or part_size is None: 

1047 return None 

1048 

1049 total += int( 

1050 2 

1051 + len(self._boundary) 

1052 + 2 

1053 + part_size # b'--'+self._boundary+b'\r\n' 

1054 + len(part._binary_headers) 

1055 + 2 # b'\r\n' 

1056 ) 

1057 

1058 total += 2 + len(self._boundary) + 4 # b'--'+self._boundary+b'--\r\n' 

1059 return total 

1060 

1061 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

1062 """Return string representation of the multipart data. 

1063 

1064 WARNING: This method may do blocking I/O if parts contain file payloads. 

1065 It should not be called in the event loop. Use as_bytes().decode() instead. 

1066 """ 

1067 return "".join( 

1068 "--" 

1069 + self.boundary 

1070 + "\r\n" 

1071 + part._binary_headers.decode(encoding, errors) 

1072 + part.decode() 

1073 for part, _e, _te in self._parts 

1074 ) 

1075 

1076 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

1077 """Return bytes representation of the multipart data. 

1078 

1079 This method is async-safe and calls as_bytes on underlying payloads. 

1080 """ 

1081 parts: List[bytes] = [] 

1082 

1083 # Process each part 

1084 for part, _e, _te in self._parts: 

1085 # Add boundary 

1086 parts.append(b"--" + self._boundary + b"\r\n") 

1087 

1088 # Add headers 

1089 parts.append(part._binary_headers) 

1090 

1091 # Add payload content using as_bytes for async safety 

1092 part_bytes = await part.as_bytes(encoding, errors) 

1093 parts.append(part_bytes) 

1094 

1095 # Add trailing CRLF 

1096 parts.append(b"\r\n") 

1097 

1098 # Add closing boundary 

1099 parts.append(b"--" + self._boundary + b"--\r\n") 

1100 

1101 return b"".join(parts) 

1102 

1103 async def write( 

1104 self, writer: AbstractStreamWriter, close_boundary: bool = True 

1105 ) -> None: 

1106 """Write body.""" 

1107 for part, encoding, te_encoding in self._parts: 

1108 if self._is_form_data: 

1109 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.2 

1110 assert CONTENT_DISPOSITION in part.headers 

1111 assert "name=" in part.headers[CONTENT_DISPOSITION] 

1112 

1113 await writer.write(b"--" + self._boundary + b"\r\n") 

1114 await writer.write(part._binary_headers) 

1115 

1116 if encoding or te_encoding: 

1117 w = MultipartPayloadWriter(writer) 

1118 if encoding: 

1119 w.enable_compression(encoding) 

1120 if te_encoding: 

1121 w.enable_encoding(te_encoding) 

1122 await part.write(w) # type: ignore[arg-type] 

1123 await w.write_eof() 

1124 else: 

1125 await part.write(writer) 

1126 

1127 await writer.write(b"\r\n") 

1128 

1129 if close_boundary: 

1130 await writer.write(b"--" + self._boundary + b"--\r\n") 

1131 

1132 async def close(self) -> None: 

1133 """ 

1134 Close all part payloads that need explicit closing. 

1135 

1136 IMPORTANT: This method must not await anything that might not finish 

1137 immediately, as it may be called during cleanup/cancellation. Schedule 

1138 any long-running operations without awaiting them. 

1139 """ 

1140 if self._consumed: 

1141 return 

1142 self._consumed = True 

1143 

1144 # Close all parts that need explicit closing 

1145 # We catch and log exceptions to ensure all parts get a chance to close 

1146 # we do not use asyncio.gather() here because we are not allowed 

1147 # to suspend given we may be called during cleanup 

1148 for idx, (part, _, _) in enumerate(self._parts): 

1149 if not part.autoclose and not part.consumed: 

1150 try: 

1151 await part.close() 

1152 except Exception as exc: 

1153 internal_logger.error( 

1154 "Failed to close multipart part %d: %s", idx, exc, exc_info=True 

1155 ) 

1156 

1157 

1158class MultipartPayloadWriter: 

1159 def __init__(self, writer: AbstractStreamWriter) -> None: 

1160 self._writer = writer 

1161 self._encoding: Optional[str] = None 

1162 self._compress: Optional[ZLibCompressor] = None 

1163 self._encoding_buffer: Optional[bytearray] = None 

1164 

1165 def enable_encoding(self, encoding: str) -> None: 

1166 if encoding == "base64": 

1167 self._encoding = encoding 

1168 self._encoding_buffer = bytearray() 

1169 elif encoding == "quoted-printable": 

1170 self._encoding = "quoted-printable" 

1171 

1172 def enable_compression( 

1173 self, encoding: str = "deflate", strategy: Optional[int] = None 

1174 ) -> None: 

1175 self._compress = ZLibCompressor( 

1176 encoding=encoding, 

1177 suppress_deflate_header=True, 

1178 strategy=strategy, 

1179 ) 

1180 

1181 async def write_eof(self) -> None: 

1182 if self._compress is not None: 

1183 chunk = self._compress.flush() 

1184 if chunk: 

1185 self._compress = None 

1186 await self.write(chunk) 

1187 

1188 if self._encoding == "base64": 

1189 if self._encoding_buffer: 

1190 await self._writer.write(base64.b64encode(self._encoding_buffer)) 

1191 

1192 async def write(self, chunk: bytes) -> None: 

1193 if self._compress is not None: 

1194 if chunk: 

1195 chunk = await self._compress.compress(chunk) 

1196 if not chunk: 

1197 return 

1198 

1199 if self._encoding == "base64": 

1200 buf = self._encoding_buffer 

1201 assert buf is not None 

1202 buf.extend(chunk) 

1203 

1204 if buf: 

1205 div, mod = divmod(len(buf), 3) 

1206 enc_chunk, self._encoding_buffer = (buf[: div * 3], buf[div * 3 :]) 

1207 if enc_chunk: 

1208 b64chunk = base64.b64encode(enc_chunk) 

1209 await self._writer.write(b64chunk) 

1210 elif self._encoding == "quoted-printable": 

1211 await self._writer.write(binascii.b2a_qp(chunk)) 

1212 else: 

1213 await self._writer.write(chunk)