Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/payload.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

392 statements  

1import asyncio 

2import enum 

3import io 

4import json 

5import mimetypes 

6import os 

7import sys 

8import warnings 

9from abc import ABC, abstractmethod 

10from collections.abc import Iterable 

11from itertools import chain 

12from typing import ( 

13 IO, 

14 TYPE_CHECKING, 

15 Any, 

16 Dict, 

17 Final, 

18 List, 

19 Optional, 

20 Set, 

21 TextIO, 

22 Tuple, 

23 Type, 

24 Union, 

25) 

26 

27from multidict import CIMultiDict 

28 

29from . import hdrs 

30from .abc import AbstractStreamWriter 

31from .helpers import ( 

32 _SENTINEL, 

33 content_disposition_header, 

34 guess_filename, 

35 parse_mimetype, 

36 sentinel, 

37) 

38from .streams import StreamReader 

39from .typedefs import JSONEncoder, _CIMultiDict 

40 

41__all__ = ( 

42 "PAYLOAD_REGISTRY", 

43 "get_payload", 

44 "payload_type", 

45 "Payload", 

46 "BytesPayload", 

47 "StringPayload", 

48 "IOBasePayload", 

49 "BytesIOPayload", 

50 "BufferedReaderPayload", 

51 "TextIOPayload", 

52 "StringIOPayload", 

53 "JsonPayload", 

54 "AsyncIterablePayload", 

55) 

56 

57TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB 

58READ_SIZE: Final[int] = 2**16 # 64 KB 

59_CLOSE_FUTURES: Set[asyncio.Future[None]] = set() 

60 

61 

62class LookupError(Exception): 

63 """Raised when no payload factory is found for the given data type.""" 

64 

65 

66class Order(str, enum.Enum): 

67 normal = "normal" 

68 try_first = "try_first" 

69 try_last = "try_last" 

70 

71 

72def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload": 

73 return PAYLOAD_REGISTRY.get(data, *args, **kwargs) 

74 

75 

76def register_payload( 

77 factory: Type["Payload"], type: Any, *, order: Order = Order.normal 

78) -> None: 

79 PAYLOAD_REGISTRY.register(factory, type, order=order) 

80 

81 

82class payload_type: 

83 def __init__(self, type: Any, *, order: Order = Order.normal) -> None: 

84 self.type = type 

85 self.order = order 

86 

87 def __call__(self, factory: Type["Payload"]) -> Type["Payload"]: 

88 register_payload(factory, self.type, order=self.order) 

89 return factory 

90 

91 

92PayloadType = Type["Payload"] 

93_PayloadRegistryItem = Tuple[PayloadType, Any] 

94 

95 

96class PayloadRegistry: 

97 """Payload registry. 

98 

99 note: we need zope.interface for more efficient adapter search 

100 """ 

101 

102 __slots__ = ("_first", "_normal", "_last", "_normal_lookup") 

103 

104 def __init__(self) -> None: 

105 self._first: List[_PayloadRegistryItem] = [] 

106 self._normal: List[_PayloadRegistryItem] = [] 

107 self._last: List[_PayloadRegistryItem] = [] 

108 self._normal_lookup: Dict[Any, PayloadType] = {} 

109 

110 def get( 

111 self, 

112 data: Any, 

113 *args: Any, 

114 _CHAIN: "Type[chain[_PayloadRegistryItem]]" = chain, 

115 **kwargs: Any, 

116 ) -> "Payload": 

117 if self._first: 

118 for factory, type_ in self._first: 

119 if isinstance(data, type_): 

120 return factory(data, *args, **kwargs) 

121 # Try the fast lookup first 

122 if lookup_factory := self._normal_lookup.get(type(data)): 

123 return lookup_factory(data, *args, **kwargs) 

124 # Bail early if its already a Payload 

125 if isinstance(data, Payload): 

126 return data 

127 # Fallback to the slower linear search 

128 for factory, type_ in _CHAIN(self._normal, self._last): 

129 if isinstance(data, type_): 

130 return factory(data, *args, **kwargs) 

131 raise LookupError() 

132 

133 def register( 

134 self, factory: PayloadType, type: Any, *, order: Order = Order.normal 

135 ) -> None: 

136 if order is Order.try_first: 

137 self._first.append((factory, type)) 

138 elif order is Order.normal: 

139 self._normal.append((factory, type)) 

140 if isinstance(type, Iterable): 

141 for t in type: 

142 self._normal_lookup[t] = factory 

143 else: 

144 self._normal_lookup[type] = factory 

145 elif order is Order.try_last: 

146 self._last.append((factory, type)) 

147 else: 

148 raise ValueError(f"Unsupported order {order!r}") 

149 

150 

151class Payload(ABC): 

152 

153 _default_content_type: str = "application/octet-stream" 

154 _size: Optional[int] = None 

155 _consumed: bool = False # Default: payload has not been consumed yet 

156 _autoclose: bool = False # Default: assume resource needs explicit closing 

157 

158 def __init__( 

159 self, 

160 value: Any, 

161 headers: Optional[ 

162 Union[_CIMultiDict, Dict[str, str], Iterable[Tuple[str, str]]] 

163 ] = None, 

164 content_type: Union[str, None, _SENTINEL] = sentinel, 

165 filename: Optional[str] = None, 

166 encoding: Optional[str] = None, 

167 **kwargs: Any, 

168 ) -> None: 

169 self._encoding = encoding 

170 self._filename = filename 

171 self._headers: _CIMultiDict = CIMultiDict() 

172 self._value = value 

173 if content_type is not sentinel and content_type is not None: 

174 self._headers[hdrs.CONTENT_TYPE] = content_type 

175 elif self._filename is not None: 

176 if sys.version_info >= (3, 13): 

177 guesser = mimetypes.guess_file_type 

178 else: 

179 guesser = mimetypes.guess_type 

180 content_type = guesser(self._filename)[0] 

181 if content_type is None: 

182 content_type = self._default_content_type 

183 self._headers[hdrs.CONTENT_TYPE] = content_type 

184 else: 

185 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type 

186 if headers: 

187 self._headers.update(headers) 

188 

189 @property 

190 def size(self) -> Optional[int]: 

191 """Size of the payload in bytes. 

192 

193 Returns the number of bytes that will be transmitted when the payload 

194 is written. For string payloads, this is the size after encoding to bytes, 

195 not the length of the string. 

196 """ 

197 return self._size 

198 

199 @property 

200 def filename(self) -> Optional[str]: 

201 """Filename of the payload.""" 

202 return self._filename 

203 

204 @property 

205 def headers(self) -> _CIMultiDict: 

206 """Custom item headers""" 

207 return self._headers 

208 

209 @property 

210 def _binary_headers(self) -> bytes: 

211 return ( 

212 "".join([k + ": " + v + "\r\n" for k, v in self.headers.items()]).encode( 

213 "utf-8" 

214 ) 

215 + b"\r\n" 

216 ) 

217 

218 @property 

219 def encoding(self) -> Optional[str]: 

220 """Payload encoding""" 

221 return self._encoding 

222 

223 @property 

224 def content_type(self) -> str: 

225 """Content type""" 

226 return self._headers[hdrs.CONTENT_TYPE] 

227 

228 @property 

229 def consumed(self) -> bool: 

230 """Whether the payload has been consumed and cannot be reused.""" 

231 return self._consumed 

232 

233 @property 

234 def autoclose(self) -> bool: 

235 """ 

236 Whether the payload can close itself automatically. 

237 

238 Returns True if the payload has no file handles or resources that need 

239 explicit closing. If False, callers must await close() to release resources. 

240 """ 

241 return self._autoclose 

242 

243 def set_content_disposition( 

244 self, 

245 disptype: str, 

246 quote_fields: bool = True, 

247 _charset: str = "utf-8", 

248 **params: Any, 

249 ) -> None: 

250 """Sets ``Content-Disposition`` header.""" 

251 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header( 

252 disptype, quote_fields=quote_fields, _charset=_charset, **params 

253 ) 

254 

255 @abstractmethod 

256 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

257 """ 

258 Return string representation of the value. 

259 

260 This is named decode() to allow compatibility with bytes objects. 

261 """ 

262 

263 @abstractmethod 

264 async def write(self, writer: AbstractStreamWriter) -> None: 

265 """ 

266 Write payload to the writer stream. 

267 

268 Args: 

269 writer: An AbstractStreamWriter instance that handles the actual writing 

270 

271 This is a legacy method that writes the entire payload without length constraints. 

272 

273 Important: 

274 For new implementations, use write_with_length() instead of this method. 

275 This method is maintained for backwards compatibility and will eventually 

276 delegate to write_with_length(writer, None) in all implementations. 

277 

278 All payload subclasses must override this method for backwards compatibility, 

279 but new code should use write_with_length for more flexibility and control. 

280 

281 """ 

282 

283 # write_with_length is new in aiohttp 3.12 

284 # it should be overridden by subclasses 

285 async def write_with_length( 

286 self, writer: AbstractStreamWriter, content_length: Optional[int] 

287 ) -> None: 

288 """ 

289 Write payload with a specific content length constraint. 

290 

291 Args: 

292 writer: An AbstractStreamWriter instance that handles the actual writing 

293 content_length: Maximum number of bytes to write (None for unlimited) 

294 

295 This method allows writing payload content with a specific length constraint, 

296 which is particularly useful for HTTP responses with Content-Length header. 

297 

298 Note: 

299 This is the base implementation that provides backwards compatibility 

300 for subclasses that don't override this method. Specific payload types 

301 should override this method to implement proper length-constrained writing. 

302 

303 """ 

304 # Backwards compatibility for subclasses that don't override this method 

305 # and for the default implementation 

306 await self.write(writer) 

307 

308 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

309 """ 

310 Return bytes representation of the value. 

311 

312 This is a convenience method that calls decode() and encodes the result 

313 to bytes using the specified encoding. 

314 """ 

315 # Use instance encoding if available, otherwise use parameter 

316 actual_encoding = self._encoding or encoding 

317 return self.decode(actual_encoding, errors).encode(actual_encoding) 

318 

319 def _close(self) -> None: 

320 """ 

321 Async safe synchronous close operations for backwards compatibility. 

322 

323 This method exists only for backwards compatibility with code that 

324 needs to clean up payloads synchronously. In the future, we will 

325 drop this method and only support the async close() method. 

326 

327 WARNING: This method must be safe to call from within the event loop 

328 without blocking. Subclasses should not perform any blocking I/O here. 

329 

330 WARNING: This method must be called from within an event loop for 

331 certain payload types (e.g., IOBasePayload). Calling it outside an 

332 event loop may raise RuntimeError. 

333 """ 

334 # This is a no-op by default, but subclasses can override it 

335 # for non-blocking cleanup operations. 

336 

337 async def close(self) -> None: 

338 """ 

339 Close the payload if it holds any resources. 

340 

341 IMPORTANT: This method must not await anything that might not finish 

342 immediately, as it may be called during cleanup/cancellation. Schedule 

343 any long-running operations without awaiting them. 

344 

345 In the future, this will be the only close method supported. 

346 """ 

347 self._close() 

348 

349 

350class BytesPayload(Payload): 

351 _value: bytes 

352 # _consumed = False (inherited) - Bytes are immutable and can be reused 

353 _autoclose = True # No file handle, just bytes in memory 

354 

355 def __init__( 

356 self, value: Union[bytes, bytearray, memoryview], *args: Any, **kwargs: Any 

357 ) -> None: 

358 if "content_type" not in kwargs: 

359 kwargs["content_type"] = "application/octet-stream" 

360 

361 super().__init__(value, *args, **kwargs) 

362 

363 if isinstance(value, memoryview): 

364 self._size = value.nbytes 

365 elif isinstance(value, (bytes, bytearray)): 

366 self._size = len(value) 

367 else: 

368 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}") 

369 

370 if self._size > TOO_LARGE_BYTES_BODY: 

371 kwargs = {"source": self} 

372 warnings.warn( 

373 "Sending a large body directly with raw bytes might" 

374 " lock the event loop. You should probably pass an " 

375 "io.BytesIO object instead", 

376 ResourceWarning, 

377 **kwargs, 

378 ) 

379 

380 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

381 return self._value.decode(encoding, errors) 

382 

383 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

384 """ 

385 Return bytes representation of the value. 

386 

387 This method returns the raw bytes content of the payload. 

388 It is equivalent to accessing the _value attribute directly. 

389 """ 

390 return self._value 

391 

392 async def write(self, writer: AbstractStreamWriter) -> None: 

393 """ 

394 Write the entire bytes payload to the writer stream. 

395 

396 Args: 

397 writer: An AbstractStreamWriter instance that handles the actual writing 

398 

399 This method writes the entire bytes content without any length constraint. 

400 

401 Note: 

402 For new implementations that need length control, use write_with_length(). 

403 This method is maintained for backwards compatibility and is equivalent 

404 to write_with_length(writer, None). 

405 

406 """ 

407 await writer.write(self._value) 

408 

409 async def write_with_length( 

410 self, writer: AbstractStreamWriter, content_length: Optional[int] 

411 ) -> None: 

412 """ 

413 Write bytes payload with a specific content length constraint. 

414 

415 Args: 

416 writer: An AbstractStreamWriter instance that handles the actual writing 

417 content_length: Maximum number of bytes to write (None for unlimited) 

418 

419 This method writes either the entire byte sequence or a slice of it 

420 up to the specified content_length. For BytesPayload, this operation 

421 is performed efficiently using array slicing. 

422 

423 """ 

424 if content_length is not None: 

425 await writer.write(self._value[:content_length]) 

426 else: 

427 await writer.write(self._value) 

428 

429 

430class StringPayload(BytesPayload): 

431 def __init__( 

432 self, 

433 value: str, 

434 *args: Any, 

435 encoding: Optional[str] = None, 

436 content_type: Optional[str] = None, 

437 **kwargs: Any, 

438 ) -> None: 

439 

440 if encoding is None: 

441 if content_type is None: 

442 real_encoding = "utf-8" 

443 content_type = "text/plain; charset=utf-8" 

444 else: 

445 mimetype = parse_mimetype(content_type) 

446 real_encoding = mimetype.parameters.get("charset", "utf-8") 

447 else: 

448 if content_type is None: 

449 content_type = "text/plain; charset=%s" % encoding 

450 real_encoding = encoding 

451 

452 super().__init__( 

453 value.encode(real_encoding), 

454 encoding=real_encoding, 

455 content_type=content_type, 

456 *args, 

457 **kwargs, 

458 ) 

459 

460 

461class StringIOPayload(StringPayload): 

462 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None: 

463 super().__init__(value.read(), *args, **kwargs) 

464 

465 

466class IOBasePayload(Payload): 

467 _value: io.IOBase 

468 # _consumed = False (inherited) - File can be re-read from the same position 

469 _start_position: Optional[int] = None 

470 # _autoclose = False (inherited) - Has file handle that needs explicit closing 

471 

472 def __init__( 

473 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any 

474 ) -> None: 

475 if "filename" not in kwargs: 

476 kwargs["filename"] = guess_filename(value) 

477 

478 super().__init__(value, *args, **kwargs) 

479 

480 if self._filename is not None and disposition is not None: 

481 if hdrs.CONTENT_DISPOSITION not in self.headers: 

482 self.set_content_disposition(disposition, filename=self._filename) 

483 

484 def _set_or_restore_start_position(self) -> None: 

485 """Set or restore the start position of the file-like object.""" 

486 if self._start_position is None: 

487 try: 

488 self._start_position = self._value.tell() 

489 except (OSError, AttributeError): 

490 self._consumed = True # Cannot seek, mark as consumed 

491 return 

492 try: 

493 self._value.seek(self._start_position) 

494 except (OSError, AttributeError): 

495 # Failed to seek back - mark as consumed since we've already read 

496 self._consumed = True 

497 

498 def _read_and_available_len( 

499 self, remaining_content_len: Optional[int] 

500 ) -> Tuple[Optional[int], bytes]: 

501 """ 

502 Read the file-like object and return both its total size and the first chunk. 

503 

504 Args: 

505 remaining_content_len: Optional limit on how many bytes to read in this operation. 

506 If None, READ_SIZE will be used as the default chunk size. 

507 

508 Returns: 

509 A tuple containing: 

510 - The total size of the remaining unread content (None if size cannot be determined) 

511 - The first chunk of bytes read from the file object 

512 

513 This method is optimized to perform both size calculation and initial read 

514 in a single operation, which is executed in a single executor job to minimize 

515 context switches and file operations when streaming content. 

516 

517 """ 

518 self._set_or_restore_start_position() 

519 size = self.size # Call size only once since it does I/O 

520 return size, self._value.read( 

521 min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE) 

522 ) 

523 

524 def _read(self, remaining_content_len: Optional[int]) -> bytes: 

525 """ 

526 Read a chunk of data from the file-like object. 

527 

528 Args: 

529 remaining_content_len: Optional maximum number of bytes to read. 

530 If None, READ_SIZE will be used as the default chunk size. 

531 

532 Returns: 

533 A chunk of bytes read from the file object, respecting the 

534 remaining_content_len limit if specified. 

535 

536 This method is used for subsequent reads during streaming after 

537 the initial _read_and_available_len call has been made. 

538 

539 """ 

540 return self._value.read(remaining_content_len or READ_SIZE) # type: ignore[no-any-return] 

541 

542 @property 

543 def size(self) -> Optional[int]: 

544 """ 

545 Size of the payload in bytes. 

546 

547 Returns the total size of the payload content from the initial position. 

548 This ensures consistent Content-Length for requests, including 307/308 redirects 

549 where the same payload instance is reused. 

550 

551 Returns None if the size cannot be determined (e.g., for unseekable streams). 

552 """ 

553 try: 

554 # Store the start position on first access. 

555 # This is critical when the same payload instance is reused (e.g., 307/308 

556 # redirects). Without storing the initial position, after the payload is 

557 # read once, the file position would be at EOF, which would cause the 

558 # size calculation to return 0 (file_size - EOF position). 

559 # By storing the start position, we ensure the size calculation always 

560 # returns the correct total size for any subsequent use. 

561 if self._start_position is None: 

562 self._start_position = self._value.tell() 

563 

564 # Return the total size from the start position 

565 # This ensures Content-Length is correct even after reading 

566 return os.fstat(self._value.fileno()).st_size - self._start_position 

567 except (AttributeError, OSError): 

568 return None 

569 

570 async def write(self, writer: AbstractStreamWriter) -> None: 

571 """ 

572 Write the entire file-like payload to the writer stream. 

573 

574 Args: 

575 writer: An AbstractStreamWriter instance that handles the actual writing 

576 

577 This method writes the entire file content without any length constraint. 

578 It delegates to write_with_length() with no length limit for implementation 

579 consistency. 

580 

581 Note: 

582 For new implementations that need length control, use write_with_length() directly. 

583 This method is maintained for backwards compatibility with existing code. 

584 

585 """ 

586 await self.write_with_length(writer, None) 

587 

588 async def write_with_length( 

589 self, writer: AbstractStreamWriter, content_length: Optional[int] 

590 ) -> None: 

591 """ 

592 Write file-like payload with a specific content length constraint. 

593 

594 Args: 

595 writer: An AbstractStreamWriter instance that handles the actual writing 

596 content_length: Maximum number of bytes to write (None for unlimited) 

597 

598 This method implements optimized streaming of file content with length constraints: 

599 

600 1. File reading is performed in a thread pool to avoid blocking the event loop 

601 2. Content is read and written in chunks to maintain memory efficiency 

602 3. Writing stops when either: 

603 - All available file content has been written (when size is known) 

604 - The specified content_length has been reached 

605 4. File resources are properly closed even if the operation is cancelled 

606 

607 The implementation carefully handles both known-size and unknown-size payloads, 

608 as well as constrained and unconstrained content lengths. 

609 

610 """ 

611 loop = asyncio.get_running_loop() 

612 total_written_len = 0 

613 remaining_content_len = content_length 

614 

615 # Get initial data and available length 

616 available_len, chunk = await loop.run_in_executor( 

617 None, self._read_and_available_len, remaining_content_len 

618 ) 

619 # Process data chunks until done 

620 while chunk: 

621 chunk_len = len(chunk) 

622 

623 # Write data with or without length constraint 

624 if remaining_content_len is None: 

625 await writer.write(chunk) 

626 else: 

627 await writer.write(chunk[:remaining_content_len]) 

628 remaining_content_len -= chunk_len 

629 

630 total_written_len += chunk_len 

631 

632 # Check if we're done writing 

633 if self._should_stop_writing( 

634 available_len, total_written_len, remaining_content_len 

635 ): 

636 return 

637 

638 # Read next chunk 

639 chunk = await loop.run_in_executor( 

640 None, 

641 self._read, 

642 ( 

643 min(READ_SIZE, remaining_content_len) 

644 if remaining_content_len is not None 

645 else READ_SIZE 

646 ), 

647 ) 

648 

649 def _should_stop_writing( 

650 self, 

651 available_len: Optional[int], 

652 total_written_len: int, 

653 remaining_content_len: Optional[int], 

654 ) -> bool: 

655 """ 

656 Determine if we should stop writing data. 

657 

658 Args: 

659 available_len: Known size of the payload if available (None if unknown) 

660 total_written_len: Number of bytes already written 

661 remaining_content_len: Remaining bytes to be written for content-length limited responses 

662 

663 Returns: 

664 True if we should stop writing data, based on either: 

665 - Having written all available data (when size is known) 

666 - Having written all requested content (when content-length is specified) 

667 

668 """ 

669 return (available_len is not None and total_written_len >= available_len) or ( 

670 remaining_content_len is not None and remaining_content_len <= 0 

671 ) 

672 

673 def _close(self) -> None: 

674 """ 

675 Async safe synchronous close operations for backwards compatibility. 

676 

677 This method exists only for backwards 

678 compatibility. Use the async close() method instead. 

679 

680 WARNING: This method MUST be called from within an event loop. 

681 Calling it outside an event loop will raise RuntimeError. 

682 """ 

683 # Skip if already consumed 

684 if self._consumed: 

685 return 

686 self._consumed = True # Mark as consumed to prevent further writes 

687 # Schedule file closing without awaiting to prevent cancellation issues 

688 loop = asyncio.get_running_loop() 

689 close_future = loop.run_in_executor(None, self._value.close) 

690 # Hold a strong reference to the future to prevent it from being 

691 # garbage collected before it completes. 

692 _CLOSE_FUTURES.add(close_future) 

693 close_future.add_done_callback(_CLOSE_FUTURES.remove) 

694 

695 async def close(self) -> None: 

696 """ 

697 Close the payload if it holds any resources. 

698 

699 IMPORTANT: This method must not await anything that might not finish 

700 immediately, as it may be called during cleanup/cancellation. Schedule 

701 any long-running operations without awaiting them. 

702 """ 

703 self._close() 

704 

705 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

706 """ 

707 Return string representation of the value. 

708 

709 WARNING: This method does blocking I/O and should not be called in the event loop. 

710 """ 

711 return self._read_all().decode(encoding, errors) 

712 

713 def _read_all(self) -> bytes: 

714 """Read the entire file-like object and return its content as bytes.""" 

715 self._set_or_restore_start_position() 

716 # Use readlines() to ensure we get all content 

717 return b"".join(self._value.readlines()) 

718 

719 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

720 """ 

721 Return bytes representation of the value. 

722 

723 This method reads the entire file content and returns it as bytes. 

724 It is equivalent to reading the file-like object directly. 

725 The file reading is performed in an executor to avoid blocking the event loop. 

726 """ 

727 loop = asyncio.get_running_loop() 

728 return await loop.run_in_executor(None, self._read_all) 

729 

730 

731class TextIOPayload(IOBasePayload): 

732 _value: io.TextIOBase 

733 # _autoclose = False (inherited) - Has text file handle that needs explicit closing 

734 

735 def __init__( 

736 self, 

737 value: TextIO, 

738 *args: Any, 

739 encoding: Optional[str] = None, 

740 content_type: Optional[str] = None, 

741 **kwargs: Any, 

742 ) -> None: 

743 

744 if encoding is None: 

745 if content_type is None: 

746 encoding = "utf-8" 

747 content_type = "text/plain; charset=utf-8" 

748 else: 

749 mimetype = parse_mimetype(content_type) 

750 encoding = mimetype.parameters.get("charset", "utf-8") 

751 else: 

752 if content_type is None: 

753 content_type = "text/plain; charset=%s" % encoding 

754 

755 super().__init__( 

756 value, 

757 content_type=content_type, 

758 encoding=encoding, 

759 *args, 

760 **kwargs, 

761 ) 

762 

763 def _read_and_available_len( 

764 self, remaining_content_len: Optional[int] 

765 ) -> Tuple[Optional[int], bytes]: 

766 """ 

767 Read the text file-like object and return both its total size and the first chunk. 

768 

769 Args: 

770 remaining_content_len: Optional limit on how many bytes to read in this operation. 

771 If None, READ_SIZE will be used as the default chunk size. 

772 

773 Returns: 

774 A tuple containing: 

775 - The total size of the remaining unread content (None if size cannot be determined) 

776 - The first chunk of bytes read from the file object, encoded using the payload's encoding 

777 

778 This method is optimized to perform both size calculation and initial read 

779 in a single operation, which is executed in a single executor job to minimize 

780 context switches and file operations when streaming content. 

781 

782 Note: 

783 TextIOPayload handles encoding of the text content before writing it 

784 to the stream. If no encoding is specified, UTF-8 is used as the default. 

785 

786 """ 

787 self._set_or_restore_start_position() 

788 size = self.size 

789 chunk = self._value.read( 

790 min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE) 

791 ) 

792 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode() 

793 

794 def _read(self, remaining_content_len: Optional[int]) -> bytes: 

795 """ 

796 Read a chunk of data from the text file-like object. 

797 

798 Args: 

799 remaining_content_len: Optional maximum number of bytes to read. 

800 If None, READ_SIZE will be used as the default chunk size. 

801 

802 Returns: 

803 A chunk of bytes read from the file object and encoded using the payload's 

804 encoding. The data is automatically converted from text to bytes. 

805 

806 This method is used for subsequent reads during streaming after 

807 the initial _read_and_available_len call has been made. It properly 

808 handles text encoding, converting the text content to bytes using 

809 the specified encoding (or UTF-8 if none was provided). 

810 

811 """ 

812 chunk = self._value.read(remaining_content_len or READ_SIZE) 

813 return chunk.encode(self._encoding) if self._encoding else chunk.encode() 

814 

815 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

816 """ 

817 Return string representation of the value. 

818 

819 WARNING: This method does blocking I/O and should not be called in the event loop. 

820 """ 

821 self._set_or_restore_start_position() 

822 return self._value.read() 

823 

824 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

825 """ 

826 Return bytes representation of the value. 

827 

828 This method reads the entire text file content and returns it as bytes. 

829 It encodes the text content using the specified encoding. 

830 The file reading is performed in an executor to avoid blocking the event loop. 

831 """ 

832 loop = asyncio.get_running_loop() 

833 

834 # Use instance encoding if available, otherwise use parameter 

835 actual_encoding = self._encoding or encoding 

836 

837 def _read_and_encode() -> bytes: 

838 self._set_or_restore_start_position() 

839 # TextIO read() always returns the full content 

840 return self._value.read().encode(actual_encoding, errors) 

841 

842 return await loop.run_in_executor(None, _read_and_encode) 

843 

844 

845class BytesIOPayload(IOBasePayload): 

846 _value: io.BytesIO 

847 _size: int # Always initialized in __init__ 

848 _autoclose = True # BytesIO is in-memory, safe to auto-close 

849 

850 def __init__(self, value: io.BytesIO, *args: Any, **kwargs: Any) -> None: 

851 super().__init__(value, *args, **kwargs) 

852 # Calculate size once during initialization 

853 self._size = len(self._value.getbuffer()) - self._value.tell() 

854 

855 @property 

856 def size(self) -> int: 

857 """Size of the payload in bytes. 

858 

859 Returns the number of bytes in the BytesIO buffer that will be transmitted. 

860 This is calculated once during initialization for efficiency. 

861 """ 

862 return self._size 

863 

864 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

865 self._set_or_restore_start_position() 

866 return self._value.read().decode(encoding, errors) 

867 

868 async def write(self, writer: AbstractStreamWriter) -> None: 

869 return await self.write_with_length(writer, None) 

870 

871 async def write_with_length( 

872 self, writer: AbstractStreamWriter, content_length: Optional[int] 

873 ) -> None: 

874 """ 

875 Write BytesIO payload with a specific content length constraint. 

876 

877 Args: 

878 writer: An AbstractStreamWriter instance that handles the actual writing 

879 content_length: Maximum number of bytes to write (None for unlimited) 

880 

881 This implementation is specifically optimized for BytesIO objects: 

882 

883 1. Reads content in chunks to maintain memory efficiency 

884 2. Yields control back to the event loop periodically to prevent blocking 

885 when dealing with large BytesIO objects 

886 3. Respects content_length constraints when specified 

887 4. Properly cleans up by closing the BytesIO object when done or on error 

888 

889 The periodic yielding to the event loop is important for maintaining 

890 responsiveness when processing large in-memory buffers. 

891 

892 """ 

893 self._set_or_restore_start_position() 

894 loop_count = 0 

895 remaining_bytes = content_length 

896 while chunk := self._value.read(READ_SIZE): 

897 if loop_count > 0: 

898 # Avoid blocking the event loop 

899 # if they pass a large BytesIO object 

900 # and we are not in the first iteration 

901 # of the loop 

902 await asyncio.sleep(0) 

903 if remaining_bytes is None: 

904 await writer.write(chunk) 

905 else: 

906 await writer.write(chunk[:remaining_bytes]) 

907 remaining_bytes -= len(chunk) 

908 if remaining_bytes <= 0: 

909 return 

910 loop_count += 1 

911 

912 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

913 """ 

914 Return bytes representation of the value. 

915 

916 This method reads the entire BytesIO content and returns it as bytes. 

917 It is equivalent to accessing the _value attribute directly. 

918 """ 

919 self._set_or_restore_start_position() 

920 return self._value.read() 

921 

922 async def close(self) -> None: 

923 """ 

924 Close the BytesIO payload. 

925 

926 This does nothing since BytesIO is in-memory and does not require explicit closing. 

927 """ 

928 

929 

930class BufferedReaderPayload(IOBasePayload): 

931 _value: io.BufferedIOBase 

932 # _autoclose = False (inherited) - Has buffered file handle that needs explicit closing 

933 

934 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

935 self._set_or_restore_start_position() 

936 return self._value.read().decode(encoding, errors) 

937 

938 

939class JsonPayload(BytesPayload): 

940 def __init__( 

941 self, 

942 value: Any, 

943 encoding: str = "utf-8", 

944 content_type: str = "application/json", 

945 dumps: JSONEncoder = json.dumps, 

946 *args: Any, 

947 **kwargs: Any, 

948 ) -> None: 

949 

950 super().__init__( 

951 dumps(value).encode(encoding), 

952 content_type=content_type, 

953 encoding=encoding, 

954 *args, 

955 **kwargs, 

956 ) 

957 

958 

959if TYPE_CHECKING: 

960 from typing import AsyncIterable, AsyncIterator 

961 

962 _AsyncIterator = AsyncIterator[bytes] 

963 _AsyncIterable = AsyncIterable[bytes] 

964else: 

965 from collections.abc import AsyncIterable, AsyncIterator 

966 

967 _AsyncIterator = AsyncIterator 

968 _AsyncIterable = AsyncIterable 

969 

970 

971class AsyncIterablePayload(Payload): 

972 

973 _iter: Optional[_AsyncIterator] = None 

974 _value: _AsyncIterable 

975 _cached_chunks: Optional[List[bytes]] = None 

976 # _consumed stays False to allow reuse with cached content 

977 _autoclose = True # Iterator doesn't need explicit closing 

978 

979 def __init__(self, value: _AsyncIterable, *args: Any, **kwargs: Any) -> None: 

980 if not isinstance(value, AsyncIterable): 

981 raise TypeError( 

982 "value argument must support " 

983 "collections.abc.AsyncIterable interface, " 

984 "got {!r}".format(type(value)) 

985 ) 

986 

987 if "content_type" not in kwargs: 

988 kwargs["content_type"] = "application/octet-stream" 

989 

990 super().__init__(value, *args, **kwargs) 

991 

992 self._iter = value.__aiter__() 

993 

994 async def write(self, writer: AbstractStreamWriter) -> None: 

995 """ 

996 Write the entire async iterable payload to the writer stream. 

997 

998 Args: 

999 writer: An AbstractStreamWriter instance that handles the actual writing 

1000 

1001 This method iterates through the async iterable and writes each chunk 

1002 to the writer without any length constraint. 

1003 

1004 Note: 

1005 For new implementations that need length control, use write_with_length() directly. 

1006 This method is maintained for backwards compatibility with existing code. 

1007 

1008 """ 

1009 await self.write_with_length(writer, None) 

1010 

1011 async def write_with_length( 

1012 self, writer: AbstractStreamWriter, content_length: Optional[int] 

1013 ) -> None: 

1014 """ 

1015 Write async iterable payload with a specific content length constraint. 

1016 

1017 Args: 

1018 writer: An AbstractStreamWriter instance that handles the actual writing 

1019 content_length: Maximum number of bytes to write (None for unlimited) 

1020 

1021 This implementation handles streaming of async iterable content with length constraints: 

1022 

1023 1. If cached chunks are available, writes from them 

1024 2. Otherwise iterates through the async iterable one chunk at a time 

1025 3. Respects content_length constraints when specified 

1026 4. Does NOT generate cache - that's done by as_bytes() 

1027 

1028 """ 

1029 # If we have cached chunks, use them 

1030 if self._cached_chunks is not None: 

1031 remaining_bytes = content_length 

1032 for chunk in self._cached_chunks: 

1033 if remaining_bytes is None: 

1034 await writer.write(chunk) 

1035 elif remaining_bytes > 0: 

1036 await writer.write(chunk[:remaining_bytes]) 

1037 remaining_bytes -= len(chunk) 

1038 else: 

1039 break 

1040 return 

1041 

1042 # If iterator is exhausted and we don't have cached chunks, nothing to write 

1043 if self._iter is None: 

1044 return 

1045 

1046 # Stream from the iterator 

1047 remaining_bytes = content_length 

1048 

1049 try: 

1050 while True: 

1051 if sys.version_info >= (3, 10): 

1052 chunk = await anext(self._iter) 

1053 else: 

1054 chunk = await self._iter.__anext__() 

1055 if remaining_bytes is None: 

1056 await writer.write(chunk) 

1057 # If we have a content length limit 

1058 elif remaining_bytes > 0: 

1059 await writer.write(chunk[:remaining_bytes]) 

1060 remaining_bytes -= len(chunk) 

1061 # We still want to exhaust the iterator even 

1062 # if we have reached the content length limit 

1063 # since the file handle may not get closed by 

1064 # the iterator if we don't do this 

1065 except StopAsyncIteration: 

1066 # Iterator is exhausted 

1067 self._iter = None 

1068 self._consumed = True # Mark as consumed when streamed without caching 

1069 

1070 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

1071 """Decode the payload content as a string if cached chunks are available.""" 

1072 if self._cached_chunks is not None: 

1073 return b"".join(self._cached_chunks).decode(encoding, errors) 

1074 raise TypeError("Unable to decode - content not cached. Call as_bytes() first.") 

1075 

1076 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

1077 """ 

1078 Return bytes representation of the value. 

1079 

1080 This method reads the entire async iterable content and returns it as bytes. 

1081 It generates and caches the chunks for future reuse. 

1082 """ 

1083 # If we have cached chunks, return them joined 

1084 if self._cached_chunks is not None: 

1085 return b"".join(self._cached_chunks) 

1086 

1087 # If iterator is exhausted and no cache, return empty 

1088 if self._iter is None: 

1089 return b"" 

1090 

1091 # Read all chunks and cache them 

1092 chunks: List[bytes] = [] 

1093 async for chunk in self._iter: 

1094 chunks.append(chunk) 

1095 

1096 # Iterator is exhausted, cache the chunks 

1097 self._iter = None 

1098 self._cached_chunks = chunks 

1099 # Keep _consumed as False to allow reuse with cached chunks 

1100 

1101 return b"".join(chunks) 

1102 

1103 

1104class StreamReaderPayload(AsyncIterablePayload): 

1105 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None: 

1106 super().__init__(value.iter_any(), *args, **kwargs) 

1107 

1108 

1109PAYLOAD_REGISTRY = PayloadRegistry() 

1110PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview)) 

1111PAYLOAD_REGISTRY.register(StringPayload, str) 

1112PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO) 

1113PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase) 

1114PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO) 

1115PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom)) 

1116PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase) 

1117PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader) 

1118# try_last for giving a chance to more specialized async interables like 

1119# multipart.BodyPartReaderPayload override the default 

1120PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)