Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/payload.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

393 statements  

1import asyncio 

2import enum 

3import io 

4import json 

5import mimetypes 

6import os 

7import sys 

8import warnings 

9from abc import ABC, abstractmethod 

10from collections.abc import Iterable 

11from itertools import chain 

12from typing import IO, TYPE_CHECKING, Any, Final, TextIO 

13 

14from multidict import CIMultiDict 

15 

16from . import hdrs 

17from .abc import AbstractStreamWriter 

18from .helpers import ( 

19 _SENTINEL, 

20 DEFAULT_CHUNK_SIZE, 

21 content_disposition_header, 

22 guess_filename, 

23 parse_mimetype, 

24 sentinel, 

25) 

26from .http_writer import _safe_header 

27from .streams import StreamReader 

28from .typedefs import JSONBytesEncoder, JSONEncoder, _CIMultiDict 

29 

30__all__ = ( 

31 "PAYLOAD_REGISTRY", 

32 "get_payload", 

33 "payload_type", 

34 "Payload", 

35 "BytesPayload", 

36 "StringPayload", 

37 "IOBasePayload", 

38 "BytesIOPayload", 

39 "BufferedReaderPayload", 

40 "TextIOPayload", 

41 "StringIOPayload", 

42 "JsonPayload", 

43 "JsonBytesPayload", 

44 "AsyncIterablePayload", 

45) 

46 

47TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB 

48_CLOSE_FUTURES: set[asyncio.Future[None]] = set() 

49 

50 

51class LookupError(Exception): 

52 """Raised when no payload factory is found for the given data type.""" 

53 

54 

55class Order(str, enum.Enum): 

56 normal = "normal" 

57 try_first = "try_first" 

58 try_last = "try_last" 

59 

60 

61def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload": 

62 return PAYLOAD_REGISTRY.get(data, *args, **kwargs) 

63 

64 

65def register_payload( 

66 factory: type["Payload"], type: Any, *, order: Order = Order.normal 

67) -> None: 

68 PAYLOAD_REGISTRY.register(factory, type, order=order) 

69 

70 

71class payload_type: 

72 def __init__(self, type: Any, *, order: Order = Order.normal) -> None: 

73 self.type = type 

74 self.order = order 

75 

76 def __call__(self, factory: type["Payload"]) -> type["Payload"]: 

77 register_payload(factory, self.type, order=self.order) 

78 return factory 

79 

80 

81PayloadType = type["Payload"] 

82_PayloadRegistryItem = tuple[PayloadType, Any] 

83 

84 

85class PayloadRegistry: 

86 """Payload registry. 

87 

88 note: we need zope.interface for more efficient adapter search 

89 """ 

90 

91 __slots__ = ("_first", "_normal", "_last", "_normal_lookup") 

92 

93 def __init__(self) -> None: 

94 self._first: list[_PayloadRegistryItem] = [] 

95 self._normal: list[_PayloadRegistryItem] = [] 

96 self._last: list[_PayloadRegistryItem] = [] 

97 self._normal_lookup: dict[Any, PayloadType] = {} 

98 

99 def get( 

100 self, 

101 data: Any, 

102 *args: Any, 

103 _CHAIN: "type[chain[_PayloadRegistryItem]]" = chain, 

104 **kwargs: Any, 

105 ) -> "Payload": 

106 if self._first: 

107 for factory, type_ in self._first: 

108 if isinstance(data, type_): 

109 return factory(data, *args, **kwargs) 

110 # Try the fast lookup first 

111 if lookup_factory := self._normal_lookup.get(type(data)): 

112 return lookup_factory(data, *args, **kwargs) 

113 # Bail early if its already a Payload 

114 if isinstance(data, Payload): 

115 return data 

116 # Fallback to the slower linear search 

117 for factory, type_ in _CHAIN(self._normal, self._last): 

118 if isinstance(data, type_): 

119 return factory(data, *args, **kwargs) 

120 raise LookupError() 

121 

122 def register( 

123 self, factory: PayloadType, type: Any, *, order: Order = Order.normal 

124 ) -> None: 

125 if order is Order.try_first: 

126 self._first.append((factory, type)) 

127 elif order is Order.normal: 

128 self._normal.append((factory, type)) 

129 if isinstance(type, Iterable): 

130 for t in type: 

131 self._normal_lookup[t] = factory 

132 else: 

133 self._normal_lookup[type] = factory 

134 elif order is Order.try_last: 

135 self._last.append((factory, type)) 

136 else: 

137 raise ValueError(f"Unsupported order {order!r}") 

138 

139 

140class Payload(ABC): 

141 

142 _default_content_type: str = "application/octet-stream" 

143 _size: int | None = None 

144 _consumed: bool = False # Default: payload has not been consumed yet 

145 _autoclose: bool = False # Default: assume resource needs explicit closing 

146 

147 def __init__( 

148 self, 

149 value: Any, 

150 headers: ( 

151 _CIMultiDict | dict[str, str] | Iterable[tuple[str, str]] | None 

152 ) = None, 

153 content_type: str | None | _SENTINEL = sentinel, 

154 filename: str | None = None, 

155 encoding: str | None = None, 

156 **kwargs: Any, 

157 ) -> None: 

158 self._encoding = encoding 

159 self._filename = filename 

160 self._headers: _CIMultiDict = CIMultiDict() 

161 self._value = value 

162 if content_type is not sentinel and content_type is not None: 

163 self._headers[hdrs.CONTENT_TYPE] = content_type 

164 elif self._filename is not None: 

165 if sys.version_info >= (3, 13): 

166 guesser = mimetypes.guess_file_type 

167 else: 

168 guesser = mimetypes.guess_type 

169 content_type = guesser(self._filename)[0] 

170 if content_type is None: 

171 content_type = self._default_content_type 

172 self._headers[hdrs.CONTENT_TYPE] = content_type 

173 else: 

174 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type 

175 if headers: 

176 self._headers.update(headers) 

177 

178 @property 

179 def size(self) -> int | None: 

180 """Size of the payload in bytes. 

181 

182 Returns the number of bytes that will be transmitted when the payload 

183 is written. For string payloads, this is the size after encoding to bytes, 

184 not the length of the string. 

185 """ 

186 return self._size 

187 

188 @property 

189 def filename(self) -> str | None: 

190 """Filename of the payload.""" 

191 return self._filename 

192 

193 @property 

194 def headers(self) -> _CIMultiDict: 

195 """Custom item headers""" 

196 return self._headers 

197 

198 @property 

199 def _binary_headers(self) -> bytes: 

200 return ( 

201 "".join( 

202 _safe_header(k) + ": " + _safe_header(v) + "\r\n" 

203 for k, v in self.headers.items() 

204 ).encode("utf-8") 

205 + b"\r\n" 

206 ) 

207 

208 @property 

209 def encoding(self) -> str | None: 

210 """Payload encoding""" 

211 return self._encoding 

212 

213 @property 

214 def content_type(self) -> str: 

215 """Content type""" 

216 return self._headers[hdrs.CONTENT_TYPE] 

217 

218 @property 

219 def consumed(self) -> bool: 

220 """Whether the payload has been consumed and cannot be reused.""" 

221 return self._consumed 

222 

223 @property 

224 def autoclose(self) -> bool: 

225 """ 

226 Whether the payload can close itself automatically. 

227 

228 Returns True if the payload has no file handles or resources that need 

229 explicit closing. If False, callers must await close() to release resources. 

230 """ 

231 return self._autoclose 

232 

233 def set_content_disposition( 

234 self, 

235 disptype: str, 

236 quote_fields: bool = True, 

237 _charset: str = "utf-8", 

238 **params: Any, 

239 ) -> None: 

240 """Sets ``Content-Disposition`` header.""" 

241 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header( 

242 disptype, quote_fields=quote_fields, _charset=_charset, **params 

243 ) 

244 

245 @abstractmethod 

246 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

247 """ 

248 Return string representation of the value. 

249 

250 This is named decode() to allow compatibility with bytes objects. 

251 """ 

252 

253 @abstractmethod 

254 async def write(self, writer: AbstractStreamWriter) -> None: 

255 """ 

256 Write payload to the writer stream. 

257 

258 Args: 

259 writer: An AbstractStreamWriter instance that handles the actual writing 

260 

261 This is a legacy method that writes the entire payload without length constraints. 

262 

263 Important: 

264 For new implementations, use write_with_length() instead of this method. 

265 This method is maintained for backwards compatibility and will eventually 

266 delegate to write_with_length(writer, None) in all implementations. 

267 

268 All payload subclasses must override this method for backwards compatibility, 

269 but new code should use write_with_length for more flexibility and control. 

270 

271 """ 

272 

273 # write_with_length is new in aiohttp 3.12 

274 # it should be overridden by subclasses 

275 async def write_with_length( 

276 self, writer: AbstractStreamWriter, content_length: int | None 

277 ) -> None: 

278 """ 

279 Write payload with a specific content length constraint. 

280 

281 Args: 

282 writer: An AbstractStreamWriter instance that handles the actual writing 

283 content_length: Maximum number of bytes to write (None for unlimited) 

284 

285 This method allows writing payload content with a specific length constraint, 

286 which is particularly useful for HTTP responses with Content-Length header. 

287 

288 Note: 

289 This is the base implementation that provides backwards compatibility 

290 for subclasses that don't override this method. Specific payload types 

291 should override this method to implement proper length-constrained writing. 

292 

293 """ 

294 # Backwards compatibility for subclasses that don't override this method 

295 # and for the default implementation 

296 await self.write(writer) 

297 

298 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

299 """ 

300 Return bytes representation of the value. 

301 

302 This is a convenience method that calls decode() and encodes the result 

303 to bytes using the specified encoding. 

304 """ 

305 # Use instance encoding if available, otherwise use parameter 

306 actual_encoding = self._encoding or encoding 

307 return self.decode(actual_encoding, errors).encode(actual_encoding) 

308 

309 def _close(self) -> None: 

310 """ 

311 Async safe synchronous close operations for backwards compatibility. 

312 

313 This method exists only for backwards compatibility with code that 

314 needs to clean up payloads synchronously. In the future, we will 

315 drop this method and only support the async close() method. 

316 

317 WARNING: This method must be safe to call from within the event loop 

318 without blocking. Subclasses should not perform any blocking I/O here. 

319 

320 WARNING: This method must be called from within an event loop for 

321 certain payload types (e.g., IOBasePayload). Calling it outside an 

322 event loop may raise RuntimeError. 

323 """ 

324 # This is a no-op by default, but subclasses can override it 

325 # for non-blocking cleanup operations. 

326 

327 async def close(self) -> None: 

328 """ 

329 Close the payload if it holds any resources. 

330 

331 IMPORTANT: This method must not await anything that might not finish 

332 immediately, as it may be called during cleanup/cancellation. Schedule 

333 any long-running operations without awaiting them. 

334 

335 In the future, this will be the only close method supported. 

336 """ 

337 self._close() 

338 

339 

340class BytesPayload(Payload): 

341 _value: bytes 

342 # _consumed = False (inherited) - Bytes are immutable and can be reused 

343 _autoclose = True # No file handle, just bytes in memory 

344 

345 def __init__( 

346 self, value: bytes | bytearray | memoryview, *args: Any, **kwargs: Any 

347 ) -> None: 

348 if "content_type" not in kwargs: 

349 kwargs["content_type"] = "application/octet-stream" 

350 

351 super().__init__(value, *args, **kwargs) 

352 

353 if isinstance(value, memoryview): 

354 self._size = value.nbytes 

355 elif isinstance(value, (bytes, bytearray)): 

356 self._size = len(value) 

357 else: 

358 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}") 

359 

360 if self._size > TOO_LARGE_BYTES_BODY: 

361 kwargs = {"source": self} 

362 warnings.warn( 

363 "Sending a large body directly with raw bytes might" 

364 " lock the event loop. You should probably pass an " 

365 "io.BytesIO object instead", 

366 ResourceWarning, 

367 **kwargs, 

368 ) 

369 

370 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

371 return self._value.decode(encoding, errors) 

372 

373 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

374 """ 

375 Return bytes representation of the value. 

376 

377 This method returns the raw bytes content of the payload. 

378 It is equivalent to accessing the _value attribute directly. 

379 """ 

380 return self._value 

381 

382 async def write(self, writer: AbstractStreamWriter) -> None: 

383 """ 

384 Write the entire bytes payload to the writer stream. 

385 

386 Args: 

387 writer: An AbstractStreamWriter instance that handles the actual writing 

388 

389 This method writes the entire bytes content without any length constraint. 

390 

391 Note: 

392 For new implementations that need length control, use write_with_length(). 

393 This method is maintained for backwards compatibility and is equivalent 

394 to write_with_length(writer, None). 

395 

396 """ 

397 await writer.write(self._value) 

398 

399 async def write_with_length( 

400 self, writer: AbstractStreamWriter, content_length: int | None 

401 ) -> None: 

402 """ 

403 Write bytes payload with a specific content length constraint. 

404 

405 Args: 

406 writer: An AbstractStreamWriter instance that handles the actual writing 

407 content_length: Maximum number of bytes to write (None for unlimited) 

408 

409 This method writes either the entire byte sequence or a slice of it 

410 up to the specified content_length. For BytesPayload, this operation 

411 is performed efficiently using array slicing. 

412 

413 """ 

414 if content_length is not None: 

415 await writer.write(self._value[:content_length]) 

416 else: 

417 await writer.write(self._value) 

418 

419 

420class StringPayload(BytesPayload): 

421 def __init__( 

422 self, 

423 value: str, 

424 *args: Any, 

425 encoding: str | None = None, 

426 content_type: str | None = None, 

427 **kwargs: Any, 

428 ) -> None: 

429 

430 if encoding is None: 

431 if content_type is None: 

432 real_encoding = "utf-8" 

433 content_type = "text/plain; charset=utf-8" 

434 else: 

435 mimetype = parse_mimetype(content_type) 

436 real_encoding = mimetype.parameters.get("charset", "utf-8") 

437 else: 

438 if content_type is None: 

439 content_type = "text/plain; charset=%s" % encoding 

440 real_encoding = encoding 

441 

442 super().__init__( 

443 value.encode(real_encoding), 

444 encoding=real_encoding, 

445 content_type=content_type, 

446 *args, 

447 **kwargs, 

448 ) 

449 

450 

451class StringIOPayload(StringPayload): 

452 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None: 

453 super().__init__(value.read(), *args, **kwargs) 

454 

455 

456class IOBasePayload(Payload): 

457 _value: io.IOBase 

458 # _consumed = False (inherited) - File can be re-read from the same position 

459 _start_position: int | None = None 

460 # _autoclose = False (inherited) - Has file handle that needs explicit closing 

461 

462 def __init__( 

463 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any 

464 ) -> None: 

465 if "filename" not in kwargs: 

466 kwargs["filename"] = guess_filename(value) 

467 

468 super().__init__(value, *args, **kwargs) 

469 

470 if self._filename is not None and disposition is not None: 

471 if hdrs.CONTENT_DISPOSITION not in self.headers: 

472 self.set_content_disposition(disposition, filename=self._filename) 

473 

474 def _set_or_restore_start_position(self) -> None: 

475 """Set or restore the start position of the file-like object.""" 

476 if self._start_position is None: 

477 try: 

478 self._start_position = self._value.tell() 

479 except (OSError, AttributeError): 

480 self._consumed = True # Cannot seek, mark as consumed 

481 return 

482 try: 

483 self._value.seek(self._start_position) 

484 except (OSError, AttributeError): 

485 # Failed to seek back - mark as consumed since we've already read 

486 self._consumed = True 

487 

488 def _read_and_available_len( 

489 self, remaining_content_len: int | None 

490 ) -> tuple[int | None, bytes]: 

491 """ 

492 Read the file-like object and return both its total size and the first chunk. 

493 

494 Args: 

495 remaining_content_len: Optional limit on how many bytes to read in this operation. 

496 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size. 

497 

498 Returns: 

499 A tuple containing: 

500 - The total size of the remaining unread content (None if size cannot be determined) 

501 - The first chunk of bytes read from the file object 

502 

503 This method is optimized to perform both size calculation and initial read 

504 in a single operation, which is executed in a single executor job to minimize 

505 context switches and file operations when streaming content. 

506 

507 """ 

508 self._set_or_restore_start_position() 

509 size = self.size # Call size only once since it does I/O 

510 return size, self._value.read( 

511 min( 

512 DEFAULT_CHUNK_SIZE, 

513 size or DEFAULT_CHUNK_SIZE, 

514 remaining_content_len or DEFAULT_CHUNK_SIZE, 

515 ) 

516 ) 

517 

518 def _read(self, remaining_content_len: int | None) -> bytes: 

519 """ 

520 Read a chunk of data from the file-like object. 

521 

522 Args: 

523 remaining_content_len: Optional maximum number of bytes to read. 

524 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size. 

525 

526 Returns: 

527 A chunk of bytes read from the file object, respecting the 

528 remaining_content_len limit if specified. 

529 

530 This method is used for subsequent reads during streaming after 

531 the initial _read_and_available_len call has been made. 

532 

533 """ 

534 return self._value.read(remaining_content_len or DEFAULT_CHUNK_SIZE) # type: ignore[no-any-return] 

535 

536 @property 

537 def size(self) -> int | None: 

538 """ 

539 Size of the payload in bytes. 

540 

541 Returns the total size of the payload content from the initial position. 

542 This ensures consistent Content-Length for requests, including 307/308 redirects 

543 where the same payload instance is reused. 

544 

545 Returns None if the size cannot be determined (e.g., for unseekable streams). 

546 """ 

547 try: 

548 # Store the start position on first access. 

549 # This is critical when the same payload instance is reused (e.g., 307/308 

550 # redirects). Without storing the initial position, after the payload is 

551 # read once, the file position would be at EOF, which would cause the 

552 # size calculation to return 0 (file_size - EOF position). 

553 # By storing the start position, we ensure the size calculation always 

554 # returns the correct total size for any subsequent use. 

555 if self._start_position is None: 

556 self._start_position = self._value.tell() 

557 

558 # Return the total size from the start position 

559 # This ensures Content-Length is correct even after reading 

560 return os.fstat(self._value.fileno()).st_size - self._start_position 

561 except (AttributeError, OSError): 

562 return None 

563 

564 async def write(self, writer: AbstractStreamWriter) -> None: 

565 """ 

566 Write the entire file-like payload to the writer stream. 

567 

568 Args: 

569 writer: An AbstractStreamWriter instance that handles the actual writing 

570 

571 This method writes the entire file content without any length constraint. 

572 It delegates to write_with_length() with no length limit for implementation 

573 consistency. 

574 

575 Note: 

576 For new implementations that need length control, use write_with_length() directly. 

577 This method is maintained for backwards compatibility with existing code. 

578 

579 """ 

580 await self.write_with_length(writer, None) 

581 

582 async def write_with_length( 

583 self, writer: AbstractStreamWriter, content_length: int | None 

584 ) -> None: 

585 """ 

586 Write file-like payload with a specific content length constraint. 

587 

588 Args: 

589 writer: An AbstractStreamWriter instance that handles the actual writing 

590 content_length: Maximum number of bytes to write (None for unlimited) 

591 

592 This method implements optimized streaming of file content with length constraints: 

593 

594 1. File reading is performed in a thread pool to avoid blocking the event loop 

595 2. Content is read and written in chunks to maintain memory efficiency 

596 3. Writing stops when either: 

597 - All available file content has been written (when size is known) 

598 - The specified content_length has been reached 

599 4. File resources are properly closed even if the operation is cancelled 

600 

601 The implementation carefully handles both known-size and unknown-size payloads, 

602 as well as constrained and unconstrained content lengths. 

603 

604 """ 

605 loop = asyncio.get_running_loop() 

606 total_written_len = 0 

607 remaining_content_len = content_length 

608 

609 # Get initial data and available length 

610 available_len, chunk = await loop.run_in_executor( 

611 None, self._read_and_available_len, remaining_content_len 

612 ) 

613 # Process data chunks until done 

614 while chunk: 

615 chunk_len = len(chunk) 

616 

617 # Write data with or without length constraint 

618 if remaining_content_len is None: 

619 await writer.write(chunk) 

620 else: 

621 await writer.write(chunk[:remaining_content_len]) 

622 remaining_content_len -= chunk_len 

623 

624 total_written_len += chunk_len 

625 

626 # Check if we're done writing 

627 if self._should_stop_writing( 

628 available_len, total_written_len, remaining_content_len 

629 ): 

630 return 

631 

632 # Read next chunk 

633 chunk = await loop.run_in_executor( 

634 None, 

635 self._read, 

636 ( 

637 min(DEFAULT_CHUNK_SIZE, remaining_content_len) 

638 if remaining_content_len is not None 

639 else DEFAULT_CHUNK_SIZE 

640 ), 

641 ) 

642 

643 def _should_stop_writing( 

644 self, 

645 available_len: int | None, 

646 total_written_len: int, 

647 remaining_content_len: int | None, 

648 ) -> bool: 

649 """ 

650 Determine if we should stop writing data. 

651 

652 Args: 

653 available_len: Known size of the payload if available (None if unknown) 

654 total_written_len: Number of bytes already written 

655 remaining_content_len: Remaining bytes to be written for content-length limited responses 

656 

657 Returns: 

658 True if we should stop writing data, based on either: 

659 - Having written all available data (when size is known) 

660 - Having written all requested content (when content-length is specified) 

661 

662 """ 

663 return (available_len is not None and total_written_len >= available_len) or ( 

664 remaining_content_len is not None and remaining_content_len <= 0 

665 ) 

666 

667 def _close(self) -> None: 

668 """ 

669 Async safe synchronous close operations for backwards compatibility. 

670 

671 This method exists only for backwards 

672 compatibility. Use the async close() method instead. 

673 

674 WARNING: This method MUST be called from within an event loop. 

675 Calling it outside an event loop will raise RuntimeError. 

676 """ 

677 # Skip if already consumed 

678 if self._consumed: 

679 return 

680 self._consumed = True # Mark as consumed to prevent further writes 

681 # Schedule file closing without awaiting to prevent cancellation issues 

682 loop = asyncio.get_running_loop() 

683 close_future = loop.run_in_executor(None, self._value.close) 

684 # Hold a strong reference to the future to prevent it from being 

685 # garbage collected before it completes. 

686 _CLOSE_FUTURES.add(close_future) 

687 close_future.add_done_callback(_CLOSE_FUTURES.remove) 

688 

689 async def close(self) -> None: 

690 """ 

691 Close the payload if it holds any resources. 

692 

693 IMPORTANT: This method must not await anything that might not finish 

694 immediately, as it may be called during cleanup/cancellation. Schedule 

695 any long-running operations without awaiting them. 

696 """ 

697 self._close() 

698 

699 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

700 """ 

701 Return string representation of the value. 

702 

703 WARNING: This method does blocking I/O and should not be called in the event loop. 

704 """ 

705 return self._read_all().decode(encoding, errors) 

706 

707 def _read_all(self) -> bytes: 

708 """Read the entire file-like object and return its content as bytes.""" 

709 self._set_or_restore_start_position() 

710 # Use readlines() to ensure we get all content 

711 return b"".join(self._value.readlines()) 

712 

713 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

714 """ 

715 Return bytes representation of the value. 

716 

717 This method reads the entire file content and returns it as bytes. 

718 It is equivalent to reading the file-like object directly. 

719 The file reading is performed in an executor to avoid blocking the event loop. 

720 """ 

721 loop = asyncio.get_running_loop() 

722 return await loop.run_in_executor(None, self._read_all) 

723 

724 

725class TextIOPayload(IOBasePayload): 

726 _value: io.TextIOBase 

727 # _autoclose = False (inherited) - Has text file handle that needs explicit closing 

728 

729 def __init__( 

730 self, 

731 value: TextIO, 

732 *args: Any, 

733 encoding: str | None = None, 

734 content_type: str | None = None, 

735 **kwargs: Any, 

736 ) -> None: 

737 

738 if encoding is None: 

739 if content_type is None: 

740 encoding = "utf-8" 

741 content_type = "text/plain; charset=utf-8" 

742 else: 

743 mimetype = parse_mimetype(content_type) 

744 encoding = mimetype.parameters.get("charset", "utf-8") 

745 else: 

746 if content_type is None: 

747 content_type = "text/plain; charset=%s" % encoding 

748 

749 super().__init__( 

750 value, 

751 content_type=content_type, 

752 encoding=encoding, 

753 *args, 

754 **kwargs, 

755 ) 

756 

757 def _read_and_available_len( 

758 self, remaining_content_len: int | None 

759 ) -> tuple[int | None, bytes]: 

760 """ 

761 Read the text file-like object and return both its total size and the first chunk. 

762 

763 Args: 

764 remaining_content_len: Optional limit on how many bytes to read in this operation. 

765 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size. 

766 

767 Returns: 

768 A tuple containing: 

769 - The total size of the remaining unread content (None if size cannot be determined) 

770 - The first chunk of bytes read from the file object, encoded using the payload's encoding 

771 

772 This method is optimized to perform both size calculation and initial read 

773 in a single operation, which is executed in a single executor job to minimize 

774 context switches and file operations when streaming content. 

775 

776 Note: 

777 TextIOPayload handles encoding of the text content before writing it 

778 to the stream. If no encoding is specified, UTF-8 is used as the default. 

779 

780 """ 

781 self._set_or_restore_start_position() 

782 size = self.size 

783 chunk = self._value.read( 

784 min( 

785 DEFAULT_CHUNK_SIZE, 

786 size or DEFAULT_CHUNK_SIZE, 

787 remaining_content_len or DEFAULT_CHUNK_SIZE, 

788 ) 

789 ) 

790 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode() 

791 

792 def _read(self, remaining_content_len: int | None) -> bytes: 

793 """ 

794 Read a chunk of data from the text file-like object. 

795 

796 Args: 

797 remaining_content_len: Optional maximum number of bytes to read. 

798 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size. 

799 

800 Returns: 

801 A chunk of bytes read from the file object and encoded using the payload's 

802 encoding. The data is automatically converted from text to bytes. 

803 

804 This method is used for subsequent reads during streaming after 

805 the initial _read_and_available_len call has been made. It properly 

806 handles text encoding, converting the text content to bytes using 

807 the specified encoding (or UTF-8 if none was provided). 

808 

809 """ 

810 chunk = self._value.read(remaining_content_len or DEFAULT_CHUNK_SIZE) 

811 return chunk.encode(self._encoding) if self._encoding else chunk.encode() 

812 

813 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

814 """ 

815 Return string representation of the value. 

816 

817 WARNING: This method does blocking I/O and should not be called in the event loop. 

818 """ 

819 self._set_or_restore_start_position() 

820 return self._value.read() 

821 

822 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

823 """ 

824 Return bytes representation of the value. 

825 

826 This method reads the entire text file content and returns it as bytes. 

827 It encodes the text content using the specified encoding. 

828 The file reading is performed in an executor to avoid blocking the event loop. 

829 """ 

830 loop = asyncio.get_running_loop() 

831 

832 # Use instance encoding if available, otherwise use parameter 

833 actual_encoding = self._encoding or encoding 

834 

835 def _read_and_encode() -> bytes: 

836 self._set_or_restore_start_position() 

837 # TextIO read() always returns the full content 

838 return self._value.read().encode(actual_encoding, errors) 

839 

840 return await loop.run_in_executor(None, _read_and_encode) 

841 

842 

843class BytesIOPayload(IOBasePayload): 

844 _value: io.BytesIO 

845 _size: int # Always initialized in __init__ 

846 _autoclose = True # BytesIO is in-memory, safe to auto-close 

847 

848 def __init__(self, value: io.BytesIO, *args: Any, **kwargs: Any) -> None: 

849 super().__init__(value, *args, **kwargs) 

850 # Calculate size once during initialization 

851 self._size = len(self._value.getbuffer()) - self._value.tell() 

852 

853 @property 

854 def size(self) -> int: 

855 """Size of the payload in bytes. 

856 

857 Returns the number of bytes in the BytesIO buffer that will be transmitted. 

858 This is calculated once during initialization for efficiency. 

859 """ 

860 return self._size 

861 

862 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

863 self._set_or_restore_start_position() 

864 return self._value.read().decode(encoding, errors) 

865 

866 async def write(self, writer: AbstractStreamWriter) -> None: 

867 return await self.write_with_length(writer, None) 

868 

869 async def write_with_length( 

870 self, writer: AbstractStreamWriter, content_length: int | None 

871 ) -> None: 

872 """ 

873 Write BytesIO payload with a specific content length constraint. 

874 

875 Args: 

876 writer: An AbstractStreamWriter instance that handles the actual writing 

877 content_length: Maximum number of bytes to write (None for unlimited) 

878 

879 This implementation is specifically optimized for BytesIO objects: 

880 

881 1. Reads content in chunks to maintain memory efficiency 

882 2. Yields control back to the event loop periodically to prevent blocking 

883 when dealing with large BytesIO objects 

884 3. Respects content_length constraints when specified 

885 4. Properly cleans up by closing the BytesIO object when done or on error 

886 

887 The periodic yielding to the event loop is important for maintaining 

888 responsiveness when processing large in-memory buffers. 

889 

890 """ 

891 self._set_or_restore_start_position() 

892 loop_count = 0 

893 remaining_bytes = content_length 

894 while chunk := self._value.read(DEFAULT_CHUNK_SIZE): 

895 if loop_count > 0: 

896 # Avoid blocking the event loop 

897 # if they pass a large BytesIO object 

898 # and we are not in the first iteration 

899 # of the loop 

900 await asyncio.sleep(0) 

901 if remaining_bytes is None: 

902 await writer.write(chunk) 

903 else: 

904 await writer.write(chunk[:remaining_bytes]) 

905 remaining_bytes -= len(chunk) 

906 if remaining_bytes <= 0: 

907 return 

908 loop_count += 1 

909 

910 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

911 """ 

912 Return bytes representation of the value. 

913 

914 This method reads the entire BytesIO content and returns it as bytes. 

915 It is equivalent to accessing the _value attribute directly. 

916 """ 

917 self._set_or_restore_start_position() 

918 return self._value.read() 

919 

920 async def close(self) -> None: 

921 """ 

922 Close the BytesIO payload. 

923 

924 This does nothing since BytesIO is in-memory and does not require explicit closing. 

925 """ 

926 

927 

928class BufferedReaderPayload(IOBasePayload): 

929 _value: io.BufferedIOBase 

930 # _autoclose = False (inherited) - Has buffered file handle that needs explicit closing 

931 

932 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

933 self._set_or_restore_start_position() 

934 return self._value.read().decode(encoding, errors) 

935 

936 

937class JsonPayload(BytesPayload): 

938 def __init__( 

939 self, 

940 value: Any, 

941 encoding: str = "utf-8", 

942 content_type: str = "application/json", 

943 dumps: JSONEncoder = json.dumps, 

944 *args: Any, 

945 **kwargs: Any, 

946 ) -> None: 

947 

948 super().__init__( 

949 dumps(value).encode(encoding), 

950 content_type=content_type, 

951 encoding=encoding, 

952 *args, 

953 **kwargs, 

954 ) 

955 

956 

957class JsonBytesPayload(BytesPayload): 

958 """JSON payload for encoders that return bytes directly. 

959 

960 Use this when your JSON encoder (like orjson) returns bytes 

961 instead of str, avoiding the encode/decode overhead. 

962 """ 

963 

964 def __init__( 

965 self, 

966 value: Any, 

967 dumps: JSONBytesEncoder, 

968 content_type: str = "application/json", 

969 *args: Any, 

970 **kwargs: Any, 

971 ) -> None: 

972 super().__init__( 

973 dumps(value), 

974 content_type=content_type, 

975 *args, 

976 **kwargs, 

977 ) 

978 

979 

980if TYPE_CHECKING: 

981 from collections.abc import AsyncIterable, AsyncIterator 

982 

983 _AsyncIterator = AsyncIterator[bytes] 

984 _AsyncIterable = AsyncIterable[bytes] 

985else: 

986 from collections.abc import AsyncIterable, AsyncIterator 

987 

988 _AsyncIterator = AsyncIterator 

989 _AsyncIterable = AsyncIterable 

990 

991 

992class AsyncIterablePayload(Payload): 

993 

994 _iter: _AsyncIterator | None = None 

995 _value: _AsyncIterable 

996 _cached_chunks: list[bytes] | None = None 

997 # _consumed stays False to allow reuse with cached content 

998 _autoclose = True # Iterator doesn't need explicit closing 

999 

1000 def __init__(self, value: _AsyncIterable, *args: Any, **kwargs: Any) -> None: 

1001 if not isinstance(value, AsyncIterable): 

1002 raise TypeError( 

1003 "value argument must support " 

1004 "collections.abc.AsyncIterable interface, " 

1005 f"got {type(value)!r}" 

1006 ) 

1007 

1008 if "content_type" not in kwargs: 

1009 kwargs["content_type"] = "application/octet-stream" 

1010 

1011 super().__init__(value, *args, **kwargs) 

1012 

1013 self._iter = value.__aiter__() 

1014 

1015 async def write(self, writer: AbstractStreamWriter) -> None: 

1016 """ 

1017 Write the entire async iterable payload to the writer stream. 

1018 

1019 Args: 

1020 writer: An AbstractStreamWriter instance that handles the actual writing 

1021 

1022 This method iterates through the async iterable and writes each chunk 

1023 to the writer without any length constraint. 

1024 

1025 Note: 

1026 For new implementations that need length control, use write_with_length() directly. 

1027 This method is maintained for backwards compatibility with existing code. 

1028 

1029 """ 

1030 await self.write_with_length(writer, None) 

1031 

1032 async def write_with_length( 

1033 self, writer: AbstractStreamWriter, content_length: int | None 

1034 ) -> None: 

1035 """ 

1036 Write async iterable payload with a specific content length constraint. 

1037 

1038 Args: 

1039 writer: An AbstractStreamWriter instance that handles the actual writing 

1040 content_length: Maximum number of bytes to write (None for unlimited) 

1041 

1042 This implementation handles streaming of async iterable content with length constraints: 

1043 

1044 1. If cached chunks are available, writes from them 

1045 2. Otherwise iterates through the async iterable one chunk at a time 

1046 3. Respects content_length constraints when specified 

1047 4. Does NOT generate cache - that's done by as_bytes() 

1048 

1049 """ 

1050 # If we have cached chunks, use them 

1051 if self._cached_chunks is not None: 

1052 remaining_bytes = content_length 

1053 for chunk in self._cached_chunks: 

1054 if remaining_bytes is None: 

1055 await writer.write(chunk) 

1056 elif remaining_bytes > 0: 

1057 await writer.write(chunk[:remaining_bytes]) 

1058 remaining_bytes -= len(chunk) 

1059 else: 

1060 break 

1061 return 

1062 

1063 # If iterator is exhausted and we don't have cached chunks, nothing to write 

1064 if self._iter is None: 

1065 return 

1066 

1067 # Stream from the iterator 

1068 remaining_bytes = content_length 

1069 

1070 try: 

1071 while True: 

1072 chunk = await anext(self._iter) 

1073 if remaining_bytes is None: 

1074 await writer.write(chunk) 

1075 # If we have a content length limit 

1076 elif remaining_bytes > 0: 

1077 await writer.write(chunk[:remaining_bytes]) 

1078 remaining_bytes -= len(chunk) 

1079 # We still want to exhaust the iterator even 

1080 # if we have reached the content length limit 

1081 # since the file handle may not get closed by 

1082 # the iterator if we don't do this 

1083 except StopAsyncIteration: 

1084 # Iterator is exhausted 

1085 self._iter = None 

1086 self._consumed = True # Mark as consumed when streamed without caching 

1087 

1088 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

1089 """Decode the payload content as a string if cached chunks are available.""" 

1090 if self._cached_chunks is not None: 

1091 return b"".join(self._cached_chunks).decode(encoding, errors) 

1092 raise TypeError("Unable to decode - content not cached. Call as_bytes() first.") 

1093 

1094 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

1095 """ 

1096 Return bytes representation of the value. 

1097 

1098 This method reads the entire async iterable content and returns it as bytes. 

1099 It generates and caches the chunks for future reuse. 

1100 """ 

1101 # If we have cached chunks, return them joined 

1102 if self._cached_chunks is not None: 

1103 return b"".join(self._cached_chunks) 

1104 

1105 # If iterator is exhausted and no cache, return empty 

1106 if self._iter is None: 

1107 return b"" 

1108 

1109 # Read all chunks and cache them 

1110 chunks: list[bytes] = [] 

1111 async for chunk in self._iter: 

1112 chunks.append(chunk) 

1113 

1114 # Iterator is exhausted, cache the chunks 

1115 self._iter = None 

1116 self._cached_chunks = chunks 

1117 # Keep _consumed as False to allow reuse with cached chunks 

1118 

1119 return b"".join(chunks) 

1120 

1121 

1122class StreamReaderPayload(AsyncIterablePayload): 

1123 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None: 

1124 super().__init__(value.iter_any(), *args, **kwargs) 

1125 

1126 

1127PAYLOAD_REGISTRY = PayloadRegistry() 

1128PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview)) 

1129PAYLOAD_REGISTRY.register(StringPayload, str) 

1130PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO) 

1131PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase) 

1132PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO) 

1133PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom)) 

1134PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase) 

1135PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader) 

1136# try_last for giving a chance to more specialized async interables like 

1137# multipart.BodyPartReaderPayload override the default 

1138PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)