Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/payload.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

386 statements  

1import asyncio 

2import enum 

3import io 

4import json 

5import mimetypes 

6import os 

7import sys 

8import warnings 

9from abc import ABC, abstractmethod 

10from collections.abc import AsyncIterable, AsyncIterator, Iterable 

11from itertools import chain 

12from typing import IO, Any, Final, TextIO 

13 

14from multidict import CIMultiDict 

15 

16from . import hdrs 

17from .abc import AbstractStreamWriter 

18from .helpers import ( 

19 _SENTINEL, 

20 content_disposition_header, 

21 guess_filename, 

22 parse_mimetype, 

23 sentinel, 

24) 

25from .streams import StreamReader 

26from .typedefs import JSONBytesEncoder, JSONEncoder 

27 

28__all__ = ( 

29 "PAYLOAD_REGISTRY", 

30 "get_payload", 

31 "payload_type", 

32 "Payload", 

33 "BytesPayload", 

34 "StringPayload", 

35 "IOBasePayload", 

36 "BytesIOPayload", 

37 "BufferedReaderPayload", 

38 "TextIOPayload", 

39 "StringIOPayload", 

40 "JsonPayload", 

41 "JsonBytesPayload", 

42 "AsyncIterablePayload", 

43) 

44 

45TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB 

46READ_SIZE: Final[int] = 2**16 # 64 KB 

47_CLOSE_FUTURES: set[asyncio.Future[None]] = set() 

48 

49 

50class LookupError(Exception): 

51 """Raised when no payload factory is found for the given data type.""" 

52 

53 

54class Order(str, enum.Enum): 

55 normal = "normal" 

56 try_first = "try_first" 

57 try_last = "try_last" 

58 

59 

60def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload": 

61 return PAYLOAD_REGISTRY.get(data, *args, **kwargs) 

62 

63 

64def register_payload( 

65 factory: type["Payload"], type: Any, *, order: Order = Order.normal 

66) -> None: 

67 PAYLOAD_REGISTRY.register(factory, type, order=order) 

68 

69 

70class payload_type: 

71 def __init__(self, type: Any, *, order: Order = Order.normal) -> None: 

72 self.type = type 

73 self.order = order 

74 

75 def __call__(self, factory: type["Payload"]) -> type["Payload"]: 

76 register_payload(factory, self.type, order=self.order) 

77 return factory 

78 

79 

80PayloadType = type["Payload"] 

81_PayloadRegistryItem = tuple[PayloadType, Any] 

82 

83 

84class PayloadRegistry: 

85 """Payload registry. 

86 

87 note: we need zope.interface for more efficient adapter search 

88 """ 

89 

90 __slots__ = ("_first", "_normal", "_last", "_normal_lookup") 

91 

92 def __init__(self) -> None: 

93 self._first: list[_PayloadRegistryItem] = [] 

94 self._normal: list[_PayloadRegistryItem] = [] 

95 self._last: list[_PayloadRegistryItem] = [] 

96 self._normal_lookup: dict[Any, PayloadType] = {} 

97 

98 def get( 

99 self, 

100 data: Any, 

101 *args: Any, 

102 _CHAIN: "type[chain[_PayloadRegistryItem]]" = chain, 

103 **kwargs: Any, 

104 ) -> "Payload": 

105 if self._first: 

106 for factory, type_ in self._first: 

107 if isinstance(data, type_): 

108 return factory(data, *args, **kwargs) 

109 # Try the fast lookup first 

110 if lookup_factory := self._normal_lookup.get(type(data)): 

111 return lookup_factory(data, *args, **kwargs) 

112 # Bail early if its already a Payload 

113 if isinstance(data, Payload): 

114 return data 

115 # Fallback to the slower linear search 

116 for factory, type_ in _CHAIN(self._normal, self._last): 

117 if isinstance(data, type_): 

118 return factory(data, *args, **kwargs) 

119 raise LookupError() 

120 

121 def register( 

122 self, factory: PayloadType, type: Any, *, order: Order = Order.normal 

123 ) -> None: 

124 if order is Order.try_first: 

125 self._first.append((factory, type)) 

126 elif order is Order.normal: 

127 self._normal.append((factory, type)) 

128 if isinstance(type, Iterable): 

129 for t in type: 

130 self._normal_lookup[t] = factory 

131 else: 

132 self._normal_lookup[type] = factory 

133 elif order is Order.try_last: 

134 self._last.append((factory, type)) 

135 else: 

136 raise ValueError(f"Unsupported order {order!r}") 

137 

138 

139class Payload(ABC): 

140 _default_content_type: str = "application/octet-stream" 

141 _size: int | None = None 

142 _consumed: bool = False # Default: payload has not been consumed yet 

143 _autoclose: bool = False # Default: assume resource needs explicit closing 

144 

145 def __init__( 

146 self, 

147 value: Any, 

148 headers: ( 

149 CIMultiDict[str] | dict[str, str] | Iterable[tuple[str, str]] | None 

150 ) = None, 

151 content_type: None | str | _SENTINEL = sentinel, 

152 filename: str | None = None, 

153 encoding: str | None = None, 

154 **kwargs: Any, 

155 ) -> None: 

156 self._encoding = encoding 

157 self._filename = filename 

158 self._headers = CIMultiDict[str]() 

159 self._value = value 

160 if content_type is not sentinel and content_type is not None: 

161 assert isinstance(content_type, str) 

162 self._headers[hdrs.CONTENT_TYPE] = content_type 

163 elif self._filename is not None: 

164 if sys.version_info >= (3, 13): 

165 guesser = mimetypes.guess_file_type 

166 else: 

167 guesser = mimetypes.guess_type 

168 content_type = guesser(self._filename)[0] 

169 if content_type is None: 

170 content_type = self._default_content_type 

171 self._headers[hdrs.CONTENT_TYPE] = content_type 

172 else: 

173 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type 

174 if headers: 

175 self._headers.update(headers) 

176 

177 @property 

178 def size(self) -> int | None: 

179 """Size of the payload in bytes. 

180 

181 Returns the number of bytes that will be transmitted when the payload 

182 is written. For string payloads, this is the size after encoding to bytes, 

183 not the length of the string. 

184 """ 

185 return self._size 

186 

187 @property 

188 def filename(self) -> str | None: 

189 """Filename of the payload.""" 

190 return self._filename 

191 

192 @property 

193 def headers(self) -> CIMultiDict[str]: 

194 """Custom item headers""" 

195 return self._headers 

196 

197 @property 

198 def _binary_headers(self) -> bytes: 

199 return ( 

200 "".join([k + ": " + v + "\r\n" for k, v in self.headers.items()]).encode( 

201 "utf-8" 

202 ) 

203 + b"\r\n" 

204 ) 

205 

206 @property 

207 def encoding(self) -> str | None: 

208 """Payload encoding""" 

209 return self._encoding 

210 

211 @property 

212 def content_type(self) -> str: 

213 """Content type""" 

214 return self._headers[hdrs.CONTENT_TYPE] 

215 

216 @property 

217 def consumed(self) -> bool: 

218 """Whether the payload has been consumed and cannot be reused.""" 

219 return self._consumed 

220 

221 @property 

222 def autoclose(self) -> bool: 

223 """ 

224 Whether the payload can close itself automatically. 

225 

226 Returns True if the payload has no file handles or resources that need 

227 explicit closing. If False, callers must await close() to release resources. 

228 """ 

229 return self._autoclose 

230 

231 def set_content_disposition( 

232 self, 

233 disptype: str, 

234 quote_fields: bool = True, 

235 _charset: str = "utf-8", 

236 **params: str, 

237 ) -> None: 

238 """Sets ``Content-Disposition`` header.""" 

239 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header( 

240 disptype, quote_fields=quote_fields, _charset=_charset, params=params 

241 ) 

242 

243 @abstractmethod 

244 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

245 """ 

246 Return string representation of the value. 

247 

248 This is named decode() to allow compatibility with bytes objects. 

249 """ 

250 

251 @abstractmethod 

252 async def write(self, writer: AbstractStreamWriter) -> None: 

253 """ 

254 Write payload to the writer stream. 

255 

256 Args: 

257 writer: An AbstractStreamWriter instance that handles the actual writing 

258 

259 This is a legacy method that writes the entire payload without length constraints. 

260 

261 Important: 

262 For new implementations, use write_with_length() instead of this method. 

263 This method is maintained for backwards compatibility and will eventually 

264 delegate to write_with_length(writer, None) in all implementations. 

265 

266 All payload subclasses must override this method for backwards compatibility, 

267 but new code should use write_with_length for more flexibility and control. 

268 

269 """ 

270 

271 # write_with_length is new in aiohttp 3.12 

272 # it should be overridden by subclasses 

273 async def write_with_length( 

274 self, writer: AbstractStreamWriter, content_length: int | None 

275 ) -> None: 

276 """ 

277 Write payload with a specific content length constraint. 

278 

279 Args: 

280 writer: An AbstractStreamWriter instance that handles the actual writing 

281 content_length: Maximum number of bytes to write (None for unlimited) 

282 

283 This method allows writing payload content with a specific length constraint, 

284 which is particularly useful for HTTP responses with Content-Length header. 

285 

286 Note: 

287 This is the base implementation that provides backwards compatibility 

288 for subclasses that don't override this method. Specific payload types 

289 should override this method to implement proper length-constrained writing. 

290 

291 """ 

292 # Backwards compatibility for subclasses that don't override this method 

293 # and for the default implementation 

294 await self.write(writer) 

295 

296 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

297 """ 

298 Return bytes representation of the value. 

299 

300 This is a convenience method that calls decode() and encodes the result 

301 to bytes using the specified encoding. 

302 """ 

303 # Use instance encoding if available, otherwise use parameter 

304 actual_encoding = self._encoding or encoding 

305 return self.decode(actual_encoding, errors).encode(actual_encoding) 

306 

307 def _close(self) -> None: 

308 """ 

309 Async safe synchronous close operations for backwards compatibility. 

310 

311 This method exists only for backwards compatibility with code that 

312 needs to clean up payloads synchronously. In the future, we will 

313 drop this method and only support the async close() method. 

314 

315 WARNING: This method must be safe to call from within the event loop 

316 without blocking. Subclasses should not perform any blocking I/O here. 

317 

318 WARNING: This method must be called from within an event loop for 

319 certain payload types (e.g., IOBasePayload). Calling it outside an 

320 event loop may raise RuntimeError. 

321 """ 

322 # This is a no-op by default, but subclasses can override it 

323 # for non-blocking cleanup operations. 

324 

325 async def close(self) -> None: 

326 """ 

327 Close the payload if it holds any resources. 

328 

329 IMPORTANT: This method must not await anything that might not finish 

330 immediately, as it may be called during cleanup/cancellation. Schedule 

331 any long-running operations without awaiting them. 

332 

333 In the future, this will be the only close method supported. 

334 """ 

335 self._close() 

336 

337 

338class BytesPayload(Payload): 

339 _value: bytes 

340 # _consumed = False (inherited) - Bytes are immutable and can be reused 

341 _autoclose = True # No file handle, just bytes in memory 

342 

343 def __init__( 

344 self, value: bytes | bytearray | memoryview, *args: Any, **kwargs: Any 

345 ) -> None: 

346 if "content_type" not in kwargs: 

347 kwargs["content_type"] = "application/octet-stream" 

348 

349 super().__init__(value, *args, **kwargs) 

350 

351 if isinstance(value, memoryview): 

352 self._size = value.nbytes 

353 elif isinstance(value, (bytes, bytearray)): 

354 self._size = len(value) 

355 else: 

356 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}") 

357 

358 if self._size > TOO_LARGE_BYTES_BODY: 

359 warnings.warn( 

360 "Sending a large body directly with raw bytes might" 

361 " lock the event loop. You should probably pass an " 

362 "io.BytesIO object instead", 

363 ResourceWarning, 

364 source=self, 

365 ) 

366 

367 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

368 return self._value.decode(encoding, errors) 

369 

370 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

371 """ 

372 Return bytes representation of the value. 

373 

374 This method returns the raw bytes content of the payload. 

375 It is equivalent to accessing the _value attribute directly. 

376 """ 

377 return self._value 

378 

379 async def write(self, writer: AbstractStreamWriter) -> None: 

380 """ 

381 Write the entire bytes payload to the writer stream. 

382 

383 Args: 

384 writer: An AbstractStreamWriter instance that handles the actual writing 

385 

386 This method writes the entire bytes content without any length constraint. 

387 

388 Note: 

389 For new implementations that need length control, use write_with_length(). 

390 This method is maintained for backwards compatibility and is equivalent 

391 to write_with_length(writer, None). 

392 

393 """ 

394 await writer.write(self._value) 

395 

396 async def write_with_length( 

397 self, writer: AbstractStreamWriter, content_length: int | None 

398 ) -> None: 

399 """ 

400 Write bytes payload with a specific content length constraint. 

401 

402 Args: 

403 writer: An AbstractStreamWriter instance that handles the actual writing 

404 content_length: Maximum number of bytes to write (None for unlimited) 

405 

406 This method writes either the entire byte sequence or a slice of it 

407 up to the specified content_length. For BytesPayload, this operation 

408 is performed efficiently using array slicing. 

409 

410 """ 

411 if content_length is not None: 

412 await writer.write(self._value[:content_length]) 

413 else: 

414 await writer.write(self._value) 

415 

416 

417class StringPayload(BytesPayload): 

418 def __init__( 

419 self, 

420 value: str, 

421 *args: Any, 

422 encoding: str | None = None, 

423 content_type: str | None = None, 

424 **kwargs: Any, 

425 ) -> None: 

426 if encoding is None: 

427 if content_type is None: 

428 real_encoding = "utf-8" 

429 content_type = "text/plain; charset=utf-8" 

430 else: 

431 mimetype = parse_mimetype(content_type) 

432 real_encoding = mimetype.parameters.get("charset", "utf-8") 

433 else: 

434 if content_type is None: 

435 content_type = "text/plain; charset=%s" % encoding 

436 real_encoding = encoding 

437 

438 super().__init__( 

439 value.encode(real_encoding), 

440 encoding=real_encoding, 

441 content_type=content_type, 

442 *args, 

443 **kwargs, 

444 ) 

445 

446 

447class StringIOPayload(StringPayload): 

448 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None: 

449 super().__init__(value.read(), *args, **kwargs) 

450 

451 

452class IOBasePayload(Payload): 

453 _value: io.IOBase 

454 # _consumed = False (inherited) - File can be re-read from the same position 

455 _start_position: int | None = None 

456 # _autoclose = False (inherited) - Has file handle that needs explicit closing 

457 

458 def __init__( 

459 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any 

460 ) -> None: 

461 if "filename" not in kwargs: 

462 kwargs["filename"] = guess_filename(value) 

463 

464 super().__init__(value, *args, **kwargs) 

465 

466 if self._filename is not None and disposition is not None: 

467 if hdrs.CONTENT_DISPOSITION not in self.headers: 

468 self.set_content_disposition(disposition, filename=self._filename) 

469 

470 def _set_or_restore_start_position(self) -> None: 

471 """Set or restore the start position of the file-like object.""" 

472 if self._start_position is None: 

473 try: 

474 self._start_position = self._value.tell() 

475 except (OSError, AttributeError): 

476 self._consumed = True # Cannot seek, mark as consumed 

477 return 

478 try: 

479 self._value.seek(self._start_position) 

480 except (OSError, AttributeError): 

481 # Failed to seek back - mark as consumed since we've already read 

482 self._consumed = True 

483 

484 def _read_and_available_len( 

485 self, remaining_content_len: int | None 

486 ) -> tuple[int | None, bytes]: 

487 """ 

488 Read the file-like object and return both its total size and the first chunk. 

489 

490 Args: 

491 remaining_content_len: Optional limit on how many bytes to read in this operation. 

492 If None, READ_SIZE will be used as the default chunk size. 

493 

494 Returns: 

495 A tuple containing: 

496 - The total size of the remaining unread content (None if size cannot be determined) 

497 - The first chunk of bytes read from the file object 

498 

499 This method is optimized to perform both size calculation and initial read 

500 in a single operation, which is executed in a single executor job to minimize 

501 context switches and file operations when streaming content. 

502 

503 """ 

504 self._set_or_restore_start_position() 

505 size = self.size # Call size only once since it does I/O 

506 return size, self._value.read( 

507 min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE) 

508 ) 

509 

510 def _read(self, remaining_content_len: int | None) -> bytes: 

511 """ 

512 Read a chunk of data from the file-like object. 

513 

514 Args: 

515 remaining_content_len: Optional maximum number of bytes to read. 

516 If None, READ_SIZE will be used as the default chunk size. 

517 

518 Returns: 

519 A chunk of bytes read from the file object, respecting the 

520 remaining_content_len limit if specified. 

521 

522 This method is used for subsequent reads during streaming after 

523 the initial _read_and_available_len call has been made. 

524 

525 """ 

526 return self._value.read(remaining_content_len or READ_SIZE) # type: ignore[no-any-return] 

527 

528 @property 

529 def size(self) -> int | None: 

530 """ 

531 Size of the payload in bytes. 

532 

533 Returns the total size of the payload content from the initial position. 

534 This ensures consistent Content-Length for requests, including 307/308 redirects 

535 where the same payload instance is reused. 

536 

537 Returns None if the size cannot be determined (e.g., for unseekable streams). 

538 """ 

539 try: 

540 # Store the start position on first access. 

541 # This is critical when the same payload instance is reused (e.g., 307/308 

542 # redirects). Without storing the initial position, after the payload is 

543 # read once, the file position would be at EOF, which would cause the 

544 # size calculation to return 0 (file_size - EOF position). 

545 # By storing the start position, we ensure the size calculation always 

546 # returns the correct total size for any subsequent use. 

547 if self._start_position is None: 

548 self._start_position = self._value.tell() 

549 

550 # Return the total size from the start position 

551 # This ensures Content-Length is correct even after reading 

552 return os.fstat(self._value.fileno()).st_size - self._start_position 

553 except (AttributeError, OSError): 

554 return None 

555 

556 async def write(self, writer: AbstractStreamWriter) -> None: 

557 """ 

558 Write the entire file-like payload to the writer stream. 

559 

560 Args: 

561 writer: An AbstractStreamWriter instance that handles the actual writing 

562 

563 This method writes the entire file content without any length constraint. 

564 It delegates to write_with_length() with no length limit for implementation 

565 consistency. 

566 

567 Note: 

568 For new implementations that need length control, use write_with_length() directly. 

569 This method is maintained for backwards compatibility with existing code. 

570 

571 """ 

572 await self.write_with_length(writer, None) 

573 

574 async def write_with_length( 

575 self, writer: AbstractStreamWriter, content_length: int | None 

576 ) -> None: 

577 """ 

578 Write file-like payload with a specific content length constraint. 

579 

580 Args: 

581 writer: An AbstractStreamWriter instance that handles the actual writing 

582 content_length: Maximum number of bytes to write (None for unlimited) 

583 

584 This method implements optimized streaming of file content with length constraints: 

585 

586 1. File reading is performed in a thread pool to avoid blocking the event loop 

587 2. Content is read and written in chunks to maintain memory efficiency 

588 3. Writing stops when either: 

589 - All available file content has been written (when size is known) 

590 - The specified content_length has been reached 

591 4. File resources are properly closed even if the operation is cancelled 

592 

593 The implementation carefully handles both known-size and unknown-size payloads, 

594 as well as constrained and unconstrained content lengths. 

595 

596 """ 

597 loop = asyncio.get_running_loop() 

598 total_written_len = 0 

599 remaining_content_len = content_length 

600 

601 # Get initial data and available length 

602 available_len, chunk = await loop.run_in_executor( 

603 None, self._read_and_available_len, remaining_content_len 

604 ) 

605 # Process data chunks until done 

606 while chunk: 

607 chunk_len = len(chunk) 

608 

609 # Write data with or without length constraint 

610 if remaining_content_len is None: 

611 await writer.write(chunk) 

612 else: 

613 await writer.write(chunk[:remaining_content_len]) 

614 remaining_content_len -= chunk_len 

615 

616 total_written_len += chunk_len 

617 

618 # Check if we're done writing 

619 if self._should_stop_writing( 

620 available_len, total_written_len, remaining_content_len 

621 ): 

622 return 

623 

624 # Read next chunk 

625 chunk = await loop.run_in_executor( 

626 None, 

627 self._read, 

628 ( 

629 min(READ_SIZE, remaining_content_len) 

630 if remaining_content_len is not None 

631 else READ_SIZE 

632 ), 

633 ) 

634 

635 def _should_stop_writing( 

636 self, 

637 available_len: int | None, 

638 total_written_len: int, 

639 remaining_content_len: int | None, 

640 ) -> bool: 

641 """ 

642 Determine if we should stop writing data. 

643 

644 Args: 

645 available_len: Known size of the payload if available (None if unknown) 

646 total_written_len: Number of bytes already written 

647 remaining_content_len: Remaining bytes to be written for content-length limited responses 

648 

649 Returns: 

650 True if we should stop writing data, based on either: 

651 - Having written all available data (when size is known) 

652 - Having written all requested content (when content-length is specified) 

653 

654 """ 

655 return (available_len is not None and total_written_len >= available_len) or ( 

656 remaining_content_len is not None and remaining_content_len <= 0 

657 ) 

658 

659 def _close(self) -> None: 

660 """ 

661 Async safe synchronous close operations for backwards compatibility. 

662 

663 This method exists only for backwards 

664 compatibility. Use the async close() method instead. 

665 

666 WARNING: This method MUST be called from within an event loop. 

667 Calling it outside an event loop will raise RuntimeError. 

668 """ 

669 # Skip if already consumed 

670 if self._consumed: 

671 return 

672 self._consumed = True # Mark as consumed to prevent further writes 

673 # Schedule file closing without awaiting to prevent cancellation issues 

674 loop = asyncio.get_running_loop() 

675 close_future = loop.run_in_executor(None, self._value.close) 

676 # Hold a strong reference to the future to prevent it from being 

677 # garbage collected before it completes. 

678 _CLOSE_FUTURES.add(close_future) 

679 close_future.add_done_callback(_CLOSE_FUTURES.remove) 

680 

681 async def close(self) -> None: 

682 """ 

683 Close the payload if it holds any resources. 

684 

685 IMPORTANT: This method must not await anything that might not finish 

686 immediately, as it may be called during cleanup/cancellation. Schedule 

687 any long-running operations without awaiting them. 

688 """ 

689 self._close() 

690 

691 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

692 """ 

693 Return string representation of the value. 

694 

695 WARNING: This method does blocking I/O and should not be called in the event loop. 

696 """ 

697 return self._read_all().decode(encoding, errors) 

698 

699 def _read_all(self) -> bytes: 

700 """Read the entire file-like object and return its content as bytes.""" 

701 self._set_or_restore_start_position() 

702 # Use readlines() to ensure we get all content 

703 return b"".join(self._value.readlines()) 

704 

705 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

706 """ 

707 Return bytes representation of the value. 

708 

709 This method reads the entire file content and returns it as bytes. 

710 It is equivalent to reading the file-like object directly. 

711 The file reading is performed in an executor to avoid blocking the event loop. 

712 """ 

713 loop = asyncio.get_running_loop() 

714 return await loop.run_in_executor(None, self._read_all) 

715 

716 

717class TextIOPayload(IOBasePayload): 

718 _value: io.TextIOBase 

719 # _autoclose = False (inherited) - Has text file handle that needs explicit closing 

720 

721 def __init__( 

722 self, 

723 value: TextIO, 

724 *args: Any, 

725 encoding: str | None = None, 

726 content_type: str | None = None, 

727 **kwargs: Any, 

728 ) -> None: 

729 if encoding is None: 

730 if content_type is None: 

731 encoding = "utf-8" 

732 content_type = "text/plain; charset=utf-8" 

733 else: 

734 mimetype = parse_mimetype(content_type) 

735 encoding = mimetype.parameters.get("charset", "utf-8") 

736 else: 

737 if content_type is None: 

738 content_type = "text/plain; charset=%s" % encoding 

739 

740 super().__init__( 

741 value, 

742 content_type=content_type, 

743 encoding=encoding, 

744 *args, 

745 **kwargs, 

746 ) 

747 

748 def _read_and_available_len( 

749 self, remaining_content_len: int | None 

750 ) -> tuple[int | None, bytes]: 

751 """ 

752 Read the text file-like object and return both its total size and the first chunk. 

753 

754 Args: 

755 remaining_content_len: Optional limit on how many bytes to read in this operation. 

756 If None, READ_SIZE will be used as the default chunk size. 

757 

758 Returns: 

759 A tuple containing: 

760 - The total size of the remaining unread content (None if size cannot be determined) 

761 - The first chunk of bytes read from the file object, encoded using the payload's encoding 

762 

763 This method is optimized to perform both size calculation and initial read 

764 in a single operation, which is executed in a single executor job to minimize 

765 context switches and file operations when streaming content. 

766 

767 Note: 

768 TextIOPayload handles encoding of the text content before writing it 

769 to the stream. If no encoding is specified, UTF-8 is used as the default. 

770 

771 """ 

772 self._set_or_restore_start_position() 

773 size = self.size 

774 chunk = self._value.read( 

775 min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE) 

776 ) 

777 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode() 

778 

779 def _read(self, remaining_content_len: int | None) -> bytes: 

780 """ 

781 Read a chunk of data from the text file-like object. 

782 

783 Args: 

784 remaining_content_len: Optional maximum number of bytes to read. 

785 If None, READ_SIZE will be used as the default chunk size. 

786 

787 Returns: 

788 A chunk of bytes read from the file object and encoded using the payload's 

789 encoding. The data is automatically converted from text to bytes. 

790 

791 This method is used for subsequent reads during streaming after 

792 the initial _read_and_available_len call has been made. It properly 

793 handles text encoding, converting the text content to bytes using 

794 the specified encoding (or UTF-8 if none was provided). 

795 

796 """ 

797 chunk = self._value.read(remaining_content_len or READ_SIZE) 

798 return chunk.encode(self._encoding) if self._encoding else chunk.encode() 

799 

800 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

801 """ 

802 Return string representation of the value. 

803 

804 WARNING: This method does blocking I/O and should not be called in the event loop. 

805 """ 

806 self._set_or_restore_start_position() 

807 return self._value.read() 

808 

809 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

810 """ 

811 Return bytes representation of the value. 

812 

813 This method reads the entire text file content and returns it as bytes. 

814 It encodes the text content using the specified encoding. 

815 The file reading is performed in an executor to avoid blocking the event loop. 

816 """ 

817 loop = asyncio.get_running_loop() 

818 

819 # Use instance encoding if available, otherwise use parameter 

820 actual_encoding = self._encoding or encoding 

821 

822 def _read_and_encode() -> bytes: 

823 self._set_or_restore_start_position() 

824 # TextIO read() always returns the full content 

825 return self._value.read().encode(actual_encoding, errors) 

826 

827 return await loop.run_in_executor(None, _read_and_encode) 

828 

829 

830class BytesIOPayload(IOBasePayload): 

831 _value: io.BytesIO 

832 _size: int # Always initialized in __init__ 

833 _autoclose = True # BytesIO is in-memory, safe to auto-close 

834 

835 def __init__(self, value: io.BytesIO, *args: Any, **kwargs: Any) -> None: 

836 super().__init__(value, *args, **kwargs) 

837 # Calculate size once during initialization 

838 self._size = len(self._value.getbuffer()) - self._value.tell() 

839 

840 @property 

841 def size(self) -> int: 

842 """Size of the payload in bytes. 

843 

844 Returns the number of bytes in the BytesIO buffer that will be transmitted. 

845 This is calculated once during initialization for efficiency. 

846 """ 

847 return self._size 

848 

849 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

850 self._set_or_restore_start_position() 

851 return self._value.read().decode(encoding, errors) 

852 

853 async def write(self, writer: AbstractStreamWriter) -> None: 

854 return await self.write_with_length(writer, None) 

855 

856 async def write_with_length( 

857 self, writer: AbstractStreamWriter, content_length: int | None 

858 ) -> None: 

859 """ 

860 Write BytesIO payload with a specific content length constraint. 

861 

862 Args: 

863 writer: An AbstractStreamWriter instance that handles the actual writing 

864 content_length: Maximum number of bytes to write (None for unlimited) 

865 

866 This implementation is specifically optimized for BytesIO objects: 

867 

868 1. Reads content in chunks to maintain memory efficiency 

869 2. Yields control back to the event loop periodically to prevent blocking 

870 when dealing with large BytesIO objects 

871 3. Respects content_length constraints when specified 

872 4. Properly cleans up by closing the BytesIO object when done or on error 

873 

874 The periodic yielding to the event loop is important for maintaining 

875 responsiveness when processing large in-memory buffers. 

876 

877 """ 

878 self._set_or_restore_start_position() 

879 loop_count = 0 

880 remaining_bytes = content_length 

881 while chunk := self._value.read(READ_SIZE): 

882 if loop_count > 0: 

883 # Avoid blocking the event loop 

884 # if they pass a large BytesIO object 

885 # and we are not in the first iteration 

886 # of the loop 

887 await asyncio.sleep(0) 

888 if remaining_bytes is None: 

889 await writer.write(chunk) 

890 else: 

891 await writer.write(chunk[:remaining_bytes]) 

892 remaining_bytes -= len(chunk) 

893 if remaining_bytes <= 0: 

894 return 

895 loop_count += 1 

896 

897 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

898 """ 

899 Return bytes representation of the value. 

900 

901 This method reads the entire BytesIO content and returns it as bytes. 

902 It is equivalent to accessing the _value attribute directly. 

903 """ 

904 self._set_or_restore_start_position() 

905 return self._value.read() 

906 

907 async def close(self) -> None: 

908 """ 

909 Close the BytesIO payload. 

910 

911 This does nothing since BytesIO is in-memory and does not require explicit closing. 

912 """ 

913 

914 

915class BufferedReaderPayload(IOBasePayload): 

916 _value: io.BufferedIOBase 

917 # _autoclose = False (inherited) - Has buffered file handle that needs explicit closing 

918 

919 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

920 self._set_or_restore_start_position() 

921 return self._value.read().decode(encoding, errors) 

922 

923 

924class JsonPayload(BytesPayload): 

925 def __init__( 

926 self, 

927 value: Any, 

928 encoding: str = "utf-8", 

929 content_type: str = "application/json", 

930 dumps: JSONEncoder = json.dumps, 

931 *args: Any, 

932 **kwargs: Any, 

933 ) -> None: 

934 super().__init__( 

935 dumps(value).encode(encoding), 

936 content_type=content_type, 

937 encoding=encoding, 

938 *args, 

939 **kwargs, 

940 ) 

941 

942 

943class JsonBytesPayload(BytesPayload): 

944 """JSON payload for encoders that return bytes directly. 

945 

946 Use this when your JSON encoder (like orjson) returns bytes 

947 instead of str, avoiding the encode/decode overhead. 

948 """ 

949 

950 def __init__( 

951 self, 

952 value: Any, 

953 dumps: JSONBytesEncoder, 

954 content_type: str = "application/json", 

955 *args: Any, 

956 **kwargs: Any, 

957 ) -> None: 

958 super().__init__( 

959 dumps(value), 

960 content_type=content_type, 

961 *args, 

962 **kwargs, 

963 ) 

964 

965 

966class AsyncIterablePayload(Payload): 

967 _iter: AsyncIterator[bytes] | None = None 

968 _value: AsyncIterable[bytes] 

969 _cached_chunks: list[bytes] | None = None 

970 # _consumed stays False to allow reuse with cached content 

971 _autoclose = True # Iterator doesn't need explicit closing 

972 

973 def __init__(self, value: AsyncIterable[bytes], *args: Any, **kwargs: Any) -> None: 

974 if not isinstance(value, AsyncIterable): 

975 raise TypeError( 

976 "value argument must support " 

977 "collections.abc.AsyncIterable interface, " 

978 f"got {type(value)!r}" 

979 ) 

980 

981 if "content_type" not in kwargs: 

982 kwargs["content_type"] = "application/octet-stream" 

983 

984 super().__init__(value, *args, **kwargs) 

985 

986 self._iter = value.__aiter__() 

987 

988 async def write(self, writer: AbstractStreamWriter) -> None: 

989 """ 

990 Write the entire async iterable payload to the writer stream. 

991 

992 Args: 

993 writer: An AbstractStreamWriter instance that handles the actual writing 

994 

995 This method iterates through the async iterable and writes each chunk 

996 to the writer without any length constraint. 

997 

998 Note: 

999 For new implementations that need length control, use write_with_length() directly. 

1000 This method is maintained for backwards compatibility with existing code. 

1001 

1002 """ 

1003 await self.write_with_length(writer, None) 

1004 

1005 async def write_with_length( 

1006 self, writer: AbstractStreamWriter, content_length: int | None 

1007 ) -> None: 

1008 """ 

1009 Write async iterable payload with a specific content length constraint. 

1010 

1011 Args: 

1012 writer: An AbstractStreamWriter instance that handles the actual writing 

1013 content_length: Maximum number of bytes to write (None for unlimited) 

1014 

1015 This implementation handles streaming of async iterable content with length constraints: 

1016 

1017 1. If cached chunks are available, writes from them 

1018 2. Otherwise iterates through the async iterable one chunk at a time 

1019 3. Respects content_length constraints when specified 

1020 4. Does NOT generate cache - that's done by as_bytes() 

1021 

1022 """ 

1023 # If we have cached chunks, use them 

1024 if self._cached_chunks is not None: 

1025 remaining_bytes = content_length 

1026 for chunk in self._cached_chunks: 

1027 if remaining_bytes is None: 

1028 await writer.write(chunk) 

1029 elif remaining_bytes > 0: 

1030 await writer.write(chunk[:remaining_bytes]) 

1031 remaining_bytes -= len(chunk) 

1032 else: 

1033 break 

1034 return 

1035 

1036 # If iterator is exhausted and we don't have cached chunks, nothing to write 

1037 if self._iter is None: 

1038 return 

1039 

1040 # Stream from the iterator 

1041 remaining_bytes = content_length 

1042 

1043 try: 

1044 while True: 

1045 chunk = await anext(self._iter) 

1046 if remaining_bytes is None: 

1047 await writer.write(chunk) 

1048 # If we have a content length limit 

1049 elif remaining_bytes > 0: 

1050 await writer.write(chunk[:remaining_bytes]) 

1051 remaining_bytes -= len(chunk) 

1052 # We still want to exhaust the iterator even 

1053 # if we have reached the content length limit 

1054 # since the file handle may not get closed by 

1055 # the iterator if we don't do this 

1056 except StopAsyncIteration: 

1057 # Iterator is exhausted 

1058 self._iter = None 

1059 self._consumed = True # Mark as consumed when streamed without caching 

1060 

1061 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

1062 """Decode the payload content as a string if cached chunks are available.""" 

1063 if self._cached_chunks is not None: 

1064 return b"".join(self._cached_chunks).decode(encoding, errors) 

1065 raise TypeError("Unable to decode - content not cached. Call as_bytes() first.") 

1066 

1067 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

1068 """ 

1069 Return bytes representation of the value. 

1070 

1071 This method reads the entire async iterable content and returns it as bytes. 

1072 It generates and caches the chunks for future reuse. 

1073 """ 

1074 # If we have cached chunks, return them joined 

1075 if self._cached_chunks is not None: 

1076 return b"".join(self._cached_chunks) 

1077 

1078 # If iterator is exhausted and no cache, return empty 

1079 if self._iter is None: 

1080 return b"" 

1081 

1082 # Read all chunks and cache them 

1083 chunks: list[bytes] = [] 

1084 async for chunk in self._iter: 

1085 chunks.append(chunk) 

1086 

1087 # Iterator is exhausted, cache the chunks 

1088 self._iter = None 

1089 self._cached_chunks = chunks 

1090 # Keep _consumed as False to allow reuse with cached chunks 

1091 

1092 return b"".join(chunks) 

1093 

1094 

1095class StreamReaderPayload(AsyncIterablePayload): 

1096 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None: 

1097 super().__init__(value.iter_any(), *args, **kwargs) 

1098 

1099 

1100PAYLOAD_REGISTRY = PayloadRegistry() 

1101PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview)) 

1102PAYLOAD_REGISTRY.register(StringPayload, str) 

1103PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO) 

1104PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase) 

1105PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO) 

1106PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom)) 

1107PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase) 

1108PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader) 

1109# try_last for giving a chance to more specialized async interables like 

1110# multipart.BodyPartReaderPayload override the default 

1111PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)