Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/payload.py: 48%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import enum
3import io
4import json
5import mimetypes
6import os
7import sys
8import warnings
9from abc import ABC, abstractmethod
10from collections.abc import Iterable
11from itertools import chain
12from typing import (
13 IO,
14 TYPE_CHECKING,
15 Any,
16 Dict,
17 Final,
18 List,
19 Optional,
20 Set,
21 TextIO,
22 Tuple,
23 Type,
24 Union,
25)
27from multidict import CIMultiDict
29from . import hdrs
30from .abc import AbstractStreamWriter
31from .helpers import (
32 _SENTINEL,
33 content_disposition_header,
34 guess_filename,
35 parse_mimetype,
36 sentinel,
37)
38from .streams import StreamReader
39from .typedefs import JSONEncoder, _CIMultiDict
41__all__ = (
42 "PAYLOAD_REGISTRY",
43 "get_payload",
44 "payload_type",
45 "Payload",
46 "BytesPayload",
47 "StringPayload",
48 "IOBasePayload",
49 "BytesIOPayload",
50 "BufferedReaderPayload",
51 "TextIOPayload",
52 "StringIOPayload",
53 "JsonPayload",
54 "AsyncIterablePayload",
55)
57TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB
58READ_SIZE: Final[int] = 2**16 # 64 KB
59_CLOSE_FUTURES: Set[asyncio.Future[None]] = set()
62class LookupError(Exception):
63 """Raised when no payload factory is found for the given data type."""
66class Order(str, enum.Enum):
67 normal = "normal"
68 try_first = "try_first"
69 try_last = "try_last"
72def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload":
73 return PAYLOAD_REGISTRY.get(data, *args, **kwargs)
76def register_payload(
77 factory: Type["Payload"], type: Any, *, order: Order = Order.normal
78) -> None:
79 PAYLOAD_REGISTRY.register(factory, type, order=order)
82class payload_type:
83 def __init__(self, type: Any, *, order: Order = Order.normal) -> None:
84 self.type = type
85 self.order = order
87 def __call__(self, factory: Type["Payload"]) -> Type["Payload"]:
88 register_payload(factory, self.type, order=self.order)
89 return factory
92PayloadType = Type["Payload"]
93_PayloadRegistryItem = Tuple[PayloadType, Any]
96class PayloadRegistry:
97 """Payload registry.
99 note: we need zope.interface for more efficient adapter search
100 """
102 __slots__ = ("_first", "_normal", "_last", "_normal_lookup")
104 def __init__(self) -> None:
105 self._first: List[_PayloadRegistryItem] = []
106 self._normal: List[_PayloadRegistryItem] = []
107 self._last: List[_PayloadRegistryItem] = []
108 self._normal_lookup: Dict[Any, PayloadType] = {}
110 def get(
111 self,
112 data: Any,
113 *args: Any,
114 _CHAIN: "Type[chain[_PayloadRegistryItem]]" = chain,
115 **kwargs: Any,
116 ) -> "Payload":
117 if self._first:
118 for factory, type_ in self._first:
119 if isinstance(data, type_):
120 return factory(data, *args, **kwargs)
121 # Try the fast lookup first
122 if lookup_factory := self._normal_lookup.get(type(data)):
123 return lookup_factory(data, *args, **kwargs)
124 # Bail early if its already a Payload
125 if isinstance(data, Payload):
126 return data
127 # Fallback to the slower linear search
128 for factory, type_ in _CHAIN(self._normal, self._last):
129 if isinstance(data, type_):
130 return factory(data, *args, **kwargs)
131 raise LookupError()
133 def register(
134 self, factory: PayloadType, type: Any, *, order: Order = Order.normal
135 ) -> None:
136 if order is Order.try_first:
137 self._first.append((factory, type))
138 elif order is Order.normal:
139 self._normal.append((factory, type))
140 if isinstance(type, Iterable):
141 for t in type:
142 self._normal_lookup[t] = factory
143 else:
144 self._normal_lookup[type] = factory
145 elif order is Order.try_last:
146 self._last.append((factory, type))
147 else:
148 raise ValueError(f"Unsupported order {order!r}")
151class Payload(ABC):
153 _default_content_type: str = "application/octet-stream"
154 _size: Optional[int] = None
155 _consumed: bool = False # Default: payload has not been consumed yet
156 _autoclose: bool = False # Default: assume resource needs explicit closing
158 def __init__(
159 self,
160 value: Any,
161 headers: Optional[
162 Union[_CIMultiDict, Dict[str, str], Iterable[Tuple[str, str]]]
163 ] = None,
164 content_type: Union[str, None, _SENTINEL] = sentinel,
165 filename: Optional[str] = None,
166 encoding: Optional[str] = None,
167 **kwargs: Any,
168 ) -> None:
169 self._encoding = encoding
170 self._filename = filename
171 self._headers: _CIMultiDict = CIMultiDict()
172 self._value = value
173 if content_type is not sentinel and content_type is not None:
174 self._headers[hdrs.CONTENT_TYPE] = content_type
175 elif self._filename is not None:
176 if sys.version_info >= (3, 13):
177 guesser = mimetypes.guess_file_type
178 else:
179 guesser = mimetypes.guess_type
180 content_type = guesser(self._filename)[0]
181 if content_type is None:
182 content_type = self._default_content_type
183 self._headers[hdrs.CONTENT_TYPE] = content_type
184 else:
185 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type
186 if headers:
187 self._headers.update(headers)
189 @property
190 def size(self) -> Optional[int]:
191 """Size of the payload in bytes.
193 Returns the number of bytes that will be transmitted when the payload
194 is written. For string payloads, this is the size after encoding to bytes,
195 not the length of the string.
196 """
197 return self._size
199 @property
200 def filename(self) -> Optional[str]:
201 """Filename of the payload."""
202 return self._filename
204 @property
205 def headers(self) -> _CIMultiDict:
206 """Custom item headers"""
207 return self._headers
209 @property
210 def _binary_headers(self) -> bytes:
211 return (
212 "".join([k + ": " + v + "\r\n" for k, v in self.headers.items()]).encode(
213 "utf-8"
214 )
215 + b"\r\n"
216 )
218 @property
219 def encoding(self) -> Optional[str]:
220 """Payload encoding"""
221 return self._encoding
223 @property
224 def content_type(self) -> str:
225 """Content type"""
226 return self._headers[hdrs.CONTENT_TYPE]
228 @property
229 def consumed(self) -> bool:
230 """Whether the payload has been consumed and cannot be reused."""
231 return self._consumed
233 @property
234 def autoclose(self) -> bool:
235 """
236 Whether the payload can close itself automatically.
238 Returns True if the payload has no file handles or resources that need
239 explicit closing. If False, callers must await close() to release resources.
240 """
241 return self._autoclose
243 def set_content_disposition(
244 self,
245 disptype: str,
246 quote_fields: bool = True,
247 _charset: str = "utf-8",
248 **params: Any,
249 ) -> None:
250 """Sets ``Content-Disposition`` header."""
251 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header(
252 disptype, quote_fields=quote_fields, _charset=_charset, **params
253 )
255 @abstractmethod
256 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
257 """
258 Return string representation of the value.
260 This is named decode() to allow compatibility with bytes objects.
261 """
263 @abstractmethod
264 async def write(self, writer: AbstractStreamWriter) -> None:
265 """
266 Write payload to the writer stream.
268 Args:
269 writer: An AbstractStreamWriter instance that handles the actual writing
271 This is a legacy method that writes the entire payload without length constraints.
273 Important:
274 For new implementations, use write_with_length() instead of this method.
275 This method is maintained for backwards compatibility and will eventually
276 delegate to write_with_length(writer, None) in all implementations.
278 All payload subclasses must override this method for backwards compatibility,
279 but new code should use write_with_length for more flexibility and control.
281 """
283 # write_with_length is new in aiohttp 3.12
284 # it should be overridden by subclasses
285 async def write_with_length(
286 self, writer: AbstractStreamWriter, content_length: Optional[int]
287 ) -> None:
288 """
289 Write payload with a specific content length constraint.
291 Args:
292 writer: An AbstractStreamWriter instance that handles the actual writing
293 content_length: Maximum number of bytes to write (None for unlimited)
295 This method allows writing payload content with a specific length constraint,
296 which is particularly useful for HTTP responses with Content-Length header.
298 Note:
299 This is the base implementation that provides backwards compatibility
300 for subclasses that don't override this method. Specific payload types
301 should override this method to implement proper length-constrained writing.
303 """
304 # Backwards compatibility for subclasses that don't override this method
305 # and for the default implementation
306 await self.write(writer)
308 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
309 """
310 Return bytes representation of the value.
312 This is a convenience method that calls decode() and encodes the result
313 to bytes using the specified encoding.
314 """
315 # Use instance encoding if available, otherwise use parameter
316 actual_encoding = self._encoding or encoding
317 return self.decode(actual_encoding, errors).encode(actual_encoding)
319 def _close(self) -> None:
320 """
321 Async safe synchronous close operations for backwards compatibility.
323 This method exists only for backwards compatibility with code that
324 needs to clean up payloads synchronously. In the future, we will
325 drop this method and only support the async close() method.
327 WARNING: This method must be safe to call from within the event loop
328 without blocking. Subclasses should not perform any blocking I/O here.
330 WARNING: This method must be called from within an event loop for
331 certain payload types (e.g., IOBasePayload). Calling it outside an
332 event loop may raise RuntimeError.
333 """
334 # This is a no-op by default, but subclasses can override it
335 # for non-blocking cleanup operations.
337 async def close(self) -> None:
338 """
339 Close the payload if it holds any resources.
341 IMPORTANT: This method must not await anything that might not finish
342 immediately, as it may be called during cleanup/cancellation. Schedule
343 any long-running operations without awaiting them.
345 In the future, this will be the only close method supported.
346 """
347 self._close()
350class BytesPayload(Payload):
351 _value: bytes
352 # _consumed = False (inherited) - Bytes are immutable and can be reused
353 _autoclose = True # No file handle, just bytes in memory
355 def __init__(
356 self, value: Union[bytes, bytearray, memoryview], *args: Any, **kwargs: Any
357 ) -> None:
358 if "content_type" not in kwargs:
359 kwargs["content_type"] = "application/octet-stream"
361 super().__init__(value, *args, **kwargs)
363 if isinstance(value, memoryview):
364 self._size = value.nbytes
365 elif isinstance(value, (bytes, bytearray)):
366 self._size = len(value)
367 else:
368 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}")
370 if self._size > TOO_LARGE_BYTES_BODY:
371 kwargs = {"source": self}
372 warnings.warn(
373 "Sending a large body directly with raw bytes might"
374 " lock the event loop. You should probably pass an "
375 "io.BytesIO object instead",
376 ResourceWarning,
377 **kwargs,
378 )
380 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
381 return self._value.decode(encoding, errors)
383 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
384 """
385 Return bytes representation of the value.
387 This method returns the raw bytes content of the payload.
388 It is equivalent to accessing the _value attribute directly.
389 """
390 return self._value
392 async def write(self, writer: AbstractStreamWriter) -> None:
393 """
394 Write the entire bytes payload to the writer stream.
396 Args:
397 writer: An AbstractStreamWriter instance that handles the actual writing
399 This method writes the entire bytes content without any length constraint.
401 Note:
402 For new implementations that need length control, use write_with_length().
403 This method is maintained for backwards compatibility and is equivalent
404 to write_with_length(writer, None).
406 """
407 await writer.write(self._value)
409 async def write_with_length(
410 self, writer: AbstractStreamWriter, content_length: Optional[int]
411 ) -> None:
412 """
413 Write bytes payload with a specific content length constraint.
415 Args:
416 writer: An AbstractStreamWriter instance that handles the actual writing
417 content_length: Maximum number of bytes to write (None for unlimited)
419 This method writes either the entire byte sequence or a slice of it
420 up to the specified content_length. For BytesPayload, this operation
421 is performed efficiently using array slicing.
423 """
424 if content_length is not None:
425 await writer.write(self._value[:content_length])
426 else:
427 await writer.write(self._value)
430class StringPayload(BytesPayload):
431 def __init__(
432 self,
433 value: str,
434 *args: Any,
435 encoding: Optional[str] = None,
436 content_type: Optional[str] = None,
437 **kwargs: Any,
438 ) -> None:
440 if encoding is None:
441 if content_type is None:
442 real_encoding = "utf-8"
443 content_type = "text/plain; charset=utf-8"
444 else:
445 mimetype = parse_mimetype(content_type)
446 real_encoding = mimetype.parameters.get("charset", "utf-8")
447 else:
448 if content_type is None:
449 content_type = "text/plain; charset=%s" % encoding
450 real_encoding = encoding
452 super().__init__(
453 value.encode(real_encoding),
454 encoding=real_encoding,
455 content_type=content_type,
456 *args,
457 **kwargs,
458 )
461class StringIOPayload(StringPayload):
462 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None:
463 super().__init__(value.read(), *args, **kwargs)
466class IOBasePayload(Payload):
467 _value: io.IOBase
468 # _consumed = False (inherited) - File can be re-read from the same position
469 _start_position: Optional[int] = None
470 # _autoclose = False (inherited) - Has file handle that needs explicit closing
472 def __init__(
473 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any
474 ) -> None:
475 if "filename" not in kwargs:
476 kwargs["filename"] = guess_filename(value)
478 super().__init__(value, *args, **kwargs)
480 if self._filename is not None and disposition is not None:
481 if hdrs.CONTENT_DISPOSITION not in self.headers:
482 self.set_content_disposition(disposition, filename=self._filename)
484 def _set_or_restore_start_position(self) -> None:
485 """Set or restore the start position of the file-like object."""
486 if self._start_position is None:
487 try:
488 self._start_position = self._value.tell()
489 except (OSError, AttributeError):
490 self._consumed = True # Cannot seek, mark as consumed
491 return
492 try:
493 self._value.seek(self._start_position)
494 except (OSError, AttributeError):
495 # Failed to seek back - mark as consumed since we've already read
496 self._consumed = True
498 def _read_and_available_len(
499 self, remaining_content_len: Optional[int]
500 ) -> Tuple[Optional[int], bytes]:
501 """
502 Read the file-like object and return both its total size and the first chunk.
504 Args:
505 remaining_content_len: Optional limit on how many bytes to read in this operation.
506 If None, READ_SIZE will be used as the default chunk size.
508 Returns:
509 A tuple containing:
510 - The total size of the remaining unread content (None if size cannot be determined)
511 - The first chunk of bytes read from the file object
513 This method is optimized to perform both size calculation and initial read
514 in a single operation, which is executed in a single executor job to minimize
515 context switches and file operations when streaming content.
517 """
518 self._set_or_restore_start_position()
519 size = self.size # Call size only once since it does I/O
520 return size, self._value.read(
521 min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE)
522 )
524 def _read(self, remaining_content_len: Optional[int]) -> bytes:
525 """
526 Read a chunk of data from the file-like object.
528 Args:
529 remaining_content_len: Optional maximum number of bytes to read.
530 If None, READ_SIZE will be used as the default chunk size.
532 Returns:
533 A chunk of bytes read from the file object, respecting the
534 remaining_content_len limit if specified.
536 This method is used for subsequent reads during streaming after
537 the initial _read_and_available_len call has been made.
539 """
540 return self._value.read(remaining_content_len or READ_SIZE) # type: ignore[no-any-return]
542 @property
543 def size(self) -> Optional[int]:
544 """
545 Size of the payload in bytes.
547 Returns the total size of the payload content from the initial position.
548 This ensures consistent Content-Length for requests, including 307/308 redirects
549 where the same payload instance is reused.
551 Returns None if the size cannot be determined (e.g., for unseekable streams).
552 """
553 try:
554 # Store the start position on first access.
555 # This is critical when the same payload instance is reused (e.g., 307/308
556 # redirects). Without storing the initial position, after the payload is
557 # read once, the file position would be at EOF, which would cause the
558 # size calculation to return 0 (file_size - EOF position).
559 # By storing the start position, we ensure the size calculation always
560 # returns the correct total size for any subsequent use.
561 if self._start_position is None:
562 self._start_position = self._value.tell()
564 # Return the total size from the start position
565 # This ensures Content-Length is correct even after reading
566 return os.fstat(self._value.fileno()).st_size - self._start_position
567 except (AttributeError, OSError):
568 return None
570 async def write(self, writer: AbstractStreamWriter) -> None:
571 """
572 Write the entire file-like payload to the writer stream.
574 Args:
575 writer: An AbstractStreamWriter instance that handles the actual writing
577 This method writes the entire file content without any length constraint.
578 It delegates to write_with_length() with no length limit for implementation
579 consistency.
581 Note:
582 For new implementations that need length control, use write_with_length() directly.
583 This method is maintained for backwards compatibility with existing code.
585 """
586 await self.write_with_length(writer, None)
588 async def write_with_length(
589 self, writer: AbstractStreamWriter, content_length: Optional[int]
590 ) -> None:
591 """
592 Write file-like payload with a specific content length constraint.
594 Args:
595 writer: An AbstractStreamWriter instance that handles the actual writing
596 content_length: Maximum number of bytes to write (None for unlimited)
598 This method implements optimized streaming of file content with length constraints:
600 1. File reading is performed in a thread pool to avoid blocking the event loop
601 2. Content is read and written in chunks to maintain memory efficiency
602 3. Writing stops when either:
603 - All available file content has been written (when size is known)
604 - The specified content_length has been reached
605 4. File resources are properly closed even if the operation is cancelled
607 The implementation carefully handles both known-size and unknown-size payloads,
608 as well as constrained and unconstrained content lengths.
610 """
611 loop = asyncio.get_running_loop()
612 total_written_len = 0
613 remaining_content_len = content_length
615 # Get initial data and available length
616 available_len, chunk = await loop.run_in_executor(
617 None, self._read_and_available_len, remaining_content_len
618 )
619 # Process data chunks until done
620 while chunk:
621 chunk_len = len(chunk)
623 # Write data with or without length constraint
624 if remaining_content_len is None:
625 await writer.write(chunk)
626 else:
627 await writer.write(chunk[:remaining_content_len])
628 remaining_content_len -= chunk_len
630 total_written_len += chunk_len
632 # Check if we're done writing
633 if self._should_stop_writing(
634 available_len, total_written_len, remaining_content_len
635 ):
636 return
638 # Read next chunk
639 chunk = await loop.run_in_executor(
640 None,
641 self._read,
642 (
643 min(READ_SIZE, remaining_content_len)
644 if remaining_content_len is not None
645 else READ_SIZE
646 ),
647 )
649 def _should_stop_writing(
650 self,
651 available_len: Optional[int],
652 total_written_len: int,
653 remaining_content_len: Optional[int],
654 ) -> bool:
655 """
656 Determine if we should stop writing data.
658 Args:
659 available_len: Known size of the payload if available (None if unknown)
660 total_written_len: Number of bytes already written
661 remaining_content_len: Remaining bytes to be written for content-length limited responses
663 Returns:
664 True if we should stop writing data, based on either:
665 - Having written all available data (when size is known)
666 - Having written all requested content (when content-length is specified)
668 """
669 return (available_len is not None and total_written_len >= available_len) or (
670 remaining_content_len is not None and remaining_content_len <= 0
671 )
673 def _close(self) -> None:
674 """
675 Async safe synchronous close operations for backwards compatibility.
677 This method exists only for backwards
678 compatibility. Use the async close() method instead.
680 WARNING: This method MUST be called from within an event loop.
681 Calling it outside an event loop will raise RuntimeError.
682 """
683 # Skip if already consumed
684 if self._consumed:
685 return
686 self._consumed = True # Mark as consumed to prevent further writes
687 # Schedule file closing without awaiting to prevent cancellation issues
688 loop = asyncio.get_running_loop()
689 close_future = loop.run_in_executor(None, self._value.close)
690 # Hold a strong reference to the future to prevent it from being
691 # garbage collected before it completes.
692 _CLOSE_FUTURES.add(close_future)
693 close_future.add_done_callback(_CLOSE_FUTURES.remove)
695 async def close(self) -> None:
696 """
697 Close the payload if it holds any resources.
699 IMPORTANT: This method must not await anything that might not finish
700 immediately, as it may be called during cleanup/cancellation. Schedule
701 any long-running operations without awaiting them.
702 """
703 self._close()
705 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
706 """
707 Return string representation of the value.
709 WARNING: This method does blocking I/O and should not be called in the event loop.
710 """
711 return self._read_all().decode(encoding, errors)
713 def _read_all(self) -> bytes:
714 """Read the entire file-like object and return its content as bytes."""
715 self._set_or_restore_start_position()
716 # Use readlines() to ensure we get all content
717 return b"".join(self._value.readlines())
719 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
720 """
721 Return bytes representation of the value.
723 This method reads the entire file content and returns it as bytes.
724 It is equivalent to reading the file-like object directly.
725 The file reading is performed in an executor to avoid blocking the event loop.
726 """
727 loop = asyncio.get_running_loop()
728 return await loop.run_in_executor(None, self._read_all)
731class TextIOPayload(IOBasePayload):
732 _value: io.TextIOBase
733 # _autoclose = False (inherited) - Has text file handle that needs explicit closing
735 def __init__(
736 self,
737 value: TextIO,
738 *args: Any,
739 encoding: Optional[str] = None,
740 content_type: Optional[str] = None,
741 **kwargs: Any,
742 ) -> None:
744 if encoding is None:
745 if content_type is None:
746 encoding = "utf-8"
747 content_type = "text/plain; charset=utf-8"
748 else:
749 mimetype = parse_mimetype(content_type)
750 encoding = mimetype.parameters.get("charset", "utf-8")
751 else:
752 if content_type is None:
753 content_type = "text/plain; charset=%s" % encoding
755 super().__init__(
756 value,
757 content_type=content_type,
758 encoding=encoding,
759 *args,
760 **kwargs,
761 )
763 def _read_and_available_len(
764 self, remaining_content_len: Optional[int]
765 ) -> Tuple[Optional[int], bytes]:
766 """
767 Read the text file-like object and return both its total size and the first chunk.
769 Args:
770 remaining_content_len: Optional limit on how many bytes to read in this operation.
771 If None, READ_SIZE will be used as the default chunk size.
773 Returns:
774 A tuple containing:
775 - The total size of the remaining unread content (None if size cannot be determined)
776 - The first chunk of bytes read from the file object, encoded using the payload's encoding
778 This method is optimized to perform both size calculation and initial read
779 in a single operation, which is executed in a single executor job to minimize
780 context switches and file operations when streaming content.
782 Note:
783 TextIOPayload handles encoding of the text content before writing it
784 to the stream. If no encoding is specified, UTF-8 is used as the default.
786 """
787 self._set_or_restore_start_position()
788 size = self.size
789 chunk = self._value.read(
790 min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE)
791 )
792 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode()
794 def _read(self, remaining_content_len: Optional[int]) -> bytes:
795 """
796 Read a chunk of data from the text file-like object.
798 Args:
799 remaining_content_len: Optional maximum number of bytes to read.
800 If None, READ_SIZE will be used as the default chunk size.
802 Returns:
803 A chunk of bytes read from the file object and encoded using the payload's
804 encoding. The data is automatically converted from text to bytes.
806 This method is used for subsequent reads during streaming after
807 the initial _read_and_available_len call has been made. It properly
808 handles text encoding, converting the text content to bytes using
809 the specified encoding (or UTF-8 if none was provided).
811 """
812 chunk = self._value.read(remaining_content_len or READ_SIZE)
813 return chunk.encode(self._encoding) if self._encoding else chunk.encode()
815 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
816 """
817 Return string representation of the value.
819 WARNING: This method does blocking I/O and should not be called in the event loop.
820 """
821 self._set_or_restore_start_position()
822 return self._value.read()
824 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
825 """
826 Return bytes representation of the value.
828 This method reads the entire text file content and returns it as bytes.
829 It encodes the text content using the specified encoding.
830 The file reading is performed in an executor to avoid blocking the event loop.
831 """
832 loop = asyncio.get_running_loop()
834 # Use instance encoding if available, otherwise use parameter
835 actual_encoding = self._encoding or encoding
837 def _read_and_encode() -> bytes:
838 self._set_or_restore_start_position()
839 # TextIO read() always returns the full content
840 return self._value.read().encode(actual_encoding, errors)
842 return await loop.run_in_executor(None, _read_and_encode)
845class BytesIOPayload(IOBasePayload):
846 _value: io.BytesIO
847 _size: int # Always initialized in __init__
848 _autoclose = True # BytesIO is in-memory, safe to auto-close
850 def __init__(self, value: io.BytesIO, *args: Any, **kwargs: Any) -> None:
851 super().__init__(value, *args, **kwargs)
852 # Calculate size once during initialization
853 self._size = len(self._value.getbuffer()) - self._value.tell()
855 @property
856 def size(self) -> int:
857 """Size of the payload in bytes.
859 Returns the number of bytes in the BytesIO buffer that will be transmitted.
860 This is calculated once during initialization for efficiency.
861 """
862 return self._size
864 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
865 self._set_or_restore_start_position()
866 return self._value.read().decode(encoding, errors)
868 async def write(self, writer: AbstractStreamWriter) -> None:
869 return await self.write_with_length(writer, None)
871 async def write_with_length(
872 self, writer: AbstractStreamWriter, content_length: Optional[int]
873 ) -> None:
874 """
875 Write BytesIO payload with a specific content length constraint.
877 Args:
878 writer: An AbstractStreamWriter instance that handles the actual writing
879 content_length: Maximum number of bytes to write (None for unlimited)
881 This implementation is specifically optimized for BytesIO objects:
883 1. Reads content in chunks to maintain memory efficiency
884 2. Yields control back to the event loop periodically to prevent blocking
885 when dealing with large BytesIO objects
886 3. Respects content_length constraints when specified
887 4. Properly cleans up by closing the BytesIO object when done or on error
889 The periodic yielding to the event loop is important for maintaining
890 responsiveness when processing large in-memory buffers.
892 """
893 self._set_or_restore_start_position()
894 loop_count = 0
895 remaining_bytes = content_length
896 while chunk := self._value.read(READ_SIZE):
897 if loop_count > 0:
898 # Avoid blocking the event loop
899 # if they pass a large BytesIO object
900 # and we are not in the first iteration
901 # of the loop
902 await asyncio.sleep(0)
903 if remaining_bytes is None:
904 await writer.write(chunk)
905 else:
906 await writer.write(chunk[:remaining_bytes])
907 remaining_bytes -= len(chunk)
908 if remaining_bytes <= 0:
909 return
910 loop_count += 1
912 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
913 """
914 Return bytes representation of the value.
916 This method reads the entire BytesIO content and returns it as bytes.
917 It is equivalent to accessing the _value attribute directly.
918 """
919 self._set_or_restore_start_position()
920 return self._value.read()
922 async def close(self) -> None:
923 """
924 Close the BytesIO payload.
926 This does nothing since BytesIO is in-memory and does not require explicit closing.
927 """
930class BufferedReaderPayload(IOBasePayload):
931 _value: io.BufferedIOBase
932 # _autoclose = False (inherited) - Has buffered file handle that needs explicit closing
934 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
935 self._set_or_restore_start_position()
936 return self._value.read().decode(encoding, errors)
939class JsonPayload(BytesPayload):
940 def __init__(
941 self,
942 value: Any,
943 encoding: str = "utf-8",
944 content_type: str = "application/json",
945 dumps: JSONEncoder = json.dumps,
946 *args: Any,
947 **kwargs: Any,
948 ) -> None:
950 super().__init__(
951 dumps(value).encode(encoding),
952 content_type=content_type,
953 encoding=encoding,
954 *args,
955 **kwargs,
956 )
959if TYPE_CHECKING:
960 from typing import AsyncIterable, AsyncIterator
962 _AsyncIterator = AsyncIterator[bytes]
963 _AsyncIterable = AsyncIterable[bytes]
964else:
965 from collections.abc import AsyncIterable, AsyncIterator
967 _AsyncIterator = AsyncIterator
968 _AsyncIterable = AsyncIterable
971class AsyncIterablePayload(Payload):
973 _iter: Optional[_AsyncIterator] = None
974 _value: _AsyncIterable
975 _cached_chunks: Optional[List[bytes]] = None
976 # _consumed stays False to allow reuse with cached content
977 _autoclose = True # Iterator doesn't need explicit closing
979 def __init__(self, value: _AsyncIterable, *args: Any, **kwargs: Any) -> None:
980 if not isinstance(value, AsyncIterable):
981 raise TypeError(
982 "value argument must support "
983 "collections.abc.AsyncIterable interface, "
984 "got {!r}".format(type(value))
985 )
987 if "content_type" not in kwargs:
988 kwargs["content_type"] = "application/octet-stream"
990 super().__init__(value, *args, **kwargs)
992 self._iter = value.__aiter__()
994 async def write(self, writer: AbstractStreamWriter) -> None:
995 """
996 Write the entire async iterable payload to the writer stream.
998 Args:
999 writer: An AbstractStreamWriter instance that handles the actual writing
1001 This method iterates through the async iterable and writes each chunk
1002 to the writer without any length constraint.
1004 Note:
1005 For new implementations that need length control, use write_with_length() directly.
1006 This method is maintained for backwards compatibility with existing code.
1008 """
1009 await self.write_with_length(writer, None)
1011 async def write_with_length(
1012 self, writer: AbstractStreamWriter, content_length: Optional[int]
1013 ) -> None:
1014 """
1015 Write async iterable payload with a specific content length constraint.
1017 Args:
1018 writer: An AbstractStreamWriter instance that handles the actual writing
1019 content_length: Maximum number of bytes to write (None for unlimited)
1021 This implementation handles streaming of async iterable content with length constraints:
1023 1. If cached chunks are available, writes from them
1024 2. Otherwise iterates through the async iterable one chunk at a time
1025 3. Respects content_length constraints when specified
1026 4. Does NOT generate cache - that's done by as_bytes()
1028 """
1029 # If we have cached chunks, use them
1030 if self._cached_chunks is not None:
1031 remaining_bytes = content_length
1032 for chunk in self._cached_chunks:
1033 if remaining_bytes is None:
1034 await writer.write(chunk)
1035 elif remaining_bytes > 0:
1036 await writer.write(chunk[:remaining_bytes])
1037 remaining_bytes -= len(chunk)
1038 else:
1039 break
1040 return
1042 # If iterator is exhausted and we don't have cached chunks, nothing to write
1043 if self._iter is None:
1044 return
1046 # Stream from the iterator
1047 remaining_bytes = content_length
1049 try:
1050 while True:
1051 if sys.version_info >= (3, 10):
1052 chunk = await anext(self._iter)
1053 else:
1054 chunk = await self._iter.__anext__()
1055 if remaining_bytes is None:
1056 await writer.write(chunk)
1057 # If we have a content length limit
1058 elif remaining_bytes > 0:
1059 await writer.write(chunk[:remaining_bytes])
1060 remaining_bytes -= len(chunk)
1061 # We still want to exhaust the iterator even
1062 # if we have reached the content length limit
1063 # since the file handle may not get closed by
1064 # the iterator if we don't do this
1065 except StopAsyncIteration:
1066 # Iterator is exhausted
1067 self._iter = None
1068 self._consumed = True # Mark as consumed when streamed without caching
1070 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
1071 """Decode the payload content as a string if cached chunks are available."""
1072 if self._cached_chunks is not None:
1073 return b"".join(self._cached_chunks).decode(encoding, errors)
1074 raise TypeError("Unable to decode - content not cached. Call as_bytes() first.")
1076 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
1077 """
1078 Return bytes representation of the value.
1080 This method reads the entire async iterable content and returns it as bytes.
1081 It generates and caches the chunks for future reuse.
1082 """
1083 # If we have cached chunks, return them joined
1084 if self._cached_chunks is not None:
1085 return b"".join(self._cached_chunks)
1087 # If iterator is exhausted and no cache, return empty
1088 if self._iter is None:
1089 return b""
1091 # Read all chunks and cache them
1092 chunks: List[bytes] = []
1093 async for chunk in self._iter:
1094 chunks.append(chunk)
1096 # Iterator is exhausted, cache the chunks
1097 self._iter = None
1098 self._cached_chunks = chunks
1099 # Keep _consumed as False to allow reuse with cached chunks
1101 return b"".join(chunks)
1104class StreamReaderPayload(AsyncIterablePayload):
1105 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None:
1106 super().__init__(value.iter_any(), *args, **kwargs)
1109PAYLOAD_REGISTRY = PayloadRegistry()
1110PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview))
1111PAYLOAD_REGISTRY.register(StringPayload, str)
1112PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO)
1113PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase)
1114PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO)
1115PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom))
1116PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase)
1117PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader)
1118# try_last for giving a chance to more specialized async interables like
1119# multipart.BodyPartReaderPayload override the default
1120PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)