Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/payload.py: 49%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import enum
3import io
4import json
5import mimetypes
6import os
7import sys
8import warnings
9from abc import ABC, abstractmethod
10from collections.abc import Iterable
11from itertools import chain
12from typing import (
13 IO,
14 TYPE_CHECKING,
15 Any,
16 Dict,
17 Final,
18 List,
19 Optional,
20 Set,
21 TextIO,
22 Tuple,
23 Type,
24 Union,
25)
27from multidict import CIMultiDict
29from . import hdrs
30from .abc import AbstractStreamWriter
31from .helpers import (
32 _SENTINEL,
33 content_disposition_header,
34 guess_filename,
35 parse_mimetype,
36 sentinel,
37)
38from .streams import StreamReader
39from .typedefs import JSONEncoder, _CIMultiDict
41__all__ = (
42 "PAYLOAD_REGISTRY",
43 "get_payload",
44 "payload_type",
45 "Payload",
46 "BytesPayload",
47 "StringPayload",
48 "IOBasePayload",
49 "BytesIOPayload",
50 "BufferedReaderPayload",
51 "TextIOPayload",
52 "StringIOPayload",
53 "JsonPayload",
54 "AsyncIterablePayload",
55)
57TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB
58READ_SIZE: Final[int] = 2**16 # 64 KB
59_CLOSE_FUTURES: Set[asyncio.Future[None]] = set()
62class LookupError(Exception):
63 """Raised when no payload factory is found for the given data type."""
66class Order(str, enum.Enum):
67 normal = "normal"
68 try_first = "try_first"
69 try_last = "try_last"
72def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload":
73 return PAYLOAD_REGISTRY.get(data, *args, **kwargs)
76def register_payload(
77 factory: Type["Payload"], type: Any, *, order: Order = Order.normal
78) -> None:
79 PAYLOAD_REGISTRY.register(factory, type, order=order)
82class payload_type:
83 def __init__(self, type: Any, *, order: Order = Order.normal) -> None:
84 self.type = type
85 self.order = order
87 def __call__(self, factory: Type["Payload"]) -> Type["Payload"]:
88 register_payload(factory, self.type, order=self.order)
89 return factory
92PayloadType = Type["Payload"]
93_PayloadRegistryItem = Tuple[PayloadType, Any]
96class PayloadRegistry:
97 """Payload registry.
99 note: we need zope.interface for more efficient adapter search
100 """
102 __slots__ = ("_first", "_normal", "_last", "_normal_lookup")
104 def __init__(self) -> None:
105 self._first: List[_PayloadRegistryItem] = []
106 self._normal: List[_PayloadRegistryItem] = []
107 self._last: List[_PayloadRegistryItem] = []
108 self._normal_lookup: Dict[Any, PayloadType] = {}
110 def get(
111 self,
112 data: Any,
113 *args: Any,
114 _CHAIN: "Type[chain[_PayloadRegistryItem]]" = chain,
115 **kwargs: Any,
116 ) -> "Payload":
117 if self._first:
118 for factory, type_ in self._first:
119 if isinstance(data, type_):
120 return factory(data, *args, **kwargs)
121 # Try the fast lookup first
122 if lookup_factory := self._normal_lookup.get(type(data)):
123 return lookup_factory(data, *args, **kwargs)
124 # Bail early if its already a Payload
125 if isinstance(data, Payload):
126 return data
127 # Fallback to the slower linear search
128 for factory, type_ in _CHAIN(self._normal, self._last):
129 if isinstance(data, type_):
130 return factory(data, *args, **kwargs)
131 raise LookupError()
133 def register(
134 self, factory: PayloadType, type: Any, *, order: Order = Order.normal
135 ) -> None:
136 if order is Order.try_first:
137 self._first.append((factory, type))
138 elif order is Order.normal:
139 self._normal.append((factory, type))
140 if isinstance(type, Iterable):
141 for t in type:
142 self._normal_lookup[t] = factory
143 else:
144 self._normal_lookup[type] = factory
145 elif order is Order.try_last:
146 self._last.append((factory, type))
147 else:
148 raise ValueError(f"Unsupported order {order!r}")
151class Payload(ABC):
153 _default_content_type: str = "application/octet-stream"
154 _size: Optional[int] = None
155 _consumed: bool = False # Default: payload has not been consumed yet
156 _autoclose: bool = False # Default: assume resource needs explicit closing
158 def __init__(
159 self,
160 value: Any,
161 headers: Optional[
162 Union[_CIMultiDict, Dict[str, str], Iterable[Tuple[str, str]]]
163 ] = None,
164 content_type: Union[str, None, _SENTINEL] = sentinel,
165 filename: Optional[str] = None,
166 encoding: Optional[str] = None,
167 **kwargs: Any,
168 ) -> None:
169 self._encoding = encoding
170 self._filename = filename
171 self._headers: _CIMultiDict = CIMultiDict()
172 self._value = value
173 if content_type is not sentinel and content_type is not None:
174 self._headers[hdrs.CONTENT_TYPE] = content_type
175 elif self._filename is not None:
176 if sys.version_info >= (3, 13):
177 guesser = mimetypes.guess_file_type
178 else:
179 guesser = mimetypes.guess_type
180 content_type = guesser(self._filename)[0]
181 if content_type is None:
182 content_type = self._default_content_type
183 self._headers[hdrs.CONTENT_TYPE] = content_type
184 else:
185 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type
186 if headers:
187 self._headers.update(headers)
189 @property
190 def size(self) -> Optional[int]:
191 """Size of the payload in bytes.
193 Returns the number of bytes that will be transmitted when the payload
194 is written. For string payloads, this is the size after encoding to bytes,
195 not the length of the string.
196 """
197 return self._size
199 @property
200 def filename(self) -> Optional[str]:
201 """Filename of the payload."""
202 return self._filename
204 @property
205 def headers(self) -> _CIMultiDict:
206 """Custom item headers"""
207 return self._headers
209 @property
210 def _binary_headers(self) -> bytes:
211 return (
212 "".join([k + ": " + v + "\r\n" for k, v in self.headers.items()]).encode(
213 "utf-8"
214 )
215 + b"\r\n"
216 )
218 @property
219 def encoding(self) -> Optional[str]:
220 """Payload encoding"""
221 return self._encoding
223 @property
224 def content_type(self) -> str:
225 """Content type"""
226 return self._headers[hdrs.CONTENT_TYPE]
228 @property
229 def consumed(self) -> bool:
230 """Whether the payload has been consumed and cannot be reused."""
231 return self._consumed
233 @property
234 def autoclose(self) -> bool:
235 """
236 Whether the payload can close itself automatically.
238 Returns True if the payload has no file handles or resources that need
239 explicit closing. If False, callers must await close() to release resources.
240 """
241 return self._autoclose
243 def set_content_disposition(
244 self,
245 disptype: str,
246 quote_fields: bool = True,
247 _charset: str = "utf-8",
248 **params: Any,
249 ) -> None:
250 """Sets ``Content-Disposition`` header."""
251 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header(
252 disptype, quote_fields=quote_fields, _charset=_charset, **params
253 )
255 @abstractmethod
256 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
257 """
258 Return string representation of the value.
260 This is named decode() to allow compatibility with bytes objects.
261 """
263 @abstractmethod
264 async def write(self, writer: AbstractStreamWriter) -> None:
265 """
266 Write payload to the writer stream.
268 Args:
269 writer: An AbstractStreamWriter instance that handles the actual writing
271 This is a legacy method that writes the entire payload without length constraints.
273 Important:
274 For new implementations, use write_with_length() instead of this method.
275 This method is maintained for backwards compatibility and will eventually
276 delegate to write_with_length(writer, None) in all implementations.
278 All payload subclasses must override this method for backwards compatibility,
279 but new code should use write_with_length for more flexibility and control.
281 """
283 # write_with_length is new in aiohttp 3.12
284 # it should be overridden by subclasses
285 async def write_with_length(
286 self, writer: AbstractStreamWriter, content_length: Optional[int]
287 ) -> None:
288 """
289 Write payload with a specific content length constraint.
291 Args:
292 writer: An AbstractStreamWriter instance that handles the actual writing
293 content_length: Maximum number of bytes to write (None for unlimited)
295 This method allows writing payload content with a specific length constraint,
296 which is particularly useful for HTTP responses with Content-Length header.
298 Note:
299 This is the base implementation that provides backwards compatibility
300 for subclasses that don't override this method. Specific payload types
301 should override this method to implement proper length-constrained writing.
303 """
304 # Backwards compatibility for subclasses that don't override this method
305 # and for the default implementation
306 await self.write(writer)
308 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
309 """
310 Return bytes representation of the value.
312 This is a convenience method that calls decode() and encodes the result
313 to bytes using the specified encoding.
314 """
315 # Use instance encoding if available, otherwise use parameter
316 actual_encoding = self._encoding or encoding
317 return self.decode(actual_encoding, errors).encode(actual_encoding)
319 def _close(self) -> None:
320 """
321 Async safe synchronous close operations for backwards compatibility.
323 This method exists only for backwards compatibility with code that
324 needs to clean up payloads synchronously. In the future, we will
325 drop this method and only support the async close() method.
327 WARNING: This method must be safe to call from within the event loop
328 without blocking. Subclasses should not perform any blocking I/O here.
330 WARNING: This method must be called from within an event loop for
331 certain payload types (e.g., IOBasePayload). Calling it outside an
332 event loop may raise RuntimeError.
333 """
334 # This is a no-op by default, but subclasses can override it
335 # for non-blocking cleanup operations.
337 async def close(self) -> None:
338 """
339 Close the payload if it holds any resources.
341 IMPORTANT: This method must not await anything that might not finish
342 immediately, as it may be called during cleanup/cancellation. Schedule
343 any long-running operations without awaiting them.
345 In the future, this will be the only close method supported.
346 """
347 self._close()
350class BytesPayload(Payload):
351 _value: bytes
352 # _consumed = False (inherited) - Bytes are immutable and can be reused
353 _autoclose = True # No file handle, just bytes in memory
355 def __init__(
356 self, value: Union[bytes, bytearray, memoryview], *args: Any, **kwargs: Any
357 ) -> None:
358 if "content_type" not in kwargs:
359 kwargs["content_type"] = "application/octet-stream"
361 super().__init__(value, *args, **kwargs)
363 if isinstance(value, memoryview):
364 self._size = value.nbytes
365 elif isinstance(value, (bytes, bytearray)):
366 self._size = len(value)
367 else:
368 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}")
370 if self._size > TOO_LARGE_BYTES_BODY:
371 kwargs = {"source": self}
372 warnings.warn(
373 "Sending a large body directly with raw bytes might"
374 " lock the event loop. You should probably pass an "
375 "io.BytesIO object instead",
376 ResourceWarning,
377 **kwargs,
378 )
380 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
381 return self._value.decode(encoding, errors)
383 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
384 """
385 Return bytes representation of the value.
387 This method returns the raw bytes content of the payload.
388 It is equivalent to accessing the _value attribute directly.
389 """
390 return self._value
392 async def write(self, writer: AbstractStreamWriter) -> None:
393 """
394 Write the entire bytes payload to the writer stream.
396 Args:
397 writer: An AbstractStreamWriter instance that handles the actual writing
399 This method writes the entire bytes content without any length constraint.
401 Note:
402 For new implementations that need length control, use write_with_length().
403 This method is maintained for backwards compatibility and is equivalent
404 to write_with_length(writer, None).
406 """
407 await writer.write(self._value)
409 async def write_with_length(
410 self, writer: AbstractStreamWriter, content_length: Optional[int]
411 ) -> None:
412 """
413 Write bytes payload with a specific content length constraint.
415 Args:
416 writer: An AbstractStreamWriter instance that handles the actual writing
417 content_length: Maximum number of bytes to write (None for unlimited)
419 This method writes either the entire byte sequence or a slice of it
420 up to the specified content_length. For BytesPayload, this operation
421 is performed efficiently using array slicing.
423 """
424 if content_length is not None:
425 await writer.write(self._value[:content_length])
426 else:
427 await writer.write(self._value)
430class StringPayload(BytesPayload):
431 def __init__(
432 self,
433 value: str,
434 *args: Any,
435 encoding: Optional[str] = None,
436 content_type: Optional[str] = None,
437 **kwargs: Any,
438 ) -> None:
440 if encoding is None:
441 if content_type is None:
442 real_encoding = "utf-8"
443 content_type = "text/plain; charset=utf-8"
444 else:
445 mimetype = parse_mimetype(content_type)
446 real_encoding = mimetype.parameters.get("charset", "utf-8")
447 else:
448 if content_type is None:
449 content_type = "text/plain; charset=%s" % encoding
450 real_encoding = encoding
452 super().__init__(
453 value.encode(real_encoding),
454 encoding=real_encoding,
455 content_type=content_type,
456 *args,
457 **kwargs,
458 )
461class StringIOPayload(StringPayload):
462 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None:
463 super().__init__(value.read(), *args, **kwargs)
466class IOBasePayload(Payload):
467 _value: io.IOBase
468 # _consumed = False (inherited) - File can be re-read from the same position
469 _start_position: Optional[int] = None
470 # _autoclose = False (inherited) - Has file handle that needs explicit closing
472 def __init__(
473 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any
474 ) -> None:
475 if "filename" not in kwargs:
476 kwargs["filename"] = guess_filename(value)
478 super().__init__(value, *args, **kwargs)
480 if self._filename is not None and disposition is not None:
481 if hdrs.CONTENT_DISPOSITION not in self.headers:
482 self.set_content_disposition(disposition, filename=self._filename)
484 def _set_or_restore_start_position(self) -> None:
485 """Set or restore the start position of the file-like object."""
486 if self._start_position is None:
487 try:
488 self._start_position = self._value.tell()
489 except OSError:
490 self._consumed = True # Cannot seek, mark as consumed
491 return
492 self._value.seek(self._start_position)
494 def _read_and_available_len(
495 self, remaining_content_len: Optional[int]
496 ) -> Tuple[Optional[int], bytes]:
497 """
498 Read the file-like object and return both its total size and the first chunk.
500 Args:
501 remaining_content_len: Optional limit on how many bytes to read in this operation.
502 If None, READ_SIZE will be used as the default chunk size.
504 Returns:
505 A tuple containing:
506 - The total size of the remaining unread content (None if size cannot be determined)
507 - The first chunk of bytes read from the file object
509 This method is optimized to perform both size calculation and initial read
510 in a single operation, which is executed in a single executor job to minimize
511 context switches and file operations when streaming content.
513 """
514 self._set_or_restore_start_position()
515 size = self.size # Call size only once since it does I/O
516 return size, self._value.read(
517 min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE)
518 )
520 def _read(self, remaining_content_len: Optional[int]) -> bytes:
521 """
522 Read a chunk of data from the file-like object.
524 Args:
525 remaining_content_len: Optional maximum number of bytes to read.
526 If None, READ_SIZE will be used as the default chunk size.
528 Returns:
529 A chunk of bytes read from the file object, respecting the
530 remaining_content_len limit if specified.
532 This method is used for subsequent reads during streaming after
533 the initial _read_and_available_len call has been made.
535 """
536 return self._value.read(remaining_content_len or READ_SIZE) # type: ignore[no-any-return]
538 @property
539 def size(self) -> Optional[int]:
540 """
541 Size of the payload in bytes.
543 Returns the number of bytes remaining to be read from the file.
544 Returns None if the size cannot be determined (e.g., for unseekable streams).
545 """
546 try:
547 return os.fstat(self._value.fileno()).st_size - self._value.tell()
548 except (AttributeError, OSError):
549 return None
551 async def write(self, writer: AbstractStreamWriter) -> None:
552 """
553 Write the entire file-like payload to the writer stream.
555 Args:
556 writer: An AbstractStreamWriter instance that handles the actual writing
558 This method writes the entire file content without any length constraint.
559 It delegates to write_with_length() with no length limit for implementation
560 consistency.
562 Note:
563 For new implementations that need length control, use write_with_length() directly.
564 This method is maintained for backwards compatibility with existing code.
566 """
567 await self.write_with_length(writer, None)
569 async def write_with_length(
570 self, writer: AbstractStreamWriter, content_length: Optional[int]
571 ) -> None:
572 """
573 Write file-like payload with a specific content length constraint.
575 Args:
576 writer: An AbstractStreamWriter instance that handles the actual writing
577 content_length: Maximum number of bytes to write (None for unlimited)
579 This method implements optimized streaming of file content with length constraints:
581 1. File reading is performed in a thread pool to avoid blocking the event loop
582 2. Content is read and written in chunks to maintain memory efficiency
583 3. Writing stops when either:
584 - All available file content has been written (when size is known)
585 - The specified content_length has been reached
586 4. File resources are properly closed even if the operation is cancelled
588 The implementation carefully handles both known-size and unknown-size payloads,
589 as well as constrained and unconstrained content lengths.
591 """
592 loop = asyncio.get_running_loop()
593 total_written_len = 0
594 remaining_content_len = content_length
596 # Get initial data and available length
597 available_len, chunk = await loop.run_in_executor(
598 None, self._read_and_available_len, remaining_content_len
599 )
600 # Process data chunks until done
601 while chunk:
602 chunk_len = len(chunk)
604 # Write data with or without length constraint
605 if remaining_content_len is None:
606 await writer.write(chunk)
607 else:
608 await writer.write(chunk[:remaining_content_len])
609 remaining_content_len -= chunk_len
611 total_written_len += chunk_len
613 # Check if we're done writing
614 if self._should_stop_writing(
615 available_len, total_written_len, remaining_content_len
616 ):
617 return
619 # Read next chunk
620 chunk = await loop.run_in_executor(
621 None,
622 self._read,
623 (
624 min(READ_SIZE, remaining_content_len)
625 if remaining_content_len is not None
626 else READ_SIZE
627 ),
628 )
630 def _should_stop_writing(
631 self,
632 available_len: Optional[int],
633 total_written_len: int,
634 remaining_content_len: Optional[int],
635 ) -> bool:
636 """
637 Determine if we should stop writing data.
639 Args:
640 available_len: Known size of the payload if available (None if unknown)
641 total_written_len: Number of bytes already written
642 remaining_content_len: Remaining bytes to be written for content-length limited responses
644 Returns:
645 True if we should stop writing data, based on either:
646 - Having written all available data (when size is known)
647 - Having written all requested content (when content-length is specified)
649 """
650 return (available_len is not None and total_written_len >= available_len) or (
651 remaining_content_len is not None and remaining_content_len <= 0
652 )
654 def _close(self) -> None:
655 """
656 Async safe synchronous close operations for backwards compatibility.
658 This method exists only for backwards
659 compatibility. Use the async close() method instead.
661 WARNING: This method MUST be called from within an event loop.
662 Calling it outside an event loop will raise RuntimeError.
663 """
664 # Skip if already consumed
665 if self._consumed:
666 return
667 self._consumed = True # Mark as consumed to prevent further writes
668 # Schedule file closing without awaiting to prevent cancellation issues
669 loop = asyncio.get_running_loop()
670 close_future = loop.run_in_executor(None, self._value.close)
671 # Hold a strong reference to the future to prevent it from being
672 # garbage collected before it completes.
673 _CLOSE_FUTURES.add(close_future)
674 close_future.add_done_callback(_CLOSE_FUTURES.remove)
676 async def close(self) -> None:
677 """
678 Close the payload if it holds any resources.
680 IMPORTANT: This method must not await anything that might not finish
681 immediately, as it may be called during cleanup/cancellation. Schedule
682 any long-running operations without awaiting them.
683 """
684 self._close()
686 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
687 """
688 Return string representation of the value.
690 WARNING: This method does blocking I/O and should not be called in the event loop.
691 """
692 return self._read_all().decode(encoding, errors)
694 def _read_all(self) -> bytes:
695 """Read the entire file-like object and return its content as bytes."""
696 self._set_or_restore_start_position()
697 # Use readlines() to ensure we get all content
698 return b"".join(self._value.readlines())
700 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
701 """
702 Return bytes representation of the value.
704 This method reads the entire file content and returns it as bytes.
705 It is equivalent to reading the file-like object directly.
706 The file reading is performed in an executor to avoid blocking the event loop.
707 """
708 loop = asyncio.get_running_loop()
709 return await loop.run_in_executor(None, self._read_all)
712class TextIOPayload(IOBasePayload):
713 _value: io.TextIOBase
714 # _autoclose = False (inherited) - Has text file handle that needs explicit closing
716 def __init__(
717 self,
718 value: TextIO,
719 *args: Any,
720 encoding: Optional[str] = None,
721 content_type: Optional[str] = None,
722 **kwargs: Any,
723 ) -> None:
725 if encoding is None:
726 if content_type is None:
727 encoding = "utf-8"
728 content_type = "text/plain; charset=utf-8"
729 else:
730 mimetype = parse_mimetype(content_type)
731 encoding = mimetype.parameters.get("charset", "utf-8")
732 else:
733 if content_type is None:
734 content_type = "text/plain; charset=%s" % encoding
736 super().__init__(
737 value,
738 content_type=content_type,
739 encoding=encoding,
740 *args,
741 **kwargs,
742 )
744 def _read_and_available_len(
745 self, remaining_content_len: Optional[int]
746 ) -> Tuple[Optional[int], bytes]:
747 """
748 Read the text file-like object and return both its total size and the first chunk.
750 Args:
751 remaining_content_len: Optional limit on how many bytes to read in this operation.
752 If None, READ_SIZE will be used as the default chunk size.
754 Returns:
755 A tuple containing:
756 - The total size of the remaining unread content (None if size cannot be determined)
757 - The first chunk of bytes read from the file object, encoded using the payload's encoding
759 This method is optimized to perform both size calculation and initial read
760 in a single operation, which is executed in a single executor job to minimize
761 context switches and file operations when streaming content.
763 Note:
764 TextIOPayload handles encoding of the text content before writing it
765 to the stream. If no encoding is specified, UTF-8 is used as the default.
767 """
768 self._set_or_restore_start_position()
769 size = self.size
770 chunk = self._value.read(
771 min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE)
772 )
773 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode()
775 def _read(self, remaining_content_len: Optional[int]) -> bytes:
776 """
777 Read a chunk of data from the text file-like object.
779 Args:
780 remaining_content_len: Optional maximum number of bytes to read.
781 If None, READ_SIZE will be used as the default chunk size.
783 Returns:
784 A chunk of bytes read from the file object and encoded using the payload's
785 encoding. The data is automatically converted from text to bytes.
787 This method is used for subsequent reads during streaming after
788 the initial _read_and_available_len call has been made. It properly
789 handles text encoding, converting the text content to bytes using
790 the specified encoding (or UTF-8 if none was provided).
792 """
793 chunk = self._value.read(remaining_content_len or READ_SIZE)
794 return chunk.encode(self._encoding) if self._encoding else chunk.encode()
796 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
797 """
798 Return string representation of the value.
800 WARNING: This method does blocking I/O and should not be called in the event loop.
801 """
802 self._set_or_restore_start_position()
803 return self._value.read()
805 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
806 """
807 Return bytes representation of the value.
809 This method reads the entire text file content and returns it as bytes.
810 It encodes the text content using the specified encoding.
811 The file reading is performed in an executor to avoid blocking the event loop.
812 """
813 loop = asyncio.get_running_loop()
815 # Use instance encoding if available, otherwise use parameter
816 actual_encoding = self._encoding or encoding
818 def _read_and_encode() -> bytes:
819 self._set_or_restore_start_position()
820 # TextIO read() always returns the full content
821 return self._value.read().encode(actual_encoding, errors)
823 return await loop.run_in_executor(None, _read_and_encode)
826class BytesIOPayload(IOBasePayload):
827 _value: io.BytesIO
828 _size: int # Always initialized in __init__
829 _autoclose = True # BytesIO is in-memory, safe to auto-close
831 def __init__(self, value: io.BytesIO, *args: Any, **kwargs: Any) -> None:
832 super().__init__(value, *args, **kwargs)
833 # Calculate size once during initialization
834 self._size = len(self._value.getbuffer()) - self._value.tell()
836 @property
837 def size(self) -> int:
838 """Size of the payload in bytes.
840 Returns the number of bytes in the BytesIO buffer that will be transmitted.
841 This is calculated once during initialization for efficiency.
842 """
843 return self._size
845 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
846 self._set_or_restore_start_position()
847 return self._value.read().decode(encoding, errors)
849 async def write(self, writer: AbstractStreamWriter) -> None:
850 return await self.write_with_length(writer, None)
852 async def write_with_length(
853 self, writer: AbstractStreamWriter, content_length: Optional[int]
854 ) -> None:
855 """
856 Write BytesIO payload with a specific content length constraint.
858 Args:
859 writer: An AbstractStreamWriter instance that handles the actual writing
860 content_length: Maximum number of bytes to write (None for unlimited)
862 This implementation is specifically optimized for BytesIO objects:
864 1. Reads content in chunks to maintain memory efficiency
865 2. Yields control back to the event loop periodically to prevent blocking
866 when dealing with large BytesIO objects
867 3. Respects content_length constraints when specified
868 4. Properly cleans up by closing the BytesIO object when done or on error
870 The periodic yielding to the event loop is important for maintaining
871 responsiveness when processing large in-memory buffers.
873 """
874 self._set_or_restore_start_position()
875 loop_count = 0
876 remaining_bytes = content_length
877 while chunk := self._value.read(READ_SIZE):
878 if loop_count > 0:
879 # Avoid blocking the event loop
880 # if they pass a large BytesIO object
881 # and we are not in the first iteration
882 # of the loop
883 await asyncio.sleep(0)
884 if remaining_bytes is None:
885 await writer.write(chunk)
886 else:
887 await writer.write(chunk[:remaining_bytes])
888 remaining_bytes -= len(chunk)
889 if remaining_bytes <= 0:
890 return
891 loop_count += 1
893 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
894 """
895 Return bytes representation of the value.
897 This method reads the entire BytesIO content and returns it as bytes.
898 It is equivalent to accessing the _value attribute directly.
899 """
900 self._set_or_restore_start_position()
901 return self._value.read()
903 async def close(self) -> None:
904 """
905 Close the BytesIO payload.
907 This does nothing since BytesIO is in-memory and does not require explicit closing.
908 """
911class BufferedReaderPayload(IOBasePayload):
912 _value: io.BufferedIOBase
913 # _autoclose = False (inherited) - Has buffered file handle that needs explicit closing
915 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
916 self._set_or_restore_start_position()
917 return self._value.read().decode(encoding, errors)
920class JsonPayload(BytesPayload):
921 def __init__(
922 self,
923 value: Any,
924 encoding: str = "utf-8",
925 content_type: str = "application/json",
926 dumps: JSONEncoder = json.dumps,
927 *args: Any,
928 **kwargs: Any,
929 ) -> None:
931 super().__init__(
932 dumps(value).encode(encoding),
933 content_type=content_type,
934 encoding=encoding,
935 *args,
936 **kwargs,
937 )
940if TYPE_CHECKING:
941 from typing import AsyncIterable, AsyncIterator
943 _AsyncIterator = AsyncIterator[bytes]
944 _AsyncIterable = AsyncIterable[bytes]
945else:
946 from collections.abc import AsyncIterable, AsyncIterator
948 _AsyncIterator = AsyncIterator
949 _AsyncIterable = AsyncIterable
952class AsyncIterablePayload(Payload):
954 _iter: Optional[_AsyncIterator] = None
955 _value: _AsyncIterable
956 _cached_chunks: Optional[List[bytes]] = None
957 # _consumed stays False to allow reuse with cached content
958 _autoclose = True # Iterator doesn't need explicit closing
960 def __init__(self, value: _AsyncIterable, *args: Any, **kwargs: Any) -> None:
961 if not isinstance(value, AsyncIterable):
962 raise TypeError(
963 "value argument must support "
964 "collections.abc.AsyncIterable interface, "
965 "got {!r}".format(type(value))
966 )
968 if "content_type" not in kwargs:
969 kwargs["content_type"] = "application/octet-stream"
971 super().__init__(value, *args, **kwargs)
973 self._iter = value.__aiter__()
975 async def write(self, writer: AbstractStreamWriter) -> None:
976 """
977 Write the entire async iterable payload to the writer stream.
979 Args:
980 writer: An AbstractStreamWriter instance that handles the actual writing
982 This method iterates through the async iterable and writes each chunk
983 to the writer without any length constraint.
985 Note:
986 For new implementations that need length control, use write_with_length() directly.
987 This method is maintained for backwards compatibility with existing code.
989 """
990 await self.write_with_length(writer, None)
992 async def write_with_length(
993 self, writer: AbstractStreamWriter, content_length: Optional[int]
994 ) -> None:
995 """
996 Write async iterable payload with a specific content length constraint.
998 Args:
999 writer: An AbstractStreamWriter instance that handles the actual writing
1000 content_length: Maximum number of bytes to write (None for unlimited)
1002 This implementation handles streaming of async iterable content with length constraints:
1004 1. If cached chunks are available, writes from them
1005 2. Otherwise iterates through the async iterable one chunk at a time
1006 3. Respects content_length constraints when specified
1007 4. Does NOT generate cache - that's done by as_bytes()
1009 """
1010 # If we have cached chunks, use them
1011 if self._cached_chunks is not None:
1012 remaining_bytes = content_length
1013 for chunk in self._cached_chunks:
1014 if remaining_bytes is None:
1015 await writer.write(chunk)
1016 elif remaining_bytes > 0:
1017 await writer.write(chunk[:remaining_bytes])
1018 remaining_bytes -= len(chunk)
1019 else:
1020 break
1021 return
1023 # If iterator is exhausted and we don't have cached chunks, nothing to write
1024 if self._iter is None:
1025 return
1027 # Stream from the iterator
1028 remaining_bytes = content_length
1030 try:
1031 while True:
1032 if sys.version_info >= (3, 10):
1033 chunk = await anext(self._iter)
1034 else:
1035 chunk = await self._iter.__anext__()
1036 if remaining_bytes is None:
1037 await writer.write(chunk)
1038 # If we have a content length limit
1039 elif remaining_bytes > 0:
1040 await writer.write(chunk[:remaining_bytes])
1041 remaining_bytes -= len(chunk)
1042 # We still want to exhaust the iterator even
1043 # if we have reached the content length limit
1044 # since the file handle may not get closed by
1045 # the iterator if we don't do this
1046 except StopAsyncIteration:
1047 # Iterator is exhausted
1048 self._iter = None
1049 self._consumed = True # Mark as consumed when streamed without caching
1051 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
1052 """Decode the payload content as a string if cached chunks are available."""
1053 if self._cached_chunks is not None:
1054 return b"".join(self._cached_chunks).decode(encoding, errors)
1055 raise TypeError("Unable to decode - content not cached. Call as_bytes() first.")
1057 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
1058 """
1059 Return bytes representation of the value.
1061 This method reads the entire async iterable content and returns it as bytes.
1062 It generates and caches the chunks for future reuse.
1063 """
1064 # If we have cached chunks, return them joined
1065 if self._cached_chunks is not None:
1066 return b"".join(self._cached_chunks)
1068 # If iterator is exhausted and no cache, return empty
1069 if self._iter is None:
1070 return b""
1072 # Read all chunks and cache them
1073 chunks: List[bytes] = []
1074 async for chunk in self._iter:
1075 chunks.append(chunk)
1077 # Iterator is exhausted, cache the chunks
1078 self._iter = None
1079 self._cached_chunks = chunks
1080 # Keep _consumed as False to allow reuse with cached chunks
1082 return b"".join(chunks)
1085class StreamReaderPayload(AsyncIterablePayload):
1086 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None:
1087 super().__init__(value.iter_any(), *args, **kwargs)
1090PAYLOAD_REGISTRY = PayloadRegistry()
1091PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview))
1092PAYLOAD_REGISTRY.register(StringPayload, str)
1093PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO)
1094PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase)
1095PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO)
1096PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom))
1097PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase)
1098PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader)
1099# try_last for giving a chance to more specialized async interables like
1100# multipart.BodyPartReaderPayload override the default
1101PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)