Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/payload.py: 49%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import enum
3import io
4import json
5import mimetypes
6import os
7import sys
8import warnings
9from abc import ABC, abstractmethod
10from collections.abc import Iterable
11from itertools import chain
12from typing import IO, TYPE_CHECKING, Any, Final, TextIO
14from multidict import CIMultiDict
16from . import hdrs
17from .abc import AbstractStreamWriter
18from .helpers import (
19 _SENTINEL,
20 DEFAULT_CHUNK_SIZE,
21 content_disposition_header,
22 guess_filename,
23 parse_mimetype,
24 sentinel,
25)
26from .http_writer import _safe_header
27from .streams import StreamReader
28from .typedefs import JSONBytesEncoder, JSONEncoder, _CIMultiDict
30__all__ = (
31 "PAYLOAD_REGISTRY",
32 "get_payload",
33 "payload_type",
34 "Payload",
35 "BytesPayload",
36 "StringPayload",
37 "IOBasePayload",
38 "BytesIOPayload",
39 "BufferedReaderPayload",
40 "TextIOPayload",
41 "StringIOPayload",
42 "JsonPayload",
43 "JsonBytesPayload",
44 "AsyncIterablePayload",
45)
47TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB
48_CLOSE_FUTURES: set[asyncio.Future[None]] = set()
51class LookupError(Exception):
52 """Raised when no payload factory is found for the given data type."""
55class Order(str, enum.Enum):
56 normal = "normal"
57 try_first = "try_first"
58 try_last = "try_last"
61def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload":
62 return PAYLOAD_REGISTRY.get(data, *args, **kwargs)
65def register_payload(
66 factory: type["Payload"], type: Any, *, order: Order = Order.normal
67) -> None:
68 PAYLOAD_REGISTRY.register(factory, type, order=order)
71class payload_type:
72 def __init__(self, type: Any, *, order: Order = Order.normal) -> None:
73 self.type = type
74 self.order = order
76 def __call__(self, factory: type["Payload"]) -> type["Payload"]:
77 register_payload(factory, self.type, order=self.order)
78 return factory
81PayloadType = type["Payload"]
82_PayloadRegistryItem = tuple[PayloadType, Any]
85class PayloadRegistry:
86 """Payload registry.
88 note: we need zope.interface for more efficient adapter search
89 """
91 __slots__ = ("_first", "_normal", "_last", "_normal_lookup")
93 def __init__(self) -> None:
94 self._first: list[_PayloadRegistryItem] = []
95 self._normal: list[_PayloadRegistryItem] = []
96 self._last: list[_PayloadRegistryItem] = []
97 self._normal_lookup: dict[Any, PayloadType] = {}
99 def get(
100 self,
101 data: Any,
102 *args: Any,
103 _CHAIN: "type[chain[_PayloadRegistryItem]]" = chain,
104 **kwargs: Any,
105 ) -> "Payload":
106 if self._first:
107 for factory, type_ in self._first:
108 if isinstance(data, type_):
109 return factory(data, *args, **kwargs)
110 # Try the fast lookup first
111 if lookup_factory := self._normal_lookup.get(type(data)):
112 return lookup_factory(data, *args, **kwargs)
113 # Bail early if its already a Payload
114 if isinstance(data, Payload):
115 return data
116 # Fallback to the slower linear search
117 for factory, type_ in _CHAIN(self._normal, self._last):
118 if isinstance(data, type_):
119 return factory(data, *args, **kwargs)
120 raise LookupError()
122 def register(
123 self, factory: PayloadType, type: Any, *, order: Order = Order.normal
124 ) -> None:
125 if order is Order.try_first:
126 self._first.append((factory, type))
127 elif order is Order.normal:
128 self._normal.append((factory, type))
129 if isinstance(type, Iterable):
130 for t in type:
131 self._normal_lookup[t] = factory
132 else:
133 self._normal_lookup[type] = factory
134 elif order is Order.try_last:
135 self._last.append((factory, type))
136 else:
137 raise ValueError(f"Unsupported order {order!r}")
140class Payload(ABC):
142 _default_content_type: str = "application/octet-stream"
143 _size: int | None = None
144 _consumed: bool = False # Default: payload has not been consumed yet
145 _autoclose: bool = False # Default: assume resource needs explicit closing
147 def __init__(
148 self,
149 value: Any,
150 headers: (
151 _CIMultiDict | dict[str, str] | Iterable[tuple[str, str]] | None
152 ) = None,
153 content_type: str | None | _SENTINEL = sentinel,
154 filename: str | None = None,
155 encoding: str | None = None,
156 **kwargs: Any,
157 ) -> None:
158 self._encoding = encoding
159 self._filename = filename
160 self._headers: _CIMultiDict = CIMultiDict()
161 self._value = value
162 if content_type is not sentinel and content_type is not None:
163 self._headers[hdrs.CONTENT_TYPE] = content_type
164 elif self._filename is not None:
165 if sys.version_info >= (3, 13):
166 guesser = mimetypes.guess_file_type
167 else:
168 guesser = mimetypes.guess_type
169 content_type = guesser(self._filename)[0]
170 if content_type is None:
171 content_type = self._default_content_type
172 self._headers[hdrs.CONTENT_TYPE] = content_type
173 else:
174 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type
175 if headers:
176 self._headers.update(headers)
178 @property
179 def size(self) -> int | None:
180 """Size of the payload in bytes.
182 Returns the number of bytes that will be transmitted when the payload
183 is written. For string payloads, this is the size after encoding to bytes,
184 not the length of the string.
185 """
186 return self._size
188 @property
189 def filename(self) -> str | None:
190 """Filename of the payload."""
191 return self._filename
193 @property
194 def headers(self) -> _CIMultiDict:
195 """Custom item headers"""
196 return self._headers
198 @property
199 def _binary_headers(self) -> bytes:
200 return (
201 "".join(
202 _safe_header(k) + ": " + _safe_header(v) + "\r\n"
203 for k, v in self.headers.items()
204 ).encode("utf-8")
205 + b"\r\n"
206 )
208 @property
209 def encoding(self) -> str | None:
210 """Payload encoding"""
211 return self._encoding
213 @property
214 def content_type(self) -> str:
215 """Content type"""
216 return self._headers[hdrs.CONTENT_TYPE]
218 @property
219 def consumed(self) -> bool:
220 """Whether the payload has been consumed and cannot be reused."""
221 return self._consumed
223 @property
224 def autoclose(self) -> bool:
225 """
226 Whether the payload can close itself automatically.
228 Returns True if the payload has no file handles or resources that need
229 explicit closing. If False, callers must await close() to release resources.
230 """
231 return self._autoclose
233 def set_content_disposition(
234 self,
235 disptype: str,
236 quote_fields: bool = True,
237 _charset: str = "utf-8",
238 **params: Any,
239 ) -> None:
240 """Sets ``Content-Disposition`` header."""
241 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header(
242 disptype, quote_fields=quote_fields, _charset=_charset, **params
243 )
245 @abstractmethod
246 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
247 """
248 Return string representation of the value.
250 This is named decode() to allow compatibility with bytes objects.
251 """
253 @abstractmethod
254 async def write(self, writer: AbstractStreamWriter) -> None:
255 """
256 Write payload to the writer stream.
258 Args:
259 writer: An AbstractStreamWriter instance that handles the actual writing
261 This is a legacy method that writes the entire payload without length constraints.
263 Important:
264 For new implementations, use write_with_length() instead of this method.
265 This method is maintained for backwards compatibility and will eventually
266 delegate to write_with_length(writer, None) in all implementations.
268 All payload subclasses must override this method for backwards compatibility,
269 but new code should use write_with_length for more flexibility and control.
271 """
273 # write_with_length is new in aiohttp 3.12
274 # it should be overridden by subclasses
275 async def write_with_length(
276 self, writer: AbstractStreamWriter, content_length: int | None
277 ) -> None:
278 """
279 Write payload with a specific content length constraint.
281 Args:
282 writer: An AbstractStreamWriter instance that handles the actual writing
283 content_length: Maximum number of bytes to write (None for unlimited)
285 This method allows writing payload content with a specific length constraint,
286 which is particularly useful for HTTP responses with Content-Length header.
288 Note:
289 This is the base implementation that provides backwards compatibility
290 for subclasses that don't override this method. Specific payload types
291 should override this method to implement proper length-constrained writing.
293 """
294 # Backwards compatibility for subclasses that don't override this method
295 # and for the default implementation
296 await self.write(writer)
298 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
299 """
300 Return bytes representation of the value.
302 This is a convenience method that calls decode() and encodes the result
303 to bytes using the specified encoding.
304 """
305 # Use instance encoding if available, otherwise use parameter
306 actual_encoding = self._encoding or encoding
307 return self.decode(actual_encoding, errors).encode(actual_encoding)
309 def _close(self) -> None:
310 """
311 Async safe synchronous close operations for backwards compatibility.
313 This method exists only for backwards compatibility with code that
314 needs to clean up payloads synchronously. In the future, we will
315 drop this method and only support the async close() method.
317 WARNING: This method must be safe to call from within the event loop
318 without blocking. Subclasses should not perform any blocking I/O here.
320 WARNING: This method must be called from within an event loop for
321 certain payload types (e.g., IOBasePayload). Calling it outside an
322 event loop may raise RuntimeError.
323 """
324 # This is a no-op by default, but subclasses can override it
325 # for non-blocking cleanup operations.
327 async def close(self) -> None:
328 """
329 Close the payload if it holds any resources.
331 IMPORTANT: This method must not await anything that might not finish
332 immediately, as it may be called during cleanup/cancellation. Schedule
333 any long-running operations without awaiting them.
335 In the future, this will be the only close method supported.
336 """
337 self._close()
340class BytesPayload(Payload):
341 _value: bytes
342 # _consumed = False (inherited) - Bytes are immutable and can be reused
343 _autoclose = True # No file handle, just bytes in memory
345 def __init__(
346 self, value: bytes | bytearray | memoryview, *args: Any, **kwargs: Any
347 ) -> None:
348 if "content_type" not in kwargs:
349 kwargs["content_type"] = "application/octet-stream"
351 super().__init__(value, *args, **kwargs)
353 if isinstance(value, memoryview):
354 self._size = value.nbytes
355 elif isinstance(value, (bytes, bytearray)):
356 self._size = len(value)
357 else:
358 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}")
360 if self._size > TOO_LARGE_BYTES_BODY:
361 kwargs = {"source": self}
362 warnings.warn(
363 "Sending a large body directly with raw bytes might"
364 " lock the event loop. You should probably pass an "
365 "io.BytesIO object instead",
366 ResourceWarning,
367 **kwargs,
368 )
370 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
371 return self._value.decode(encoding, errors)
373 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
374 """
375 Return bytes representation of the value.
377 This method returns the raw bytes content of the payload.
378 It is equivalent to accessing the _value attribute directly.
379 """
380 return self._value
382 async def write(self, writer: AbstractStreamWriter) -> None:
383 """
384 Write the entire bytes payload to the writer stream.
386 Args:
387 writer: An AbstractStreamWriter instance that handles the actual writing
389 This method writes the entire bytes content without any length constraint.
391 Note:
392 For new implementations that need length control, use write_with_length().
393 This method is maintained for backwards compatibility and is equivalent
394 to write_with_length(writer, None).
396 """
397 await writer.write(self._value)
399 async def write_with_length(
400 self, writer: AbstractStreamWriter, content_length: int | None
401 ) -> None:
402 """
403 Write bytes payload with a specific content length constraint.
405 Args:
406 writer: An AbstractStreamWriter instance that handles the actual writing
407 content_length: Maximum number of bytes to write (None for unlimited)
409 This method writes either the entire byte sequence or a slice of it
410 up to the specified content_length. For BytesPayload, this operation
411 is performed efficiently using array slicing.
413 """
414 if content_length is not None:
415 await writer.write(self._value[:content_length])
416 else:
417 await writer.write(self._value)
420class StringPayload(BytesPayload):
421 def __init__(
422 self,
423 value: str,
424 *args: Any,
425 encoding: str | None = None,
426 content_type: str | None = None,
427 **kwargs: Any,
428 ) -> None:
430 if encoding is None:
431 if content_type is None:
432 real_encoding = "utf-8"
433 content_type = "text/plain; charset=utf-8"
434 else:
435 mimetype = parse_mimetype(content_type)
436 real_encoding = mimetype.parameters.get("charset", "utf-8")
437 else:
438 if content_type is None:
439 content_type = "text/plain; charset=%s" % encoding
440 real_encoding = encoding
442 super().__init__(
443 value.encode(real_encoding),
444 encoding=real_encoding,
445 content_type=content_type,
446 *args,
447 **kwargs,
448 )
451class StringIOPayload(StringPayload):
452 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None:
453 super().__init__(value.read(), *args, **kwargs)
456class IOBasePayload(Payload):
457 _value: io.IOBase
458 # _consumed = False (inherited) - File can be re-read from the same position
459 _start_position: int | None = None
460 # _autoclose = False (inherited) - Has file handle that needs explicit closing
462 def __init__(
463 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any
464 ) -> None:
465 if "filename" not in kwargs:
466 kwargs["filename"] = guess_filename(value)
468 super().__init__(value, *args, **kwargs)
470 if self._filename is not None and disposition is not None:
471 if hdrs.CONTENT_DISPOSITION not in self.headers:
472 self.set_content_disposition(disposition, filename=self._filename)
474 def _set_or_restore_start_position(self) -> None:
475 """Set or restore the start position of the file-like object."""
476 if self._start_position is None:
477 try:
478 self._start_position = self._value.tell()
479 except (OSError, AttributeError):
480 self._consumed = True # Cannot seek, mark as consumed
481 return
482 try:
483 self._value.seek(self._start_position)
484 except (OSError, AttributeError):
485 # Failed to seek back - mark as consumed since we've already read
486 self._consumed = True
488 def _read_and_available_len(
489 self, remaining_content_len: int | None
490 ) -> tuple[int | None, bytes]:
491 """
492 Read the file-like object and return both its total size and the first chunk.
494 Args:
495 remaining_content_len: Optional limit on how many bytes to read in this operation.
496 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size.
498 Returns:
499 A tuple containing:
500 - The total size of the remaining unread content (None if size cannot be determined)
501 - The first chunk of bytes read from the file object
503 This method is optimized to perform both size calculation and initial read
504 in a single operation, which is executed in a single executor job to minimize
505 context switches and file operations when streaming content.
507 """
508 self._set_or_restore_start_position()
509 size = self.size # Call size only once since it does I/O
510 return size, self._value.read(
511 min(
512 DEFAULT_CHUNK_SIZE,
513 size or DEFAULT_CHUNK_SIZE,
514 remaining_content_len or DEFAULT_CHUNK_SIZE,
515 )
516 )
518 def _read(self, remaining_content_len: int | None) -> bytes:
519 """
520 Read a chunk of data from the file-like object.
522 Args:
523 remaining_content_len: Optional maximum number of bytes to read.
524 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size.
526 Returns:
527 A chunk of bytes read from the file object, respecting the
528 remaining_content_len limit if specified.
530 This method is used for subsequent reads during streaming after
531 the initial _read_and_available_len call has been made.
533 """
534 return self._value.read(remaining_content_len or DEFAULT_CHUNK_SIZE) # type: ignore[no-any-return]
536 @property
537 def size(self) -> int | None:
538 """
539 Size of the payload in bytes.
541 Returns the total size of the payload content from the initial position.
542 This ensures consistent Content-Length for requests, including 307/308 redirects
543 where the same payload instance is reused.
545 Returns None if the size cannot be determined (e.g., for unseekable streams).
546 """
547 try:
548 # Store the start position on first access.
549 # This is critical when the same payload instance is reused (e.g., 307/308
550 # redirects). Without storing the initial position, after the payload is
551 # read once, the file position would be at EOF, which would cause the
552 # size calculation to return 0 (file_size - EOF position).
553 # By storing the start position, we ensure the size calculation always
554 # returns the correct total size for any subsequent use.
555 if self._start_position is None:
556 self._start_position = self._value.tell()
558 # Return the total size from the start position
559 # This ensures Content-Length is correct even after reading
560 return os.fstat(self._value.fileno()).st_size - self._start_position
561 except (AttributeError, OSError):
562 return None
564 async def write(self, writer: AbstractStreamWriter) -> None:
565 """
566 Write the entire file-like payload to the writer stream.
568 Args:
569 writer: An AbstractStreamWriter instance that handles the actual writing
571 This method writes the entire file content without any length constraint.
572 It delegates to write_with_length() with no length limit for implementation
573 consistency.
575 Note:
576 For new implementations that need length control, use write_with_length() directly.
577 This method is maintained for backwards compatibility with existing code.
579 """
580 await self.write_with_length(writer, None)
582 async def write_with_length(
583 self, writer: AbstractStreamWriter, content_length: int | None
584 ) -> None:
585 """
586 Write file-like payload with a specific content length constraint.
588 Args:
589 writer: An AbstractStreamWriter instance that handles the actual writing
590 content_length: Maximum number of bytes to write (None for unlimited)
592 This method implements optimized streaming of file content with length constraints:
594 1. File reading is performed in a thread pool to avoid blocking the event loop
595 2. Content is read and written in chunks to maintain memory efficiency
596 3. Writing stops when either:
597 - All available file content has been written (when size is known)
598 - The specified content_length has been reached
599 4. File resources are properly closed even if the operation is cancelled
601 The implementation carefully handles both known-size and unknown-size payloads,
602 as well as constrained and unconstrained content lengths.
604 """
605 loop = asyncio.get_running_loop()
606 total_written_len = 0
607 remaining_content_len = content_length
609 # Get initial data and available length
610 available_len, chunk = await loop.run_in_executor(
611 None, self._read_and_available_len, remaining_content_len
612 )
613 # Process data chunks until done
614 while chunk:
615 chunk_len = len(chunk)
617 # Write data with or without length constraint
618 if remaining_content_len is None:
619 await writer.write(chunk)
620 else:
621 await writer.write(chunk[:remaining_content_len])
622 remaining_content_len -= chunk_len
624 total_written_len += chunk_len
626 # Check if we're done writing
627 if self._should_stop_writing(
628 available_len, total_written_len, remaining_content_len
629 ):
630 return
632 # Read next chunk
633 chunk = await loop.run_in_executor(
634 None,
635 self._read,
636 (
637 min(DEFAULT_CHUNK_SIZE, remaining_content_len)
638 if remaining_content_len is not None
639 else DEFAULT_CHUNK_SIZE
640 ),
641 )
643 def _should_stop_writing(
644 self,
645 available_len: int | None,
646 total_written_len: int,
647 remaining_content_len: int | None,
648 ) -> bool:
649 """
650 Determine if we should stop writing data.
652 Args:
653 available_len: Known size of the payload if available (None if unknown)
654 total_written_len: Number of bytes already written
655 remaining_content_len: Remaining bytes to be written for content-length limited responses
657 Returns:
658 True if we should stop writing data, based on either:
659 - Having written all available data (when size is known)
660 - Having written all requested content (when content-length is specified)
662 """
663 return (available_len is not None and total_written_len >= available_len) or (
664 remaining_content_len is not None and remaining_content_len <= 0
665 )
667 def _close(self) -> None:
668 """
669 Async safe synchronous close operations for backwards compatibility.
671 This method exists only for backwards
672 compatibility. Use the async close() method instead.
674 WARNING: This method MUST be called from within an event loop.
675 Calling it outside an event loop will raise RuntimeError.
676 """
677 # Skip if already consumed
678 if self._consumed:
679 return
680 self._consumed = True # Mark as consumed to prevent further writes
681 # Schedule file closing without awaiting to prevent cancellation issues
682 loop = asyncio.get_running_loop()
683 close_future = loop.run_in_executor(None, self._value.close)
684 # Hold a strong reference to the future to prevent it from being
685 # garbage collected before it completes.
686 _CLOSE_FUTURES.add(close_future)
687 close_future.add_done_callback(_CLOSE_FUTURES.remove)
689 async def close(self) -> None:
690 """
691 Close the payload if it holds any resources.
693 IMPORTANT: This method must not await anything that might not finish
694 immediately, as it may be called during cleanup/cancellation. Schedule
695 any long-running operations without awaiting them.
696 """
697 self._close()
699 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
700 """
701 Return string representation of the value.
703 WARNING: This method does blocking I/O and should not be called in the event loop.
704 """
705 return self._read_all().decode(encoding, errors)
707 def _read_all(self) -> bytes:
708 """Read the entire file-like object and return its content as bytes."""
709 self._set_or_restore_start_position()
710 # Use readlines() to ensure we get all content
711 return b"".join(self._value.readlines())
713 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
714 """
715 Return bytes representation of the value.
717 This method reads the entire file content and returns it as bytes.
718 It is equivalent to reading the file-like object directly.
719 The file reading is performed in an executor to avoid blocking the event loop.
720 """
721 loop = asyncio.get_running_loop()
722 return await loop.run_in_executor(None, self._read_all)
725class TextIOPayload(IOBasePayload):
726 _value: io.TextIOBase
727 # _autoclose = False (inherited) - Has text file handle that needs explicit closing
729 def __init__(
730 self,
731 value: TextIO,
732 *args: Any,
733 encoding: str | None = None,
734 content_type: str | None = None,
735 **kwargs: Any,
736 ) -> None:
738 if encoding is None:
739 if content_type is None:
740 encoding = "utf-8"
741 content_type = "text/plain; charset=utf-8"
742 else:
743 mimetype = parse_mimetype(content_type)
744 encoding = mimetype.parameters.get("charset", "utf-8")
745 else:
746 if content_type is None:
747 content_type = "text/plain; charset=%s" % encoding
749 super().__init__(
750 value,
751 content_type=content_type,
752 encoding=encoding,
753 *args,
754 **kwargs,
755 )
757 def _read_and_available_len(
758 self, remaining_content_len: int | None
759 ) -> tuple[int | None, bytes]:
760 """
761 Read the text file-like object and return both its total size and the first chunk.
763 Args:
764 remaining_content_len: Optional limit on how many bytes to read in this operation.
765 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size.
767 Returns:
768 A tuple containing:
769 - The total size of the remaining unread content (None if size cannot be determined)
770 - The first chunk of bytes read from the file object, encoded using the payload's encoding
772 This method is optimized to perform both size calculation and initial read
773 in a single operation, which is executed in a single executor job to minimize
774 context switches and file operations when streaming content.
776 Note:
777 TextIOPayload handles encoding of the text content before writing it
778 to the stream. If no encoding is specified, UTF-8 is used as the default.
780 """
781 self._set_or_restore_start_position()
782 size = self.size
783 chunk = self._value.read(
784 min(
785 DEFAULT_CHUNK_SIZE,
786 size or DEFAULT_CHUNK_SIZE,
787 remaining_content_len or DEFAULT_CHUNK_SIZE,
788 )
789 )
790 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode()
792 def _read(self, remaining_content_len: int | None) -> bytes:
793 """
794 Read a chunk of data from the text file-like object.
796 Args:
797 remaining_content_len: Optional maximum number of bytes to read.
798 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size.
800 Returns:
801 A chunk of bytes read from the file object and encoded using the payload's
802 encoding. The data is automatically converted from text to bytes.
804 This method is used for subsequent reads during streaming after
805 the initial _read_and_available_len call has been made. It properly
806 handles text encoding, converting the text content to bytes using
807 the specified encoding (or UTF-8 if none was provided).
809 """
810 chunk = self._value.read(remaining_content_len or DEFAULT_CHUNK_SIZE)
811 return chunk.encode(self._encoding) if self._encoding else chunk.encode()
813 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
814 """
815 Return string representation of the value.
817 WARNING: This method does blocking I/O and should not be called in the event loop.
818 """
819 self._set_or_restore_start_position()
820 return self._value.read()
822 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
823 """
824 Return bytes representation of the value.
826 This method reads the entire text file content and returns it as bytes.
827 It encodes the text content using the specified encoding.
828 The file reading is performed in an executor to avoid blocking the event loop.
829 """
830 loop = asyncio.get_running_loop()
832 # Use instance encoding if available, otherwise use parameter
833 actual_encoding = self._encoding or encoding
835 def _read_and_encode() -> bytes:
836 self._set_or_restore_start_position()
837 # TextIO read() always returns the full content
838 return self._value.read().encode(actual_encoding, errors)
840 return await loop.run_in_executor(None, _read_and_encode)
843class BytesIOPayload(IOBasePayload):
844 _value: io.BytesIO
845 _size: int # Always initialized in __init__
846 _autoclose = True # BytesIO is in-memory, safe to auto-close
848 def __init__(self, value: io.BytesIO, *args: Any, **kwargs: Any) -> None:
849 super().__init__(value, *args, **kwargs)
850 # Calculate size once during initialization
851 self._size = len(self._value.getbuffer()) - self._value.tell()
853 @property
854 def size(self) -> int:
855 """Size of the payload in bytes.
857 Returns the number of bytes in the BytesIO buffer that will be transmitted.
858 This is calculated once during initialization for efficiency.
859 """
860 return self._size
862 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
863 self._set_or_restore_start_position()
864 return self._value.read().decode(encoding, errors)
866 async def write(self, writer: AbstractStreamWriter) -> None:
867 return await self.write_with_length(writer, None)
869 async def write_with_length(
870 self, writer: AbstractStreamWriter, content_length: int | None
871 ) -> None:
872 """
873 Write BytesIO payload with a specific content length constraint.
875 Args:
876 writer: An AbstractStreamWriter instance that handles the actual writing
877 content_length: Maximum number of bytes to write (None for unlimited)
879 This implementation is specifically optimized for BytesIO objects:
881 1. Reads content in chunks to maintain memory efficiency
882 2. Yields control back to the event loop periodically to prevent blocking
883 when dealing with large BytesIO objects
884 3. Respects content_length constraints when specified
885 4. Properly cleans up by closing the BytesIO object when done or on error
887 The periodic yielding to the event loop is important for maintaining
888 responsiveness when processing large in-memory buffers.
890 """
891 self._set_or_restore_start_position()
892 loop_count = 0
893 remaining_bytes = content_length
894 while chunk := self._value.read(DEFAULT_CHUNK_SIZE):
895 if loop_count > 0:
896 # Avoid blocking the event loop
897 # if they pass a large BytesIO object
898 # and we are not in the first iteration
899 # of the loop
900 await asyncio.sleep(0)
901 if remaining_bytes is None:
902 await writer.write(chunk)
903 else:
904 await writer.write(chunk[:remaining_bytes])
905 remaining_bytes -= len(chunk)
906 if remaining_bytes <= 0:
907 return
908 loop_count += 1
910 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
911 """
912 Return bytes representation of the value.
914 This method reads the entire BytesIO content and returns it as bytes.
915 It is equivalent to accessing the _value attribute directly.
916 """
917 self._set_or_restore_start_position()
918 return self._value.read()
920 async def close(self) -> None:
921 """
922 Close the BytesIO payload.
924 This does nothing since BytesIO is in-memory and does not require explicit closing.
925 """
928class BufferedReaderPayload(IOBasePayload):
929 _value: io.BufferedIOBase
930 # _autoclose = False (inherited) - Has buffered file handle that needs explicit closing
932 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
933 self._set_or_restore_start_position()
934 return self._value.read().decode(encoding, errors)
937class JsonPayload(BytesPayload):
938 def __init__(
939 self,
940 value: Any,
941 encoding: str = "utf-8",
942 content_type: str = "application/json",
943 dumps: JSONEncoder = json.dumps,
944 *args: Any,
945 **kwargs: Any,
946 ) -> None:
948 super().__init__(
949 dumps(value).encode(encoding),
950 content_type=content_type,
951 encoding=encoding,
952 *args,
953 **kwargs,
954 )
957class JsonBytesPayload(BytesPayload):
958 """JSON payload for encoders that return bytes directly.
960 Use this when your JSON encoder (like orjson) returns bytes
961 instead of str, avoiding the encode/decode overhead.
962 """
964 def __init__(
965 self,
966 value: Any,
967 dumps: JSONBytesEncoder,
968 content_type: str = "application/json",
969 *args: Any,
970 **kwargs: Any,
971 ) -> None:
972 super().__init__(
973 dumps(value),
974 content_type=content_type,
975 *args,
976 **kwargs,
977 )
980if TYPE_CHECKING:
981 from collections.abc import AsyncIterable, AsyncIterator
983 _AsyncIterator = AsyncIterator[bytes]
984 _AsyncIterable = AsyncIterable[bytes]
985else:
986 from collections.abc import AsyncIterable, AsyncIterator
988 _AsyncIterator = AsyncIterator
989 _AsyncIterable = AsyncIterable
992class AsyncIterablePayload(Payload):
994 _iter: _AsyncIterator | None = None
995 _value: _AsyncIterable
996 _cached_chunks: list[bytes] | None = None
997 # _consumed stays False to allow reuse with cached content
998 _autoclose = True # Iterator doesn't need explicit closing
1000 def __init__(self, value: _AsyncIterable, *args: Any, **kwargs: Any) -> None:
1001 if not isinstance(value, AsyncIterable):
1002 raise TypeError(
1003 "value argument must support "
1004 "collections.abc.AsyncIterable interface, "
1005 f"got {type(value)!r}"
1006 )
1008 if "content_type" not in kwargs:
1009 kwargs["content_type"] = "application/octet-stream"
1011 super().__init__(value, *args, **kwargs)
1013 self._iter = value.__aiter__()
1015 async def write(self, writer: AbstractStreamWriter) -> None:
1016 """
1017 Write the entire async iterable payload to the writer stream.
1019 Args:
1020 writer: An AbstractStreamWriter instance that handles the actual writing
1022 This method iterates through the async iterable and writes each chunk
1023 to the writer without any length constraint.
1025 Note:
1026 For new implementations that need length control, use write_with_length() directly.
1027 This method is maintained for backwards compatibility with existing code.
1029 """
1030 await self.write_with_length(writer, None)
1032 async def write_with_length(
1033 self, writer: AbstractStreamWriter, content_length: int | None
1034 ) -> None:
1035 """
1036 Write async iterable payload with a specific content length constraint.
1038 Args:
1039 writer: An AbstractStreamWriter instance that handles the actual writing
1040 content_length: Maximum number of bytes to write (None for unlimited)
1042 This implementation handles streaming of async iterable content with length constraints:
1044 1. If cached chunks are available, writes from them
1045 2. Otherwise iterates through the async iterable one chunk at a time
1046 3. Respects content_length constraints when specified
1047 4. Does NOT generate cache - that's done by as_bytes()
1049 """
1050 # If we have cached chunks, use them
1051 if self._cached_chunks is not None:
1052 remaining_bytes = content_length
1053 for chunk in self._cached_chunks:
1054 if remaining_bytes is None:
1055 await writer.write(chunk)
1056 elif remaining_bytes > 0:
1057 await writer.write(chunk[:remaining_bytes])
1058 remaining_bytes -= len(chunk)
1059 else:
1060 break
1061 return
1063 # If iterator is exhausted and we don't have cached chunks, nothing to write
1064 if self._iter is None:
1065 return
1067 # Stream from the iterator
1068 remaining_bytes = content_length
1070 try:
1071 while True:
1072 chunk = await anext(self._iter)
1073 if remaining_bytes is None:
1074 await writer.write(chunk)
1075 # If we have a content length limit
1076 elif remaining_bytes > 0:
1077 await writer.write(chunk[:remaining_bytes])
1078 remaining_bytes -= len(chunk)
1079 # We still want to exhaust the iterator even
1080 # if we have reached the content length limit
1081 # since the file handle may not get closed by
1082 # the iterator if we don't do this
1083 except StopAsyncIteration:
1084 # Iterator is exhausted
1085 self._iter = None
1086 self._consumed = True # Mark as consumed when streamed without caching
1088 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
1089 """Decode the payload content as a string if cached chunks are available."""
1090 if self._cached_chunks is not None:
1091 return b"".join(self._cached_chunks).decode(encoding, errors)
1092 raise TypeError("Unable to decode - content not cached. Call as_bytes() first.")
1094 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
1095 """
1096 Return bytes representation of the value.
1098 This method reads the entire async iterable content and returns it as bytes.
1099 It generates and caches the chunks for future reuse.
1100 """
1101 # If we have cached chunks, return them joined
1102 if self._cached_chunks is not None:
1103 return b"".join(self._cached_chunks)
1105 # If iterator is exhausted and no cache, return empty
1106 if self._iter is None:
1107 return b""
1109 # Read all chunks and cache them
1110 chunks: list[bytes] = []
1111 async for chunk in self._iter:
1112 chunks.append(chunk)
1114 # Iterator is exhausted, cache the chunks
1115 self._iter = None
1116 self._cached_chunks = chunks
1117 # Keep _consumed as False to allow reuse with cached chunks
1119 return b"".join(chunks)
1122class StreamReaderPayload(AsyncIterablePayload):
1123 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None:
1124 super().__init__(value.iter_any(), *args, **kwargs)
1127PAYLOAD_REGISTRY = PayloadRegistry()
1128PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview))
1129PAYLOAD_REGISTRY.register(StringPayload, str)
1130PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO)
1131PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase)
1132PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO)
1133PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom))
1134PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase)
1135PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader)
1136# try_last for giving a chance to more specialized async interables like
1137# multipart.BodyPartReaderPayload override the default
1138PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)