Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/multipart.py: 30%

593 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1import base64 

2import binascii 

3import json 

4import re 

5import uuid 

6import warnings 

7import zlib 

8from collections import deque 

9from types import TracebackType 

10from typing import ( 

11 TYPE_CHECKING, 

12 Any, 

13 AsyncIterator, 

14 Deque, 

15 Dict, 

16 Iterator, 

17 List, 

18 Mapping, 

19 Optional, 

20 Sequence, 

21 Tuple, 

22 Type, 

23 Union, 

24 cast, 

25) 

26from urllib.parse import parse_qsl, unquote, urlencode 

27 

28from multidict import CIMultiDict, CIMultiDictProxy, MultiMapping 

29 

30from .compression_utils import ZLibCompressor, ZLibDecompressor 

31from .hdrs import ( 

32 CONTENT_DISPOSITION, 

33 CONTENT_ENCODING, 

34 CONTENT_LENGTH, 

35 CONTENT_TRANSFER_ENCODING, 

36 CONTENT_TYPE, 

37) 

38from .helpers import CHAR, TOKEN, parse_mimetype, reify 

39from .http import HeadersParser 

40from .payload import ( 

41 JsonPayload, 

42 LookupError, 

43 Order, 

44 Payload, 

45 StringPayload, 

46 get_payload, 

47 payload_type, 

48) 

49from .streams import StreamReader 

50 

51__all__ = ( 

52 "MultipartReader", 

53 "MultipartWriter", 

54 "BodyPartReader", 

55 "BadContentDispositionHeader", 

56 "BadContentDispositionParam", 

57 "parse_content_disposition", 

58 "content_disposition_filename", 

59) 

60 

61 

62if TYPE_CHECKING: # pragma: no cover 

63 from .client_reqrep import ClientResponse 

64 

65 

66class BadContentDispositionHeader(RuntimeWarning): 

67 pass 

68 

69 

70class BadContentDispositionParam(RuntimeWarning): 

71 pass 

72 

73 

74def parse_content_disposition( 

75 header: Optional[str], 

76) -> Tuple[Optional[str], Dict[str, str]]: 

77 def is_token(string: str) -> bool: 

78 return bool(string) and TOKEN >= set(string) 

79 

80 def is_quoted(string: str) -> bool: 

81 return string[0] == string[-1] == '"' 

82 

83 def is_rfc5987(string: str) -> bool: 

84 return is_token(string) and string.count("'") == 2 

85 

86 def is_extended_param(string: str) -> bool: 

87 return string.endswith("*") 

88 

89 def is_continuous_param(string: str) -> bool: 

90 pos = string.find("*") + 1 

91 if not pos: 

92 return False 

93 substring = string[pos:-1] if string.endswith("*") else string[pos:] 

94 return substring.isdigit() 

95 

96 def unescape(text: str, *, chars: str = "".join(map(re.escape, CHAR))) -> str: 

97 return re.sub(f"\\\\([{chars}])", "\\1", text) 

98 

99 if not header: 

100 return None, {} 

101 

102 disptype, *parts = header.split(";") 

103 if not is_token(disptype): 

104 warnings.warn(BadContentDispositionHeader(header)) 

105 return None, {} 

106 

107 params: Dict[str, str] = {} 

108 while parts: 

109 item = parts.pop(0) 

110 

111 if "=" not in item: 

112 warnings.warn(BadContentDispositionHeader(header)) 

113 return None, {} 

114 

115 key, value = item.split("=", 1) 

116 key = key.lower().strip() 

117 value = value.lstrip() 

118 

119 if key in params: 

120 warnings.warn(BadContentDispositionHeader(header)) 

121 return None, {} 

122 

123 if not is_token(key): 

124 warnings.warn(BadContentDispositionParam(item)) 

125 continue 

126 

127 elif is_continuous_param(key): 

128 if is_quoted(value): 

129 value = unescape(value[1:-1]) 

130 elif not is_token(value): 

131 warnings.warn(BadContentDispositionParam(item)) 

132 continue 

133 

134 elif is_extended_param(key): 

135 if is_rfc5987(value): 

136 encoding, _, value = value.split("'", 2) 

137 encoding = encoding or "utf-8" 

138 else: 

139 warnings.warn(BadContentDispositionParam(item)) 

140 continue 

141 

142 try: 

143 value = unquote(value, encoding, "strict") 

144 except UnicodeDecodeError: # pragma: nocover 

145 warnings.warn(BadContentDispositionParam(item)) 

146 continue 

147 

148 else: 

149 failed = True 

150 if is_quoted(value): 

151 failed = False 

152 value = unescape(value[1:-1].lstrip("\\/")) 

153 elif is_token(value): 

154 failed = False 

155 elif parts: 

156 # maybe just ; in filename, in any case this is just 

157 # one case fix, for proper fix we need to redesign parser 

158 _value = f"{value};{parts[0]}" 

159 if is_quoted(_value): 

160 parts.pop(0) 

161 value = unescape(_value[1:-1].lstrip("\\/")) 

162 failed = False 

163 

164 if failed: 

165 warnings.warn(BadContentDispositionHeader(header)) 

166 return None, {} 

167 

168 params[key] = value 

169 

170 return disptype.lower(), params 

171 

172 

173def content_disposition_filename( 

174 params: Mapping[str, str], name: str = "filename" 

175) -> Optional[str]: 

176 name_suf = "%s*" % name 

177 if not params: 

178 return None 

179 elif name_suf in params: 

180 return params[name_suf] 

181 elif name in params: 

182 return params[name] 

183 else: 

184 parts = [] 

185 fnparams = sorted( 

186 (key, value) for key, value in params.items() if key.startswith(name_suf) 

187 ) 

188 for num, (key, value) in enumerate(fnparams): 

189 _, tail = key.split("*", 1) 

190 if tail.endswith("*"): 

191 tail = tail[:-1] 

192 if tail == str(num): 

193 parts.append(value) 

194 else: 

195 break 

196 if not parts: 

197 return None 

198 value = "".join(parts) 

199 if "'" in value: 

200 encoding, _, value = value.split("'", 2) 

201 encoding = encoding or "utf-8" 

202 return unquote(value, encoding, "strict") 

203 return value 

204 

205 

206class MultipartResponseWrapper: 

207 """Wrapper around the MultipartReader. 

208 

209 It takes care about 

210 underlying connection and close it when it needs in. 

211 """ 

212 

213 def __init__( 

214 self, 

215 resp: "ClientResponse", 

216 stream: "MultipartReader", 

217 ) -> None: 

218 self.resp = resp 

219 self.stream = stream 

220 

221 def __aiter__(self) -> "MultipartResponseWrapper": 

222 return self 

223 

224 async def __anext__( 

225 self, 

226 ) -> Union["MultipartReader", "BodyPartReader"]: 

227 part = await self.next() 

228 if part is None: 

229 raise StopAsyncIteration 

230 return part 

231 

232 def at_eof(self) -> bool: 

233 """Returns True when all response data had been read.""" 

234 return self.resp.content.at_eof() 

235 

236 async def next( 

237 self, 

238 ) -> Optional[Union["MultipartReader", "BodyPartReader"]]: 

239 """Emits next multipart reader object.""" 

240 item = await self.stream.next() 

241 if self.stream.at_eof(): 

242 await self.release() 

243 return item 

244 

245 async def release(self) -> None: 

246 """Release the connection gracefully. 

247 

248 All remaining content is read to the void. 

249 """ 

250 await self.resp.release() 

251 

252 

253class BodyPartReader: 

254 """Multipart reader for single body part.""" 

255 

256 chunk_size = 8192 

257 

258 def __init__( 

259 self, 

260 boundary: bytes, 

261 headers: "CIMultiDictProxy[str]", 

262 content: StreamReader, 

263 *, 

264 _newline: bytes = b"\r\n", 

265 ) -> None: 

266 self.headers = headers 

267 self._boundary = boundary 

268 self._newline = _newline 

269 self._content = content 

270 self._at_eof = False 

271 length = self.headers.get(CONTENT_LENGTH, None) 

272 self._length = int(length) if length is not None else None 

273 self._read_bytes = 0 

274 self._unread: Deque[bytes] = deque() 

275 self._prev_chunk: Optional[bytes] = None 

276 self._content_eof = 0 

277 self._cache: Dict[str, Any] = {} 

278 

279 def __aiter__(self) -> AsyncIterator["BodyPartReader"]: 

280 return self # type: ignore[return-value] 

281 

282 async def __anext__(self) -> bytes: 

283 part = await self.next() 

284 if part is None: 

285 raise StopAsyncIteration 

286 return part 

287 

288 async def next(self) -> Optional[bytes]: 

289 item = await self.read() 

290 if not item: 

291 return None 

292 return item 

293 

294 async def read(self, *, decode: bool = False) -> bytes: 

295 """Reads body part data. 

296 

297 decode: Decodes data following by encoding 

298 method from Content-Encoding header. If it missed 

299 data remains untouched 

300 """ 

301 if self._at_eof: 

302 return b"" 

303 data = bytearray() 

304 while not self._at_eof: 

305 data.extend(await self.read_chunk(self.chunk_size)) 

306 if decode: 

307 return self.decode(data) 

308 return data 

309 

310 async def read_chunk(self, size: int = chunk_size) -> bytes: 

311 """Reads body part content chunk of the specified size. 

312 

313 size: chunk size 

314 """ 

315 if self._at_eof: 

316 return b"" 

317 if self._length: 

318 chunk = await self._read_chunk_from_length(size) 

319 else: 

320 chunk = await self._read_chunk_from_stream(size) 

321 

322 # For the case of base64 data, we must read a fragment of size with a 

323 # remainder of 0 by dividing by 4 for string without symbols \n or \r 

324 encoding = self.headers.get(CONTENT_TRANSFER_ENCODING) 

325 if encoding and encoding.lower() == "base64": 

326 stripped_chunk = b"".join(chunk.split()) 

327 remainder = len(stripped_chunk) % 4 

328 

329 while remainder != 0 and not self.at_eof(): 

330 over_chunk_size = 4 - remainder 

331 over_chunk = b"" 

332 

333 if self._prev_chunk: 

334 over_chunk = self._prev_chunk[:over_chunk_size] 

335 self._prev_chunk = self._prev_chunk[len(over_chunk) :] 

336 

337 if len(over_chunk) != over_chunk_size: 

338 over_chunk += await self._content.read(4 - len(over_chunk)) 

339 

340 if not over_chunk: 

341 self._at_eof = True 

342 

343 stripped_chunk += b"".join(over_chunk.split()) 

344 chunk += over_chunk 

345 remainder = len(stripped_chunk) % 4 

346 

347 self._read_bytes += len(chunk) 

348 if self._read_bytes == self._length: 

349 self._at_eof = True 

350 if self._at_eof: 

351 newline = await self._content.readline() 

352 assert ( 

353 newline == self._newline 

354 ), "reader did not read all the data or it is malformed" 

355 return chunk 

356 

357 async def _read_chunk_from_length(self, size: int) -> bytes: 

358 # Reads body part content chunk of the specified size. 

359 # The body part must has Content-Length header with proper value. 

360 assert self._length is not None, "Content-Length required for chunked read" 

361 chunk_size = min(size, self._length - self._read_bytes) 

362 chunk = await self._content.read(chunk_size) 

363 return chunk 

364 

365 async def _read_chunk_from_stream(self, size: int) -> bytes: 

366 # Reads content chunk of body part with unknown length. 

367 # The Content-Length header for body part is not necessary. 

368 assert ( 

369 size >= len(self._boundary) + 2 

370 ), "Chunk size must be greater or equal than boundary length + 2" 

371 first_chunk = self._prev_chunk is None 

372 if first_chunk: 

373 self._prev_chunk = await self._content.read(size) 

374 

375 chunk = await self._content.read(size) 

376 self._content_eof += int(self._content.at_eof()) 

377 assert self._content_eof < 3, "Reading after EOF" 

378 assert self._prev_chunk is not None 

379 window = self._prev_chunk + chunk 

380 

381 intermeditate_boundary = self._newline + self._boundary 

382 

383 if first_chunk: 

384 pos = 0 

385 else: 

386 pos = max(0, len(self._prev_chunk) - len(intermeditate_boundary)) 

387 

388 idx = window.find(intermeditate_boundary, pos) 

389 if idx >= 0: 

390 # pushing boundary back to content 

391 with warnings.catch_warnings(): 

392 warnings.filterwarnings("ignore", category=DeprecationWarning) 

393 self._content.unread_data(window[idx:]) 

394 if size > idx: 

395 self._prev_chunk = self._prev_chunk[:idx] 

396 chunk = window[len(self._prev_chunk) : idx] 

397 if not chunk: 

398 self._at_eof = True 

399 

400 result = self._prev_chunk 

401 self._prev_chunk = chunk 

402 return result 

403 

404 async def readline(self) -> bytes: 

405 """Reads body part by line by line.""" 

406 if self._at_eof: 

407 return b"" 

408 

409 if self._unread: 

410 line = self._unread.popleft() 

411 else: 

412 line = await self._content.readline() 

413 

414 if line.startswith(self._boundary): 

415 # the very last boundary may not come with \r\n, 

416 # so set single rules for everyone 

417 sline = line.rstrip(b"\r\n") 

418 boundary = self._boundary 

419 last_boundary = self._boundary + b"--" 

420 # ensure that we read exactly the boundary, not something alike 

421 if sline == boundary or sline == last_boundary: 

422 self._at_eof = True 

423 self._unread.append(line) 

424 return b"" 

425 else: 

426 next_line = await self._content.readline() 

427 if next_line.startswith(self._boundary): 

428 # strip newline but only once 

429 line = line[: -len(self._newline)] 

430 self._unread.append(next_line) 

431 

432 return line 

433 

434 async def release(self) -> None: 

435 """Like read(), but reads all the data to the void.""" 

436 if self._at_eof: 

437 return 

438 while not self._at_eof: 

439 await self.read_chunk(self.chunk_size) 

440 

441 async def text(self, *, encoding: Optional[str] = None) -> str: 

442 """Like read(), but assumes that body part contains text data.""" 

443 data = await self.read(decode=True) 

444 # see https://www.w3.org/TR/html5/forms.html#multipart/form-data-encoding-algorithm # NOQA 

445 # and https://dvcs.w3.org/hg/xhr/raw-file/tip/Overview.html#dom-xmlhttprequest-send # NOQA 

446 encoding = encoding or self.get_charset(default="utf-8") 

447 return data.decode(encoding) 

448 

449 async def json(self, *, encoding: Optional[str] = None) -> Optional[Dict[str, Any]]: 

450 """Like read(), but assumes that body parts contains JSON data.""" 

451 data = await self.read(decode=True) 

452 if not data: 

453 return None 

454 encoding = encoding or self.get_charset(default="utf-8") 

455 return cast(Dict[str, Any], json.loads(data.decode(encoding))) 

456 

457 async def form(self, *, encoding: Optional[str] = None) -> List[Tuple[str, str]]: 

458 """Like read(), but assumes that body parts contain form urlencoded data.""" 

459 data = await self.read(decode=True) 

460 if not data: 

461 return [] 

462 if encoding is not None: 

463 real_encoding = encoding 

464 else: 

465 real_encoding = self.get_charset(default="utf-8") 

466 try: 

467 decoded_data = data.rstrip().decode(real_encoding) 

468 except UnicodeDecodeError: 

469 raise ValueError("data cannot be decoded with %s encoding" % real_encoding) 

470 

471 return parse_qsl( 

472 decoded_data, 

473 keep_blank_values=True, 

474 encoding=real_encoding, 

475 ) 

476 

477 def at_eof(self) -> bool: 

478 """Returns True if the boundary was reached or False otherwise.""" 

479 return self._at_eof 

480 

481 def decode(self, data: bytes) -> bytes: 

482 """Decodes data. 

483 

484 Decoding is done according the specified Content-Encoding 

485 or Content-Transfer-Encoding headers value. 

486 """ 

487 if CONTENT_TRANSFER_ENCODING in self.headers: 

488 data = self._decode_content_transfer(data) 

489 if CONTENT_ENCODING in self.headers: 

490 return self._decode_content(data) 

491 return data 

492 

493 def _decode_content(self, data: bytes) -> bytes: 

494 encoding = self.headers.get(CONTENT_ENCODING, "").lower() 

495 if encoding == "identity": 

496 return data 

497 if encoding in {"deflate", "gzip"}: 

498 return ZLibDecompressor( 

499 encoding=encoding, 

500 suppress_deflate_header=True, 

501 ).decompress_sync(data) 

502 

503 raise RuntimeError(f"unknown content encoding: {encoding}") 

504 

505 def _decode_content_transfer(self, data: bytes) -> bytes: 

506 encoding = self.headers.get(CONTENT_TRANSFER_ENCODING, "").lower() 

507 

508 if encoding == "base64": 

509 return base64.b64decode(data) 

510 elif encoding == "quoted-printable": 

511 return binascii.a2b_qp(data) 

512 elif encoding in ("binary", "8bit", "7bit"): 

513 return data 

514 else: 

515 raise RuntimeError( 

516 "unknown content transfer encoding: {}" "".format(encoding) 

517 ) 

518 

519 def get_charset(self, default: str) -> str: 

520 """Returns charset parameter from Content-Type header or default.""" 

521 ctype = self.headers.get(CONTENT_TYPE, "") 

522 mimetype = parse_mimetype(ctype) 

523 return mimetype.parameters.get("charset", default) 

524 

525 @reify 

526 def name(self) -> Optional[str]: 

527 """Returns name specified in Content-Disposition header. 

528 

529 If the header is missing or malformed, returns None. 

530 """ 

531 _, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION)) 

532 return content_disposition_filename(params, "name") 

533 

534 @reify 

535 def filename(self) -> Optional[str]: 

536 """Returns filename specified in Content-Disposition header. 

537 

538 Returns None if the header is missing or malformed. 

539 """ 

540 _, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION)) 

541 return content_disposition_filename(params, "filename") 

542 

543 

544@payload_type(BodyPartReader, order=Order.try_first) 

545class BodyPartReaderPayload(Payload): 

546 def __init__(self, value: BodyPartReader, *args: Any, **kwargs: Any) -> None: 

547 super().__init__(value, *args, **kwargs) 

548 

549 params: Dict[str, str] = {} 

550 if value.name is not None: 

551 params["name"] = value.name 

552 if value.filename is not None: 

553 params["filename"] = value.filename 

554 

555 if params: 

556 self.set_content_disposition("attachment", True, **params) 

557 

558 async def write(self, writer: Any) -> None: 

559 field = self._value 

560 chunk = await field.read_chunk(size=2**16) 

561 while chunk: 

562 await writer.write(field.decode(chunk)) 

563 chunk = await field.read_chunk(size=2**16) 

564 

565 

566class MultipartReader: 

567 """Multipart body reader.""" 

568 

569 #: Response wrapper, used when multipart readers constructs from response. 

570 response_wrapper_cls = MultipartResponseWrapper 

571 #: Multipart reader class, used to handle multipart/* body parts. 

572 #: None points to type(self) 

573 multipart_reader_cls = None 

574 #: Body part reader class for non multipart/* content types. 

575 part_reader_cls = BodyPartReader 

576 

577 def __init__( 

578 self, 

579 headers: Mapping[str, str], 

580 content: StreamReader, 

581 *, 

582 _newline: bytes = b"\r\n", 

583 ) -> None: 

584 self.headers = headers 

585 self._boundary = ("--" + self._get_boundary()).encode() 

586 self._newline = _newline 

587 self._content = content 

588 self._last_part: Optional[Union["MultipartReader", BodyPartReader]] = None 

589 self._at_eof = False 

590 self._at_bof = True 

591 self._unread: List[bytes] = [] 

592 

593 def __aiter__( 

594 self, 

595 ) -> AsyncIterator["BodyPartReader"]: 

596 return self # type: ignore[return-value] 

597 

598 async def __anext__( 

599 self, 

600 ) -> Optional[Union["MultipartReader", BodyPartReader]]: 

601 part = await self.next() 

602 if part is None: 

603 raise StopAsyncIteration 

604 return part 

605 

606 @classmethod 

607 def from_response( 

608 cls, 

609 response: "ClientResponse", 

610 ) -> MultipartResponseWrapper: 

611 """Constructs reader instance from HTTP response. 

612 

613 :param response: :class:`~aiohttp.client.ClientResponse` instance 

614 """ 

615 obj = cls.response_wrapper_cls( 

616 response, cls(response.headers, response.content) 

617 ) 

618 return obj 

619 

620 def at_eof(self) -> bool: 

621 """Returns True if the final boundary was reached, false otherwise.""" 

622 return self._at_eof 

623 

624 async def next( 

625 self, 

626 ) -> Optional[Union["MultipartReader", BodyPartReader]]: 

627 """Emits the next multipart body part.""" 

628 # So, if we're at BOF, we need to skip till the boundary. 

629 if self._at_eof: 

630 return None 

631 await self._maybe_release_last_part() 

632 if self._at_bof: 

633 await self._read_until_first_boundary() 

634 self._at_bof = False 

635 else: 

636 await self._read_boundary() 

637 if self._at_eof: # we just read the last boundary, nothing to do there 

638 return None 

639 self._last_part = await self.fetch_next_part() 

640 return self._last_part 

641 

642 async def release(self) -> None: 

643 """Reads all the body parts to the void till the final boundary.""" 

644 while not self._at_eof: 

645 item = await self.next() 

646 if item is None: 

647 break 

648 await item.release() 

649 

650 async def fetch_next_part( 

651 self, 

652 ) -> Union["MultipartReader", BodyPartReader]: 

653 """Returns the next body part reader.""" 

654 headers = await self._read_headers() 

655 return self._get_part_reader(headers) 

656 

657 def _get_part_reader( 

658 self, 

659 headers: "CIMultiDictProxy[str]", 

660 ) -> Union["MultipartReader", BodyPartReader]: 

661 """Dispatches the response by the `Content-Type` header. 

662 

663 Returns a suitable reader instance. 

664 

665 :param dict headers: Response headers 

666 """ 

667 ctype = headers.get(CONTENT_TYPE, "") 

668 mimetype = parse_mimetype(ctype) 

669 

670 if mimetype.type == "multipart": 

671 if self.multipart_reader_cls is None: 

672 return type(self)(headers, self._content) 

673 return self.multipart_reader_cls( 

674 headers, self._content, _newline=self._newline 

675 ) 

676 else: 

677 return self.part_reader_cls( 

678 self._boundary, headers, self._content, _newline=self._newline 

679 ) 

680 

681 def _get_boundary(self) -> str: 

682 mimetype = parse_mimetype(self.headers[CONTENT_TYPE]) 

683 

684 assert mimetype.type == "multipart", "multipart/* content type expected" 

685 

686 if "boundary" not in mimetype.parameters: 

687 raise ValueError( 

688 "boundary missed for Content-Type: %s" % self.headers[CONTENT_TYPE] 

689 ) 

690 

691 boundary = mimetype.parameters["boundary"] 

692 if len(boundary) > 70: 

693 raise ValueError("boundary %r is too long (70 chars max)" % boundary) 

694 

695 return boundary 

696 

697 async def _readline(self) -> bytes: 

698 if self._unread: 

699 return self._unread.pop() 

700 return await self._content.readline() 

701 

702 async def _read_until_first_boundary(self) -> None: 

703 while True: 

704 chunk = await self._readline() 

705 if chunk == b"": 

706 raise ValueError( 

707 "Could not find starting boundary %r" % (self._boundary) 

708 ) 

709 newline = None 

710 end_boundary = self._boundary + b"--" 

711 if chunk.startswith(end_boundary): 

712 _, newline = chunk.split(end_boundary, 1) 

713 elif chunk.startswith(self._boundary): 

714 _, newline = chunk.split(self._boundary, 1) 

715 if newline is not None: 

716 assert newline in (b"\r\n", b"\n"), (newline, chunk, self._boundary) 

717 self._newline = newline 

718 

719 chunk = chunk.rstrip() 

720 if chunk == self._boundary: 

721 return 

722 elif chunk == end_boundary: 

723 self._at_eof = True 

724 return 

725 

726 async def _read_boundary(self) -> None: 

727 chunk = (await self._readline()).rstrip() 

728 if chunk == self._boundary: 

729 pass 

730 elif chunk == self._boundary + b"--": 

731 self._at_eof = True 

732 epilogue = await self._readline() 

733 next_line = await self._readline() 

734 

735 # the epilogue is expected and then either the end of input or the 

736 # parent multipart boundary, if the parent boundary is found then 

737 # it should be marked as unread and handed to the parent for 

738 # processing 

739 if next_line[:2] == b"--": 

740 self._unread.append(next_line) 

741 # otherwise the request is likely missing an epilogue and both 

742 # lines should be passed to the parent for processing 

743 # (this handles the old behavior gracefully) 

744 else: 

745 self._unread.extend([next_line, epilogue]) 

746 else: 

747 raise ValueError(f"Invalid boundary {chunk!r}, expected {self._boundary!r}") 

748 

749 async def _read_headers(self) -> "CIMultiDictProxy[str]": 

750 lines = [b""] 

751 while True: 

752 chunk = await self._content.readline() 

753 chunk = chunk.strip() 

754 lines.append(chunk) 

755 if not chunk: 

756 break 

757 parser = HeadersParser() 

758 headers, raw_headers = parser.parse_headers(lines) 

759 return headers 

760 

761 async def _maybe_release_last_part(self) -> None: 

762 """Ensures that the last read body part is read completely.""" 

763 if self._last_part is not None: 

764 if not self._last_part.at_eof(): 

765 await self._last_part.release() 

766 self._unread.extend(self._last_part._unread) 

767 self._last_part = None 

768 

769 

770_Part = Tuple[Payload, str, str] 

771 

772 

773class MultipartWriter(Payload): 

774 """Multipart body writer.""" 

775 

776 def __init__(self, subtype: str = "mixed", boundary: Optional[str] = None) -> None: 

777 boundary = boundary if boundary is not None else uuid.uuid4().hex 

778 # The underlying Payload API demands a str (utf-8), not bytes, 

779 # so we need to ensure we don't lose anything during conversion. 

780 # As a result, require the boundary to be ASCII only. 

781 # In both situations. 

782 

783 try: 

784 self._boundary = boundary.encode("ascii") 

785 except UnicodeEncodeError: 

786 raise ValueError("boundary should contain ASCII only chars") from None 

787 

788 if len(boundary) > 70: 

789 raise ValueError("boundary %r is too long (70 chars max)" % boundary) 

790 

791 ctype = f"multipart/{subtype}; boundary={self._boundary_value}" 

792 

793 super().__init__(None, content_type=ctype) 

794 

795 self._parts: List[_Part] = [] 

796 

797 def __enter__(self) -> "MultipartWriter": 

798 return self 

799 

800 def __exit__( 

801 self, 

802 exc_type: Optional[Type[BaseException]], 

803 exc_val: Optional[BaseException], 

804 exc_tb: Optional[TracebackType], 

805 ) -> None: 

806 pass 

807 

808 def __iter__(self) -> Iterator[_Part]: 

809 return iter(self._parts) 

810 

811 def __len__(self) -> int: 

812 return len(self._parts) 

813 

814 def __bool__(self) -> bool: 

815 return True 

816 

817 _valid_tchar_regex = re.compile(rb"\A[!#$%&'*+\-.^_`|~\w]+\Z") 

818 _invalid_qdtext_char_regex = re.compile(rb"[\x00-\x08\x0A-\x1F\x7F]") 

819 

820 @property 

821 def _boundary_value(self) -> str: 

822 """Wrap boundary parameter value in quotes, if necessary. 

823 

824 Reads self.boundary and returns a unicode string. 

825 """ 

826 # Refer to RFCs 7231, 7230, 5234. 

827 # 

828 # parameter = token "=" ( token / quoted-string ) 

829 # token = 1*tchar 

830 # quoted-string = DQUOTE *( qdtext / quoted-pair ) DQUOTE 

831 # qdtext = HTAB / SP / %x21 / %x23-5B / %x5D-7E / obs-text 

832 # obs-text = %x80-FF 

833 # quoted-pair = "\" ( HTAB / SP / VCHAR / obs-text ) 

834 # tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*" 

835 # / "+" / "-" / "." / "^" / "_" / "`" / "|" / "~" 

836 # / DIGIT / ALPHA 

837 # ; any VCHAR, except delimiters 

838 # VCHAR = %x21-7E 

839 value = self._boundary 

840 if re.match(self._valid_tchar_regex, value): 

841 return value.decode("ascii") # cannot fail 

842 

843 if re.search(self._invalid_qdtext_char_regex, value): 

844 raise ValueError("boundary value contains invalid characters") 

845 

846 # escape %x5C and %x22 

847 quoted_value_content = value.replace(b"\\", b"\\\\") 

848 quoted_value_content = quoted_value_content.replace(b'"', b'\\"') 

849 

850 return '"' + quoted_value_content.decode("ascii") + '"' 

851 

852 @property 

853 def boundary(self) -> str: 

854 return self._boundary.decode("ascii") 

855 

856 def append(self, obj: Any, headers: Optional[MultiMapping[str]] = None) -> Payload: 

857 if headers is None: 

858 headers = CIMultiDict() 

859 

860 if isinstance(obj, Payload): 

861 obj.headers.update(headers) 

862 return self.append_payload(obj) 

863 else: 

864 try: 

865 payload = get_payload(obj, headers=headers) 

866 except LookupError: 

867 raise TypeError("Cannot create payload from %r" % obj) 

868 else: 

869 return self.append_payload(payload) 

870 

871 def append_payload(self, payload: Payload) -> Payload: 

872 """Adds a new body part to multipart writer.""" 

873 # compression 

874 encoding: Optional[str] = payload.headers.get( 

875 CONTENT_ENCODING, 

876 "", 

877 ).lower() 

878 if encoding and encoding not in ("deflate", "gzip", "identity"): 

879 raise RuntimeError(f"unknown content encoding: {encoding}") 

880 if encoding == "identity": 

881 encoding = None 

882 

883 # te encoding 

884 te_encoding: Optional[str] = payload.headers.get( 

885 CONTENT_TRANSFER_ENCODING, 

886 "", 

887 ).lower() 

888 if te_encoding not in ("", "base64", "quoted-printable", "binary"): 

889 raise RuntimeError( 

890 "unknown content transfer encoding: {}" "".format(te_encoding) 

891 ) 

892 if te_encoding == "binary": 

893 te_encoding = None 

894 

895 # size 

896 size = payload.size 

897 if size is not None and not (encoding or te_encoding): 

898 payload.headers[CONTENT_LENGTH] = str(size) 

899 

900 self._parts.append((payload, encoding, te_encoding)) # type: ignore[arg-type] 

901 return payload 

902 

903 def append_json( 

904 self, obj: Any, headers: Optional[MultiMapping[str]] = None 

905 ) -> Payload: 

906 """Helper to append JSON part.""" 

907 if headers is None: 

908 headers = CIMultiDict() 

909 

910 return self.append_payload(JsonPayload(obj, headers=headers)) 

911 

912 def append_form( 

913 self, 

914 obj: Union[Sequence[Tuple[str, str]], Mapping[str, str]], 

915 headers: Optional[MultiMapping[str]] = None, 

916 ) -> Payload: 

917 """Helper to append form urlencoded part.""" 

918 assert isinstance(obj, (Sequence, Mapping)) 

919 

920 if headers is None: 

921 headers = CIMultiDict() 

922 

923 if isinstance(obj, Mapping): 

924 obj = list(obj.items()) 

925 data = urlencode(obj, doseq=True) 

926 

927 return self.append_payload( 

928 StringPayload( 

929 data, headers=headers, content_type="application/x-www-form-urlencoded" 

930 ) 

931 ) 

932 

933 @property 

934 def size(self) -> Optional[int]: 

935 """Size of the payload.""" 

936 total = 0 

937 for part, encoding, te_encoding in self._parts: 

938 if encoding or te_encoding or part.size is None: 

939 return None 

940 

941 total += int( 

942 2 

943 + len(self._boundary) 

944 + 2 

945 + part.size # b'--'+self._boundary+b'\r\n' 

946 + len(part._binary_headers) 

947 + 2 # b'\r\n' 

948 ) 

949 

950 total += 2 + len(self._boundary) + 4 # b'--'+self._boundary+b'--\r\n' 

951 return total 

952 

953 async def write(self, writer: Any, close_boundary: bool = True) -> None: 

954 """Write body.""" 

955 for part, encoding, te_encoding in self._parts: 

956 await writer.write(b"--" + self._boundary + b"\r\n") 

957 await writer.write(part._binary_headers) 

958 

959 if encoding or te_encoding: 

960 w = MultipartPayloadWriter(writer) 

961 if encoding: 

962 w.enable_compression(encoding) 

963 if te_encoding: 

964 w.enable_encoding(te_encoding) 

965 await part.write(w) # type: ignore[arg-type] 

966 await w.write_eof() 

967 else: 

968 await part.write(writer) 

969 

970 await writer.write(b"\r\n") 

971 

972 if close_boundary: 

973 await writer.write(b"--" + self._boundary + b"--\r\n") 

974 

975 

976class MultipartPayloadWriter: 

977 def __init__(self, writer: Any) -> None: 

978 self._writer = writer 

979 self._encoding: Optional[str] = None 

980 self._compress: Optional[ZLibCompressor] = None 

981 self._encoding_buffer: Optional[bytearray] = None 

982 

983 def enable_encoding(self, encoding: str) -> None: 

984 if encoding == "base64": 

985 self._encoding = encoding 

986 self._encoding_buffer = bytearray() 

987 elif encoding == "quoted-printable": 

988 self._encoding = "quoted-printable" 

989 

990 def enable_compression( 

991 self, encoding: str = "deflate", strategy: int = zlib.Z_DEFAULT_STRATEGY 

992 ) -> None: 

993 self._compress = ZLibCompressor( 

994 encoding=encoding, 

995 suppress_deflate_header=True, 

996 strategy=strategy, 

997 ) 

998 

999 async def write_eof(self) -> None: 

1000 if self._compress is not None: 

1001 chunk = self._compress.flush() 

1002 if chunk: 

1003 self._compress = None 

1004 await self.write(chunk) 

1005 

1006 if self._encoding == "base64": 

1007 if self._encoding_buffer: 

1008 await self._writer.write(base64.b64encode(self._encoding_buffer)) 

1009 

1010 async def write(self, chunk: bytes) -> None: 

1011 if self._compress is not None: 

1012 if chunk: 

1013 chunk = await self._compress.compress(chunk) 

1014 if not chunk: 

1015 return 

1016 

1017 if self._encoding == "base64": 

1018 buf = self._encoding_buffer 

1019 assert buf is not None 

1020 buf.extend(chunk) 

1021 

1022 if buf: 

1023 div, mod = divmod(len(buf), 3) 

1024 enc_chunk, self._encoding_buffer = (buf[: div * 3], buf[div * 3 :]) 

1025 if enc_chunk: 

1026 b64chunk = base64.b64encode(enc_chunk) 

1027 await self._writer.write(b64chunk) 

1028 elif self._encoding == "quoted-printable": 

1029 await self._writer.write(binascii.b2a_qp(chunk)) 

1030 else: 

1031 await self._writer.write(chunk)