Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/payload.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

310 statements  

1import asyncio 

2import enum 

3import io 

4import json 

5import mimetypes 

6import os 

7import sys 

8import warnings 

9from abc import ABC, abstractmethod 

10from itertools import chain 

11from typing import ( 

12 IO, 

13 TYPE_CHECKING, 

14 Any, 

15 Dict, 

16 Final, 

17 Iterable, 

18 Optional, 

19 Set, 

20 TextIO, 

21 Tuple, 

22 Type, 

23 Union, 

24) 

25 

26from multidict import CIMultiDict 

27 

28from . import hdrs 

29from .abc import AbstractStreamWriter 

30from .helpers import ( 

31 _SENTINEL, 

32 content_disposition_header, 

33 guess_filename, 

34 parse_mimetype, 

35 sentinel, 

36) 

37from .streams import StreamReader 

38from .typedefs import JSONEncoder, _CIMultiDict 

39 

40__all__ = ( 

41 "PAYLOAD_REGISTRY", 

42 "get_payload", 

43 "payload_type", 

44 "Payload", 

45 "BytesPayload", 

46 "StringPayload", 

47 "IOBasePayload", 

48 "BytesIOPayload", 

49 "BufferedReaderPayload", 

50 "TextIOPayload", 

51 "StringIOPayload", 

52 "JsonPayload", 

53 "AsyncIterablePayload", 

54) 

55 

56TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB 

57READ_SIZE: Final[int] = 2**16 # 64 KB 

58_CLOSE_FUTURES: Set[asyncio.Future[None]] = set() 

59 

60 

61if TYPE_CHECKING: 

62 from typing import List 

63 

64 

65class LookupError(Exception): 

66 pass 

67 

68 

69class Order(str, enum.Enum): 

70 normal = "normal" 

71 try_first = "try_first" 

72 try_last = "try_last" 

73 

74 

75def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload": 

76 return PAYLOAD_REGISTRY.get(data, *args, **kwargs) 

77 

78 

79def register_payload( 

80 factory: Type["Payload"], type: Any, *, order: Order = Order.normal 

81) -> None: 

82 PAYLOAD_REGISTRY.register(factory, type, order=order) 

83 

84 

85class payload_type: 

86 def __init__(self, type: Any, *, order: Order = Order.normal) -> None: 

87 self.type = type 

88 self.order = order 

89 

90 def __call__(self, factory: Type["Payload"]) -> Type["Payload"]: 

91 register_payload(factory, self.type, order=self.order) 

92 return factory 

93 

94 

95PayloadType = Type["Payload"] 

96_PayloadRegistryItem = Tuple[PayloadType, Any] 

97 

98 

99class PayloadRegistry: 

100 """Payload registry. 

101 

102 note: we need zope.interface for more efficient adapter search 

103 """ 

104 

105 __slots__ = ("_first", "_normal", "_last", "_normal_lookup") 

106 

107 def __init__(self) -> None: 

108 self._first: List[_PayloadRegistryItem] = [] 

109 self._normal: List[_PayloadRegistryItem] = [] 

110 self._last: List[_PayloadRegistryItem] = [] 

111 self._normal_lookup: Dict[Any, PayloadType] = {} 

112 

113 def get( 

114 self, 

115 data: Any, 

116 *args: Any, 

117 _CHAIN: "Type[chain[_PayloadRegistryItem]]" = chain, 

118 **kwargs: Any, 

119 ) -> "Payload": 

120 if self._first: 

121 for factory, type_ in self._first: 

122 if isinstance(data, type_): 

123 return factory(data, *args, **kwargs) 

124 # Try the fast lookup first 

125 if lookup_factory := self._normal_lookup.get(type(data)): 

126 return lookup_factory(data, *args, **kwargs) 

127 # Bail early if its already a Payload 

128 if isinstance(data, Payload): 

129 return data 

130 # Fallback to the slower linear search 

131 for factory, type_ in _CHAIN(self._normal, self._last): 

132 if isinstance(data, type_): 

133 return factory(data, *args, **kwargs) 

134 raise LookupError() 

135 

136 def register( 

137 self, factory: PayloadType, type: Any, *, order: Order = Order.normal 

138 ) -> None: 

139 if order is Order.try_first: 

140 self._first.append((factory, type)) 

141 elif order is Order.normal: 

142 self._normal.append((factory, type)) 

143 if isinstance(type, Iterable): 

144 for t in type: 

145 self._normal_lookup[t] = factory 

146 else: 

147 self._normal_lookup[type] = factory 

148 elif order is Order.try_last: 

149 self._last.append((factory, type)) 

150 else: 

151 raise ValueError(f"Unsupported order {order!r}") 

152 

153 

154class Payload(ABC): 

155 _default_content_type: str = "application/octet-stream" 

156 _size: Optional[int] = None 

157 

158 def __init__( 

159 self, 

160 value: Any, 

161 headers: Optional[ 

162 Union[_CIMultiDict, Dict[str, str], Iterable[Tuple[str, str]]] 

163 ] = None, 

164 content_type: Union[None, str, _SENTINEL] = sentinel, 

165 filename: Optional[str] = None, 

166 encoding: Optional[str] = None, 

167 **kwargs: Any, 

168 ) -> None: 

169 self._encoding = encoding 

170 self._filename = filename 

171 self._headers: _CIMultiDict = CIMultiDict() 

172 self._value = value 

173 if content_type is not sentinel and content_type is not None: 

174 assert isinstance(content_type, str) 

175 self._headers[hdrs.CONTENT_TYPE] = content_type 

176 elif self._filename is not None: 

177 if sys.version_info >= (3, 13): 

178 guesser = mimetypes.guess_file_type 

179 else: 

180 guesser = mimetypes.guess_type 

181 content_type = guesser(self._filename)[0] 

182 if content_type is None: 

183 content_type = self._default_content_type 

184 self._headers[hdrs.CONTENT_TYPE] = content_type 

185 else: 

186 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type 

187 if headers: 

188 self._headers.update(headers) 

189 

190 @property 

191 def size(self) -> Optional[int]: 

192 """Size of the payload.""" 

193 return self._size 

194 

195 @property 

196 def filename(self) -> Optional[str]: 

197 """Filename of the payload.""" 

198 return self._filename 

199 

200 @property 

201 def headers(self) -> _CIMultiDict: 

202 """Custom item headers""" 

203 return self._headers 

204 

205 @property 

206 def _binary_headers(self) -> bytes: 

207 return ( 

208 "".join([k + ": " + v + "\r\n" for k, v in self.headers.items()]).encode( 

209 "utf-8" 

210 ) 

211 + b"\r\n" 

212 ) 

213 

214 @property 

215 def encoding(self) -> Optional[str]: 

216 """Payload encoding""" 

217 return self._encoding 

218 

219 @property 

220 def content_type(self) -> str: 

221 """Content type""" 

222 return self._headers[hdrs.CONTENT_TYPE] 

223 

224 def set_content_disposition( 

225 self, 

226 disptype: str, 

227 quote_fields: bool = True, 

228 _charset: str = "utf-8", 

229 **params: str, 

230 ) -> None: 

231 """Sets ``Content-Disposition`` header.""" 

232 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header( 

233 disptype, quote_fields=quote_fields, _charset=_charset, params=params 

234 ) 

235 

236 @abstractmethod 

237 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

238 """Return string representation of the value. 

239 

240 This is named decode() to allow compatibility with bytes objects. 

241 """ 

242 

243 @abstractmethod 

244 async def write(self, writer: AbstractStreamWriter) -> None: 

245 """Write payload to the writer stream. 

246 

247 Args: 

248 writer: An AbstractStreamWriter instance that handles the actual writing 

249 

250 This is a legacy method that writes the entire payload without length constraints. 

251 

252 Important: 

253 For new implementations, use write_with_length() instead of this method. 

254 This method is maintained for backwards compatibility and will eventually 

255 delegate to write_with_length(writer, None) in all implementations. 

256 

257 All payload subclasses must override this method for backwards compatibility, 

258 but new code should use write_with_length for more flexibility and control. 

259 """ 

260 

261 # write_with_length is new in aiohttp 3.12 

262 # it should be overridden by subclasses 

263 async def write_with_length( 

264 self, writer: AbstractStreamWriter, content_length: Optional[int] 

265 ) -> None: 

266 """ 

267 Write payload with a specific content length constraint. 

268 

269 Args: 

270 writer: An AbstractStreamWriter instance that handles the actual writing 

271 content_length: Maximum number of bytes to write (None for unlimited) 

272 

273 This method allows writing payload content with a specific length constraint, 

274 which is particularly useful for HTTP responses with Content-Length header. 

275 

276 Note: 

277 This is the base implementation that provides backwards compatibility 

278 for subclasses that don't override this method. Specific payload types 

279 should override this method to implement proper length-constrained writing. 

280 

281 """ 

282 # Backwards compatibility for subclasses that don't override this method 

283 # and for the default implementation 

284 await self.write(writer) 

285 

286 

287class BytesPayload(Payload): 

288 _value: bytes 

289 

290 def __init__( 

291 self, value: Union[bytes, bytearray, memoryview], *args: Any, **kwargs: Any 

292 ) -> None: 

293 if "content_type" not in kwargs: 

294 kwargs["content_type"] = "application/octet-stream" 

295 

296 super().__init__(value, *args, **kwargs) 

297 

298 if isinstance(value, memoryview): 

299 self._size = value.nbytes 

300 elif isinstance(value, (bytes, bytearray)): 

301 self._size = len(value) 

302 else: 

303 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}") 

304 

305 if self._size > TOO_LARGE_BYTES_BODY: 

306 warnings.warn( 

307 "Sending a large body directly with raw bytes might" 

308 " lock the event loop. You should probably pass an " 

309 "io.BytesIO object instead", 

310 ResourceWarning, 

311 source=self, 

312 ) 

313 

314 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

315 return self._value.decode(encoding, errors) 

316 

317 async def write(self, writer: AbstractStreamWriter) -> None: 

318 """Write the entire bytes payload to the writer stream. 

319 

320 Args: 

321 writer: An AbstractStreamWriter instance that handles the actual writing 

322 

323 This method writes the entire bytes content without any length constraint. 

324 

325 Note: 

326 For new implementations that need length control, use write_with_length(). 

327 This method is maintained for backwards compatibility and is equivalent 

328 to write_with_length(writer, None). 

329 """ 

330 await writer.write(self._value) 

331 

332 async def write_with_length( 

333 self, writer: AbstractStreamWriter, content_length: Optional[int] 

334 ) -> None: 

335 """ 

336 Write bytes payload with a specific content length constraint. 

337 

338 Args: 

339 writer: An AbstractStreamWriter instance that handles the actual writing 

340 content_length: Maximum number of bytes to write (None for unlimited) 

341 

342 This method writes either the entire byte sequence or a slice of it 

343 up to the specified content_length. For BytesPayload, this operation 

344 is performed efficiently using array slicing. 

345 

346 """ 

347 if content_length is not None: 

348 await writer.write(self._value[:content_length]) 

349 else: 

350 await writer.write(self._value) 

351 

352 

353class StringPayload(BytesPayload): 

354 def __init__( 

355 self, 

356 value: str, 

357 *args: Any, 

358 encoding: Optional[str] = None, 

359 content_type: Optional[str] = None, 

360 **kwargs: Any, 

361 ) -> None: 

362 if encoding is None: 

363 if content_type is None: 

364 real_encoding = "utf-8" 

365 content_type = "text/plain; charset=utf-8" 

366 else: 

367 mimetype = parse_mimetype(content_type) 

368 real_encoding = mimetype.parameters.get("charset", "utf-8") 

369 else: 

370 if content_type is None: 

371 content_type = "text/plain; charset=%s" % encoding 

372 real_encoding = encoding 

373 

374 super().__init__( 

375 value.encode(real_encoding), 

376 encoding=real_encoding, 

377 content_type=content_type, 

378 *args, 

379 **kwargs, 

380 ) 

381 

382 

383class StringIOPayload(StringPayload): 

384 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None: 

385 super().__init__(value.read(), *args, **kwargs) 

386 

387 

388class IOBasePayload(Payload): 

389 _value: io.IOBase 

390 

391 def __init__( 

392 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any 

393 ) -> None: 

394 if "filename" not in kwargs: 

395 kwargs["filename"] = guess_filename(value) 

396 

397 super().__init__(value, *args, **kwargs) 

398 

399 if self._filename is not None and disposition is not None: 

400 if hdrs.CONTENT_DISPOSITION not in self.headers: 

401 self.set_content_disposition(disposition, filename=self._filename) 

402 

403 def _read_and_available_len( 

404 self, remaining_content_len: Optional[int] 

405 ) -> Tuple[Optional[int], bytes]: 

406 """ 

407 Read the file-like object and return both its total size and the first chunk. 

408 

409 Args: 

410 remaining_content_len: Optional limit on how many bytes to read in this operation. 

411 If None, READ_SIZE will be used as the default chunk size. 

412 

413 Returns: 

414 A tuple containing: 

415 - The total size of the remaining unread content (None if size cannot be determined) 

416 - The first chunk of bytes read from the file object 

417 

418 This method is optimized to perform both size calculation and initial read 

419 in a single operation, which is executed in a single executor job to minimize 

420 context switches and file operations when streaming content. 

421 

422 """ 

423 size = self.size # Call size only once since it does I/O 

424 return size, self._value.read( 

425 min(size or READ_SIZE, remaining_content_len or READ_SIZE) 

426 ) 

427 

428 def _read(self, remaining_content_len: Optional[int]) -> bytes: 

429 """ 

430 Read a chunk of data from the file-like object. 

431 

432 Args: 

433 remaining_content_len: Optional maximum number of bytes to read. 

434 If None, READ_SIZE will be used as the default chunk size. 

435 

436 Returns: 

437 A chunk of bytes read from the file object, respecting the 

438 remaining_content_len limit if specified. 

439 

440 This method is used for subsequent reads during streaming after 

441 the initial _read_and_available_len call has been made. 

442 

443 """ 

444 return self._value.read(remaining_content_len or READ_SIZE) # type: ignore[no-any-return] 

445 

446 @property 

447 def size(self) -> Optional[int]: 

448 try: 

449 return os.fstat(self._value.fileno()).st_size - self._value.tell() 

450 except (AttributeError, OSError): 

451 return None 

452 

453 async def write(self, writer: AbstractStreamWriter) -> None: 

454 """ 

455 Write the entire file-like payload to the writer stream. 

456 

457 Args: 

458 writer: An AbstractStreamWriter instance that handles the actual writing 

459 

460 This method writes the entire file content without any length constraint. 

461 It delegates to write_with_length() with no length limit for implementation 

462 consistency. 

463 

464 Note: 

465 For new implementations that need length control, use write_with_length() directly. 

466 This method is maintained for backwards compatibility with existing code. 

467 

468 """ 

469 await self.write_with_length(writer, None) 

470 

471 async def write_with_length( 

472 self, writer: AbstractStreamWriter, content_length: Optional[int] 

473 ) -> None: 

474 """ 

475 Write file-like payload with a specific content length constraint. 

476 

477 Args: 

478 writer: An AbstractStreamWriter instance that handles the actual writing 

479 content_length: Maximum number of bytes to write (None for unlimited) 

480 

481 This method implements optimized streaming of file content with length constraints: 

482 

483 1. File reading is performed in a thread pool to avoid blocking the event loop 

484 2. Content is read and written in chunks to maintain memory efficiency 

485 3. Writing stops when either: 

486 - All available file content has been written (when size is known) 

487 - The specified content_length has been reached 

488 4. File resources are properly closed even if the operation is cancelled 

489 

490 The implementation carefully handles both known-size and unknown-size payloads, 

491 as well as constrained and unconstrained content lengths. 

492 

493 """ 

494 loop = asyncio.get_running_loop() 

495 total_written_len = 0 

496 remaining_content_len = content_length 

497 

498 try: 

499 # Get initial data and available length 

500 available_len, chunk = await loop.run_in_executor( 

501 None, self._read_and_available_len, remaining_content_len 

502 ) 

503 # Process data chunks until done 

504 while chunk: 

505 chunk_len = len(chunk) 

506 

507 # Write data with or without length constraint 

508 if remaining_content_len is None: 

509 await writer.write(chunk) 

510 else: 

511 await writer.write(chunk[:remaining_content_len]) 

512 remaining_content_len -= chunk_len 

513 

514 total_written_len += chunk_len 

515 

516 # Check if we're done writing 

517 if self._should_stop_writing( 

518 available_len, total_written_len, remaining_content_len 

519 ): 

520 return 

521 

522 # Read next chunk 

523 chunk = await loop.run_in_executor( 

524 None, self._read, remaining_content_len 

525 ) 

526 finally: 

527 # Handle closing the file without awaiting to prevent cancellation issues 

528 # when the StreamReader reaches EOF 

529 self._schedule_file_close(loop) 

530 

531 def _should_stop_writing( 

532 self, 

533 available_len: Optional[int], 

534 total_written_len: int, 

535 remaining_content_len: Optional[int], 

536 ) -> bool: 

537 """ 

538 Determine if we should stop writing data. 

539 

540 Args: 

541 available_len: Known size of the payload if available (None if unknown) 

542 total_written_len: Number of bytes already written 

543 remaining_content_len: Remaining bytes to be written for content-length limited responses 

544 

545 Returns: 

546 True if we should stop writing data, based on either: 

547 - Having written all available data (when size is known) 

548 - Having written all requested content (when content-length is specified) 

549 

550 """ 

551 return (available_len is not None and total_written_len >= available_len) or ( 

552 remaining_content_len is not None and remaining_content_len <= 0 

553 ) 

554 

555 def _schedule_file_close(self, loop: asyncio.AbstractEventLoop) -> None: 

556 """Schedule file closing without awaiting to prevent cancellation issues.""" 

557 close_future = loop.run_in_executor(None, self._value.close) 

558 # Hold a strong reference to the future to prevent it from being 

559 # garbage collected before it completes. 

560 _CLOSE_FUTURES.add(close_future) 

561 close_future.add_done_callback(_CLOSE_FUTURES.remove) 

562 

563 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

564 return "".join(r.decode(encoding, errors) for r in self._value.readlines()) 

565 

566 

567class TextIOPayload(IOBasePayload): 

568 _value: io.TextIOBase 

569 

570 def __init__( 

571 self, 

572 value: TextIO, 

573 *args: Any, 

574 encoding: Optional[str] = None, 

575 content_type: Optional[str] = None, 

576 **kwargs: Any, 

577 ) -> None: 

578 if encoding is None: 

579 if content_type is None: 

580 encoding = "utf-8" 

581 content_type = "text/plain; charset=utf-8" 

582 else: 

583 mimetype = parse_mimetype(content_type) 

584 encoding = mimetype.parameters.get("charset", "utf-8") 

585 else: 

586 if content_type is None: 

587 content_type = "text/plain; charset=%s" % encoding 

588 

589 super().__init__( 

590 value, 

591 content_type=content_type, 

592 encoding=encoding, 

593 *args, 

594 **kwargs, 

595 ) 

596 

597 def _read_and_available_len( 

598 self, remaining_content_len: Optional[int] 

599 ) -> Tuple[Optional[int], bytes]: 

600 """ 

601 Read the text file-like object and return both its total size and the first chunk. 

602 

603 Args: 

604 remaining_content_len: Optional limit on how many bytes to read in this operation. 

605 If None, READ_SIZE will be used as the default chunk size. 

606 

607 Returns: 

608 A tuple containing: 

609 - The total size of the remaining unread content (None if size cannot be determined) 

610 - The first chunk of bytes read from the file object, encoded using the payload's encoding 

611 

612 This method is optimized to perform both size calculation and initial read 

613 in a single operation, which is executed in a single executor job to minimize 

614 context switches and file operations when streaming content. 

615 

616 Note: 

617 TextIOPayload handles encoding of the text content before writing it 

618 to the stream. If no encoding is specified, UTF-8 is used as the default. 

619 

620 """ 

621 size = self.size 

622 chunk = self._value.read( 

623 min(size or READ_SIZE, remaining_content_len or READ_SIZE) 

624 ) 

625 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode() 

626 

627 def _read(self, remaining_content_len: Optional[int]) -> bytes: 

628 """ 

629 Read a chunk of data from the text file-like object. 

630 

631 Args: 

632 remaining_content_len: Optional maximum number of bytes to read. 

633 If None, READ_SIZE will be used as the default chunk size. 

634 

635 Returns: 

636 A chunk of bytes read from the file object and encoded using the payload's 

637 encoding. The data is automatically converted from text to bytes. 

638 

639 This method is used for subsequent reads during streaming after 

640 the initial _read_and_available_len call has been made. It properly 

641 handles text encoding, converting the text content to bytes using 

642 the specified encoding (or UTF-8 if none was provided). 

643 

644 """ 

645 chunk = self._value.read(remaining_content_len or READ_SIZE) 

646 return chunk.encode(self._encoding) if self._encoding else chunk.encode() 

647 

648 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

649 return self._value.read() 

650 

651 

652class BytesIOPayload(IOBasePayload): 

653 _value: io.BytesIO 

654 

655 @property 

656 def size(self) -> int: 

657 position = self._value.tell() 

658 end = self._value.seek(0, os.SEEK_END) 

659 self._value.seek(position) 

660 return end - position 

661 

662 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

663 return self._value.read().decode(encoding, errors) 

664 

665 async def write(self, writer: AbstractStreamWriter) -> None: 

666 return await self.write_with_length(writer, None) 

667 

668 async def write_with_length( 

669 self, writer: AbstractStreamWriter, content_length: Optional[int] 

670 ) -> None: 

671 """ 

672 Write BytesIO payload with a specific content length constraint. 

673 

674 Args: 

675 writer: An AbstractStreamWriter instance that handles the actual writing 

676 content_length: Maximum number of bytes to write (None for unlimited) 

677 

678 This implementation is specifically optimized for BytesIO objects: 

679 

680 1. Reads content in chunks to maintain memory efficiency 

681 2. Yields control back to the event loop periodically to prevent blocking 

682 when dealing with large BytesIO objects 

683 3. Respects content_length constraints when specified 

684 4. Properly cleans up by closing the BytesIO object when done or on error 

685 

686 The periodic yielding to the event loop is important for maintaining 

687 responsiveness when processing large in-memory buffers. 

688 

689 """ 

690 loop_count = 0 

691 remaining_bytes = content_length 

692 try: 

693 while chunk := self._value.read(READ_SIZE): 

694 if loop_count > 0: 

695 # Avoid blocking the event loop 

696 # if they pass a large BytesIO object 

697 # and we are not in the first iteration 

698 # of the loop 

699 await asyncio.sleep(0) 

700 if remaining_bytes is None: 

701 await writer.write(chunk) 

702 else: 

703 await writer.write(chunk[:remaining_bytes]) 

704 remaining_bytes -= len(chunk) 

705 if remaining_bytes <= 0: 

706 return 

707 loop_count += 1 

708 finally: 

709 self._value.close() 

710 

711 

712class BufferedReaderPayload(IOBasePayload): 

713 _value: io.BufferedIOBase 

714 

715 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

716 return self._value.read().decode(encoding, errors) 

717 

718 

719class JsonPayload(BytesPayload): 

720 def __init__( 

721 self, 

722 value: Any, 

723 encoding: str = "utf-8", 

724 content_type: str = "application/json", 

725 dumps: JSONEncoder = json.dumps, 

726 *args: Any, 

727 **kwargs: Any, 

728 ) -> None: 

729 super().__init__( 

730 dumps(value).encode(encoding), 

731 content_type=content_type, 

732 encoding=encoding, 

733 *args, 

734 **kwargs, 

735 ) 

736 

737 

738if TYPE_CHECKING: 

739 from typing import AsyncIterable, AsyncIterator 

740 

741 _AsyncIterator = AsyncIterator[bytes] 

742 _AsyncIterable = AsyncIterable[bytes] 

743else: 

744 from collections.abc import AsyncIterable, AsyncIterator 

745 

746 _AsyncIterator = AsyncIterator 

747 _AsyncIterable = AsyncIterable 

748 

749 

750class AsyncIterablePayload(Payload): 

751 _iter: Optional[_AsyncIterator] = None 

752 _value: _AsyncIterable 

753 

754 def __init__(self, value: _AsyncIterable, *args: Any, **kwargs: Any) -> None: 

755 if not isinstance(value, AsyncIterable): 

756 raise TypeError( 

757 "value argument must support " 

758 "collections.abc.AsyncIterable interface, " 

759 "got {!r}".format(type(value)) 

760 ) 

761 

762 if "content_type" not in kwargs: 

763 kwargs["content_type"] = "application/octet-stream" 

764 

765 super().__init__(value, *args, **kwargs) 

766 

767 self._iter = value.__aiter__() 

768 

769 async def write(self, writer: AbstractStreamWriter) -> None: 

770 """ 

771 Write the entire async iterable payload to the writer stream. 

772 

773 Args: 

774 writer: An AbstractStreamWriter instance that handles the actual writing 

775 

776 This method iterates through the async iterable and writes each chunk 

777 to the writer without any length constraint. 

778 

779 Note: 

780 For new implementations that need length control, use write_with_length() directly. 

781 This method is maintained for backwards compatibility with existing code. 

782 

783 """ 

784 await self.write_with_length(writer, None) 

785 

786 async def write_with_length( 

787 self, writer: AbstractStreamWriter, content_length: Optional[int] 

788 ) -> None: 

789 """ 

790 Write async iterable payload with a specific content length constraint. 

791 

792 Args: 

793 writer: An AbstractStreamWriter instance that handles the actual writing 

794 content_length: Maximum number of bytes to write (None for unlimited) 

795 

796 This implementation handles streaming of async iterable content with length constraints: 

797 

798 1. Iterates through the async iterable one chunk at a time 

799 2. Respects content_length constraints when specified 

800 3. Handles the case when the iterable might be used twice 

801 

802 Since async iterables are consumed as they're iterated, there is no way to 

803 restart the iteration if it's already in progress or completed. 

804 

805 """ 

806 if self._iter is None: 

807 return 

808 

809 remaining_bytes = content_length 

810 

811 try: 

812 while True: 

813 if sys.version_info >= (3, 10): 

814 chunk = await anext(self._iter) 

815 else: 

816 chunk = await self._iter.__anext__() 

817 if remaining_bytes is None: 

818 await writer.write(chunk) 

819 # If we have a content length limit 

820 elif remaining_bytes > 0: 

821 await writer.write(chunk[:remaining_bytes]) 

822 remaining_bytes -= len(chunk) 

823 # We still want to exhaust the iterator even 

824 # if we have reached the content length limit 

825 # since the file handle may not get closed by 

826 # the iterator if we don't do this 

827 except StopAsyncIteration: 

828 # Iterator is exhausted 

829 self._iter = None 

830 

831 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

832 raise TypeError("Unable to decode.") 

833 

834 

835class StreamReaderPayload(AsyncIterablePayload): 

836 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None: 

837 super().__init__(value.iter_any(), *args, **kwargs) 

838 

839 

840PAYLOAD_REGISTRY = PayloadRegistry() 

841PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview)) 

842PAYLOAD_REGISTRY.register(StringPayload, str) 

843PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO) 

844PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase) 

845PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO) 

846PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom)) 

847PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase) 

848PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader) 

849# try_last for giving a chance to more specialized async interables like 

850# multidict.BodyPartReaderPayload override the default 

851PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)