Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/payload.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

387 statements  

1import asyncio 

2import enum 

3import io 

4import json 

5import mimetypes 

6import os 

7import sys 

8import warnings 

9from abc import ABC, abstractmethod 

10from collections.abc import Iterable 

11from itertools import chain 

12from typing import ( 

13 IO, 

14 TYPE_CHECKING, 

15 Any, 

16 Dict, 

17 Final, 

18 List, 

19 Optional, 

20 Set, 

21 TextIO, 

22 Tuple, 

23 Type, 

24 Union, 

25) 

26 

27from multidict import CIMultiDict 

28 

29from . import hdrs 

30from .abc import AbstractStreamWriter 

31from .helpers import ( 

32 _SENTINEL, 

33 content_disposition_header, 

34 guess_filename, 

35 parse_mimetype, 

36 sentinel, 

37) 

38from .streams import StreamReader 

39from .typedefs import JSONEncoder, _CIMultiDict 

40 

41__all__ = ( 

42 "PAYLOAD_REGISTRY", 

43 "get_payload", 

44 "payload_type", 

45 "Payload", 

46 "BytesPayload", 

47 "StringPayload", 

48 "IOBasePayload", 

49 "BytesIOPayload", 

50 "BufferedReaderPayload", 

51 "TextIOPayload", 

52 "StringIOPayload", 

53 "JsonPayload", 

54 "AsyncIterablePayload", 

55) 

56 

57TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB 

58READ_SIZE: Final[int] = 2**16 # 64 KB 

59_CLOSE_FUTURES: Set[asyncio.Future[None]] = set() 

60 

61 

62class LookupError(Exception): 

63 """Raised when no payload factory is found for the given data type.""" 

64 

65 

66class Order(str, enum.Enum): 

67 normal = "normal" 

68 try_first = "try_first" 

69 try_last = "try_last" 

70 

71 

72def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload": 

73 return PAYLOAD_REGISTRY.get(data, *args, **kwargs) 

74 

75 

76def register_payload( 

77 factory: Type["Payload"], type: Any, *, order: Order = Order.normal 

78) -> None: 

79 PAYLOAD_REGISTRY.register(factory, type, order=order) 

80 

81 

82class payload_type: 

83 def __init__(self, type: Any, *, order: Order = Order.normal) -> None: 

84 self.type = type 

85 self.order = order 

86 

87 def __call__(self, factory: Type["Payload"]) -> Type["Payload"]: 

88 register_payload(factory, self.type, order=self.order) 

89 return factory 

90 

91 

92PayloadType = Type["Payload"] 

93_PayloadRegistryItem = Tuple[PayloadType, Any] 

94 

95 

96class PayloadRegistry: 

97 """Payload registry. 

98 

99 note: we need zope.interface for more efficient adapter search 

100 """ 

101 

102 __slots__ = ("_first", "_normal", "_last", "_normal_lookup") 

103 

104 def __init__(self) -> None: 

105 self._first: List[_PayloadRegistryItem] = [] 

106 self._normal: List[_PayloadRegistryItem] = [] 

107 self._last: List[_PayloadRegistryItem] = [] 

108 self._normal_lookup: Dict[Any, PayloadType] = {} 

109 

110 def get( 

111 self, 

112 data: Any, 

113 *args: Any, 

114 _CHAIN: "Type[chain[_PayloadRegistryItem]]" = chain, 

115 **kwargs: Any, 

116 ) -> "Payload": 

117 if self._first: 

118 for factory, type_ in self._first: 

119 if isinstance(data, type_): 

120 return factory(data, *args, **kwargs) 

121 # Try the fast lookup first 

122 if lookup_factory := self._normal_lookup.get(type(data)): 

123 return lookup_factory(data, *args, **kwargs) 

124 # Bail early if its already a Payload 

125 if isinstance(data, Payload): 

126 return data 

127 # Fallback to the slower linear search 

128 for factory, type_ in _CHAIN(self._normal, self._last): 

129 if isinstance(data, type_): 

130 return factory(data, *args, **kwargs) 

131 raise LookupError() 

132 

133 def register( 

134 self, factory: PayloadType, type: Any, *, order: Order = Order.normal 

135 ) -> None: 

136 if order is Order.try_first: 

137 self._first.append((factory, type)) 

138 elif order is Order.normal: 

139 self._normal.append((factory, type)) 

140 if isinstance(type, Iterable): 

141 for t in type: 

142 self._normal_lookup[t] = factory 

143 else: 

144 self._normal_lookup[type] = factory 

145 elif order is Order.try_last: 

146 self._last.append((factory, type)) 

147 else: 

148 raise ValueError(f"Unsupported order {order!r}") 

149 

150 

151class Payload(ABC): 

152 

153 _default_content_type: str = "application/octet-stream" 

154 _size: Optional[int] = None 

155 _consumed: bool = False # Default: payload has not been consumed yet 

156 _autoclose: bool = False # Default: assume resource needs explicit closing 

157 

158 def __init__( 

159 self, 

160 value: Any, 

161 headers: Optional[ 

162 Union[_CIMultiDict, Dict[str, str], Iterable[Tuple[str, str]]] 

163 ] = None, 

164 content_type: Union[str, None, _SENTINEL] = sentinel, 

165 filename: Optional[str] = None, 

166 encoding: Optional[str] = None, 

167 **kwargs: Any, 

168 ) -> None: 

169 self._encoding = encoding 

170 self._filename = filename 

171 self._headers: _CIMultiDict = CIMultiDict() 

172 self._value = value 

173 if content_type is not sentinel and content_type is not None: 

174 self._headers[hdrs.CONTENT_TYPE] = content_type 

175 elif self._filename is not None: 

176 if sys.version_info >= (3, 13): 

177 guesser = mimetypes.guess_file_type 

178 else: 

179 guesser = mimetypes.guess_type 

180 content_type = guesser(self._filename)[0] 

181 if content_type is None: 

182 content_type = self._default_content_type 

183 self._headers[hdrs.CONTENT_TYPE] = content_type 

184 else: 

185 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type 

186 if headers: 

187 self._headers.update(headers) 

188 

189 @property 

190 def size(self) -> Optional[int]: 

191 """Size of the payload in bytes. 

192 

193 Returns the number of bytes that will be transmitted when the payload 

194 is written. For string payloads, this is the size after encoding to bytes, 

195 not the length of the string. 

196 """ 

197 return self._size 

198 

199 @property 

200 def filename(self) -> Optional[str]: 

201 """Filename of the payload.""" 

202 return self._filename 

203 

204 @property 

205 def headers(self) -> _CIMultiDict: 

206 """Custom item headers""" 

207 return self._headers 

208 

209 @property 

210 def _binary_headers(self) -> bytes: 

211 return ( 

212 "".join([k + ": " + v + "\r\n" for k, v in self.headers.items()]).encode( 

213 "utf-8" 

214 ) 

215 + b"\r\n" 

216 ) 

217 

218 @property 

219 def encoding(self) -> Optional[str]: 

220 """Payload encoding""" 

221 return self._encoding 

222 

223 @property 

224 def content_type(self) -> str: 

225 """Content type""" 

226 return self._headers[hdrs.CONTENT_TYPE] 

227 

228 @property 

229 def consumed(self) -> bool: 

230 """Whether the payload has been consumed and cannot be reused.""" 

231 return self._consumed 

232 

233 @property 

234 def autoclose(self) -> bool: 

235 """ 

236 Whether the payload can close itself automatically. 

237 

238 Returns True if the payload has no file handles or resources that need 

239 explicit closing. If False, callers must await close() to release resources. 

240 """ 

241 return self._autoclose 

242 

243 def set_content_disposition( 

244 self, 

245 disptype: str, 

246 quote_fields: bool = True, 

247 _charset: str = "utf-8", 

248 **params: Any, 

249 ) -> None: 

250 """Sets ``Content-Disposition`` header.""" 

251 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header( 

252 disptype, quote_fields=quote_fields, _charset=_charset, **params 

253 ) 

254 

255 @abstractmethod 

256 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

257 """ 

258 Return string representation of the value. 

259 

260 This is named decode() to allow compatibility with bytes objects. 

261 """ 

262 

263 @abstractmethod 

264 async def write(self, writer: AbstractStreamWriter) -> None: 

265 """ 

266 Write payload to the writer stream. 

267 

268 Args: 

269 writer: An AbstractStreamWriter instance that handles the actual writing 

270 

271 This is a legacy method that writes the entire payload without length constraints. 

272 

273 Important: 

274 For new implementations, use write_with_length() instead of this method. 

275 This method is maintained for backwards compatibility and will eventually 

276 delegate to write_with_length(writer, None) in all implementations. 

277 

278 All payload subclasses must override this method for backwards compatibility, 

279 but new code should use write_with_length for more flexibility and control. 

280 

281 """ 

282 

283 # write_with_length is new in aiohttp 3.12 

284 # it should be overridden by subclasses 

285 async def write_with_length( 

286 self, writer: AbstractStreamWriter, content_length: Optional[int] 

287 ) -> None: 

288 """ 

289 Write payload with a specific content length constraint. 

290 

291 Args: 

292 writer: An AbstractStreamWriter instance that handles the actual writing 

293 content_length: Maximum number of bytes to write (None for unlimited) 

294 

295 This method allows writing payload content with a specific length constraint, 

296 which is particularly useful for HTTP responses with Content-Length header. 

297 

298 Note: 

299 This is the base implementation that provides backwards compatibility 

300 for subclasses that don't override this method. Specific payload types 

301 should override this method to implement proper length-constrained writing. 

302 

303 """ 

304 # Backwards compatibility for subclasses that don't override this method 

305 # and for the default implementation 

306 await self.write(writer) 

307 

308 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

309 """ 

310 Return bytes representation of the value. 

311 

312 This is a convenience method that calls decode() and encodes the result 

313 to bytes using the specified encoding. 

314 """ 

315 # Use instance encoding if available, otherwise use parameter 

316 actual_encoding = self._encoding or encoding 

317 return self.decode(actual_encoding, errors).encode(actual_encoding) 

318 

319 def _close(self) -> None: 

320 """ 

321 Async safe synchronous close operations for backwards compatibility. 

322 

323 This method exists only for backwards compatibility with code that 

324 needs to clean up payloads synchronously. In the future, we will 

325 drop this method and only support the async close() method. 

326 

327 WARNING: This method must be safe to call from within the event loop 

328 without blocking. Subclasses should not perform any blocking I/O here. 

329 

330 WARNING: This method must be called from within an event loop for 

331 certain payload types (e.g., IOBasePayload). Calling it outside an 

332 event loop may raise RuntimeError. 

333 """ 

334 # This is a no-op by default, but subclasses can override it 

335 # for non-blocking cleanup operations. 

336 

337 async def close(self) -> None: 

338 """ 

339 Close the payload if it holds any resources. 

340 

341 IMPORTANT: This method must not await anything that might not finish 

342 immediately, as it may be called during cleanup/cancellation. Schedule 

343 any long-running operations without awaiting them. 

344 

345 In the future, this will be the only close method supported. 

346 """ 

347 self._close() 

348 

349 

350class BytesPayload(Payload): 

351 _value: bytes 

352 # _consumed = False (inherited) - Bytes are immutable and can be reused 

353 _autoclose = True # No file handle, just bytes in memory 

354 

355 def __init__( 

356 self, value: Union[bytes, bytearray, memoryview], *args: Any, **kwargs: Any 

357 ) -> None: 

358 if "content_type" not in kwargs: 

359 kwargs["content_type"] = "application/octet-stream" 

360 

361 super().__init__(value, *args, **kwargs) 

362 

363 if isinstance(value, memoryview): 

364 self._size = value.nbytes 

365 elif isinstance(value, (bytes, bytearray)): 

366 self._size = len(value) 

367 else: 

368 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}") 

369 

370 if self._size > TOO_LARGE_BYTES_BODY: 

371 kwargs = {"source": self} 

372 warnings.warn( 

373 "Sending a large body directly with raw bytes might" 

374 " lock the event loop. You should probably pass an " 

375 "io.BytesIO object instead", 

376 ResourceWarning, 

377 **kwargs, 

378 ) 

379 

380 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

381 return self._value.decode(encoding, errors) 

382 

383 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

384 """ 

385 Return bytes representation of the value. 

386 

387 This method returns the raw bytes content of the payload. 

388 It is equivalent to accessing the _value attribute directly. 

389 """ 

390 return self._value 

391 

392 async def write(self, writer: AbstractStreamWriter) -> None: 

393 """ 

394 Write the entire bytes payload to the writer stream. 

395 

396 Args: 

397 writer: An AbstractStreamWriter instance that handles the actual writing 

398 

399 This method writes the entire bytes content without any length constraint. 

400 

401 Note: 

402 For new implementations that need length control, use write_with_length(). 

403 This method is maintained for backwards compatibility and is equivalent 

404 to write_with_length(writer, None). 

405 

406 """ 

407 await writer.write(self._value) 

408 

409 async def write_with_length( 

410 self, writer: AbstractStreamWriter, content_length: Optional[int] 

411 ) -> None: 

412 """ 

413 Write bytes payload with a specific content length constraint. 

414 

415 Args: 

416 writer: An AbstractStreamWriter instance that handles the actual writing 

417 content_length: Maximum number of bytes to write (None for unlimited) 

418 

419 This method writes either the entire byte sequence or a slice of it 

420 up to the specified content_length. For BytesPayload, this operation 

421 is performed efficiently using array slicing. 

422 

423 """ 

424 if content_length is not None: 

425 await writer.write(self._value[:content_length]) 

426 else: 

427 await writer.write(self._value) 

428 

429 

430class StringPayload(BytesPayload): 

431 def __init__( 

432 self, 

433 value: str, 

434 *args: Any, 

435 encoding: Optional[str] = None, 

436 content_type: Optional[str] = None, 

437 **kwargs: Any, 

438 ) -> None: 

439 

440 if encoding is None: 

441 if content_type is None: 

442 real_encoding = "utf-8" 

443 content_type = "text/plain; charset=utf-8" 

444 else: 

445 mimetype = parse_mimetype(content_type) 

446 real_encoding = mimetype.parameters.get("charset", "utf-8") 

447 else: 

448 if content_type is None: 

449 content_type = "text/plain; charset=%s" % encoding 

450 real_encoding = encoding 

451 

452 super().__init__( 

453 value.encode(real_encoding), 

454 encoding=real_encoding, 

455 content_type=content_type, 

456 *args, 

457 **kwargs, 

458 ) 

459 

460 

461class StringIOPayload(StringPayload): 

462 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None: 

463 super().__init__(value.read(), *args, **kwargs) 

464 

465 

466class IOBasePayload(Payload): 

467 _value: io.IOBase 

468 # _consumed = False (inherited) - File can be re-read from the same position 

469 _start_position: Optional[int] = None 

470 # _autoclose = False (inherited) - Has file handle that needs explicit closing 

471 

472 def __init__( 

473 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any 

474 ) -> None: 

475 if "filename" not in kwargs: 

476 kwargs["filename"] = guess_filename(value) 

477 

478 super().__init__(value, *args, **kwargs) 

479 

480 if self._filename is not None and disposition is not None: 

481 if hdrs.CONTENT_DISPOSITION not in self.headers: 

482 self.set_content_disposition(disposition, filename=self._filename) 

483 

484 def _set_or_restore_start_position(self) -> None: 

485 """Set or restore the start position of the file-like object.""" 

486 if self._start_position is None: 

487 try: 

488 self._start_position = self._value.tell() 

489 except OSError: 

490 self._consumed = True # Cannot seek, mark as consumed 

491 return 

492 self._value.seek(self._start_position) 

493 

494 def _read_and_available_len( 

495 self, remaining_content_len: Optional[int] 

496 ) -> Tuple[Optional[int], bytes]: 

497 """ 

498 Read the file-like object and return both its total size and the first chunk. 

499 

500 Args: 

501 remaining_content_len: Optional limit on how many bytes to read in this operation. 

502 If None, READ_SIZE will be used as the default chunk size. 

503 

504 Returns: 

505 A tuple containing: 

506 - The total size of the remaining unread content (None if size cannot be determined) 

507 - The first chunk of bytes read from the file object 

508 

509 This method is optimized to perform both size calculation and initial read 

510 in a single operation, which is executed in a single executor job to minimize 

511 context switches and file operations when streaming content. 

512 

513 """ 

514 self._set_or_restore_start_position() 

515 size = self.size # Call size only once since it does I/O 

516 return size, self._value.read( 

517 min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE) 

518 ) 

519 

520 def _read(self, remaining_content_len: Optional[int]) -> bytes: 

521 """ 

522 Read a chunk of data from the file-like object. 

523 

524 Args: 

525 remaining_content_len: Optional maximum number of bytes to read. 

526 If None, READ_SIZE will be used as the default chunk size. 

527 

528 Returns: 

529 A chunk of bytes read from the file object, respecting the 

530 remaining_content_len limit if specified. 

531 

532 This method is used for subsequent reads during streaming after 

533 the initial _read_and_available_len call has been made. 

534 

535 """ 

536 return self._value.read(remaining_content_len or READ_SIZE) # type: ignore[no-any-return] 

537 

538 @property 

539 def size(self) -> Optional[int]: 

540 """ 

541 Size of the payload in bytes. 

542 

543 Returns the number of bytes remaining to be read from the file. 

544 Returns None if the size cannot be determined (e.g., for unseekable streams). 

545 """ 

546 try: 

547 return os.fstat(self._value.fileno()).st_size - self._value.tell() 

548 except (AttributeError, OSError): 

549 return None 

550 

551 async def write(self, writer: AbstractStreamWriter) -> None: 

552 """ 

553 Write the entire file-like payload to the writer stream. 

554 

555 Args: 

556 writer: An AbstractStreamWriter instance that handles the actual writing 

557 

558 This method writes the entire file content without any length constraint. 

559 It delegates to write_with_length() with no length limit for implementation 

560 consistency. 

561 

562 Note: 

563 For new implementations that need length control, use write_with_length() directly. 

564 This method is maintained for backwards compatibility with existing code. 

565 

566 """ 

567 await self.write_with_length(writer, None) 

568 

569 async def write_with_length( 

570 self, writer: AbstractStreamWriter, content_length: Optional[int] 

571 ) -> None: 

572 """ 

573 Write file-like payload with a specific content length constraint. 

574 

575 Args: 

576 writer: An AbstractStreamWriter instance that handles the actual writing 

577 content_length: Maximum number of bytes to write (None for unlimited) 

578 

579 This method implements optimized streaming of file content with length constraints: 

580 

581 1. File reading is performed in a thread pool to avoid blocking the event loop 

582 2. Content is read and written in chunks to maintain memory efficiency 

583 3. Writing stops when either: 

584 - All available file content has been written (when size is known) 

585 - The specified content_length has been reached 

586 4. File resources are properly closed even if the operation is cancelled 

587 

588 The implementation carefully handles both known-size and unknown-size payloads, 

589 as well as constrained and unconstrained content lengths. 

590 

591 """ 

592 loop = asyncio.get_running_loop() 

593 total_written_len = 0 

594 remaining_content_len = content_length 

595 

596 # Get initial data and available length 

597 available_len, chunk = await loop.run_in_executor( 

598 None, self._read_and_available_len, remaining_content_len 

599 ) 

600 # Process data chunks until done 

601 while chunk: 

602 chunk_len = len(chunk) 

603 

604 # Write data with or without length constraint 

605 if remaining_content_len is None: 

606 await writer.write(chunk) 

607 else: 

608 await writer.write(chunk[:remaining_content_len]) 

609 remaining_content_len -= chunk_len 

610 

611 total_written_len += chunk_len 

612 

613 # Check if we're done writing 

614 if self._should_stop_writing( 

615 available_len, total_written_len, remaining_content_len 

616 ): 

617 return 

618 

619 # Read next chunk 

620 chunk = await loop.run_in_executor( 

621 None, 

622 self._read, 

623 ( 

624 min(READ_SIZE, remaining_content_len) 

625 if remaining_content_len is not None 

626 else READ_SIZE 

627 ), 

628 ) 

629 

630 def _should_stop_writing( 

631 self, 

632 available_len: Optional[int], 

633 total_written_len: int, 

634 remaining_content_len: Optional[int], 

635 ) -> bool: 

636 """ 

637 Determine if we should stop writing data. 

638 

639 Args: 

640 available_len: Known size of the payload if available (None if unknown) 

641 total_written_len: Number of bytes already written 

642 remaining_content_len: Remaining bytes to be written for content-length limited responses 

643 

644 Returns: 

645 True if we should stop writing data, based on either: 

646 - Having written all available data (when size is known) 

647 - Having written all requested content (when content-length is specified) 

648 

649 """ 

650 return (available_len is not None and total_written_len >= available_len) or ( 

651 remaining_content_len is not None and remaining_content_len <= 0 

652 ) 

653 

654 def _close(self) -> None: 

655 """ 

656 Async safe synchronous close operations for backwards compatibility. 

657 

658 This method exists only for backwards 

659 compatibility. Use the async close() method instead. 

660 

661 WARNING: This method MUST be called from within an event loop. 

662 Calling it outside an event loop will raise RuntimeError. 

663 """ 

664 # Skip if already consumed 

665 if self._consumed: 

666 return 

667 self._consumed = True # Mark as consumed to prevent further writes 

668 # Schedule file closing without awaiting to prevent cancellation issues 

669 loop = asyncio.get_running_loop() 

670 close_future = loop.run_in_executor(None, self._value.close) 

671 # Hold a strong reference to the future to prevent it from being 

672 # garbage collected before it completes. 

673 _CLOSE_FUTURES.add(close_future) 

674 close_future.add_done_callback(_CLOSE_FUTURES.remove) 

675 

676 async def close(self) -> None: 

677 """ 

678 Close the payload if it holds any resources. 

679 

680 IMPORTANT: This method must not await anything that might not finish 

681 immediately, as it may be called during cleanup/cancellation. Schedule 

682 any long-running operations without awaiting them. 

683 """ 

684 self._close() 

685 

686 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

687 """ 

688 Return string representation of the value. 

689 

690 WARNING: This method does blocking I/O and should not be called in the event loop. 

691 """ 

692 return self._read_all().decode(encoding, errors) 

693 

694 def _read_all(self) -> bytes: 

695 """Read the entire file-like object and return its content as bytes.""" 

696 self._set_or_restore_start_position() 

697 # Use readlines() to ensure we get all content 

698 return b"".join(self._value.readlines()) 

699 

700 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

701 """ 

702 Return bytes representation of the value. 

703 

704 This method reads the entire file content and returns it as bytes. 

705 It is equivalent to reading the file-like object directly. 

706 The file reading is performed in an executor to avoid blocking the event loop. 

707 """ 

708 loop = asyncio.get_running_loop() 

709 return await loop.run_in_executor(None, self._read_all) 

710 

711 

712class TextIOPayload(IOBasePayload): 

713 _value: io.TextIOBase 

714 # _autoclose = False (inherited) - Has text file handle that needs explicit closing 

715 

716 def __init__( 

717 self, 

718 value: TextIO, 

719 *args: Any, 

720 encoding: Optional[str] = None, 

721 content_type: Optional[str] = None, 

722 **kwargs: Any, 

723 ) -> None: 

724 

725 if encoding is None: 

726 if content_type is None: 

727 encoding = "utf-8" 

728 content_type = "text/plain; charset=utf-8" 

729 else: 

730 mimetype = parse_mimetype(content_type) 

731 encoding = mimetype.parameters.get("charset", "utf-8") 

732 else: 

733 if content_type is None: 

734 content_type = "text/plain; charset=%s" % encoding 

735 

736 super().__init__( 

737 value, 

738 content_type=content_type, 

739 encoding=encoding, 

740 *args, 

741 **kwargs, 

742 ) 

743 

744 def _read_and_available_len( 

745 self, remaining_content_len: Optional[int] 

746 ) -> Tuple[Optional[int], bytes]: 

747 """ 

748 Read the text file-like object and return both its total size and the first chunk. 

749 

750 Args: 

751 remaining_content_len: Optional limit on how many bytes to read in this operation. 

752 If None, READ_SIZE will be used as the default chunk size. 

753 

754 Returns: 

755 A tuple containing: 

756 - The total size of the remaining unread content (None if size cannot be determined) 

757 - The first chunk of bytes read from the file object, encoded using the payload's encoding 

758 

759 This method is optimized to perform both size calculation and initial read 

760 in a single operation, which is executed in a single executor job to minimize 

761 context switches and file operations when streaming content. 

762 

763 Note: 

764 TextIOPayload handles encoding of the text content before writing it 

765 to the stream. If no encoding is specified, UTF-8 is used as the default. 

766 

767 """ 

768 self._set_or_restore_start_position() 

769 size = self.size 

770 chunk = self._value.read( 

771 min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE) 

772 ) 

773 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode() 

774 

775 def _read(self, remaining_content_len: Optional[int]) -> bytes: 

776 """ 

777 Read a chunk of data from the text file-like object. 

778 

779 Args: 

780 remaining_content_len: Optional maximum number of bytes to read. 

781 If None, READ_SIZE will be used as the default chunk size. 

782 

783 Returns: 

784 A chunk of bytes read from the file object and encoded using the payload's 

785 encoding. The data is automatically converted from text to bytes. 

786 

787 This method is used for subsequent reads during streaming after 

788 the initial _read_and_available_len call has been made. It properly 

789 handles text encoding, converting the text content to bytes using 

790 the specified encoding (or UTF-8 if none was provided). 

791 

792 """ 

793 chunk = self._value.read(remaining_content_len or READ_SIZE) 

794 return chunk.encode(self._encoding) if self._encoding else chunk.encode() 

795 

796 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

797 """ 

798 Return string representation of the value. 

799 

800 WARNING: This method does blocking I/O and should not be called in the event loop. 

801 """ 

802 self._set_or_restore_start_position() 

803 return self._value.read() 

804 

805 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

806 """ 

807 Return bytes representation of the value. 

808 

809 This method reads the entire text file content and returns it as bytes. 

810 It encodes the text content using the specified encoding. 

811 The file reading is performed in an executor to avoid blocking the event loop. 

812 """ 

813 loop = asyncio.get_running_loop() 

814 

815 # Use instance encoding if available, otherwise use parameter 

816 actual_encoding = self._encoding or encoding 

817 

818 def _read_and_encode() -> bytes: 

819 self._set_or_restore_start_position() 

820 # TextIO read() always returns the full content 

821 return self._value.read().encode(actual_encoding, errors) 

822 

823 return await loop.run_in_executor(None, _read_and_encode) 

824 

825 

826class BytesIOPayload(IOBasePayload): 

827 _value: io.BytesIO 

828 _size: int # Always initialized in __init__ 

829 _autoclose = True # BytesIO is in-memory, safe to auto-close 

830 

831 def __init__(self, value: io.BytesIO, *args: Any, **kwargs: Any) -> None: 

832 super().__init__(value, *args, **kwargs) 

833 # Calculate size once during initialization 

834 self._size = len(self._value.getbuffer()) - self._value.tell() 

835 

836 @property 

837 def size(self) -> int: 

838 """Size of the payload in bytes. 

839 

840 Returns the number of bytes in the BytesIO buffer that will be transmitted. 

841 This is calculated once during initialization for efficiency. 

842 """ 

843 return self._size 

844 

845 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

846 self._set_or_restore_start_position() 

847 return self._value.read().decode(encoding, errors) 

848 

849 async def write(self, writer: AbstractStreamWriter) -> None: 

850 return await self.write_with_length(writer, None) 

851 

852 async def write_with_length( 

853 self, writer: AbstractStreamWriter, content_length: Optional[int] 

854 ) -> None: 

855 """ 

856 Write BytesIO payload with a specific content length constraint. 

857 

858 Args: 

859 writer: An AbstractStreamWriter instance that handles the actual writing 

860 content_length: Maximum number of bytes to write (None for unlimited) 

861 

862 This implementation is specifically optimized for BytesIO objects: 

863 

864 1. Reads content in chunks to maintain memory efficiency 

865 2. Yields control back to the event loop periodically to prevent blocking 

866 when dealing with large BytesIO objects 

867 3. Respects content_length constraints when specified 

868 4. Properly cleans up by closing the BytesIO object when done or on error 

869 

870 The periodic yielding to the event loop is important for maintaining 

871 responsiveness when processing large in-memory buffers. 

872 

873 """ 

874 self._set_or_restore_start_position() 

875 loop_count = 0 

876 remaining_bytes = content_length 

877 while chunk := self._value.read(READ_SIZE): 

878 if loop_count > 0: 

879 # Avoid blocking the event loop 

880 # if they pass a large BytesIO object 

881 # and we are not in the first iteration 

882 # of the loop 

883 await asyncio.sleep(0) 

884 if remaining_bytes is None: 

885 await writer.write(chunk) 

886 else: 

887 await writer.write(chunk[:remaining_bytes]) 

888 remaining_bytes -= len(chunk) 

889 if remaining_bytes <= 0: 

890 return 

891 loop_count += 1 

892 

893 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

894 """ 

895 Return bytes representation of the value. 

896 

897 This method reads the entire BytesIO content and returns it as bytes. 

898 It is equivalent to accessing the _value attribute directly. 

899 """ 

900 self._set_or_restore_start_position() 

901 return self._value.read() 

902 

903 async def close(self) -> None: 

904 """ 

905 Close the BytesIO payload. 

906 

907 This does nothing since BytesIO is in-memory and does not require explicit closing. 

908 """ 

909 

910 

911class BufferedReaderPayload(IOBasePayload): 

912 _value: io.BufferedIOBase 

913 # _autoclose = False (inherited) - Has buffered file handle that needs explicit closing 

914 

915 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

916 self._set_or_restore_start_position() 

917 return self._value.read().decode(encoding, errors) 

918 

919 

920class JsonPayload(BytesPayload): 

921 def __init__( 

922 self, 

923 value: Any, 

924 encoding: str = "utf-8", 

925 content_type: str = "application/json", 

926 dumps: JSONEncoder = json.dumps, 

927 *args: Any, 

928 **kwargs: Any, 

929 ) -> None: 

930 

931 super().__init__( 

932 dumps(value).encode(encoding), 

933 content_type=content_type, 

934 encoding=encoding, 

935 *args, 

936 **kwargs, 

937 ) 

938 

939 

940if TYPE_CHECKING: 

941 from typing import AsyncIterable, AsyncIterator 

942 

943 _AsyncIterator = AsyncIterator[bytes] 

944 _AsyncIterable = AsyncIterable[bytes] 

945else: 

946 from collections.abc import AsyncIterable, AsyncIterator 

947 

948 _AsyncIterator = AsyncIterator 

949 _AsyncIterable = AsyncIterable 

950 

951 

952class AsyncIterablePayload(Payload): 

953 

954 _iter: Optional[_AsyncIterator] = None 

955 _value: _AsyncIterable 

956 _cached_chunks: Optional[List[bytes]] = None 

957 # _consumed stays False to allow reuse with cached content 

958 _autoclose = True # Iterator doesn't need explicit closing 

959 

960 def __init__(self, value: _AsyncIterable, *args: Any, **kwargs: Any) -> None: 

961 if not isinstance(value, AsyncIterable): 

962 raise TypeError( 

963 "value argument must support " 

964 "collections.abc.AsyncIterable interface, " 

965 "got {!r}".format(type(value)) 

966 ) 

967 

968 if "content_type" not in kwargs: 

969 kwargs["content_type"] = "application/octet-stream" 

970 

971 super().__init__(value, *args, **kwargs) 

972 

973 self._iter = value.__aiter__() 

974 

975 async def write(self, writer: AbstractStreamWriter) -> None: 

976 """ 

977 Write the entire async iterable payload to the writer stream. 

978 

979 Args: 

980 writer: An AbstractStreamWriter instance that handles the actual writing 

981 

982 This method iterates through the async iterable and writes each chunk 

983 to the writer without any length constraint. 

984 

985 Note: 

986 For new implementations that need length control, use write_with_length() directly. 

987 This method is maintained for backwards compatibility with existing code. 

988 

989 """ 

990 await self.write_with_length(writer, None) 

991 

992 async def write_with_length( 

993 self, writer: AbstractStreamWriter, content_length: Optional[int] 

994 ) -> None: 

995 """ 

996 Write async iterable payload with a specific content length constraint. 

997 

998 Args: 

999 writer: An AbstractStreamWriter instance that handles the actual writing 

1000 content_length: Maximum number of bytes to write (None for unlimited) 

1001 

1002 This implementation handles streaming of async iterable content with length constraints: 

1003 

1004 1. If cached chunks are available, writes from them 

1005 2. Otherwise iterates through the async iterable one chunk at a time 

1006 3. Respects content_length constraints when specified 

1007 4. Does NOT generate cache - that's done by as_bytes() 

1008 

1009 """ 

1010 # If we have cached chunks, use them 

1011 if self._cached_chunks is not None: 

1012 remaining_bytes = content_length 

1013 for chunk in self._cached_chunks: 

1014 if remaining_bytes is None: 

1015 await writer.write(chunk) 

1016 elif remaining_bytes > 0: 

1017 await writer.write(chunk[:remaining_bytes]) 

1018 remaining_bytes -= len(chunk) 

1019 else: 

1020 break 

1021 return 

1022 

1023 # If iterator is exhausted and we don't have cached chunks, nothing to write 

1024 if self._iter is None: 

1025 return 

1026 

1027 # Stream from the iterator 

1028 remaining_bytes = content_length 

1029 

1030 try: 

1031 while True: 

1032 if sys.version_info >= (3, 10): 

1033 chunk = await anext(self._iter) 

1034 else: 

1035 chunk = await self._iter.__anext__() 

1036 if remaining_bytes is None: 

1037 await writer.write(chunk) 

1038 # If we have a content length limit 

1039 elif remaining_bytes > 0: 

1040 await writer.write(chunk[:remaining_bytes]) 

1041 remaining_bytes -= len(chunk) 

1042 # We still want to exhaust the iterator even 

1043 # if we have reached the content length limit 

1044 # since the file handle may not get closed by 

1045 # the iterator if we don't do this 

1046 except StopAsyncIteration: 

1047 # Iterator is exhausted 

1048 self._iter = None 

1049 self._consumed = True # Mark as consumed when streamed without caching 

1050 

1051 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

1052 """Decode the payload content as a string if cached chunks are available.""" 

1053 if self._cached_chunks is not None: 

1054 return b"".join(self._cached_chunks).decode(encoding, errors) 

1055 raise TypeError("Unable to decode - content not cached. Call as_bytes() first.") 

1056 

1057 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

1058 """ 

1059 Return bytes representation of the value. 

1060 

1061 This method reads the entire async iterable content and returns it as bytes. 

1062 It generates and caches the chunks for future reuse. 

1063 """ 

1064 # If we have cached chunks, return them joined 

1065 if self._cached_chunks is not None: 

1066 return b"".join(self._cached_chunks) 

1067 

1068 # If iterator is exhausted and no cache, return empty 

1069 if self._iter is None: 

1070 return b"" 

1071 

1072 # Read all chunks and cache them 

1073 chunks: List[bytes] = [] 

1074 async for chunk in self._iter: 

1075 chunks.append(chunk) 

1076 

1077 # Iterator is exhausted, cache the chunks 

1078 self._iter = None 

1079 self._cached_chunks = chunks 

1080 # Keep _consumed as False to allow reuse with cached chunks 

1081 

1082 return b"".join(chunks) 

1083 

1084 

1085class StreamReaderPayload(AsyncIterablePayload): 

1086 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None: 

1087 super().__init__(value.iter_any(), *args, **kwargs) 

1088 

1089 

1090PAYLOAD_REGISTRY = PayloadRegistry() 

1091PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview)) 

1092PAYLOAD_REGISTRY.register(StringPayload, str) 

1093PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO) 

1094PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase) 

1095PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO) 

1096PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom)) 

1097PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase) 

1098PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader) 

1099# try_last for giving a chance to more specialized async interables like 

1100# multipart.BodyPartReaderPayload override the default 

1101PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)