Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/payload.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

385 statements  

1import asyncio 

2import enum 

3import io 

4import json 

5import mimetypes 

6import os 

7import sys 

8import warnings 

9from abc import ABC, abstractmethod 

10from collections.abc import AsyncIterable, AsyncIterator, Iterable 

11from itertools import chain 

12from typing import IO, Any, Final, TextIO 

13 

14from multidict import CIMultiDict 

15 

16from . import hdrs 

17from .abc import AbstractStreamWriter 

18from .helpers import ( 

19 _SENTINEL, 

20 DEFAULT_CHUNK_SIZE, 

21 content_disposition_header, 

22 guess_filename, 

23 parse_mimetype, 

24 sentinel, 

25) 

26from .streams import StreamReader 

27from .typedefs import JSONBytesEncoder, JSONEncoder 

28 

29__all__ = ( 

30 "PAYLOAD_REGISTRY", 

31 "get_payload", 

32 "payload_type", 

33 "Payload", 

34 "BytesPayload", 

35 "StringPayload", 

36 "IOBasePayload", 

37 "BytesIOPayload", 

38 "BufferedReaderPayload", 

39 "TextIOPayload", 

40 "StringIOPayload", 

41 "JsonPayload", 

42 "JsonBytesPayload", 

43 "AsyncIterablePayload", 

44) 

45 

46TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB 

47_CLOSE_FUTURES: set[asyncio.Future[None]] = set() 

48 

49 

50class LookupError(Exception): 

51 """Raised when no payload factory is found for the given data type.""" 

52 

53 

54class Order(str, enum.Enum): 

55 normal = "normal" 

56 try_first = "try_first" 

57 try_last = "try_last" 

58 

59 

60def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload": 

61 return PAYLOAD_REGISTRY.get(data, *args, **kwargs) 

62 

63 

64def register_payload( 

65 factory: type["Payload"], type: Any, *, order: Order = Order.normal 

66) -> None: 

67 PAYLOAD_REGISTRY.register(factory, type, order=order) 

68 

69 

70class payload_type: 

71 def __init__(self, type: Any, *, order: Order = Order.normal) -> None: 

72 self.type = type 

73 self.order = order 

74 

75 def __call__(self, factory: type["Payload"]) -> type["Payload"]: 

76 register_payload(factory, self.type, order=self.order) 

77 return factory 

78 

79 

80PayloadType = type["Payload"] 

81_PayloadRegistryItem = tuple[PayloadType, Any] 

82 

83 

84class PayloadRegistry: 

85 """Payload registry. 

86 

87 note: we need zope.interface for more efficient adapter search 

88 """ 

89 

90 __slots__ = ("_first", "_normal", "_last", "_normal_lookup") 

91 

92 def __init__(self) -> None: 

93 self._first: list[_PayloadRegistryItem] = [] 

94 self._normal: list[_PayloadRegistryItem] = [] 

95 self._last: list[_PayloadRegistryItem] = [] 

96 self._normal_lookup: dict[Any, PayloadType] = {} 

97 

98 def get( 

99 self, 

100 data: Any, 

101 *args: Any, 

102 _CHAIN: "type[chain[_PayloadRegistryItem]]" = chain, 

103 **kwargs: Any, 

104 ) -> "Payload": 

105 if self._first: 

106 for factory, type_ in self._first: 

107 if isinstance(data, type_): 

108 return factory(data, *args, **kwargs) 

109 # Try the fast lookup first 

110 if lookup_factory := self._normal_lookup.get(type(data)): 

111 return lookup_factory(data, *args, **kwargs) 

112 # Bail early if its already a Payload 

113 if isinstance(data, Payload): 

114 return data 

115 # Fallback to the slower linear search 

116 for factory, type_ in _CHAIN(self._normal, self._last): 

117 if isinstance(data, type_): 

118 return factory(data, *args, **kwargs) 

119 raise LookupError() 

120 

121 def register( 

122 self, factory: PayloadType, type: Any, *, order: Order = Order.normal 

123 ) -> None: 

124 if order is Order.try_first: 

125 self._first.append((factory, type)) 

126 elif order is Order.normal: 

127 self._normal.append((factory, type)) 

128 if isinstance(type, Iterable): 

129 for t in type: 

130 self._normal_lookup[t] = factory 

131 else: 

132 self._normal_lookup[type] = factory 

133 elif order is Order.try_last: 

134 self._last.append((factory, type)) 

135 else: 

136 raise ValueError(f"Unsupported order {order!r}") 

137 

138 

139class Payload(ABC): 

140 _default_content_type: str = "application/octet-stream" 

141 _size: int | None = None 

142 _consumed: bool = False # Default: payload has not been consumed yet 

143 _autoclose: bool = False # Default: assume resource needs explicit closing 

144 

145 def __init__( 

146 self, 

147 value: Any, 

148 headers: ( 

149 CIMultiDict[str] | dict[str, str] | Iterable[tuple[str, str]] | None 

150 ) = None, 

151 content_type: None | str | _SENTINEL = sentinel, 

152 filename: str | None = None, 

153 encoding: str | None = None, 

154 **kwargs: Any, 

155 ) -> None: 

156 self._encoding = encoding 

157 self._filename = filename 

158 self._headers = CIMultiDict[str]() 

159 self._value = value 

160 if content_type is not sentinel and content_type is not None: 

161 assert isinstance(content_type, str) 

162 self._headers[hdrs.CONTENT_TYPE] = content_type 

163 elif self._filename is not None: 

164 if sys.version_info >= (3, 13): 

165 guesser = mimetypes.guess_file_type 

166 else: 

167 guesser = mimetypes.guess_type 

168 content_type = guesser(self._filename)[0] 

169 if content_type is None: 

170 content_type = self._default_content_type 

171 self._headers[hdrs.CONTENT_TYPE] = content_type 

172 else: 

173 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type 

174 if headers: 

175 self._headers.update(headers) 

176 

177 @property 

178 def size(self) -> int | None: 

179 """Size of the payload in bytes. 

180 

181 Returns the number of bytes that will be transmitted when the payload 

182 is written. For string payloads, this is the size after encoding to bytes, 

183 not the length of the string. 

184 """ 

185 return self._size 

186 

187 @property 

188 def filename(self) -> str | None: 

189 """Filename of the payload.""" 

190 return self._filename 

191 

192 @property 

193 def headers(self) -> CIMultiDict[str]: 

194 """Custom item headers""" 

195 return self._headers 

196 

197 @property 

198 def _binary_headers(self) -> bytes: 

199 return ( 

200 "".join([k + ": " + v + "\r\n" for k, v in self.headers.items()]).encode( 

201 "utf-8" 

202 ) 

203 + b"\r\n" 

204 ) 

205 

206 @property 

207 def encoding(self) -> str | None: 

208 """Payload encoding""" 

209 return self._encoding 

210 

211 @property 

212 def content_type(self) -> str: 

213 """Content type""" 

214 return self._headers[hdrs.CONTENT_TYPE] 

215 

216 @property 

217 def consumed(self) -> bool: 

218 """Whether the payload has been consumed and cannot be reused.""" 

219 return self._consumed 

220 

221 @property 

222 def autoclose(self) -> bool: 

223 """ 

224 Whether the payload can close itself automatically. 

225 

226 Returns True if the payload has no file handles or resources that need 

227 explicit closing. If False, callers must await close() to release resources. 

228 """ 

229 return self._autoclose 

230 

231 def set_content_disposition( 

232 self, 

233 disptype: str, 

234 quote_fields: bool = True, 

235 _charset: str = "utf-8", 

236 **params: str, 

237 ) -> None: 

238 """Sets ``Content-Disposition`` header.""" 

239 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header( 

240 disptype, quote_fields=quote_fields, _charset=_charset, params=params 

241 ) 

242 

243 @abstractmethod 

244 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

245 """ 

246 Return string representation of the value. 

247 

248 This is named decode() to allow compatibility with bytes objects. 

249 """ 

250 

251 @abstractmethod 

252 async def write(self, writer: AbstractStreamWriter) -> None: 

253 """ 

254 Write payload to the writer stream. 

255 

256 Args: 

257 writer: An AbstractStreamWriter instance that handles the actual writing 

258 

259 This is a legacy method that writes the entire payload without length constraints. 

260 

261 Important: 

262 For new implementations, use write_with_length() instead of this method. 

263 This method is maintained for backwards compatibility and will eventually 

264 delegate to write_with_length(writer, None) in all implementations. 

265 

266 All payload subclasses must override this method for backwards compatibility, 

267 but new code should use write_with_length for more flexibility and control. 

268 

269 """ 

270 

271 # write_with_length is new in aiohttp 3.12 

272 # it should be overridden by subclasses 

273 async def write_with_length( 

274 self, writer: AbstractStreamWriter, content_length: int | None 

275 ) -> None: 

276 """ 

277 Write payload with a specific content length constraint. 

278 

279 Args: 

280 writer: An AbstractStreamWriter instance that handles the actual writing 

281 content_length: Maximum number of bytes to write (None for unlimited) 

282 

283 This method allows writing payload content with a specific length constraint, 

284 which is particularly useful for HTTP responses with Content-Length header. 

285 

286 Note: 

287 This is the base implementation that provides backwards compatibility 

288 for subclasses that don't override this method. Specific payload types 

289 should override this method to implement proper length-constrained writing. 

290 

291 """ 

292 # Backwards compatibility for subclasses that don't override this method 

293 # and for the default implementation 

294 await self.write(writer) 

295 

296 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

297 """ 

298 Return bytes representation of the value. 

299 

300 This is a convenience method that calls decode() and encodes the result 

301 to bytes using the specified encoding. 

302 """ 

303 # Use instance encoding if available, otherwise use parameter 

304 actual_encoding = self._encoding or encoding 

305 return self.decode(actual_encoding, errors).encode(actual_encoding) 

306 

307 def _close(self) -> None: 

308 """ 

309 Async safe synchronous close operations for backwards compatibility. 

310 

311 This method exists only for backwards compatibility with code that 

312 needs to clean up payloads synchronously. In the future, we will 

313 drop this method and only support the async close() method. 

314 

315 WARNING: This method must be safe to call from within the event loop 

316 without blocking. Subclasses should not perform any blocking I/O here. 

317 

318 WARNING: This method must be called from within an event loop for 

319 certain payload types (e.g., IOBasePayload). Calling it outside an 

320 event loop may raise RuntimeError. 

321 """ 

322 # This is a no-op by default, but subclasses can override it 

323 # for non-blocking cleanup operations. 

324 

325 async def close(self) -> None: 

326 """ 

327 Close the payload if it holds any resources. 

328 

329 IMPORTANT: This method must not await anything that might not finish 

330 immediately, as it may be called during cleanup/cancellation. Schedule 

331 any long-running operations without awaiting them. 

332 

333 In the future, this will be the only close method supported. 

334 """ 

335 self._close() 

336 

337 

338class BytesPayload(Payload): 

339 _value: bytes 

340 # _consumed = False (inherited) - Bytes are immutable and can be reused 

341 _autoclose = True # No file handle, just bytes in memory 

342 

343 def __init__( 

344 self, value: bytes | bytearray | memoryview, *args: Any, **kwargs: Any 

345 ) -> None: 

346 if "content_type" not in kwargs: 

347 kwargs["content_type"] = "application/octet-stream" 

348 

349 super().__init__(value, *args, **kwargs) 

350 

351 if isinstance(value, memoryview): 

352 self._size = value.nbytes 

353 elif isinstance(value, (bytes, bytearray)): 

354 self._size = len(value) 

355 else: 

356 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}") 

357 

358 if self._size > TOO_LARGE_BYTES_BODY: 

359 warnings.warn( 

360 "Sending a large body directly with raw bytes might" 

361 " lock the event loop. You should probably pass an " 

362 "io.BytesIO object instead", 

363 ResourceWarning, 

364 source=self, 

365 ) 

366 

367 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

368 return self._value.decode(encoding, errors) 

369 

370 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

371 """ 

372 Return bytes representation of the value. 

373 

374 This method returns the raw bytes content of the payload. 

375 It is equivalent to accessing the _value attribute directly. 

376 """ 

377 return self._value 

378 

379 async def write(self, writer: AbstractStreamWriter) -> None: 

380 """ 

381 Write the entire bytes payload to the writer stream. 

382 

383 Args: 

384 writer: An AbstractStreamWriter instance that handles the actual writing 

385 

386 This method writes the entire bytes content without any length constraint. 

387 

388 Note: 

389 For new implementations that need length control, use write_with_length(). 

390 This method is maintained for backwards compatibility and is equivalent 

391 to write_with_length(writer, None). 

392 

393 """ 

394 await writer.write(self._value) 

395 

396 async def write_with_length( 

397 self, writer: AbstractStreamWriter, content_length: int | None 

398 ) -> None: 

399 """ 

400 Write bytes payload with a specific content length constraint. 

401 

402 Args: 

403 writer: An AbstractStreamWriter instance that handles the actual writing 

404 content_length: Maximum number of bytes to write (None for unlimited) 

405 

406 This method writes either the entire byte sequence or a slice of it 

407 up to the specified content_length. For BytesPayload, this operation 

408 is performed efficiently using array slicing. 

409 

410 """ 

411 if content_length is not None: 

412 await writer.write(self._value[:content_length]) 

413 else: 

414 await writer.write(self._value) 

415 

416 

417class StringPayload(BytesPayload): 

418 def __init__( 

419 self, 

420 value: str, 

421 *args: Any, 

422 encoding: str | None = None, 

423 content_type: str | None = None, 

424 **kwargs: Any, 

425 ) -> None: 

426 if encoding is None: 

427 if content_type is None: 

428 real_encoding = "utf-8" 

429 content_type = "text/plain; charset=utf-8" 

430 else: 

431 mimetype = parse_mimetype(content_type) 

432 real_encoding = mimetype.parameters.get("charset", "utf-8") 

433 else: 

434 if content_type is None: 

435 content_type = "text/plain; charset=%s" % encoding 

436 real_encoding = encoding 

437 

438 super().__init__( 

439 value.encode(real_encoding), 

440 encoding=real_encoding, 

441 content_type=content_type, 

442 *args, 

443 **kwargs, 

444 ) 

445 

446 

447class StringIOPayload(StringPayload): 

448 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None: 

449 super().__init__(value.read(), *args, **kwargs) 

450 

451 

452class IOBasePayload(Payload): 

453 _value: io.IOBase 

454 # _consumed = False (inherited) - File can be re-read from the same position 

455 _start_position: int | None = None 

456 # _autoclose = False (inherited) - Has file handle that needs explicit closing 

457 

458 def __init__( 

459 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any 

460 ) -> None: 

461 if "filename" not in kwargs: 

462 kwargs["filename"] = guess_filename(value) 

463 

464 super().__init__(value, *args, **kwargs) 

465 

466 if self._filename is not None and disposition is not None: 

467 if hdrs.CONTENT_DISPOSITION not in self.headers: 

468 self.set_content_disposition(disposition, filename=self._filename) 

469 

470 def _set_or_restore_start_position(self) -> None: 

471 """Set or restore the start position of the file-like object.""" 

472 if self._start_position is None: 

473 try: 

474 self._start_position = self._value.tell() 

475 except (OSError, AttributeError): 

476 self._consumed = True # Cannot seek, mark as consumed 

477 return 

478 try: 

479 self._value.seek(self._start_position) 

480 except (OSError, AttributeError): 

481 # Failed to seek back - mark as consumed since we've already read 

482 self._consumed = True 

483 

484 def _read_and_available_len( 

485 self, remaining_content_len: int | None 

486 ) -> tuple[int | None, bytes]: 

487 """ 

488 Read the file-like object and return both its total size and the first chunk. 

489 

490 Args: 

491 remaining_content_len: Optional limit on how many bytes to read in this operation. 

492 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size. 

493 

494 Returns: 

495 A tuple containing: 

496 - The total size of the remaining unread content (None if size cannot be determined) 

497 - The first chunk of bytes read from the file object 

498 

499 This method is optimized to perform both size calculation and initial read 

500 in a single operation, which is executed in a single executor job to minimize 

501 context switches and file operations when streaming content. 

502 

503 """ 

504 self._set_or_restore_start_position() 

505 size = self.size # Call size only once since it does I/O 

506 return size, self._value.read( 

507 min( 

508 DEFAULT_CHUNK_SIZE, 

509 size or DEFAULT_CHUNK_SIZE, 

510 remaining_content_len or DEFAULT_CHUNK_SIZE, 

511 ) 

512 ) 

513 

514 def _read(self, remaining_content_len: int | None) -> bytes: 

515 """ 

516 Read a chunk of data from the file-like object. 

517 

518 Args: 

519 remaining_content_len: Optional maximum number of bytes to read. 

520 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size. 

521 

522 Returns: 

523 A chunk of bytes read from the file object, respecting the 

524 remaining_content_len limit if specified. 

525 

526 This method is used for subsequent reads during streaming after 

527 the initial _read_and_available_len call has been made. 

528 

529 """ 

530 return self._value.read(remaining_content_len or DEFAULT_CHUNK_SIZE) # type: ignore[no-any-return] 

531 

532 @property 

533 def size(self) -> int | None: 

534 """ 

535 Size of the payload in bytes. 

536 

537 Returns the total size of the payload content from the initial position. 

538 This ensures consistent Content-Length for requests, including 307/308 redirects 

539 where the same payload instance is reused. 

540 

541 Returns None if the size cannot be determined (e.g., for unseekable streams). 

542 """ 

543 try: 

544 # Store the start position on first access. 

545 # This is critical when the same payload instance is reused (e.g., 307/308 

546 # redirects). Without storing the initial position, after the payload is 

547 # read once, the file position would be at EOF, which would cause the 

548 # size calculation to return 0 (file_size - EOF position). 

549 # By storing the start position, we ensure the size calculation always 

550 # returns the correct total size for any subsequent use. 

551 if self._start_position is None: 

552 self._start_position = self._value.tell() 

553 

554 # Return the total size from the start position 

555 # This ensures Content-Length is correct even after reading 

556 return os.fstat(self._value.fileno()).st_size - self._start_position 

557 except (AttributeError, OSError): 

558 return None 

559 

560 async def write(self, writer: AbstractStreamWriter) -> None: 

561 """ 

562 Write the entire file-like payload to the writer stream. 

563 

564 Args: 

565 writer: An AbstractStreamWriter instance that handles the actual writing 

566 

567 This method writes the entire file content without any length constraint. 

568 It delegates to write_with_length() with no length limit for implementation 

569 consistency. 

570 

571 Note: 

572 For new implementations that need length control, use write_with_length() directly. 

573 This method is maintained for backwards compatibility with existing code. 

574 

575 """ 

576 await self.write_with_length(writer, None) 

577 

578 async def write_with_length( 

579 self, writer: AbstractStreamWriter, content_length: int | None 

580 ) -> None: 

581 """ 

582 Write file-like payload with a specific content length constraint. 

583 

584 Args: 

585 writer: An AbstractStreamWriter instance that handles the actual writing 

586 content_length: Maximum number of bytes to write (None for unlimited) 

587 

588 This method implements optimized streaming of file content with length constraints: 

589 

590 1. File reading is performed in a thread pool to avoid blocking the event loop 

591 2. Content is read and written in chunks to maintain memory efficiency 

592 3. Writing stops when either: 

593 - All available file content has been written (when size is known) 

594 - The specified content_length has been reached 

595 4. File resources are properly closed even if the operation is cancelled 

596 

597 The implementation carefully handles both known-size and unknown-size payloads, 

598 as well as constrained and unconstrained content lengths. 

599 

600 """ 

601 loop = asyncio.get_running_loop() 

602 total_written_len = 0 

603 remaining_content_len = content_length 

604 

605 # Get initial data and available length 

606 available_len, chunk = await loop.run_in_executor( 

607 None, self._read_and_available_len, remaining_content_len 

608 ) 

609 # Process data chunks until done 

610 while chunk: 

611 chunk_len = len(chunk) 

612 

613 # Write data with or without length constraint 

614 if remaining_content_len is None: 

615 await writer.write(chunk) 

616 else: 

617 await writer.write(chunk[:remaining_content_len]) 

618 remaining_content_len -= chunk_len 

619 

620 total_written_len += chunk_len 

621 

622 # Check if we're done writing 

623 if self._should_stop_writing( 

624 available_len, total_written_len, remaining_content_len 

625 ): 

626 return 

627 

628 # Read next chunk 

629 chunk = await loop.run_in_executor( 

630 None, 

631 self._read, 

632 ( 

633 min(DEFAULT_CHUNK_SIZE, remaining_content_len) 

634 if remaining_content_len is not None 

635 else DEFAULT_CHUNK_SIZE 

636 ), 

637 ) 

638 

639 def _should_stop_writing( 

640 self, 

641 available_len: int | None, 

642 total_written_len: int, 

643 remaining_content_len: int | None, 

644 ) -> bool: 

645 """ 

646 Determine if we should stop writing data. 

647 

648 Args: 

649 available_len: Known size of the payload if available (None if unknown) 

650 total_written_len: Number of bytes already written 

651 remaining_content_len: Remaining bytes to be written for content-length limited responses 

652 

653 Returns: 

654 True if we should stop writing data, based on either: 

655 - Having written all available data (when size is known) 

656 - Having written all requested content (when content-length is specified) 

657 

658 """ 

659 return (available_len is not None and total_written_len >= available_len) or ( 

660 remaining_content_len is not None and remaining_content_len <= 0 

661 ) 

662 

663 def _close(self) -> None: 

664 """ 

665 Async safe synchronous close operations for backwards compatibility. 

666 

667 This method exists only for backwards 

668 compatibility. Use the async close() method instead. 

669 

670 WARNING: This method MUST be called from within an event loop. 

671 Calling it outside an event loop will raise RuntimeError. 

672 """ 

673 # Skip if already consumed 

674 if self._consumed: 

675 return 

676 self._consumed = True # Mark as consumed to prevent further writes 

677 # Schedule file closing without awaiting to prevent cancellation issues 

678 loop = asyncio.get_running_loop() 

679 close_future = loop.run_in_executor(None, self._value.close) 

680 # Hold a strong reference to the future to prevent it from being 

681 # garbage collected before it completes. 

682 _CLOSE_FUTURES.add(close_future) 

683 close_future.add_done_callback(_CLOSE_FUTURES.remove) 

684 

685 async def close(self) -> None: 

686 """ 

687 Close the payload if it holds any resources. 

688 

689 IMPORTANT: This method must not await anything that might not finish 

690 immediately, as it may be called during cleanup/cancellation. Schedule 

691 any long-running operations without awaiting them. 

692 """ 

693 self._close() 

694 

695 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

696 """ 

697 Return string representation of the value. 

698 

699 WARNING: This method does blocking I/O and should not be called in the event loop. 

700 """ 

701 return self._read_all().decode(encoding, errors) 

702 

703 def _read_all(self) -> bytes: 

704 """Read the entire file-like object and return its content as bytes.""" 

705 self._set_or_restore_start_position() 

706 # Use readlines() to ensure we get all content 

707 return b"".join(self._value.readlines()) 

708 

709 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

710 """ 

711 Return bytes representation of the value. 

712 

713 This method reads the entire file content and returns it as bytes. 

714 It is equivalent to reading the file-like object directly. 

715 The file reading is performed in an executor to avoid blocking the event loop. 

716 """ 

717 loop = asyncio.get_running_loop() 

718 return await loop.run_in_executor(None, self._read_all) 

719 

720 

721class TextIOPayload(IOBasePayload): 

722 _value: io.TextIOBase 

723 # _autoclose = False (inherited) - Has text file handle that needs explicit closing 

724 

725 def __init__( 

726 self, 

727 value: TextIO, 

728 *args: Any, 

729 encoding: str | None = None, 

730 content_type: str | None = None, 

731 **kwargs: Any, 

732 ) -> None: 

733 if encoding is None: 

734 if content_type is None: 

735 encoding = "utf-8" 

736 content_type = "text/plain; charset=utf-8" 

737 else: 

738 mimetype = parse_mimetype(content_type) 

739 encoding = mimetype.parameters.get("charset", "utf-8") 

740 else: 

741 if content_type is None: 

742 content_type = "text/plain; charset=%s" % encoding 

743 

744 super().__init__( 

745 value, 

746 content_type=content_type, 

747 encoding=encoding, 

748 *args, 

749 **kwargs, 

750 ) 

751 

752 def _read_and_available_len( 

753 self, remaining_content_len: int | None 

754 ) -> tuple[int | None, bytes]: 

755 """ 

756 Read the text file-like object and return both its total size and the first chunk. 

757 

758 Args: 

759 remaining_content_len: Optional limit on how many bytes to read in this operation. 

760 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size. 

761 

762 Returns: 

763 A tuple containing: 

764 - The total size of the remaining unread content (None if size cannot be determined) 

765 - The first chunk of bytes read from the file object, encoded using the payload's encoding 

766 

767 This method is optimized to perform both size calculation and initial read 

768 in a single operation, which is executed in a single executor job to minimize 

769 context switches and file operations when streaming content. 

770 

771 Note: 

772 TextIOPayload handles encoding of the text content before writing it 

773 to the stream. If no encoding is specified, UTF-8 is used as the default. 

774 

775 """ 

776 self._set_or_restore_start_position() 

777 size = self.size 

778 chunk = self._value.read( 

779 min( 

780 DEFAULT_CHUNK_SIZE, 

781 size or DEFAULT_CHUNK_SIZE, 

782 remaining_content_len or DEFAULT_CHUNK_SIZE, 

783 ) 

784 ) 

785 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode() 

786 

787 def _read(self, remaining_content_len: int | None) -> bytes: 

788 """ 

789 Read a chunk of data from the text file-like object. 

790 

791 Args: 

792 remaining_content_len: Optional maximum number of bytes to read. 

793 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size. 

794 

795 Returns: 

796 A chunk of bytes read from the file object and encoded using the payload's 

797 encoding. The data is automatically converted from text to bytes. 

798 

799 This method is used for subsequent reads during streaming after 

800 the initial _read_and_available_len call has been made. It properly 

801 handles text encoding, converting the text content to bytes using 

802 the specified encoding (or UTF-8 if none was provided). 

803 

804 """ 

805 chunk = self._value.read(remaining_content_len or DEFAULT_CHUNK_SIZE) 

806 return chunk.encode(self._encoding) if self._encoding else chunk.encode() 

807 

808 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

809 """ 

810 Return string representation of the value. 

811 

812 WARNING: This method does blocking I/O and should not be called in the event loop. 

813 """ 

814 self._set_or_restore_start_position() 

815 return self._value.read() 

816 

817 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

818 """ 

819 Return bytes representation of the value. 

820 

821 This method reads the entire text file content and returns it as bytes. 

822 It encodes the text content using the specified encoding. 

823 The file reading is performed in an executor to avoid blocking the event loop. 

824 """ 

825 loop = asyncio.get_running_loop() 

826 

827 # Use instance encoding if available, otherwise use parameter 

828 actual_encoding = self._encoding or encoding 

829 

830 def _read_and_encode() -> bytes: 

831 self._set_or_restore_start_position() 

832 # TextIO read() always returns the full content 

833 return self._value.read().encode(actual_encoding, errors) 

834 

835 return await loop.run_in_executor(None, _read_and_encode) 

836 

837 

838class BytesIOPayload(IOBasePayload): 

839 _value: io.BytesIO 

840 _size: int # Always initialized in __init__ 

841 _autoclose = True # BytesIO is in-memory, safe to auto-close 

842 

843 def __init__(self, value: io.BytesIO, *args: Any, **kwargs: Any) -> None: 

844 super().__init__(value, *args, **kwargs) 

845 # Calculate size once during initialization 

846 self._size = len(self._value.getbuffer()) - self._value.tell() 

847 

848 @property 

849 def size(self) -> int: 

850 """Size of the payload in bytes. 

851 

852 Returns the number of bytes in the BytesIO buffer that will be transmitted. 

853 This is calculated once during initialization for efficiency. 

854 """ 

855 return self._size 

856 

857 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

858 self._set_or_restore_start_position() 

859 return self._value.read().decode(encoding, errors) 

860 

861 async def write(self, writer: AbstractStreamWriter) -> None: 

862 return await self.write_with_length(writer, None) 

863 

864 async def write_with_length( 

865 self, writer: AbstractStreamWriter, content_length: int | None 

866 ) -> None: 

867 """ 

868 Write BytesIO payload with a specific content length constraint. 

869 

870 Args: 

871 writer: An AbstractStreamWriter instance that handles the actual writing 

872 content_length: Maximum number of bytes to write (None for unlimited) 

873 

874 This implementation is specifically optimized for BytesIO objects: 

875 

876 1. Reads content in chunks to maintain memory efficiency 

877 2. Yields control back to the event loop periodically to prevent blocking 

878 when dealing with large BytesIO objects 

879 3. Respects content_length constraints when specified 

880 4. Properly cleans up by closing the BytesIO object when done or on error 

881 

882 The periodic yielding to the event loop is important for maintaining 

883 responsiveness when processing large in-memory buffers. 

884 

885 """ 

886 self._set_or_restore_start_position() 

887 loop_count = 0 

888 remaining_bytes = content_length 

889 while chunk := self._value.read(DEFAULT_CHUNK_SIZE): 

890 if loop_count > 0: 

891 # Avoid blocking the event loop 

892 # if they pass a large BytesIO object 

893 # and we are not in the first iteration 

894 # of the loop 

895 await asyncio.sleep(0) 

896 if remaining_bytes is None: 

897 await writer.write(chunk) 

898 else: 

899 await writer.write(chunk[:remaining_bytes]) 

900 remaining_bytes -= len(chunk) 

901 if remaining_bytes <= 0: 

902 return 

903 loop_count += 1 

904 

905 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

906 """ 

907 Return bytes representation of the value. 

908 

909 This method reads the entire BytesIO content and returns it as bytes. 

910 It is equivalent to accessing the _value attribute directly. 

911 """ 

912 self._set_or_restore_start_position() 

913 return self._value.read() 

914 

915 async def close(self) -> None: 

916 """ 

917 Close the BytesIO payload. 

918 

919 This does nothing since BytesIO is in-memory and does not require explicit closing. 

920 """ 

921 

922 

923class BufferedReaderPayload(IOBasePayload): 

924 _value: io.BufferedIOBase 

925 # _autoclose = False (inherited) - Has buffered file handle that needs explicit closing 

926 

927 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

928 self._set_or_restore_start_position() 

929 return self._value.read().decode(encoding, errors) 

930 

931 

932class JsonPayload(BytesPayload): 

933 def __init__( 

934 self, 

935 value: Any, 

936 encoding: str = "utf-8", 

937 content_type: str = "application/json", 

938 dumps: JSONEncoder = json.dumps, 

939 *args: Any, 

940 **kwargs: Any, 

941 ) -> None: 

942 super().__init__( 

943 dumps(value).encode(encoding), 

944 content_type=content_type, 

945 encoding=encoding, 

946 *args, 

947 **kwargs, 

948 ) 

949 

950 

951class JsonBytesPayload(BytesPayload): 

952 """JSON payload for encoders that return bytes directly. 

953 

954 Use this when your JSON encoder (like orjson) returns bytes 

955 instead of str, avoiding the encode/decode overhead. 

956 """ 

957 

958 def __init__( 

959 self, 

960 value: Any, 

961 dumps: JSONBytesEncoder, 

962 content_type: str = "application/json", 

963 *args: Any, 

964 **kwargs: Any, 

965 ) -> None: 

966 super().__init__( 

967 dumps(value), 

968 content_type=content_type, 

969 *args, 

970 **kwargs, 

971 ) 

972 

973 

974class AsyncIterablePayload(Payload): 

975 _iter: AsyncIterator[bytes] | None = None 

976 _value: AsyncIterable[bytes] 

977 _cached_chunks: list[bytes] | None = None 

978 # _consumed stays False to allow reuse with cached content 

979 _autoclose = True # Iterator doesn't need explicit closing 

980 

981 def __init__(self, value: AsyncIterable[bytes], *args: Any, **kwargs: Any) -> None: 

982 if not isinstance(value, AsyncIterable): 

983 raise TypeError( 

984 "value argument must support " 

985 "collections.abc.AsyncIterable interface, " 

986 f"got {type(value)!r}" 

987 ) 

988 

989 if "content_type" not in kwargs: 

990 kwargs["content_type"] = "application/octet-stream" 

991 

992 super().__init__(value, *args, **kwargs) 

993 

994 self._iter = value.__aiter__() 

995 

996 async def write(self, writer: AbstractStreamWriter) -> None: 

997 """ 

998 Write the entire async iterable payload to the writer stream. 

999 

1000 Args: 

1001 writer: An AbstractStreamWriter instance that handles the actual writing 

1002 

1003 This method iterates through the async iterable and writes each chunk 

1004 to the writer without any length constraint. 

1005 

1006 Note: 

1007 For new implementations that need length control, use write_with_length() directly. 

1008 This method is maintained for backwards compatibility with existing code. 

1009 

1010 """ 

1011 await self.write_with_length(writer, None) 

1012 

1013 async def write_with_length( 

1014 self, writer: AbstractStreamWriter, content_length: int | None 

1015 ) -> None: 

1016 """ 

1017 Write async iterable payload with a specific content length constraint. 

1018 

1019 Args: 

1020 writer: An AbstractStreamWriter instance that handles the actual writing 

1021 content_length: Maximum number of bytes to write (None for unlimited) 

1022 

1023 This implementation handles streaming of async iterable content with length constraints: 

1024 

1025 1. If cached chunks are available, writes from them 

1026 2. Otherwise iterates through the async iterable one chunk at a time 

1027 3. Respects content_length constraints when specified 

1028 4. Does NOT generate cache - that's done by as_bytes() 

1029 

1030 """ 

1031 # If we have cached chunks, use them 

1032 if self._cached_chunks is not None: 

1033 remaining_bytes = content_length 

1034 for chunk in self._cached_chunks: 

1035 if remaining_bytes is None: 

1036 await writer.write(chunk) 

1037 elif remaining_bytes > 0: 

1038 await writer.write(chunk[:remaining_bytes]) 

1039 remaining_bytes -= len(chunk) 

1040 else: 

1041 break 

1042 return 

1043 

1044 # If iterator is exhausted and we don't have cached chunks, nothing to write 

1045 if self._iter is None: 

1046 return 

1047 

1048 # Stream from the iterator 

1049 remaining_bytes = content_length 

1050 

1051 try: 

1052 while True: 

1053 chunk = await anext(self._iter) 

1054 if remaining_bytes is None: 

1055 await writer.write(chunk) 

1056 # If we have a content length limit 

1057 elif remaining_bytes > 0: 

1058 await writer.write(chunk[:remaining_bytes]) 

1059 remaining_bytes -= len(chunk) 

1060 # We still want to exhaust the iterator even 

1061 # if we have reached the content length limit 

1062 # since the file handle may not get closed by 

1063 # the iterator if we don't do this 

1064 except StopAsyncIteration: 

1065 # Iterator is exhausted 

1066 self._iter = None 

1067 self._consumed = True # Mark as consumed when streamed without caching 

1068 

1069 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

1070 """Decode the payload content as a string if cached chunks are available.""" 

1071 if self._cached_chunks is not None: 

1072 return b"".join(self._cached_chunks).decode(encoding, errors) 

1073 raise TypeError("Unable to decode - content not cached. Call as_bytes() first.") 

1074 

1075 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

1076 """ 

1077 Return bytes representation of the value. 

1078 

1079 This method reads the entire async iterable content and returns it as bytes. 

1080 It generates and caches the chunks for future reuse. 

1081 """ 

1082 # If we have cached chunks, return them joined 

1083 if self._cached_chunks is not None: 

1084 return b"".join(self._cached_chunks) 

1085 

1086 # If iterator is exhausted and no cache, return empty 

1087 if self._iter is None: 

1088 return b"" 

1089 

1090 # Read all chunks and cache them 

1091 chunks: list[bytes] = [] 

1092 async for chunk in self._iter: 

1093 chunks.append(chunk) 

1094 

1095 # Iterator is exhausted, cache the chunks 

1096 self._iter = None 

1097 self._cached_chunks = chunks 

1098 # Keep _consumed as False to allow reuse with cached chunks 

1099 

1100 return b"".join(chunks) 

1101 

1102 

1103class StreamReaderPayload(AsyncIterablePayload): 

1104 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None: 

1105 super().__init__(value.iter_any(), *args, **kwargs) 

1106 

1107 

1108PAYLOAD_REGISTRY = PayloadRegistry() 

1109PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview)) 

1110PAYLOAD_REGISTRY.register(StringPayload, str) 

1111PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO) 

1112PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase) 

1113PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO) 

1114PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom)) 

1115PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase) 

1116PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader) 

1117# try_last for giving a chance to more specialized async interables like 

1118# multipart.BodyPartReaderPayload override the default 

1119PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)