Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/payload.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

386 statements  

1import asyncio 

2import enum 

3import io 

4import json 

5import mimetypes 

6import os 

7import sys 

8import warnings 

9from abc import ABC, abstractmethod 

10from collections.abc import AsyncIterable, AsyncIterator, Iterable 

11from itertools import chain 

12from typing import IO, Any, Final, TextIO 

13 

14from multidict import CIMultiDict 

15 

16from . import hdrs 

17from .abc import AbstractStreamWriter 

18from .helpers import ( 

19 _SENTINEL, 

20 DEFAULT_CHUNK_SIZE, 

21 content_disposition_header, 

22 guess_filename, 

23 parse_mimetype, 

24 sentinel, 

25) 

26from .http_writer import _safe_header 

27from .streams import StreamReader 

28from .typedefs import JSONBytesEncoder, JSONEncoder 

29 

30__all__ = ( 

31 "PAYLOAD_REGISTRY", 

32 "get_payload", 

33 "payload_type", 

34 "Payload", 

35 "BytesPayload", 

36 "StringPayload", 

37 "IOBasePayload", 

38 "BytesIOPayload", 

39 "BufferedReaderPayload", 

40 "TextIOPayload", 

41 "StringIOPayload", 

42 "JsonPayload", 

43 "JsonBytesPayload", 

44 "AsyncIterablePayload", 

45) 

46 

47TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB 

48_CLOSE_FUTURES: set[asyncio.Future[None]] = set() 

49 

50 

51class LookupError(Exception): 

52 """Raised when no payload factory is found for the given data type.""" 

53 

54 

55class Order(str, enum.Enum): 

56 normal = "normal" 

57 try_first = "try_first" 

58 try_last = "try_last" 

59 

60 

61def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload": 

62 return PAYLOAD_REGISTRY.get(data, *args, **kwargs) 

63 

64 

65def register_payload( 

66 factory: type["Payload"], type: Any, *, order: Order = Order.normal 

67) -> None: 

68 PAYLOAD_REGISTRY.register(factory, type, order=order) 

69 

70 

71class payload_type: 

72 def __init__(self, type: Any, *, order: Order = Order.normal) -> None: 

73 self.type = type 

74 self.order = order 

75 

76 def __call__(self, factory: type["Payload"]) -> type["Payload"]: 

77 register_payload(factory, self.type, order=self.order) 

78 return factory 

79 

80 

81PayloadType = type["Payload"] 

82_PayloadRegistryItem = tuple[PayloadType, Any] 

83 

84 

85class PayloadRegistry: 

86 """Payload registry. 

87 

88 note: we need zope.interface for more efficient adapter search 

89 """ 

90 

91 __slots__ = ("_first", "_normal", "_last", "_normal_lookup") 

92 

93 def __init__(self) -> None: 

94 self._first: list[_PayloadRegistryItem] = [] 

95 self._normal: list[_PayloadRegistryItem] = [] 

96 self._last: list[_PayloadRegistryItem] = [] 

97 self._normal_lookup: dict[Any, PayloadType] = {} 

98 

99 def get( 

100 self, 

101 data: Any, 

102 *args: Any, 

103 _CHAIN: "type[chain[_PayloadRegistryItem]]" = chain, 

104 **kwargs: Any, 

105 ) -> "Payload": 

106 if self._first: 

107 for factory, type_ in self._first: 

108 if isinstance(data, type_): 

109 return factory(data, *args, **kwargs) 

110 # Try the fast lookup first 

111 if lookup_factory := self._normal_lookup.get(type(data)): 

112 return lookup_factory(data, *args, **kwargs) 

113 # Bail early if its already a Payload 

114 if isinstance(data, Payload): 

115 return data 

116 # Fallback to the slower linear search 

117 for factory, type_ in _CHAIN(self._normal, self._last): 

118 if isinstance(data, type_): 

119 return factory(data, *args, **kwargs) 

120 raise LookupError() 

121 

122 def register( 

123 self, factory: PayloadType, type: Any, *, order: Order = Order.normal 

124 ) -> None: 

125 if order is Order.try_first: 

126 self._first.append((factory, type)) 

127 elif order is Order.normal: 

128 self._normal.append((factory, type)) 

129 if isinstance(type, Iterable): 

130 for t in type: 

131 self._normal_lookup[t] = factory 

132 else: 

133 self._normal_lookup[type] = factory 

134 elif order is Order.try_last: 

135 self._last.append((factory, type)) 

136 else: 

137 raise ValueError(f"Unsupported order {order!r}") 

138 

139 

140class Payload(ABC): 

141 _default_content_type: str = "application/octet-stream" 

142 _size: int | None = None 

143 _consumed: bool = False # Default: payload has not been consumed yet 

144 _autoclose: bool = False # Default: assume resource needs explicit closing 

145 

146 def __init__( 

147 self, 

148 value: Any, 

149 headers: ( 

150 CIMultiDict[str] | dict[str, str] | Iterable[tuple[str, str]] | None 

151 ) = None, 

152 content_type: None | str | _SENTINEL = sentinel, 

153 filename: str | None = None, 

154 encoding: str | None = None, 

155 **kwargs: Any, 

156 ) -> None: 

157 self._encoding = encoding 

158 self._filename = filename 

159 self._headers = CIMultiDict[str]() 

160 self._value = value 

161 if content_type is not sentinel and content_type is not None: 

162 assert isinstance(content_type, str) 

163 self._headers[hdrs.CONTENT_TYPE] = content_type 

164 elif self._filename is not None: 

165 if sys.version_info >= (3, 13): 

166 guesser = mimetypes.guess_file_type 

167 else: 

168 guesser = mimetypes.guess_type 

169 content_type = guesser(self._filename)[0] 

170 if content_type is None: 

171 content_type = self._default_content_type 

172 self._headers[hdrs.CONTENT_TYPE] = content_type 

173 else: 

174 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type 

175 if headers: 

176 self._headers.update(headers) 

177 

178 @property 

179 def size(self) -> int | None: 

180 """Size of the payload in bytes. 

181 

182 Returns the number of bytes that will be transmitted when the payload 

183 is written. For string payloads, this is the size after encoding to bytes, 

184 not the length of the string. 

185 """ 

186 return self._size 

187 

188 @property 

189 def filename(self) -> str | None: 

190 """Filename of the payload.""" 

191 return self._filename 

192 

193 @property 

194 def headers(self) -> CIMultiDict[str]: 

195 """Custom item headers""" 

196 return self._headers 

197 

198 @property 

199 def _binary_headers(self) -> bytes: 

200 return ( 

201 "".join( 

202 _safe_header(k) + ": " + _safe_header(v) + "\r\n" 

203 for k, v in self.headers.items() 

204 ).encode("utf-8") 

205 + b"\r\n" 

206 ) 

207 

208 @property 

209 def encoding(self) -> str | None: 

210 """Payload encoding""" 

211 return self._encoding 

212 

213 @property 

214 def content_type(self) -> str: 

215 """Content type""" 

216 return self._headers[hdrs.CONTENT_TYPE] 

217 

218 @property 

219 def consumed(self) -> bool: 

220 """Whether the payload has been consumed and cannot be reused.""" 

221 return self._consumed 

222 

223 @property 

224 def autoclose(self) -> bool: 

225 """ 

226 Whether the payload can close itself automatically. 

227 

228 Returns True if the payload has no file handles or resources that need 

229 explicit closing. If False, callers must await close() to release resources. 

230 """ 

231 return self._autoclose 

232 

233 def set_content_disposition( 

234 self, 

235 disptype: str, 

236 quote_fields: bool = True, 

237 _charset: str = "utf-8", 

238 **params: str, 

239 ) -> None: 

240 """Sets ``Content-Disposition`` header.""" 

241 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header( 

242 disptype, quote_fields=quote_fields, _charset=_charset, params=params 

243 ) 

244 

245 @abstractmethod 

246 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

247 """ 

248 Return string representation of the value. 

249 

250 This is named decode() to allow compatibility with bytes objects. 

251 """ 

252 

253 @abstractmethod 

254 async def write(self, writer: AbstractStreamWriter) -> None: 

255 """ 

256 Write payload to the writer stream. 

257 

258 Args: 

259 writer: An AbstractStreamWriter instance that handles the actual writing 

260 

261 This is a legacy method that writes the entire payload without length constraints. 

262 

263 Important: 

264 For new implementations, use write_with_length() instead of this method. 

265 This method is maintained for backwards compatibility and will eventually 

266 delegate to write_with_length(writer, None) in all implementations. 

267 

268 All payload subclasses must override this method for backwards compatibility, 

269 but new code should use write_with_length for more flexibility and control. 

270 

271 """ 

272 

273 # write_with_length is new in aiohttp 3.12 

274 # it should be overridden by subclasses 

275 async def write_with_length( 

276 self, writer: AbstractStreamWriter, content_length: int | None 

277 ) -> None: 

278 """ 

279 Write payload with a specific content length constraint. 

280 

281 Args: 

282 writer: An AbstractStreamWriter instance that handles the actual writing 

283 content_length: Maximum number of bytes to write (None for unlimited) 

284 

285 This method allows writing payload content with a specific length constraint, 

286 which is particularly useful for HTTP responses with Content-Length header. 

287 

288 Note: 

289 This is the base implementation that provides backwards compatibility 

290 for subclasses that don't override this method. Specific payload types 

291 should override this method to implement proper length-constrained writing. 

292 

293 """ 

294 # Backwards compatibility for subclasses that don't override this method 

295 # and for the default implementation 

296 await self.write(writer) 

297 

298 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

299 """ 

300 Return bytes representation of the value. 

301 

302 This is a convenience method that calls decode() and encodes the result 

303 to bytes using the specified encoding. 

304 """ 

305 # Use instance encoding if available, otherwise use parameter 

306 actual_encoding = self._encoding or encoding 

307 return self.decode(actual_encoding, errors).encode(actual_encoding) 

308 

309 def _close(self) -> None: 

310 """ 

311 Async safe synchronous close operations for backwards compatibility. 

312 

313 This method exists only for backwards compatibility with code that 

314 needs to clean up payloads synchronously. In the future, we will 

315 drop this method and only support the async close() method. 

316 

317 WARNING: This method must be safe to call from within the event loop 

318 without blocking. Subclasses should not perform any blocking I/O here. 

319 

320 WARNING: This method must be called from within an event loop for 

321 certain payload types (e.g., IOBasePayload). Calling it outside an 

322 event loop may raise RuntimeError. 

323 """ 

324 # This is a no-op by default, but subclasses can override it 

325 # for non-blocking cleanup operations. 

326 

327 async def close(self) -> None: 

328 """ 

329 Close the payload if it holds any resources. 

330 

331 IMPORTANT: This method must not await anything that might not finish 

332 immediately, as it may be called during cleanup/cancellation. Schedule 

333 any long-running operations without awaiting them. 

334 

335 In the future, this will be the only close method supported. 

336 """ 

337 self._close() 

338 

339 

340class BytesPayload(Payload): 

341 _value: bytes 

342 # _consumed = False (inherited) - Bytes are immutable and can be reused 

343 _autoclose = True # No file handle, just bytes in memory 

344 

345 def __init__( 

346 self, value: bytes | bytearray | memoryview, *args: Any, **kwargs: Any 

347 ) -> None: 

348 if "content_type" not in kwargs: 

349 kwargs["content_type"] = "application/octet-stream" 

350 

351 super().__init__(value, *args, **kwargs) 

352 

353 if isinstance(value, memoryview): 

354 self._size = value.nbytes 

355 elif isinstance(value, (bytes, bytearray)): 

356 self._size = len(value) 

357 else: 

358 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}") 

359 

360 if self._size > TOO_LARGE_BYTES_BODY: 

361 warnings.warn( 

362 "Sending a large body directly with raw bytes might" 

363 " lock the event loop. You should probably pass an " 

364 "io.BytesIO object instead", 

365 ResourceWarning, 

366 source=self, 

367 ) 

368 

369 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

370 return self._value.decode(encoding, errors) 

371 

372 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

373 """ 

374 Return bytes representation of the value. 

375 

376 This method returns the raw bytes content of the payload. 

377 It is equivalent to accessing the _value attribute directly. 

378 """ 

379 return self._value 

380 

381 async def write(self, writer: AbstractStreamWriter) -> None: 

382 """ 

383 Write the entire bytes payload to the writer stream. 

384 

385 Args: 

386 writer: An AbstractStreamWriter instance that handles the actual writing 

387 

388 This method writes the entire bytes content without any length constraint. 

389 

390 Note: 

391 For new implementations that need length control, use write_with_length(). 

392 This method is maintained for backwards compatibility and is equivalent 

393 to write_with_length(writer, None). 

394 

395 """ 

396 await writer.write(self._value) 

397 

398 async def write_with_length( 

399 self, writer: AbstractStreamWriter, content_length: int | None 

400 ) -> None: 

401 """ 

402 Write bytes payload with a specific content length constraint. 

403 

404 Args: 

405 writer: An AbstractStreamWriter instance that handles the actual writing 

406 content_length: Maximum number of bytes to write (None for unlimited) 

407 

408 This method writes either the entire byte sequence or a slice of it 

409 up to the specified content_length. For BytesPayload, this operation 

410 is performed efficiently using array slicing. 

411 

412 """ 

413 if content_length is not None: 

414 await writer.write(self._value[:content_length]) 

415 else: 

416 await writer.write(self._value) 

417 

418 

419class StringPayload(BytesPayload): 

420 def __init__( 

421 self, 

422 value: str, 

423 *args: Any, 

424 encoding: str | None = None, 

425 content_type: str | None = None, 

426 **kwargs: Any, 

427 ) -> None: 

428 if encoding is None: 

429 if content_type is None: 

430 real_encoding = "utf-8" 

431 content_type = "text/plain; charset=utf-8" 

432 else: 

433 mimetype = parse_mimetype(content_type) 

434 real_encoding = mimetype.parameters.get("charset", "utf-8") 

435 else: 

436 if content_type is None: 

437 content_type = "text/plain; charset=%s" % encoding 

438 real_encoding = encoding 

439 

440 super().__init__( 

441 value.encode(real_encoding), 

442 encoding=real_encoding, 

443 content_type=content_type, 

444 *args, 

445 **kwargs, 

446 ) 

447 

448 

449class StringIOPayload(StringPayload): 

450 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None: 

451 super().__init__(value.read(), *args, **kwargs) 

452 

453 

454class IOBasePayload(Payload): 

455 _value: io.IOBase 

456 # _consumed = False (inherited) - File can be re-read from the same position 

457 _start_position: int | None = None 

458 # _autoclose = False (inherited) - Has file handle that needs explicit closing 

459 

460 def __init__( 

461 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any 

462 ) -> None: 

463 if "filename" not in kwargs: 

464 kwargs["filename"] = guess_filename(value) 

465 

466 super().__init__(value, *args, **kwargs) 

467 

468 if self._filename is not None and disposition is not None: 

469 if hdrs.CONTENT_DISPOSITION not in self.headers: 

470 self.set_content_disposition(disposition, filename=self._filename) 

471 

472 def _set_or_restore_start_position(self) -> None: 

473 """Set or restore the start position of the file-like object.""" 

474 if self._start_position is None: 

475 try: 

476 self._start_position = self._value.tell() 

477 except (OSError, AttributeError): 

478 self._consumed = True # Cannot seek, mark as consumed 

479 return 

480 try: 

481 self._value.seek(self._start_position) 

482 except (OSError, AttributeError): 

483 # Failed to seek back - mark as consumed since we've already read 

484 self._consumed = True 

485 

486 def _read_and_available_len( 

487 self, remaining_content_len: int | None 

488 ) -> tuple[int | None, bytes]: 

489 """ 

490 Read the file-like object and return both its total size and the first chunk. 

491 

492 Args: 

493 remaining_content_len: Optional limit on how many bytes to read in this operation. 

494 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size. 

495 

496 Returns: 

497 A tuple containing: 

498 - The total size of the remaining unread content (None if size cannot be determined) 

499 - The first chunk of bytes read from the file object 

500 

501 This method is optimized to perform both size calculation and initial read 

502 in a single operation, which is executed in a single executor job to minimize 

503 context switches and file operations when streaming content. 

504 

505 """ 

506 self._set_or_restore_start_position() 

507 size = self.size # Call size only once since it does I/O 

508 return size, self._value.read( 

509 min( 

510 DEFAULT_CHUNK_SIZE, 

511 size or DEFAULT_CHUNK_SIZE, 

512 remaining_content_len or DEFAULT_CHUNK_SIZE, 

513 ) 

514 ) 

515 

516 def _read(self, remaining_content_len: int | None) -> bytes: 

517 """ 

518 Read a chunk of data from the file-like object. 

519 

520 Args: 

521 remaining_content_len: Optional maximum number of bytes to read. 

522 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size. 

523 

524 Returns: 

525 A chunk of bytes read from the file object, respecting the 

526 remaining_content_len limit if specified. 

527 

528 This method is used for subsequent reads during streaming after 

529 the initial _read_and_available_len call has been made. 

530 

531 """ 

532 return self._value.read(remaining_content_len or DEFAULT_CHUNK_SIZE) # type: ignore[no-any-return] 

533 

534 @property 

535 def size(self) -> int | None: 

536 """ 

537 Size of the payload in bytes. 

538 

539 Returns the total size of the payload content from the initial position. 

540 This ensures consistent Content-Length for requests, including 307/308 redirects 

541 where the same payload instance is reused. 

542 

543 Returns None if the size cannot be determined (e.g., for unseekable streams). 

544 """ 

545 try: 

546 # Store the start position on first access. 

547 # This is critical when the same payload instance is reused (e.g., 307/308 

548 # redirects). Without storing the initial position, after the payload is 

549 # read once, the file position would be at EOF, which would cause the 

550 # size calculation to return 0 (file_size - EOF position). 

551 # By storing the start position, we ensure the size calculation always 

552 # returns the correct total size for any subsequent use. 

553 if self._start_position is None: 

554 self._start_position = self._value.tell() 

555 

556 # Return the total size from the start position 

557 # This ensures Content-Length is correct even after reading 

558 return os.fstat(self._value.fileno()).st_size - self._start_position 

559 except (AttributeError, OSError): 

560 return None 

561 

562 async def write(self, writer: AbstractStreamWriter) -> None: 

563 """ 

564 Write the entire file-like payload to the writer stream. 

565 

566 Args: 

567 writer: An AbstractStreamWriter instance that handles the actual writing 

568 

569 This method writes the entire file content without any length constraint. 

570 It delegates to write_with_length() with no length limit for implementation 

571 consistency. 

572 

573 Note: 

574 For new implementations that need length control, use write_with_length() directly. 

575 This method is maintained for backwards compatibility with existing code. 

576 

577 """ 

578 await self.write_with_length(writer, None) 

579 

580 async def write_with_length( 

581 self, writer: AbstractStreamWriter, content_length: int | None 

582 ) -> None: 

583 """ 

584 Write file-like payload with a specific content length constraint. 

585 

586 Args: 

587 writer: An AbstractStreamWriter instance that handles the actual writing 

588 content_length: Maximum number of bytes to write (None for unlimited) 

589 

590 This method implements optimized streaming of file content with length constraints: 

591 

592 1. File reading is performed in a thread pool to avoid blocking the event loop 

593 2. Content is read and written in chunks to maintain memory efficiency 

594 3. Writing stops when either: 

595 - All available file content has been written (when size is known) 

596 - The specified content_length has been reached 

597 4. File resources are properly closed even if the operation is cancelled 

598 

599 The implementation carefully handles both known-size and unknown-size payloads, 

600 as well as constrained and unconstrained content lengths. 

601 

602 """ 

603 loop = asyncio.get_running_loop() 

604 total_written_len = 0 

605 remaining_content_len = content_length 

606 

607 # Get initial data and available length 

608 available_len, chunk = await loop.run_in_executor( 

609 None, self._read_and_available_len, remaining_content_len 

610 ) 

611 # Process data chunks until done 

612 while chunk: 

613 chunk_len = len(chunk) 

614 

615 # Write data with or without length constraint 

616 if remaining_content_len is None: 

617 await writer.write(chunk) 

618 else: 

619 await writer.write(chunk[:remaining_content_len]) 

620 remaining_content_len -= chunk_len 

621 

622 total_written_len += chunk_len 

623 

624 # Check if we're done writing 

625 if self._should_stop_writing( 

626 available_len, total_written_len, remaining_content_len 

627 ): 

628 return 

629 

630 # Read next chunk 

631 chunk = await loop.run_in_executor( 

632 None, 

633 self._read, 

634 ( 

635 min(DEFAULT_CHUNK_SIZE, remaining_content_len) 

636 if remaining_content_len is not None 

637 else DEFAULT_CHUNK_SIZE 

638 ), 

639 ) 

640 

641 def _should_stop_writing( 

642 self, 

643 available_len: int | None, 

644 total_written_len: int, 

645 remaining_content_len: int | None, 

646 ) -> bool: 

647 """ 

648 Determine if we should stop writing data. 

649 

650 Args: 

651 available_len: Known size of the payload if available (None if unknown) 

652 total_written_len: Number of bytes already written 

653 remaining_content_len: Remaining bytes to be written for content-length limited responses 

654 

655 Returns: 

656 True if we should stop writing data, based on either: 

657 - Having written all available data (when size is known) 

658 - Having written all requested content (when content-length is specified) 

659 

660 """ 

661 return (available_len is not None and total_written_len >= available_len) or ( 

662 remaining_content_len is not None and remaining_content_len <= 0 

663 ) 

664 

665 def _close(self) -> None: 

666 """ 

667 Async safe synchronous close operations for backwards compatibility. 

668 

669 This method exists only for backwards 

670 compatibility. Use the async close() method instead. 

671 

672 WARNING: This method MUST be called from within an event loop. 

673 Calling it outside an event loop will raise RuntimeError. 

674 """ 

675 # Skip if already consumed 

676 if self._consumed: 

677 return 

678 self._consumed = True # Mark as consumed to prevent further writes 

679 # Schedule file closing without awaiting to prevent cancellation issues 

680 loop = asyncio.get_running_loop() 

681 close_future = loop.run_in_executor(None, self._value.close) 

682 # Hold a strong reference to the future to prevent it from being 

683 # garbage collected before it completes. 

684 _CLOSE_FUTURES.add(close_future) 

685 close_future.add_done_callback(_CLOSE_FUTURES.remove) 

686 

687 async def close(self) -> None: 

688 """ 

689 Close the payload if it holds any resources. 

690 

691 IMPORTANT: This method must not await anything that might not finish 

692 immediately, as it may be called during cleanup/cancellation. Schedule 

693 any long-running operations without awaiting them. 

694 """ 

695 self._close() 

696 

697 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

698 """ 

699 Return string representation of the value. 

700 

701 WARNING: This method does blocking I/O and should not be called in the event loop. 

702 """ 

703 return self._read_all().decode(encoding, errors) 

704 

705 def _read_all(self) -> bytes: 

706 """Read the entire file-like object and return its content as bytes.""" 

707 self._set_or_restore_start_position() 

708 # Use readlines() to ensure we get all content 

709 return b"".join(self._value.readlines()) 

710 

711 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

712 """ 

713 Return bytes representation of the value. 

714 

715 This method reads the entire file content and returns it as bytes. 

716 It is equivalent to reading the file-like object directly. 

717 The file reading is performed in an executor to avoid blocking the event loop. 

718 """ 

719 loop = asyncio.get_running_loop() 

720 return await loop.run_in_executor(None, self._read_all) 

721 

722 

723class TextIOPayload(IOBasePayload): 

724 _value: io.TextIOBase 

725 # _autoclose = False (inherited) - Has text file handle that needs explicit closing 

726 

727 def __init__( 

728 self, 

729 value: TextIO, 

730 *args: Any, 

731 encoding: str | None = None, 

732 content_type: str | None = None, 

733 **kwargs: Any, 

734 ) -> None: 

735 if encoding is None: 

736 if content_type is None: 

737 encoding = "utf-8" 

738 content_type = "text/plain; charset=utf-8" 

739 else: 

740 mimetype = parse_mimetype(content_type) 

741 encoding = mimetype.parameters.get("charset", "utf-8") 

742 else: 

743 if content_type is None: 

744 content_type = "text/plain; charset=%s" % encoding 

745 

746 super().__init__( 

747 value, 

748 content_type=content_type, 

749 encoding=encoding, 

750 *args, 

751 **kwargs, 

752 ) 

753 

754 def _read_and_available_len( 

755 self, remaining_content_len: int | None 

756 ) -> tuple[int | None, bytes]: 

757 """ 

758 Read the text file-like object and return both its total size and the first chunk. 

759 

760 Args: 

761 remaining_content_len: Optional limit on how many bytes to read in this operation. 

762 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size. 

763 

764 Returns: 

765 A tuple containing: 

766 - The total size of the remaining unread content (None if size cannot be determined) 

767 - The first chunk of bytes read from the file object, encoded using the payload's encoding 

768 

769 This method is optimized to perform both size calculation and initial read 

770 in a single operation, which is executed in a single executor job to minimize 

771 context switches and file operations when streaming content. 

772 

773 Note: 

774 TextIOPayload handles encoding of the text content before writing it 

775 to the stream. If no encoding is specified, UTF-8 is used as the default. 

776 

777 """ 

778 self._set_or_restore_start_position() 

779 size = self.size 

780 chunk = self._value.read( 

781 min( 

782 DEFAULT_CHUNK_SIZE, 

783 size or DEFAULT_CHUNK_SIZE, 

784 remaining_content_len or DEFAULT_CHUNK_SIZE, 

785 ) 

786 ) 

787 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode() 

788 

789 def _read(self, remaining_content_len: int | None) -> bytes: 

790 """ 

791 Read a chunk of data from the text file-like object. 

792 

793 Args: 

794 remaining_content_len: Optional maximum number of bytes to read. 

795 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size. 

796 

797 Returns: 

798 A chunk of bytes read from the file object and encoded using the payload's 

799 encoding. The data is automatically converted from text to bytes. 

800 

801 This method is used for subsequent reads during streaming after 

802 the initial _read_and_available_len call has been made. It properly 

803 handles text encoding, converting the text content to bytes using 

804 the specified encoding (or UTF-8 if none was provided). 

805 

806 """ 

807 chunk = self._value.read(remaining_content_len or DEFAULT_CHUNK_SIZE) 

808 return chunk.encode(self._encoding) if self._encoding else chunk.encode() 

809 

810 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

811 """ 

812 Return string representation of the value. 

813 

814 WARNING: This method does blocking I/O and should not be called in the event loop. 

815 """ 

816 self._set_or_restore_start_position() 

817 return self._value.read() 

818 

819 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

820 """ 

821 Return bytes representation of the value. 

822 

823 This method reads the entire text file content and returns it as bytes. 

824 It encodes the text content using the specified encoding. 

825 The file reading is performed in an executor to avoid blocking the event loop. 

826 """ 

827 loop = asyncio.get_running_loop() 

828 

829 # Use instance encoding if available, otherwise use parameter 

830 actual_encoding = self._encoding or encoding 

831 

832 def _read_and_encode() -> bytes: 

833 self._set_or_restore_start_position() 

834 # TextIO read() always returns the full content 

835 return self._value.read().encode(actual_encoding, errors) 

836 

837 return await loop.run_in_executor(None, _read_and_encode) 

838 

839 

840class BytesIOPayload(IOBasePayload): 

841 _value: io.BytesIO 

842 _size: int # Always initialized in __init__ 

843 _autoclose = True # BytesIO is in-memory, safe to auto-close 

844 

845 def __init__(self, value: io.BytesIO, *args: Any, **kwargs: Any) -> None: 

846 super().__init__(value, *args, **kwargs) 

847 # Calculate size once during initialization 

848 self._size = len(self._value.getbuffer()) - self._value.tell() 

849 

850 @property 

851 def size(self) -> int: 

852 """Size of the payload in bytes. 

853 

854 Returns the number of bytes in the BytesIO buffer that will be transmitted. 

855 This is calculated once during initialization for efficiency. 

856 """ 

857 return self._size 

858 

859 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

860 self._set_or_restore_start_position() 

861 return self._value.read().decode(encoding, errors) 

862 

863 async def write(self, writer: AbstractStreamWriter) -> None: 

864 return await self.write_with_length(writer, None) 

865 

866 async def write_with_length( 

867 self, writer: AbstractStreamWriter, content_length: int | None 

868 ) -> None: 

869 """ 

870 Write BytesIO payload with a specific content length constraint. 

871 

872 Args: 

873 writer: An AbstractStreamWriter instance that handles the actual writing 

874 content_length: Maximum number of bytes to write (None for unlimited) 

875 

876 This implementation is specifically optimized for BytesIO objects: 

877 

878 1. Reads content in chunks to maintain memory efficiency 

879 2. Yields control back to the event loop periodically to prevent blocking 

880 when dealing with large BytesIO objects 

881 3. Respects content_length constraints when specified 

882 4. Properly cleans up by closing the BytesIO object when done or on error 

883 

884 The periodic yielding to the event loop is important for maintaining 

885 responsiveness when processing large in-memory buffers. 

886 

887 """ 

888 self._set_or_restore_start_position() 

889 loop_count = 0 

890 remaining_bytes = content_length 

891 while chunk := self._value.read(DEFAULT_CHUNK_SIZE): 

892 if loop_count > 0: 

893 # Avoid blocking the event loop 

894 # if they pass a large BytesIO object 

895 # and we are not in the first iteration 

896 # of the loop 

897 await asyncio.sleep(0) 

898 if remaining_bytes is None: 

899 await writer.write(chunk) 

900 else: 

901 await writer.write(chunk[:remaining_bytes]) 

902 remaining_bytes -= len(chunk) 

903 if remaining_bytes <= 0: 

904 return 

905 loop_count += 1 

906 

907 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

908 """ 

909 Return bytes representation of the value. 

910 

911 This method reads the entire BytesIO content and returns it as bytes. 

912 It is equivalent to accessing the _value attribute directly. 

913 """ 

914 self._set_or_restore_start_position() 

915 return self._value.read() 

916 

917 async def close(self) -> None: 

918 """ 

919 Close the BytesIO payload. 

920 

921 This does nothing since BytesIO is in-memory and does not require explicit closing. 

922 """ 

923 

924 

925class BufferedReaderPayload(IOBasePayload): 

926 _value: io.BufferedIOBase 

927 # _autoclose = False (inherited) - Has buffered file handle that needs explicit closing 

928 

929 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

930 self._set_or_restore_start_position() 

931 return self._value.read().decode(encoding, errors) 

932 

933 

934class JsonPayload(BytesPayload): 

935 def __init__( 

936 self, 

937 value: Any, 

938 encoding: str = "utf-8", 

939 content_type: str = "application/json", 

940 dumps: JSONEncoder = json.dumps, 

941 *args: Any, 

942 **kwargs: Any, 

943 ) -> None: 

944 super().__init__( 

945 dumps(value).encode(encoding), 

946 content_type=content_type, 

947 encoding=encoding, 

948 *args, 

949 **kwargs, 

950 ) 

951 

952 

953class JsonBytesPayload(BytesPayload): 

954 """JSON payload for encoders that return bytes directly. 

955 

956 Use this when your JSON encoder (like orjson) returns bytes 

957 instead of str, avoiding the encode/decode overhead. 

958 """ 

959 

960 def __init__( 

961 self, 

962 value: Any, 

963 dumps: JSONBytesEncoder, 

964 content_type: str = "application/json", 

965 *args: Any, 

966 **kwargs: Any, 

967 ) -> None: 

968 super().__init__( 

969 dumps(value), 

970 content_type=content_type, 

971 *args, 

972 **kwargs, 

973 ) 

974 

975 

976class AsyncIterablePayload(Payload): 

977 _iter: AsyncIterator[bytes] | None = None 

978 _value: AsyncIterable[bytes] 

979 _cached_chunks: list[bytes] | None = None 

980 # _consumed stays False to allow reuse with cached content 

981 _autoclose = True # Iterator doesn't need explicit closing 

982 

983 def __init__(self, value: AsyncIterable[bytes], *args: Any, **kwargs: Any) -> None: 

984 if not isinstance(value, AsyncIterable): 

985 raise TypeError( 

986 "value argument must support " 

987 "collections.abc.AsyncIterable interface, " 

988 f"got {type(value)!r}" 

989 ) 

990 

991 if "content_type" not in kwargs: 

992 kwargs["content_type"] = "application/octet-stream" 

993 

994 super().__init__(value, *args, **kwargs) 

995 

996 self._iter = value.__aiter__() 

997 

998 async def write(self, writer: AbstractStreamWriter) -> None: 

999 """ 

1000 Write the entire async iterable payload to the writer stream. 

1001 

1002 Args: 

1003 writer: An AbstractStreamWriter instance that handles the actual writing 

1004 

1005 This method iterates through the async iterable and writes each chunk 

1006 to the writer without any length constraint. 

1007 

1008 Note: 

1009 For new implementations that need length control, use write_with_length() directly. 

1010 This method is maintained for backwards compatibility with existing code. 

1011 

1012 """ 

1013 await self.write_with_length(writer, None) 

1014 

1015 async def write_with_length( 

1016 self, writer: AbstractStreamWriter, content_length: int | None 

1017 ) -> None: 

1018 """ 

1019 Write async iterable payload with a specific content length constraint. 

1020 

1021 Args: 

1022 writer: An AbstractStreamWriter instance that handles the actual writing 

1023 content_length: Maximum number of bytes to write (None for unlimited) 

1024 

1025 This implementation handles streaming of async iterable content with length constraints: 

1026 

1027 1. If cached chunks are available, writes from them 

1028 2. Otherwise iterates through the async iterable one chunk at a time 

1029 3. Respects content_length constraints when specified 

1030 4. Does NOT generate cache - that's done by as_bytes() 

1031 

1032 """ 

1033 # If we have cached chunks, use them 

1034 if self._cached_chunks is not None: 

1035 remaining_bytes = content_length 

1036 for chunk in self._cached_chunks: 

1037 if remaining_bytes is None: 

1038 await writer.write(chunk) 

1039 elif remaining_bytes > 0: 

1040 await writer.write(chunk[:remaining_bytes]) 

1041 remaining_bytes -= len(chunk) 

1042 else: 

1043 break 

1044 return 

1045 

1046 # If iterator is exhausted and we don't have cached chunks, nothing to write 

1047 if self._iter is None: 

1048 return 

1049 

1050 # Stream from the iterator 

1051 remaining_bytes = content_length 

1052 

1053 try: 

1054 while True: 

1055 chunk = await anext(self._iter) 

1056 if remaining_bytes is None: 

1057 await writer.write(chunk) 

1058 # If we have a content length limit 

1059 elif remaining_bytes > 0: 

1060 await writer.write(chunk[:remaining_bytes]) 

1061 remaining_bytes -= len(chunk) 

1062 # We still want to exhaust the iterator even 

1063 # if we have reached the content length limit 

1064 # since the file handle may not get closed by 

1065 # the iterator if we don't do this 

1066 except StopAsyncIteration: 

1067 # Iterator is exhausted 

1068 self._iter = None 

1069 self._consumed = True # Mark as consumed when streamed without caching 

1070 

1071 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: 

1072 """Decode the payload content as a string if cached chunks are available.""" 

1073 if self._cached_chunks is not None: 

1074 return b"".join(self._cached_chunks).decode(encoding, errors) 

1075 raise TypeError("Unable to decode - content not cached. Call as_bytes() first.") 

1076 

1077 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes: 

1078 """ 

1079 Return bytes representation of the value. 

1080 

1081 This method reads the entire async iterable content and returns it as bytes. 

1082 It generates and caches the chunks for future reuse. 

1083 """ 

1084 # If we have cached chunks, return them joined 

1085 if self._cached_chunks is not None: 

1086 return b"".join(self._cached_chunks) 

1087 

1088 # If iterator is exhausted and no cache, return empty 

1089 if self._iter is None: 

1090 return b"" 

1091 

1092 # Read all chunks and cache them 

1093 chunks: list[bytes] = [] 

1094 async for chunk in self._iter: 

1095 chunks.append(chunk) 

1096 

1097 # Iterator is exhausted, cache the chunks 

1098 self._iter = None 

1099 self._cached_chunks = chunks 

1100 # Keep _consumed as False to allow reuse with cached chunks 

1101 

1102 return b"".join(chunks) 

1103 

1104 

1105class StreamReaderPayload(AsyncIterablePayload): 

1106 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None: 

1107 super().__init__(value.iter_any(), *args, **kwargs) 

1108 

1109 

1110PAYLOAD_REGISTRY = PayloadRegistry() 

1111PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview)) 

1112PAYLOAD_REGISTRY.register(StringPayload, str) 

1113PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO) 

1114PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase) 

1115PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO) 

1116PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom)) 

1117PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase) 

1118PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader) 

1119# try_last for giving a chance to more specialized async interables like 

1120# multipart.BodyPartReaderPayload override the default 

1121PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)