Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/payload.py: 48%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import enum
3import io
4import json
5import mimetypes
6import os
7import sys
8import warnings
9from abc import ABC, abstractmethod
10from collections.abc import Iterable
11from itertools import chain
12from typing import (
13 IO,
14 TYPE_CHECKING,
15 Any,
16 Dict,
17 Final,
18 List,
19 Optional,
20 Set,
21 TextIO,
22 Tuple,
23 Type,
24 Union,
25)
27from multidict import CIMultiDict
29from . import hdrs
30from .abc import AbstractStreamWriter
31from .helpers import (
32 _SENTINEL,
33 content_disposition_header,
34 guess_filename,
35 parse_mimetype,
36 sentinel,
37)
38from .streams import StreamReader
39from .typedefs import JSONEncoder, _CIMultiDict
41__all__ = (
42 "PAYLOAD_REGISTRY",
43 "get_payload",
44 "payload_type",
45 "Payload",
46 "BytesPayload",
47 "StringPayload",
48 "IOBasePayload",
49 "BytesIOPayload",
50 "BufferedReaderPayload",
51 "TextIOPayload",
52 "StringIOPayload",
53 "JsonPayload",
54 "AsyncIterablePayload",
55)
57TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB
58READ_SIZE: Final[int] = 2**16 # 64 KB
59_CLOSE_FUTURES: Set[asyncio.Future[None]] = set()
62class LookupError(Exception):
63 """Raised when no payload factory is found for the given data type."""
66class Order(str, enum.Enum):
67 normal = "normal"
68 try_first = "try_first"
69 try_last = "try_last"
72def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload":
73 return PAYLOAD_REGISTRY.get(data, *args, **kwargs)
76def register_payload(
77 factory: Type["Payload"], type: Any, *, order: Order = Order.normal
78) -> None:
79 PAYLOAD_REGISTRY.register(factory, type, order=order)
82class payload_type:
83 def __init__(self, type: Any, *, order: Order = Order.normal) -> None:
84 self.type = type
85 self.order = order
87 def __call__(self, factory: Type["Payload"]) -> Type["Payload"]:
88 register_payload(factory, self.type, order=self.order)
89 return factory
92PayloadType = Type["Payload"]
93_PayloadRegistryItem = Tuple[PayloadType, Any]
96class PayloadRegistry:
97 """Payload registry.
99 note: we need zope.interface for more efficient adapter search
100 """
102 __slots__ = ("_first", "_normal", "_last", "_normal_lookup")
104 def __init__(self) -> None:
105 self._first: List[_PayloadRegistryItem] = []
106 self._normal: List[_PayloadRegistryItem] = []
107 self._last: List[_PayloadRegistryItem] = []
108 self._normal_lookup: Dict[Any, PayloadType] = {}
110 def get(
111 self,
112 data: Any,
113 *args: Any,
114 _CHAIN: "Type[chain[_PayloadRegistryItem]]" = chain,
115 **kwargs: Any,
116 ) -> "Payload":
117 if self._first:
118 for factory, type_ in self._first:
119 if isinstance(data, type_):
120 return factory(data, *args, **kwargs)
121 # Try the fast lookup first
122 if lookup_factory := self._normal_lookup.get(type(data)):
123 return lookup_factory(data, *args, **kwargs)
124 # Bail early if its already a Payload
125 if isinstance(data, Payload):
126 return data
127 # Fallback to the slower linear search
128 for factory, type_ in _CHAIN(self._normal, self._last):
129 if isinstance(data, type_):
130 return factory(data, *args, **kwargs)
131 raise LookupError()
133 def register(
134 self, factory: PayloadType, type: Any, *, order: Order = Order.normal
135 ) -> None:
136 if order is Order.try_first:
137 self._first.append((factory, type))
138 elif order is Order.normal:
139 self._normal.append((factory, type))
140 if isinstance(type, Iterable):
141 for t in type:
142 self._normal_lookup[t] = factory
143 else:
144 self._normal_lookup[type] = factory
145 elif order is Order.try_last:
146 self._last.append((factory, type))
147 else:
148 raise ValueError(f"Unsupported order {order!r}")
151class Payload(ABC):
153 _default_content_type: str = "application/octet-stream"
154 _size: Optional[int] = None
155 _consumed: bool = False # Default: payload has not been consumed yet
156 _autoclose: bool = False # Default: assume resource needs explicit closing
158 def __init__(
159 self,
160 value: Any,
161 headers: Optional[
162 Union[_CIMultiDict, Dict[str, str], Iterable[Tuple[str, str]]]
163 ] = None,
164 content_type: Union[str, None, _SENTINEL] = sentinel,
165 filename: Optional[str] = None,
166 encoding: Optional[str] = None,
167 **kwargs: Any,
168 ) -> None:
169 self._encoding = encoding
170 self._filename = filename
171 self._headers: _CIMultiDict = CIMultiDict()
172 self._value = value
173 if content_type is not sentinel and content_type is not None:
174 self._headers[hdrs.CONTENT_TYPE] = content_type
175 elif self._filename is not None:
176 if sys.version_info >= (3, 13):
177 guesser = mimetypes.guess_file_type
178 else:
179 guesser = mimetypes.guess_type
180 content_type = guesser(self._filename)[0]
181 if content_type is None:
182 content_type = self._default_content_type
183 self._headers[hdrs.CONTENT_TYPE] = content_type
184 else:
185 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type
186 if headers:
187 self._headers.update(headers)
189 @property
190 def size(self) -> Optional[int]:
191 """Size of the payload in bytes.
193 Returns the number of bytes that will be transmitted when the payload
194 is written. For string payloads, this is the size after encoding to bytes,
195 not the length of the string.
196 """
197 return self._size
199 @property
200 def filename(self) -> Optional[str]:
201 """Filename of the payload."""
202 return self._filename
204 @property
205 def headers(self) -> _CIMultiDict:
206 """Custom item headers"""
207 return self._headers
209 @property
210 def _binary_headers(self) -> bytes:
211 return (
212 "".join([k + ": " + v + "\r\n" for k, v in self.headers.items()]).encode(
213 "utf-8"
214 )
215 + b"\r\n"
216 )
218 @property
219 def encoding(self) -> Optional[str]:
220 """Payload encoding"""
221 return self._encoding
223 @property
224 def content_type(self) -> str:
225 """Content type"""
226 return self._headers[hdrs.CONTENT_TYPE]
228 @property
229 def consumed(self) -> bool:
230 """Whether the payload has been consumed and cannot be reused."""
231 return self._consumed
233 @property
234 def autoclose(self) -> bool:
235 """
236 Whether the payload can close itself automatically.
238 Returns True if the payload has no file handles or resources that need
239 explicit closing. If False, callers must await close() to release resources.
240 """
241 return self._autoclose
243 def set_content_disposition(
244 self,
245 disptype: str,
246 quote_fields: bool = True,
247 _charset: str = "utf-8",
248 **params: Any,
249 ) -> None:
250 """Sets ``Content-Disposition`` header."""
251 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header(
252 disptype, quote_fields=quote_fields, _charset=_charset, **params
253 )
255 @abstractmethod
256 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
257 """
258 Return string representation of the value.
260 This is named decode() to allow compatibility with bytes objects.
261 """
263 @abstractmethod
264 async def write(self, writer: AbstractStreamWriter) -> None:
265 """
266 Write payload to the writer stream.
268 Args:
269 writer: An AbstractStreamWriter instance that handles the actual writing
271 This is a legacy method that writes the entire payload without length constraints.
273 Important:
274 For new implementations, use write_with_length() instead of this method.
275 This method is maintained for backwards compatibility and will eventually
276 delegate to write_with_length(writer, None) in all implementations.
278 All payload subclasses must override this method for backwards compatibility,
279 but new code should use write_with_length for more flexibility and control.
281 """
283 # write_with_length is new in aiohttp 3.12
284 # it should be overridden by subclasses
285 async def write_with_length(
286 self, writer: AbstractStreamWriter, content_length: Optional[int]
287 ) -> None:
288 """
289 Write payload with a specific content length constraint.
291 Args:
292 writer: An AbstractStreamWriter instance that handles the actual writing
293 content_length: Maximum number of bytes to write (None for unlimited)
295 This method allows writing payload content with a specific length constraint,
296 which is particularly useful for HTTP responses with Content-Length header.
298 Note:
299 This is the base implementation that provides backwards compatibility
300 for subclasses that don't override this method. Specific payload types
301 should override this method to implement proper length-constrained writing.
303 """
304 # Backwards compatibility for subclasses that don't override this method
305 # and for the default implementation
306 await self.write(writer)
308 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
309 """
310 Return bytes representation of the value.
312 This is a convenience method that calls decode() and encodes the result
313 to bytes using the specified encoding.
314 """
315 # Use instance encoding if available, otherwise use parameter
316 actual_encoding = self._encoding or encoding
317 return self.decode(actual_encoding, errors).encode(actual_encoding)
319 def _close(self) -> None:
320 """
321 Async safe synchronous close operations for backwards compatibility.
323 This method exists only for backwards compatibility with code that
324 needs to clean up payloads synchronously. In the future, we will
325 drop this method and only support the async close() method.
327 WARNING: This method must be safe to call from within the event loop
328 without blocking. Subclasses should not perform any blocking I/O here.
330 WARNING: This method must be called from within an event loop for
331 certain payload types (e.g., IOBasePayload). Calling it outside an
332 event loop may raise RuntimeError.
333 """
334 # This is a no-op by default, but subclasses can override it
335 # for non-blocking cleanup operations.
337 async def close(self) -> None:
338 """
339 Close the payload if it holds any resources.
341 IMPORTANT: This method must not await anything that might not finish
342 immediately, as it may be called during cleanup/cancellation. Schedule
343 any long-running operations without awaiting them.
345 In the future, this will be the only close method supported.
346 """
347 self._close()
350class BytesPayload(Payload):
351 _value: bytes
352 # _consumed = False (inherited) - Bytes are immutable and can be reused
353 _autoclose = True # No file handle, just bytes in memory
355 def __init__(
356 self, value: Union[bytes, bytearray, memoryview], *args: Any, **kwargs: Any
357 ) -> None:
358 if "content_type" not in kwargs:
359 kwargs["content_type"] = "application/octet-stream"
361 super().__init__(value, *args, **kwargs)
363 if isinstance(value, memoryview):
364 self._size = value.nbytes
365 elif isinstance(value, (bytes, bytearray)):
366 self._size = len(value)
367 else:
368 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}")
370 if self._size > TOO_LARGE_BYTES_BODY:
371 kwargs = {"source": self}
372 warnings.warn(
373 "Sending a large body directly with raw bytes might"
374 " lock the event loop. You should probably pass an "
375 "io.BytesIO object instead",
376 ResourceWarning,
377 **kwargs,
378 )
380 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
381 return self._value.decode(encoding, errors)
383 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
384 """
385 Return bytes representation of the value.
387 This method returns the raw bytes content of the payload.
388 It is equivalent to accessing the _value attribute directly.
389 """
390 return self._value
392 async def write(self, writer: AbstractStreamWriter) -> None:
393 """
394 Write the entire bytes payload to the writer stream.
396 Args:
397 writer: An AbstractStreamWriter instance that handles the actual writing
399 This method writes the entire bytes content without any length constraint.
401 Note:
402 For new implementations that need length control, use write_with_length().
403 This method is maintained for backwards compatibility and is equivalent
404 to write_with_length(writer, None).
406 """
407 await writer.write(self._value)
409 async def write_with_length(
410 self, writer: AbstractStreamWriter, content_length: Optional[int]
411 ) -> None:
412 """
413 Write bytes payload with a specific content length constraint.
415 Args:
416 writer: An AbstractStreamWriter instance that handles the actual writing
417 content_length: Maximum number of bytes to write (None for unlimited)
419 This method writes either the entire byte sequence or a slice of it
420 up to the specified content_length. For BytesPayload, this operation
421 is performed efficiently using array slicing.
423 """
424 if content_length is not None:
425 await writer.write(self._value[:content_length])
426 else:
427 await writer.write(self._value)
430class StringPayload(BytesPayload):
431 def __init__(
432 self,
433 value: str,
434 *args: Any,
435 encoding: Optional[str] = None,
436 content_type: Optional[str] = None,
437 **kwargs: Any,
438 ) -> None:
440 if encoding is None:
441 if content_type is None:
442 real_encoding = "utf-8"
443 content_type = "text/plain; charset=utf-8"
444 else:
445 mimetype = parse_mimetype(content_type)
446 real_encoding = mimetype.parameters.get("charset", "utf-8")
447 else:
448 if content_type is None:
449 content_type = "text/plain; charset=%s" % encoding
450 real_encoding = encoding
452 super().__init__(
453 value.encode(real_encoding),
454 encoding=real_encoding,
455 content_type=content_type,
456 *args,
457 **kwargs,
458 )
461class StringIOPayload(StringPayload):
462 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None:
463 super().__init__(value.read(), *args, **kwargs)
466class IOBasePayload(Payload):
467 _value: io.IOBase
468 # _consumed = False (inherited) - File can be re-read from the same position
469 _start_position: Optional[int] = None
470 # _autoclose = False (inherited) - Has file handle that needs explicit closing
472 def __init__(
473 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any
474 ) -> None:
475 if "filename" not in kwargs:
476 kwargs["filename"] = guess_filename(value)
478 super().__init__(value, *args, **kwargs)
480 if self._filename is not None and disposition is not None:
481 if hdrs.CONTENT_DISPOSITION not in self.headers:
482 self.set_content_disposition(disposition, filename=self._filename)
484 def _set_or_restore_start_position(self) -> None:
485 """Set or restore the start position of the file-like object."""
486 if self._start_position is None:
487 try:
488 self._start_position = self._value.tell()
489 except (OSError, AttributeError):
490 self._consumed = True # Cannot seek, mark as consumed
491 return
492 try:
493 self._value.seek(self._start_position)
494 except (OSError, AttributeError):
495 # Failed to seek back - mark as consumed since we've already read
496 self._consumed = True
498 def _read_and_available_len(
499 self, remaining_content_len: Optional[int]
500 ) -> Tuple[Optional[int], bytes]:
501 """
502 Read the file-like object and return both its total size and the first chunk.
504 Args:
505 remaining_content_len: Optional limit on how many bytes to read in this operation.
506 If None, READ_SIZE will be used as the default chunk size.
508 Returns:
509 A tuple containing:
510 - The total size of the remaining unread content (None if size cannot be determined)
511 - The first chunk of bytes read from the file object
513 This method is optimized to perform both size calculation and initial read
514 in a single operation, which is executed in a single executor job to minimize
515 context switches and file operations when streaming content.
517 """
518 self._set_or_restore_start_position()
519 size = self.size # Call size only once since it does I/O
520 return size, self._value.read(
521 min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE)
522 )
524 def _read(self, remaining_content_len: Optional[int]) -> bytes:
525 """
526 Read a chunk of data from the file-like object.
528 Args:
529 remaining_content_len: Optional maximum number of bytes to read.
530 If None, READ_SIZE will be used as the default chunk size.
532 Returns:
533 A chunk of bytes read from the file object, respecting the
534 remaining_content_len limit if specified.
536 This method is used for subsequent reads during streaming after
537 the initial _read_and_available_len call has been made.
539 """
540 return self._value.read(remaining_content_len or READ_SIZE) # type: ignore[no-any-return]
542 @property
543 def size(self) -> Optional[int]:
544 """
545 Size of the payload in bytes.
547 Returns the total size of the payload content from the initial position.
548 This ensures consistent Content-Length for requests, including 307/308 redirects
549 where the same payload instance is reused.
551 Returns None if the size cannot be determined (e.g., for unseekable streams).
552 """
553 try:
554 # Store the start position on first access.
555 # This is critical when the same payload instance is reused (e.g., 307/308
556 # redirects). Without storing the initial position, after the payload is
557 # read once, the file position would be at EOF, which would cause the
558 # size calculation to return 0 (file_size - EOF position).
559 # By storing the start position, we ensure the size calculation always
560 # returns the correct total size for any subsequent use.
561 if self._start_position is None:
562 try:
563 self._start_position = self._value.tell()
564 except (OSError, AttributeError):
565 # Can't get position, can't determine size
566 return None
568 # Return the total size from the start position
569 # This ensures Content-Length is correct even after reading
570 return os.fstat(self._value.fileno()).st_size - self._start_position
571 except (AttributeError, OSError):
572 return None
574 async def write(self, writer: AbstractStreamWriter) -> None:
575 """
576 Write the entire file-like payload to the writer stream.
578 Args:
579 writer: An AbstractStreamWriter instance that handles the actual writing
581 This method writes the entire file content without any length constraint.
582 It delegates to write_with_length() with no length limit for implementation
583 consistency.
585 Note:
586 For new implementations that need length control, use write_with_length() directly.
587 This method is maintained for backwards compatibility with existing code.
589 """
590 await self.write_with_length(writer, None)
592 async def write_with_length(
593 self, writer: AbstractStreamWriter, content_length: Optional[int]
594 ) -> None:
595 """
596 Write file-like payload with a specific content length constraint.
598 Args:
599 writer: An AbstractStreamWriter instance that handles the actual writing
600 content_length: Maximum number of bytes to write (None for unlimited)
602 This method implements optimized streaming of file content with length constraints:
604 1. File reading is performed in a thread pool to avoid blocking the event loop
605 2. Content is read and written in chunks to maintain memory efficiency
606 3. Writing stops when either:
607 - All available file content has been written (when size is known)
608 - The specified content_length has been reached
609 4. File resources are properly closed even if the operation is cancelled
611 The implementation carefully handles both known-size and unknown-size payloads,
612 as well as constrained and unconstrained content lengths.
614 """
615 loop = asyncio.get_running_loop()
616 total_written_len = 0
617 remaining_content_len = content_length
619 # Get initial data and available length
620 available_len, chunk = await loop.run_in_executor(
621 None, self._read_and_available_len, remaining_content_len
622 )
623 # Process data chunks until done
624 while chunk:
625 chunk_len = len(chunk)
627 # Write data with or without length constraint
628 if remaining_content_len is None:
629 await writer.write(chunk)
630 else:
631 await writer.write(chunk[:remaining_content_len])
632 remaining_content_len -= chunk_len
634 total_written_len += chunk_len
636 # Check if we're done writing
637 if self._should_stop_writing(
638 available_len, total_written_len, remaining_content_len
639 ):
640 return
642 # Read next chunk
643 chunk = await loop.run_in_executor(
644 None,
645 self._read,
646 (
647 min(READ_SIZE, remaining_content_len)
648 if remaining_content_len is not None
649 else READ_SIZE
650 ),
651 )
653 def _should_stop_writing(
654 self,
655 available_len: Optional[int],
656 total_written_len: int,
657 remaining_content_len: Optional[int],
658 ) -> bool:
659 """
660 Determine if we should stop writing data.
662 Args:
663 available_len: Known size of the payload if available (None if unknown)
664 total_written_len: Number of bytes already written
665 remaining_content_len: Remaining bytes to be written for content-length limited responses
667 Returns:
668 True if we should stop writing data, based on either:
669 - Having written all available data (when size is known)
670 - Having written all requested content (when content-length is specified)
672 """
673 return (available_len is not None and total_written_len >= available_len) or (
674 remaining_content_len is not None and remaining_content_len <= 0
675 )
677 def _close(self) -> None:
678 """
679 Async safe synchronous close operations for backwards compatibility.
681 This method exists only for backwards
682 compatibility. Use the async close() method instead.
684 WARNING: This method MUST be called from within an event loop.
685 Calling it outside an event loop will raise RuntimeError.
686 """
687 # Skip if already consumed
688 if self._consumed:
689 return
690 self._consumed = True # Mark as consumed to prevent further writes
691 # Schedule file closing without awaiting to prevent cancellation issues
692 loop = asyncio.get_running_loop()
693 close_future = loop.run_in_executor(None, self._value.close)
694 # Hold a strong reference to the future to prevent it from being
695 # garbage collected before it completes.
696 _CLOSE_FUTURES.add(close_future)
697 close_future.add_done_callback(_CLOSE_FUTURES.remove)
699 async def close(self) -> None:
700 """
701 Close the payload if it holds any resources.
703 IMPORTANT: This method must not await anything that might not finish
704 immediately, as it may be called during cleanup/cancellation. Schedule
705 any long-running operations without awaiting them.
706 """
707 self._close()
709 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
710 """
711 Return string representation of the value.
713 WARNING: This method does blocking I/O and should not be called in the event loop.
714 """
715 return self._read_all().decode(encoding, errors)
717 def _read_all(self) -> bytes:
718 """Read the entire file-like object and return its content as bytes."""
719 self._set_or_restore_start_position()
720 # Use readlines() to ensure we get all content
721 return b"".join(self._value.readlines())
723 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
724 """
725 Return bytes representation of the value.
727 This method reads the entire file content and returns it as bytes.
728 It is equivalent to reading the file-like object directly.
729 The file reading is performed in an executor to avoid blocking the event loop.
730 """
731 loop = asyncio.get_running_loop()
732 return await loop.run_in_executor(None, self._read_all)
735class TextIOPayload(IOBasePayload):
736 _value: io.TextIOBase
737 # _autoclose = False (inherited) - Has text file handle that needs explicit closing
739 def __init__(
740 self,
741 value: TextIO,
742 *args: Any,
743 encoding: Optional[str] = None,
744 content_type: Optional[str] = None,
745 **kwargs: Any,
746 ) -> None:
748 if encoding is None:
749 if content_type is None:
750 encoding = "utf-8"
751 content_type = "text/plain; charset=utf-8"
752 else:
753 mimetype = parse_mimetype(content_type)
754 encoding = mimetype.parameters.get("charset", "utf-8")
755 else:
756 if content_type is None:
757 content_type = "text/plain; charset=%s" % encoding
759 super().__init__(
760 value,
761 content_type=content_type,
762 encoding=encoding,
763 *args,
764 **kwargs,
765 )
767 def _read_and_available_len(
768 self, remaining_content_len: Optional[int]
769 ) -> Tuple[Optional[int], bytes]:
770 """
771 Read the text file-like object and return both its total size and the first chunk.
773 Args:
774 remaining_content_len: Optional limit on how many bytes to read in this operation.
775 If None, READ_SIZE will be used as the default chunk size.
777 Returns:
778 A tuple containing:
779 - The total size of the remaining unread content (None if size cannot be determined)
780 - The first chunk of bytes read from the file object, encoded using the payload's encoding
782 This method is optimized to perform both size calculation and initial read
783 in a single operation, which is executed in a single executor job to minimize
784 context switches and file operations when streaming content.
786 Note:
787 TextIOPayload handles encoding of the text content before writing it
788 to the stream. If no encoding is specified, UTF-8 is used as the default.
790 """
791 self._set_or_restore_start_position()
792 size = self.size
793 chunk = self._value.read(
794 min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE)
795 )
796 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode()
798 def _read(self, remaining_content_len: Optional[int]) -> bytes:
799 """
800 Read a chunk of data from the text file-like object.
802 Args:
803 remaining_content_len: Optional maximum number of bytes to read.
804 If None, READ_SIZE will be used as the default chunk size.
806 Returns:
807 A chunk of bytes read from the file object and encoded using the payload's
808 encoding. The data is automatically converted from text to bytes.
810 This method is used for subsequent reads during streaming after
811 the initial _read_and_available_len call has been made. It properly
812 handles text encoding, converting the text content to bytes using
813 the specified encoding (or UTF-8 if none was provided).
815 """
816 chunk = self._value.read(remaining_content_len or READ_SIZE)
817 return chunk.encode(self._encoding) if self._encoding else chunk.encode()
819 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
820 """
821 Return string representation of the value.
823 WARNING: This method does blocking I/O and should not be called in the event loop.
824 """
825 self._set_or_restore_start_position()
826 return self._value.read()
828 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
829 """
830 Return bytes representation of the value.
832 This method reads the entire text file content and returns it as bytes.
833 It encodes the text content using the specified encoding.
834 The file reading is performed in an executor to avoid blocking the event loop.
835 """
836 loop = asyncio.get_running_loop()
838 # Use instance encoding if available, otherwise use parameter
839 actual_encoding = self._encoding or encoding
841 def _read_and_encode() -> bytes:
842 self._set_or_restore_start_position()
843 # TextIO read() always returns the full content
844 return self._value.read().encode(actual_encoding, errors)
846 return await loop.run_in_executor(None, _read_and_encode)
849class BytesIOPayload(IOBasePayload):
850 _value: io.BytesIO
851 _size: int # Always initialized in __init__
852 _autoclose = True # BytesIO is in-memory, safe to auto-close
854 def __init__(self, value: io.BytesIO, *args: Any, **kwargs: Any) -> None:
855 super().__init__(value, *args, **kwargs)
856 # Calculate size once during initialization
857 self._size = len(self._value.getbuffer()) - self._value.tell()
859 @property
860 def size(self) -> int:
861 """Size of the payload in bytes.
863 Returns the number of bytes in the BytesIO buffer that will be transmitted.
864 This is calculated once during initialization for efficiency.
865 """
866 return self._size
868 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
869 self._set_or_restore_start_position()
870 return self._value.read().decode(encoding, errors)
872 async def write(self, writer: AbstractStreamWriter) -> None:
873 return await self.write_with_length(writer, None)
875 async def write_with_length(
876 self, writer: AbstractStreamWriter, content_length: Optional[int]
877 ) -> None:
878 """
879 Write BytesIO payload with a specific content length constraint.
881 Args:
882 writer: An AbstractStreamWriter instance that handles the actual writing
883 content_length: Maximum number of bytes to write (None for unlimited)
885 This implementation is specifically optimized for BytesIO objects:
887 1. Reads content in chunks to maintain memory efficiency
888 2. Yields control back to the event loop periodically to prevent blocking
889 when dealing with large BytesIO objects
890 3. Respects content_length constraints when specified
891 4. Properly cleans up by closing the BytesIO object when done or on error
893 The periodic yielding to the event loop is important for maintaining
894 responsiveness when processing large in-memory buffers.
896 """
897 self._set_or_restore_start_position()
898 loop_count = 0
899 remaining_bytes = content_length
900 while chunk := self._value.read(READ_SIZE):
901 if loop_count > 0:
902 # Avoid blocking the event loop
903 # if they pass a large BytesIO object
904 # and we are not in the first iteration
905 # of the loop
906 await asyncio.sleep(0)
907 if remaining_bytes is None:
908 await writer.write(chunk)
909 else:
910 await writer.write(chunk[:remaining_bytes])
911 remaining_bytes -= len(chunk)
912 if remaining_bytes <= 0:
913 return
914 loop_count += 1
916 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
917 """
918 Return bytes representation of the value.
920 This method reads the entire BytesIO content and returns it as bytes.
921 It is equivalent to accessing the _value attribute directly.
922 """
923 self._set_or_restore_start_position()
924 return self._value.read()
926 async def close(self) -> None:
927 """
928 Close the BytesIO payload.
930 This does nothing since BytesIO is in-memory and does not require explicit closing.
931 """
934class BufferedReaderPayload(IOBasePayload):
935 _value: io.BufferedIOBase
936 # _autoclose = False (inherited) - Has buffered file handle that needs explicit closing
938 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
939 self._set_or_restore_start_position()
940 return self._value.read().decode(encoding, errors)
943class JsonPayload(BytesPayload):
944 def __init__(
945 self,
946 value: Any,
947 encoding: str = "utf-8",
948 content_type: str = "application/json",
949 dumps: JSONEncoder = json.dumps,
950 *args: Any,
951 **kwargs: Any,
952 ) -> None:
954 super().__init__(
955 dumps(value).encode(encoding),
956 content_type=content_type,
957 encoding=encoding,
958 *args,
959 **kwargs,
960 )
963if TYPE_CHECKING:
964 from typing import AsyncIterable, AsyncIterator
966 _AsyncIterator = AsyncIterator[bytes]
967 _AsyncIterable = AsyncIterable[bytes]
968else:
969 from collections.abc import AsyncIterable, AsyncIterator
971 _AsyncIterator = AsyncIterator
972 _AsyncIterable = AsyncIterable
975class AsyncIterablePayload(Payload):
977 _iter: Optional[_AsyncIterator] = None
978 _value: _AsyncIterable
979 _cached_chunks: Optional[List[bytes]] = None
980 # _consumed stays False to allow reuse with cached content
981 _autoclose = True # Iterator doesn't need explicit closing
983 def __init__(self, value: _AsyncIterable, *args: Any, **kwargs: Any) -> None:
984 if not isinstance(value, AsyncIterable):
985 raise TypeError(
986 "value argument must support "
987 "collections.abc.AsyncIterable interface, "
988 "got {!r}".format(type(value))
989 )
991 if "content_type" not in kwargs:
992 kwargs["content_type"] = "application/octet-stream"
994 super().__init__(value, *args, **kwargs)
996 self._iter = value.__aiter__()
998 async def write(self, writer: AbstractStreamWriter) -> None:
999 """
1000 Write the entire async iterable payload to the writer stream.
1002 Args:
1003 writer: An AbstractStreamWriter instance that handles the actual writing
1005 This method iterates through the async iterable and writes each chunk
1006 to the writer without any length constraint.
1008 Note:
1009 For new implementations that need length control, use write_with_length() directly.
1010 This method is maintained for backwards compatibility with existing code.
1012 """
1013 await self.write_with_length(writer, None)
1015 async def write_with_length(
1016 self, writer: AbstractStreamWriter, content_length: Optional[int]
1017 ) -> None:
1018 """
1019 Write async iterable payload with a specific content length constraint.
1021 Args:
1022 writer: An AbstractStreamWriter instance that handles the actual writing
1023 content_length: Maximum number of bytes to write (None for unlimited)
1025 This implementation handles streaming of async iterable content with length constraints:
1027 1. If cached chunks are available, writes from them
1028 2. Otherwise iterates through the async iterable one chunk at a time
1029 3. Respects content_length constraints when specified
1030 4. Does NOT generate cache - that's done by as_bytes()
1032 """
1033 # If we have cached chunks, use them
1034 if self._cached_chunks is not None:
1035 remaining_bytes = content_length
1036 for chunk in self._cached_chunks:
1037 if remaining_bytes is None:
1038 await writer.write(chunk)
1039 elif remaining_bytes > 0:
1040 await writer.write(chunk[:remaining_bytes])
1041 remaining_bytes -= len(chunk)
1042 else:
1043 break
1044 return
1046 # If iterator is exhausted and we don't have cached chunks, nothing to write
1047 if self._iter is None:
1048 return
1050 # Stream from the iterator
1051 remaining_bytes = content_length
1053 try:
1054 while True:
1055 if sys.version_info >= (3, 10):
1056 chunk = await anext(self._iter)
1057 else:
1058 chunk = await self._iter.__anext__()
1059 if remaining_bytes is None:
1060 await writer.write(chunk)
1061 # If we have a content length limit
1062 elif remaining_bytes > 0:
1063 await writer.write(chunk[:remaining_bytes])
1064 remaining_bytes -= len(chunk)
1065 # We still want to exhaust the iterator even
1066 # if we have reached the content length limit
1067 # since the file handle may not get closed by
1068 # the iterator if we don't do this
1069 except StopAsyncIteration:
1070 # Iterator is exhausted
1071 self._iter = None
1072 self._consumed = True # Mark as consumed when streamed without caching
1074 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
1075 """Decode the payload content as a string if cached chunks are available."""
1076 if self._cached_chunks is not None:
1077 return b"".join(self._cached_chunks).decode(encoding, errors)
1078 raise TypeError("Unable to decode - content not cached. Call as_bytes() first.")
1080 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
1081 """
1082 Return bytes representation of the value.
1084 This method reads the entire async iterable content and returns it as bytes.
1085 It generates and caches the chunks for future reuse.
1086 """
1087 # If we have cached chunks, return them joined
1088 if self._cached_chunks is not None:
1089 return b"".join(self._cached_chunks)
1091 # If iterator is exhausted and no cache, return empty
1092 if self._iter is None:
1093 return b""
1095 # Read all chunks and cache them
1096 chunks: List[bytes] = []
1097 async for chunk in self._iter:
1098 chunks.append(chunk)
1100 # Iterator is exhausted, cache the chunks
1101 self._iter = None
1102 self._cached_chunks = chunks
1103 # Keep _consumed as False to allow reuse with cached chunks
1105 return b"".join(chunks)
1108class StreamReaderPayload(AsyncIterablePayload):
1109 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None:
1110 super().__init__(value.iter_any(), *args, **kwargs)
1113PAYLOAD_REGISTRY = PayloadRegistry()
1114PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview))
1115PAYLOAD_REGISTRY.register(StringPayload, str)
1116PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO)
1117PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase)
1118PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO)
1119PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom))
1120PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase)
1121PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader)
1122# try_last for giving a chance to more specialized async interables like
1123# multipart.BodyPartReaderPayload override the default
1124PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)