Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/payload.py: 47%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import enum
3import io
4import json
5import mimetypes
6import os
7import sys
8import warnings
9from abc import ABC, abstractmethod
10from itertools import chain
11from typing import (
12 IO,
13 TYPE_CHECKING,
14 Any,
15 Dict,
16 Final,
17 Iterable,
18 Optional,
19 Set,
20 TextIO,
21 Tuple,
22 Type,
23 Union,
24)
26from multidict import CIMultiDict
28from . import hdrs
29from .abc import AbstractStreamWriter
30from .helpers import (
31 _SENTINEL,
32 content_disposition_header,
33 guess_filename,
34 parse_mimetype,
35 sentinel,
36)
37from .streams import StreamReader
38from .typedefs import JSONEncoder, _CIMultiDict
40__all__ = (
41 "PAYLOAD_REGISTRY",
42 "get_payload",
43 "payload_type",
44 "Payload",
45 "BytesPayload",
46 "StringPayload",
47 "IOBasePayload",
48 "BytesIOPayload",
49 "BufferedReaderPayload",
50 "TextIOPayload",
51 "StringIOPayload",
52 "JsonPayload",
53 "AsyncIterablePayload",
54)
56TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB
57READ_SIZE: Final[int] = 2**16 # 64 KB
58_CLOSE_FUTURES: Set[asyncio.Future[None]] = set()
61if TYPE_CHECKING:
62 from typing import List
65class LookupError(Exception):
66 pass
69class Order(str, enum.Enum):
70 normal = "normal"
71 try_first = "try_first"
72 try_last = "try_last"
75def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload":
76 return PAYLOAD_REGISTRY.get(data, *args, **kwargs)
79def register_payload(
80 factory: Type["Payload"], type: Any, *, order: Order = Order.normal
81) -> None:
82 PAYLOAD_REGISTRY.register(factory, type, order=order)
85class payload_type:
86 def __init__(self, type: Any, *, order: Order = Order.normal) -> None:
87 self.type = type
88 self.order = order
90 def __call__(self, factory: Type["Payload"]) -> Type["Payload"]:
91 register_payload(factory, self.type, order=self.order)
92 return factory
95PayloadType = Type["Payload"]
96_PayloadRegistryItem = Tuple[PayloadType, Any]
99class PayloadRegistry:
100 """Payload registry.
102 note: we need zope.interface for more efficient adapter search
103 """
105 __slots__ = ("_first", "_normal", "_last", "_normal_lookup")
107 def __init__(self) -> None:
108 self._first: List[_PayloadRegistryItem] = []
109 self._normal: List[_PayloadRegistryItem] = []
110 self._last: List[_PayloadRegistryItem] = []
111 self._normal_lookup: Dict[Any, PayloadType] = {}
113 def get(
114 self,
115 data: Any,
116 *args: Any,
117 _CHAIN: "Type[chain[_PayloadRegistryItem]]" = chain,
118 **kwargs: Any,
119 ) -> "Payload":
120 if self._first:
121 for factory, type_ in self._first:
122 if isinstance(data, type_):
123 return factory(data, *args, **kwargs)
124 # Try the fast lookup first
125 if lookup_factory := self._normal_lookup.get(type(data)):
126 return lookup_factory(data, *args, **kwargs)
127 # Bail early if its already a Payload
128 if isinstance(data, Payload):
129 return data
130 # Fallback to the slower linear search
131 for factory, type_ in _CHAIN(self._normal, self._last):
132 if isinstance(data, type_):
133 return factory(data, *args, **kwargs)
134 raise LookupError()
136 def register(
137 self, factory: PayloadType, type: Any, *, order: Order = Order.normal
138 ) -> None:
139 if order is Order.try_first:
140 self._first.append((factory, type))
141 elif order is Order.normal:
142 self._normal.append((factory, type))
143 if isinstance(type, Iterable):
144 for t in type:
145 self._normal_lookup[t] = factory
146 else:
147 self._normal_lookup[type] = factory
148 elif order is Order.try_last:
149 self._last.append((factory, type))
150 else:
151 raise ValueError(f"Unsupported order {order!r}")
154class Payload(ABC):
155 _default_content_type: str = "application/octet-stream"
156 _size: Optional[int] = None
158 def __init__(
159 self,
160 value: Any,
161 headers: Optional[
162 Union[_CIMultiDict, Dict[str, str], Iterable[Tuple[str, str]]]
163 ] = None,
164 content_type: Union[None, str, _SENTINEL] = sentinel,
165 filename: Optional[str] = None,
166 encoding: Optional[str] = None,
167 **kwargs: Any,
168 ) -> None:
169 self._encoding = encoding
170 self._filename = filename
171 self._headers: _CIMultiDict = CIMultiDict()
172 self._value = value
173 if content_type is not sentinel and content_type is not None:
174 assert isinstance(content_type, str)
175 self._headers[hdrs.CONTENT_TYPE] = content_type
176 elif self._filename is not None:
177 if sys.version_info >= (3, 13):
178 guesser = mimetypes.guess_file_type
179 else:
180 guesser = mimetypes.guess_type
181 content_type = guesser(self._filename)[0]
182 if content_type is None:
183 content_type = self._default_content_type
184 self._headers[hdrs.CONTENT_TYPE] = content_type
185 else:
186 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type
187 if headers:
188 self._headers.update(headers)
190 @property
191 def size(self) -> Optional[int]:
192 """Size of the payload."""
193 return self._size
195 @property
196 def filename(self) -> Optional[str]:
197 """Filename of the payload."""
198 return self._filename
200 @property
201 def headers(self) -> _CIMultiDict:
202 """Custom item headers"""
203 return self._headers
205 @property
206 def _binary_headers(self) -> bytes:
207 return (
208 "".join([k + ": " + v + "\r\n" for k, v in self.headers.items()]).encode(
209 "utf-8"
210 )
211 + b"\r\n"
212 )
214 @property
215 def encoding(self) -> Optional[str]:
216 """Payload encoding"""
217 return self._encoding
219 @property
220 def content_type(self) -> str:
221 """Content type"""
222 return self._headers[hdrs.CONTENT_TYPE]
224 def set_content_disposition(
225 self,
226 disptype: str,
227 quote_fields: bool = True,
228 _charset: str = "utf-8",
229 **params: str,
230 ) -> None:
231 """Sets ``Content-Disposition`` header."""
232 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header(
233 disptype, quote_fields=quote_fields, _charset=_charset, params=params
234 )
236 @abstractmethod
237 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
238 """Return string representation of the value.
240 This is named decode() to allow compatibility with bytes objects.
241 """
243 @abstractmethod
244 async def write(self, writer: AbstractStreamWriter) -> None:
245 """Write payload to the writer stream.
247 Args:
248 writer: An AbstractStreamWriter instance that handles the actual writing
250 This is a legacy method that writes the entire payload without length constraints.
252 Important:
253 For new implementations, use write_with_length() instead of this method.
254 This method is maintained for backwards compatibility and will eventually
255 delegate to write_with_length(writer, None) in all implementations.
257 All payload subclasses must override this method for backwards compatibility,
258 but new code should use write_with_length for more flexibility and control.
259 """
261 # write_with_length is new in aiohttp 3.12
262 # it should be overridden by subclasses
263 async def write_with_length(
264 self, writer: AbstractStreamWriter, content_length: Optional[int]
265 ) -> None:
266 """
267 Write payload with a specific content length constraint.
269 Args:
270 writer: An AbstractStreamWriter instance that handles the actual writing
271 content_length: Maximum number of bytes to write (None for unlimited)
273 This method allows writing payload content with a specific length constraint,
274 which is particularly useful for HTTP responses with Content-Length header.
276 Note:
277 This is the base implementation that provides backwards compatibility
278 for subclasses that don't override this method. Specific payload types
279 should override this method to implement proper length-constrained writing.
281 """
282 # Backwards compatibility for subclasses that don't override this method
283 # and for the default implementation
284 await self.write(writer)
287class BytesPayload(Payload):
288 _value: bytes
290 def __init__(
291 self, value: Union[bytes, bytearray, memoryview], *args: Any, **kwargs: Any
292 ) -> None:
293 if "content_type" not in kwargs:
294 kwargs["content_type"] = "application/octet-stream"
296 super().__init__(value, *args, **kwargs)
298 if isinstance(value, memoryview):
299 self._size = value.nbytes
300 elif isinstance(value, (bytes, bytearray)):
301 self._size = len(value)
302 else:
303 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}")
305 if self._size > TOO_LARGE_BYTES_BODY:
306 warnings.warn(
307 "Sending a large body directly with raw bytes might"
308 " lock the event loop. You should probably pass an "
309 "io.BytesIO object instead",
310 ResourceWarning,
311 source=self,
312 )
314 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
315 return self._value.decode(encoding, errors)
317 async def write(self, writer: AbstractStreamWriter) -> None:
318 """Write the entire bytes payload to the writer stream.
320 Args:
321 writer: An AbstractStreamWriter instance that handles the actual writing
323 This method writes the entire bytes content without any length constraint.
325 Note:
326 For new implementations that need length control, use write_with_length().
327 This method is maintained for backwards compatibility and is equivalent
328 to write_with_length(writer, None).
329 """
330 await writer.write(self._value)
332 async def write_with_length(
333 self, writer: AbstractStreamWriter, content_length: Optional[int]
334 ) -> None:
335 """
336 Write bytes payload with a specific content length constraint.
338 Args:
339 writer: An AbstractStreamWriter instance that handles the actual writing
340 content_length: Maximum number of bytes to write (None for unlimited)
342 This method writes either the entire byte sequence or a slice of it
343 up to the specified content_length. For BytesPayload, this operation
344 is performed efficiently using array slicing.
346 """
347 if content_length is not None:
348 await writer.write(self._value[:content_length])
349 else:
350 await writer.write(self._value)
353class StringPayload(BytesPayload):
354 def __init__(
355 self,
356 value: str,
357 *args: Any,
358 encoding: Optional[str] = None,
359 content_type: Optional[str] = None,
360 **kwargs: Any,
361 ) -> None:
362 if encoding is None:
363 if content_type is None:
364 real_encoding = "utf-8"
365 content_type = "text/plain; charset=utf-8"
366 else:
367 mimetype = parse_mimetype(content_type)
368 real_encoding = mimetype.parameters.get("charset", "utf-8")
369 else:
370 if content_type is None:
371 content_type = "text/plain; charset=%s" % encoding
372 real_encoding = encoding
374 super().__init__(
375 value.encode(real_encoding),
376 encoding=real_encoding,
377 content_type=content_type,
378 *args,
379 **kwargs,
380 )
383class StringIOPayload(StringPayload):
384 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None:
385 super().__init__(value.read(), *args, **kwargs)
388class IOBasePayload(Payload):
389 _value: io.IOBase
391 def __init__(
392 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any
393 ) -> None:
394 if "filename" not in kwargs:
395 kwargs["filename"] = guess_filename(value)
397 super().__init__(value, *args, **kwargs)
399 if self._filename is not None and disposition is not None:
400 if hdrs.CONTENT_DISPOSITION not in self.headers:
401 self.set_content_disposition(disposition, filename=self._filename)
403 def _read_and_available_len(
404 self, remaining_content_len: Optional[int]
405 ) -> Tuple[Optional[int], bytes]:
406 """
407 Read the file-like object and return both its total size and the first chunk.
409 Args:
410 remaining_content_len: Optional limit on how many bytes to read in this operation.
411 If None, READ_SIZE will be used as the default chunk size.
413 Returns:
414 A tuple containing:
415 - The total size of the remaining unread content (None if size cannot be determined)
416 - The first chunk of bytes read from the file object
418 This method is optimized to perform both size calculation and initial read
419 in a single operation, which is executed in a single executor job to minimize
420 context switches and file operations when streaming content.
422 """
423 size = self.size # Call size only once since it does I/O
424 return size, self._value.read(
425 min(size or READ_SIZE, remaining_content_len or READ_SIZE)
426 )
428 def _read(self, remaining_content_len: Optional[int]) -> bytes:
429 """
430 Read a chunk of data from the file-like object.
432 Args:
433 remaining_content_len: Optional maximum number of bytes to read.
434 If None, READ_SIZE will be used as the default chunk size.
436 Returns:
437 A chunk of bytes read from the file object, respecting the
438 remaining_content_len limit if specified.
440 This method is used for subsequent reads during streaming after
441 the initial _read_and_available_len call has been made.
443 """
444 return self._value.read(remaining_content_len or READ_SIZE) # type: ignore[no-any-return]
446 @property
447 def size(self) -> Optional[int]:
448 try:
449 return os.fstat(self._value.fileno()).st_size - self._value.tell()
450 except (AttributeError, OSError):
451 return None
453 async def write(self, writer: AbstractStreamWriter) -> None:
454 """
455 Write the entire file-like payload to the writer stream.
457 Args:
458 writer: An AbstractStreamWriter instance that handles the actual writing
460 This method writes the entire file content without any length constraint.
461 It delegates to write_with_length() with no length limit for implementation
462 consistency.
464 Note:
465 For new implementations that need length control, use write_with_length() directly.
466 This method is maintained for backwards compatibility with existing code.
468 """
469 await self.write_with_length(writer, None)
471 async def write_with_length(
472 self, writer: AbstractStreamWriter, content_length: Optional[int]
473 ) -> None:
474 """
475 Write file-like payload with a specific content length constraint.
477 Args:
478 writer: An AbstractStreamWriter instance that handles the actual writing
479 content_length: Maximum number of bytes to write (None for unlimited)
481 This method implements optimized streaming of file content with length constraints:
483 1. File reading is performed in a thread pool to avoid blocking the event loop
484 2. Content is read and written in chunks to maintain memory efficiency
485 3. Writing stops when either:
486 - All available file content has been written (when size is known)
487 - The specified content_length has been reached
488 4. File resources are properly closed even if the operation is cancelled
490 The implementation carefully handles both known-size and unknown-size payloads,
491 as well as constrained and unconstrained content lengths.
493 """
494 loop = asyncio.get_running_loop()
495 total_written_len = 0
496 remaining_content_len = content_length
498 try:
499 # Get initial data and available length
500 available_len, chunk = await loop.run_in_executor(
501 None, self._read_and_available_len, remaining_content_len
502 )
503 # Process data chunks until done
504 while chunk:
505 chunk_len = len(chunk)
507 # Write data with or without length constraint
508 if remaining_content_len is None:
509 await writer.write(chunk)
510 else:
511 await writer.write(chunk[:remaining_content_len])
512 remaining_content_len -= chunk_len
514 total_written_len += chunk_len
516 # Check if we're done writing
517 if self._should_stop_writing(
518 available_len, total_written_len, remaining_content_len
519 ):
520 return
522 # Read next chunk
523 chunk = await loop.run_in_executor(
524 None, self._read, remaining_content_len
525 )
526 finally:
527 # Handle closing the file without awaiting to prevent cancellation issues
528 # when the StreamReader reaches EOF
529 self._schedule_file_close(loop)
531 def _should_stop_writing(
532 self,
533 available_len: Optional[int],
534 total_written_len: int,
535 remaining_content_len: Optional[int],
536 ) -> bool:
537 """
538 Determine if we should stop writing data.
540 Args:
541 available_len: Known size of the payload if available (None if unknown)
542 total_written_len: Number of bytes already written
543 remaining_content_len: Remaining bytes to be written for content-length limited responses
545 Returns:
546 True if we should stop writing data, based on either:
547 - Having written all available data (when size is known)
548 - Having written all requested content (when content-length is specified)
550 """
551 return (available_len is not None and total_written_len >= available_len) or (
552 remaining_content_len is not None and remaining_content_len <= 0
553 )
555 def _schedule_file_close(self, loop: asyncio.AbstractEventLoop) -> None:
556 """Schedule file closing without awaiting to prevent cancellation issues."""
557 close_future = loop.run_in_executor(None, self._value.close)
558 # Hold a strong reference to the future to prevent it from being
559 # garbage collected before it completes.
560 _CLOSE_FUTURES.add(close_future)
561 close_future.add_done_callback(_CLOSE_FUTURES.remove)
563 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
564 return "".join(r.decode(encoding, errors) for r in self._value.readlines())
567class TextIOPayload(IOBasePayload):
568 _value: io.TextIOBase
570 def __init__(
571 self,
572 value: TextIO,
573 *args: Any,
574 encoding: Optional[str] = None,
575 content_type: Optional[str] = None,
576 **kwargs: Any,
577 ) -> None:
578 if encoding is None:
579 if content_type is None:
580 encoding = "utf-8"
581 content_type = "text/plain; charset=utf-8"
582 else:
583 mimetype = parse_mimetype(content_type)
584 encoding = mimetype.parameters.get("charset", "utf-8")
585 else:
586 if content_type is None:
587 content_type = "text/plain; charset=%s" % encoding
589 super().__init__(
590 value,
591 content_type=content_type,
592 encoding=encoding,
593 *args,
594 **kwargs,
595 )
597 def _read_and_available_len(
598 self, remaining_content_len: Optional[int]
599 ) -> Tuple[Optional[int], bytes]:
600 """
601 Read the text file-like object and return both its total size and the first chunk.
603 Args:
604 remaining_content_len: Optional limit on how many bytes to read in this operation.
605 If None, READ_SIZE will be used as the default chunk size.
607 Returns:
608 A tuple containing:
609 - The total size of the remaining unread content (None if size cannot be determined)
610 - The first chunk of bytes read from the file object, encoded using the payload's encoding
612 This method is optimized to perform both size calculation and initial read
613 in a single operation, which is executed in a single executor job to minimize
614 context switches and file operations when streaming content.
616 Note:
617 TextIOPayload handles encoding of the text content before writing it
618 to the stream. If no encoding is specified, UTF-8 is used as the default.
620 """
621 size = self.size
622 chunk = self._value.read(
623 min(size or READ_SIZE, remaining_content_len or READ_SIZE)
624 )
625 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode()
627 def _read(self, remaining_content_len: Optional[int]) -> bytes:
628 """
629 Read a chunk of data from the text file-like object.
631 Args:
632 remaining_content_len: Optional maximum number of bytes to read.
633 If None, READ_SIZE will be used as the default chunk size.
635 Returns:
636 A chunk of bytes read from the file object and encoded using the payload's
637 encoding. The data is automatically converted from text to bytes.
639 This method is used for subsequent reads during streaming after
640 the initial _read_and_available_len call has been made. It properly
641 handles text encoding, converting the text content to bytes using
642 the specified encoding (or UTF-8 if none was provided).
644 """
645 chunk = self._value.read(remaining_content_len or READ_SIZE)
646 return chunk.encode(self._encoding) if self._encoding else chunk.encode()
648 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
649 return self._value.read()
652class BytesIOPayload(IOBasePayload):
653 _value: io.BytesIO
655 @property
656 def size(self) -> int:
657 position = self._value.tell()
658 end = self._value.seek(0, os.SEEK_END)
659 self._value.seek(position)
660 return end - position
662 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
663 return self._value.read().decode(encoding, errors)
665 async def write(self, writer: AbstractStreamWriter) -> None:
666 return await self.write_with_length(writer, None)
668 async def write_with_length(
669 self, writer: AbstractStreamWriter, content_length: Optional[int]
670 ) -> None:
671 """
672 Write BytesIO payload with a specific content length constraint.
674 Args:
675 writer: An AbstractStreamWriter instance that handles the actual writing
676 content_length: Maximum number of bytes to write (None for unlimited)
678 This implementation is specifically optimized for BytesIO objects:
680 1. Reads content in chunks to maintain memory efficiency
681 2. Yields control back to the event loop periodically to prevent blocking
682 when dealing with large BytesIO objects
683 3. Respects content_length constraints when specified
684 4. Properly cleans up by closing the BytesIO object when done or on error
686 The periodic yielding to the event loop is important for maintaining
687 responsiveness when processing large in-memory buffers.
689 """
690 loop_count = 0
691 remaining_bytes = content_length
692 try:
693 while chunk := self._value.read(READ_SIZE):
694 if loop_count > 0:
695 # Avoid blocking the event loop
696 # if they pass a large BytesIO object
697 # and we are not in the first iteration
698 # of the loop
699 await asyncio.sleep(0)
700 if remaining_bytes is None:
701 await writer.write(chunk)
702 else:
703 await writer.write(chunk[:remaining_bytes])
704 remaining_bytes -= len(chunk)
705 if remaining_bytes <= 0:
706 return
707 loop_count += 1
708 finally:
709 self._value.close()
712class BufferedReaderPayload(IOBasePayload):
713 _value: io.BufferedIOBase
715 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
716 return self._value.read().decode(encoding, errors)
719class JsonPayload(BytesPayload):
720 def __init__(
721 self,
722 value: Any,
723 encoding: str = "utf-8",
724 content_type: str = "application/json",
725 dumps: JSONEncoder = json.dumps,
726 *args: Any,
727 **kwargs: Any,
728 ) -> None:
729 super().__init__(
730 dumps(value).encode(encoding),
731 content_type=content_type,
732 encoding=encoding,
733 *args,
734 **kwargs,
735 )
738if TYPE_CHECKING:
739 from typing import AsyncIterable, AsyncIterator
741 _AsyncIterator = AsyncIterator[bytes]
742 _AsyncIterable = AsyncIterable[bytes]
743else:
744 from collections.abc import AsyncIterable, AsyncIterator
746 _AsyncIterator = AsyncIterator
747 _AsyncIterable = AsyncIterable
750class AsyncIterablePayload(Payload):
751 _iter: Optional[_AsyncIterator] = None
752 _value: _AsyncIterable
754 def __init__(self, value: _AsyncIterable, *args: Any, **kwargs: Any) -> None:
755 if not isinstance(value, AsyncIterable):
756 raise TypeError(
757 "value argument must support "
758 "collections.abc.AsyncIterable interface, "
759 "got {!r}".format(type(value))
760 )
762 if "content_type" not in kwargs:
763 kwargs["content_type"] = "application/octet-stream"
765 super().__init__(value, *args, **kwargs)
767 self._iter = value.__aiter__()
769 async def write(self, writer: AbstractStreamWriter) -> None:
770 """
771 Write the entire async iterable payload to the writer stream.
773 Args:
774 writer: An AbstractStreamWriter instance that handles the actual writing
776 This method iterates through the async iterable and writes each chunk
777 to the writer without any length constraint.
779 Note:
780 For new implementations that need length control, use write_with_length() directly.
781 This method is maintained for backwards compatibility with existing code.
783 """
784 await self.write_with_length(writer, None)
786 async def write_with_length(
787 self, writer: AbstractStreamWriter, content_length: Optional[int]
788 ) -> None:
789 """
790 Write async iterable payload with a specific content length constraint.
792 Args:
793 writer: An AbstractStreamWriter instance that handles the actual writing
794 content_length: Maximum number of bytes to write (None for unlimited)
796 This implementation handles streaming of async iterable content with length constraints:
798 1. Iterates through the async iterable one chunk at a time
799 2. Respects content_length constraints when specified
800 3. Handles the case when the iterable might be used twice
802 Since async iterables are consumed as they're iterated, there is no way to
803 restart the iteration if it's already in progress or completed.
805 """
806 if self._iter is None:
807 return
809 remaining_bytes = content_length
811 try:
812 while True:
813 if sys.version_info >= (3, 10):
814 chunk = await anext(self._iter)
815 else:
816 chunk = await self._iter.__anext__()
817 if remaining_bytes is None:
818 await writer.write(chunk)
819 # If we have a content length limit
820 elif remaining_bytes > 0:
821 await writer.write(chunk[:remaining_bytes])
822 remaining_bytes -= len(chunk)
823 # We still want to exhaust the iterator even
824 # if we have reached the content length limit
825 # since the file handle may not get closed by
826 # the iterator if we don't do this
827 except StopAsyncIteration:
828 # Iterator is exhausted
829 self._iter = None
831 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
832 raise TypeError("Unable to decode.")
835class StreamReaderPayload(AsyncIterablePayload):
836 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None:
837 super().__init__(value.iter_any(), *args, **kwargs)
840PAYLOAD_REGISTRY = PayloadRegistry()
841PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview))
842PAYLOAD_REGISTRY.register(StringPayload, str)
843PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO)
844PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase)
845PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO)
846PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom))
847PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase)
848PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader)
849# try_last for giving a chance to more specialized async interables like
850# multidict.BodyPartReaderPayload override the default
851PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)