Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/payload.py: 50%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import enum
3import io
4import json
5import mimetypes
6import os
7import sys
8import warnings
9from abc import ABC, abstractmethod
10from collections.abc import AsyncIterable, AsyncIterator, Iterable
11from itertools import chain
12from typing import IO, Any, Final, TextIO
14from multidict import CIMultiDict
16from . import hdrs
17from .abc import AbstractStreamWriter
18from .helpers import (
19 _SENTINEL,
20 DEFAULT_CHUNK_SIZE,
21 content_disposition_header,
22 guess_filename,
23 parse_mimetype,
24 sentinel,
25)
26from .streams import StreamReader
27from .typedefs import JSONBytesEncoder, JSONEncoder
29__all__ = (
30 "PAYLOAD_REGISTRY",
31 "get_payload",
32 "payload_type",
33 "Payload",
34 "BytesPayload",
35 "StringPayload",
36 "IOBasePayload",
37 "BytesIOPayload",
38 "BufferedReaderPayload",
39 "TextIOPayload",
40 "StringIOPayload",
41 "JsonPayload",
42 "JsonBytesPayload",
43 "AsyncIterablePayload",
44)
46TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB
47_CLOSE_FUTURES: set[asyncio.Future[None]] = set()
50class LookupError(Exception):
51 """Raised when no payload factory is found for the given data type."""
54class Order(str, enum.Enum):
55 normal = "normal"
56 try_first = "try_first"
57 try_last = "try_last"
60def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload":
61 return PAYLOAD_REGISTRY.get(data, *args, **kwargs)
64def register_payload(
65 factory: type["Payload"], type: Any, *, order: Order = Order.normal
66) -> None:
67 PAYLOAD_REGISTRY.register(factory, type, order=order)
70class payload_type:
71 def __init__(self, type: Any, *, order: Order = Order.normal) -> None:
72 self.type = type
73 self.order = order
75 def __call__(self, factory: type["Payload"]) -> type["Payload"]:
76 register_payload(factory, self.type, order=self.order)
77 return factory
80PayloadType = type["Payload"]
81_PayloadRegistryItem = tuple[PayloadType, Any]
84class PayloadRegistry:
85 """Payload registry.
87 note: we need zope.interface for more efficient adapter search
88 """
90 __slots__ = ("_first", "_normal", "_last", "_normal_lookup")
92 def __init__(self) -> None:
93 self._first: list[_PayloadRegistryItem] = []
94 self._normal: list[_PayloadRegistryItem] = []
95 self._last: list[_PayloadRegistryItem] = []
96 self._normal_lookup: dict[Any, PayloadType] = {}
98 def get(
99 self,
100 data: Any,
101 *args: Any,
102 _CHAIN: "type[chain[_PayloadRegistryItem]]" = chain,
103 **kwargs: Any,
104 ) -> "Payload":
105 if self._first:
106 for factory, type_ in self._first:
107 if isinstance(data, type_):
108 return factory(data, *args, **kwargs)
109 # Try the fast lookup first
110 if lookup_factory := self._normal_lookup.get(type(data)):
111 return lookup_factory(data, *args, **kwargs)
112 # Bail early if its already a Payload
113 if isinstance(data, Payload):
114 return data
115 # Fallback to the slower linear search
116 for factory, type_ in _CHAIN(self._normal, self._last):
117 if isinstance(data, type_):
118 return factory(data, *args, **kwargs)
119 raise LookupError()
121 def register(
122 self, factory: PayloadType, type: Any, *, order: Order = Order.normal
123 ) -> None:
124 if order is Order.try_first:
125 self._first.append((factory, type))
126 elif order is Order.normal:
127 self._normal.append((factory, type))
128 if isinstance(type, Iterable):
129 for t in type:
130 self._normal_lookup[t] = factory
131 else:
132 self._normal_lookup[type] = factory
133 elif order is Order.try_last:
134 self._last.append((factory, type))
135 else:
136 raise ValueError(f"Unsupported order {order!r}")
139class Payload(ABC):
140 _default_content_type: str = "application/octet-stream"
141 _size: int | None = None
142 _consumed: bool = False # Default: payload has not been consumed yet
143 _autoclose: bool = False # Default: assume resource needs explicit closing
145 def __init__(
146 self,
147 value: Any,
148 headers: (
149 CIMultiDict[str] | dict[str, str] | Iterable[tuple[str, str]] | None
150 ) = None,
151 content_type: None | str | _SENTINEL = sentinel,
152 filename: str | None = None,
153 encoding: str | None = None,
154 **kwargs: Any,
155 ) -> None:
156 self._encoding = encoding
157 self._filename = filename
158 self._headers = CIMultiDict[str]()
159 self._value = value
160 if content_type is not sentinel and content_type is not None:
161 assert isinstance(content_type, str)
162 self._headers[hdrs.CONTENT_TYPE] = content_type
163 elif self._filename is not None:
164 if sys.version_info >= (3, 13):
165 guesser = mimetypes.guess_file_type
166 else:
167 guesser = mimetypes.guess_type
168 content_type = guesser(self._filename)[0]
169 if content_type is None:
170 content_type = self._default_content_type
171 self._headers[hdrs.CONTENT_TYPE] = content_type
172 else:
173 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type
174 if headers:
175 self._headers.update(headers)
177 @property
178 def size(self) -> int | None:
179 """Size of the payload in bytes.
181 Returns the number of bytes that will be transmitted when the payload
182 is written. For string payloads, this is the size after encoding to bytes,
183 not the length of the string.
184 """
185 return self._size
187 @property
188 def filename(self) -> str | None:
189 """Filename of the payload."""
190 return self._filename
192 @property
193 def headers(self) -> CIMultiDict[str]:
194 """Custom item headers"""
195 return self._headers
197 @property
198 def _binary_headers(self) -> bytes:
199 return (
200 "".join([k + ": " + v + "\r\n" for k, v in self.headers.items()]).encode(
201 "utf-8"
202 )
203 + b"\r\n"
204 )
206 @property
207 def encoding(self) -> str | None:
208 """Payload encoding"""
209 return self._encoding
211 @property
212 def content_type(self) -> str:
213 """Content type"""
214 return self._headers[hdrs.CONTENT_TYPE]
216 @property
217 def consumed(self) -> bool:
218 """Whether the payload has been consumed and cannot be reused."""
219 return self._consumed
221 @property
222 def autoclose(self) -> bool:
223 """
224 Whether the payload can close itself automatically.
226 Returns True if the payload has no file handles or resources that need
227 explicit closing. If False, callers must await close() to release resources.
228 """
229 return self._autoclose
231 def set_content_disposition(
232 self,
233 disptype: str,
234 quote_fields: bool = True,
235 _charset: str = "utf-8",
236 **params: str,
237 ) -> None:
238 """Sets ``Content-Disposition`` header."""
239 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header(
240 disptype, quote_fields=quote_fields, _charset=_charset, params=params
241 )
243 @abstractmethod
244 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
245 """
246 Return string representation of the value.
248 This is named decode() to allow compatibility with bytes objects.
249 """
251 @abstractmethod
252 async def write(self, writer: AbstractStreamWriter) -> None:
253 """
254 Write payload to the writer stream.
256 Args:
257 writer: An AbstractStreamWriter instance that handles the actual writing
259 This is a legacy method that writes the entire payload without length constraints.
261 Important:
262 For new implementations, use write_with_length() instead of this method.
263 This method is maintained for backwards compatibility and will eventually
264 delegate to write_with_length(writer, None) in all implementations.
266 All payload subclasses must override this method for backwards compatibility,
267 but new code should use write_with_length for more flexibility and control.
269 """
271 # write_with_length is new in aiohttp 3.12
272 # it should be overridden by subclasses
273 async def write_with_length(
274 self, writer: AbstractStreamWriter, content_length: int | None
275 ) -> None:
276 """
277 Write payload with a specific content length constraint.
279 Args:
280 writer: An AbstractStreamWriter instance that handles the actual writing
281 content_length: Maximum number of bytes to write (None for unlimited)
283 This method allows writing payload content with a specific length constraint,
284 which is particularly useful for HTTP responses with Content-Length header.
286 Note:
287 This is the base implementation that provides backwards compatibility
288 for subclasses that don't override this method. Specific payload types
289 should override this method to implement proper length-constrained writing.
291 """
292 # Backwards compatibility for subclasses that don't override this method
293 # and for the default implementation
294 await self.write(writer)
296 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
297 """
298 Return bytes representation of the value.
300 This is a convenience method that calls decode() and encodes the result
301 to bytes using the specified encoding.
302 """
303 # Use instance encoding if available, otherwise use parameter
304 actual_encoding = self._encoding or encoding
305 return self.decode(actual_encoding, errors).encode(actual_encoding)
307 def _close(self) -> None:
308 """
309 Async safe synchronous close operations for backwards compatibility.
311 This method exists only for backwards compatibility with code that
312 needs to clean up payloads synchronously. In the future, we will
313 drop this method and only support the async close() method.
315 WARNING: This method must be safe to call from within the event loop
316 without blocking. Subclasses should not perform any blocking I/O here.
318 WARNING: This method must be called from within an event loop for
319 certain payload types (e.g., IOBasePayload). Calling it outside an
320 event loop may raise RuntimeError.
321 """
322 # This is a no-op by default, but subclasses can override it
323 # for non-blocking cleanup operations.
325 async def close(self) -> None:
326 """
327 Close the payload if it holds any resources.
329 IMPORTANT: This method must not await anything that might not finish
330 immediately, as it may be called during cleanup/cancellation. Schedule
331 any long-running operations without awaiting them.
333 In the future, this will be the only close method supported.
334 """
335 self._close()
338class BytesPayload(Payload):
339 _value: bytes
340 # _consumed = False (inherited) - Bytes are immutable and can be reused
341 _autoclose = True # No file handle, just bytes in memory
343 def __init__(
344 self, value: bytes | bytearray | memoryview, *args: Any, **kwargs: Any
345 ) -> None:
346 if "content_type" not in kwargs:
347 kwargs["content_type"] = "application/octet-stream"
349 super().__init__(value, *args, **kwargs)
351 if isinstance(value, memoryview):
352 self._size = value.nbytes
353 elif isinstance(value, (bytes, bytearray)):
354 self._size = len(value)
355 else:
356 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}")
358 if self._size > TOO_LARGE_BYTES_BODY:
359 warnings.warn(
360 "Sending a large body directly with raw bytes might"
361 " lock the event loop. You should probably pass an "
362 "io.BytesIO object instead",
363 ResourceWarning,
364 source=self,
365 )
367 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
368 return self._value.decode(encoding, errors)
370 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
371 """
372 Return bytes representation of the value.
374 This method returns the raw bytes content of the payload.
375 It is equivalent to accessing the _value attribute directly.
376 """
377 return self._value
379 async def write(self, writer: AbstractStreamWriter) -> None:
380 """
381 Write the entire bytes payload to the writer stream.
383 Args:
384 writer: An AbstractStreamWriter instance that handles the actual writing
386 This method writes the entire bytes content without any length constraint.
388 Note:
389 For new implementations that need length control, use write_with_length().
390 This method is maintained for backwards compatibility and is equivalent
391 to write_with_length(writer, None).
393 """
394 await writer.write(self._value)
396 async def write_with_length(
397 self, writer: AbstractStreamWriter, content_length: int | None
398 ) -> None:
399 """
400 Write bytes payload with a specific content length constraint.
402 Args:
403 writer: An AbstractStreamWriter instance that handles the actual writing
404 content_length: Maximum number of bytes to write (None for unlimited)
406 This method writes either the entire byte sequence or a slice of it
407 up to the specified content_length. For BytesPayload, this operation
408 is performed efficiently using array slicing.
410 """
411 if content_length is not None:
412 await writer.write(self._value[:content_length])
413 else:
414 await writer.write(self._value)
417class StringPayload(BytesPayload):
418 def __init__(
419 self,
420 value: str,
421 *args: Any,
422 encoding: str | None = None,
423 content_type: str | None = None,
424 **kwargs: Any,
425 ) -> None:
426 if encoding is None:
427 if content_type is None:
428 real_encoding = "utf-8"
429 content_type = "text/plain; charset=utf-8"
430 else:
431 mimetype = parse_mimetype(content_type)
432 real_encoding = mimetype.parameters.get("charset", "utf-8")
433 else:
434 if content_type is None:
435 content_type = "text/plain; charset=%s" % encoding
436 real_encoding = encoding
438 super().__init__(
439 value.encode(real_encoding),
440 encoding=real_encoding,
441 content_type=content_type,
442 *args,
443 **kwargs,
444 )
447class StringIOPayload(StringPayload):
448 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None:
449 super().__init__(value.read(), *args, **kwargs)
452class IOBasePayload(Payload):
453 _value: io.IOBase
454 # _consumed = False (inherited) - File can be re-read from the same position
455 _start_position: int | None = None
456 # _autoclose = False (inherited) - Has file handle that needs explicit closing
458 def __init__(
459 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any
460 ) -> None:
461 if "filename" not in kwargs:
462 kwargs["filename"] = guess_filename(value)
464 super().__init__(value, *args, **kwargs)
466 if self._filename is not None and disposition is not None:
467 if hdrs.CONTENT_DISPOSITION not in self.headers:
468 self.set_content_disposition(disposition, filename=self._filename)
470 def _set_or_restore_start_position(self) -> None:
471 """Set or restore the start position of the file-like object."""
472 if self._start_position is None:
473 try:
474 self._start_position = self._value.tell()
475 except (OSError, AttributeError):
476 self._consumed = True # Cannot seek, mark as consumed
477 return
478 try:
479 self._value.seek(self._start_position)
480 except (OSError, AttributeError):
481 # Failed to seek back - mark as consumed since we've already read
482 self._consumed = True
484 def _read_and_available_len(
485 self, remaining_content_len: int | None
486 ) -> tuple[int | None, bytes]:
487 """
488 Read the file-like object and return both its total size and the first chunk.
490 Args:
491 remaining_content_len: Optional limit on how many bytes to read in this operation.
492 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size.
494 Returns:
495 A tuple containing:
496 - The total size of the remaining unread content (None if size cannot be determined)
497 - The first chunk of bytes read from the file object
499 This method is optimized to perform both size calculation and initial read
500 in a single operation, which is executed in a single executor job to minimize
501 context switches and file operations when streaming content.
503 """
504 self._set_or_restore_start_position()
505 size = self.size # Call size only once since it does I/O
506 return size, self._value.read(
507 min(
508 DEFAULT_CHUNK_SIZE,
509 size or DEFAULT_CHUNK_SIZE,
510 remaining_content_len or DEFAULT_CHUNK_SIZE,
511 )
512 )
514 def _read(self, remaining_content_len: int | None) -> bytes:
515 """
516 Read a chunk of data from the file-like object.
518 Args:
519 remaining_content_len: Optional maximum number of bytes to read.
520 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size.
522 Returns:
523 A chunk of bytes read from the file object, respecting the
524 remaining_content_len limit if specified.
526 This method is used for subsequent reads during streaming after
527 the initial _read_and_available_len call has been made.
529 """
530 return self._value.read(remaining_content_len or DEFAULT_CHUNK_SIZE) # type: ignore[no-any-return]
532 @property
533 def size(self) -> int | None:
534 """
535 Size of the payload in bytes.
537 Returns the total size of the payload content from the initial position.
538 This ensures consistent Content-Length for requests, including 307/308 redirects
539 where the same payload instance is reused.
541 Returns None if the size cannot be determined (e.g., for unseekable streams).
542 """
543 try:
544 # Store the start position on first access.
545 # This is critical when the same payload instance is reused (e.g., 307/308
546 # redirects). Without storing the initial position, after the payload is
547 # read once, the file position would be at EOF, which would cause the
548 # size calculation to return 0 (file_size - EOF position).
549 # By storing the start position, we ensure the size calculation always
550 # returns the correct total size for any subsequent use.
551 if self._start_position is None:
552 self._start_position = self._value.tell()
554 # Return the total size from the start position
555 # This ensures Content-Length is correct even after reading
556 return os.fstat(self._value.fileno()).st_size - self._start_position
557 except (AttributeError, OSError):
558 return None
560 async def write(self, writer: AbstractStreamWriter) -> None:
561 """
562 Write the entire file-like payload to the writer stream.
564 Args:
565 writer: An AbstractStreamWriter instance that handles the actual writing
567 This method writes the entire file content without any length constraint.
568 It delegates to write_with_length() with no length limit for implementation
569 consistency.
571 Note:
572 For new implementations that need length control, use write_with_length() directly.
573 This method is maintained for backwards compatibility with existing code.
575 """
576 await self.write_with_length(writer, None)
578 async def write_with_length(
579 self, writer: AbstractStreamWriter, content_length: int | None
580 ) -> None:
581 """
582 Write file-like payload with a specific content length constraint.
584 Args:
585 writer: An AbstractStreamWriter instance that handles the actual writing
586 content_length: Maximum number of bytes to write (None for unlimited)
588 This method implements optimized streaming of file content with length constraints:
590 1. File reading is performed in a thread pool to avoid blocking the event loop
591 2. Content is read and written in chunks to maintain memory efficiency
592 3. Writing stops when either:
593 - All available file content has been written (when size is known)
594 - The specified content_length has been reached
595 4. File resources are properly closed even if the operation is cancelled
597 The implementation carefully handles both known-size and unknown-size payloads,
598 as well as constrained and unconstrained content lengths.
600 """
601 loop = asyncio.get_running_loop()
602 total_written_len = 0
603 remaining_content_len = content_length
605 # Get initial data and available length
606 available_len, chunk = await loop.run_in_executor(
607 None, self._read_and_available_len, remaining_content_len
608 )
609 # Process data chunks until done
610 while chunk:
611 chunk_len = len(chunk)
613 # Write data with or without length constraint
614 if remaining_content_len is None:
615 await writer.write(chunk)
616 else:
617 await writer.write(chunk[:remaining_content_len])
618 remaining_content_len -= chunk_len
620 total_written_len += chunk_len
622 # Check if we're done writing
623 if self._should_stop_writing(
624 available_len, total_written_len, remaining_content_len
625 ):
626 return
628 # Read next chunk
629 chunk = await loop.run_in_executor(
630 None,
631 self._read,
632 (
633 min(DEFAULT_CHUNK_SIZE, remaining_content_len)
634 if remaining_content_len is not None
635 else DEFAULT_CHUNK_SIZE
636 ),
637 )
639 def _should_stop_writing(
640 self,
641 available_len: int | None,
642 total_written_len: int,
643 remaining_content_len: int | None,
644 ) -> bool:
645 """
646 Determine if we should stop writing data.
648 Args:
649 available_len: Known size of the payload if available (None if unknown)
650 total_written_len: Number of bytes already written
651 remaining_content_len: Remaining bytes to be written for content-length limited responses
653 Returns:
654 True if we should stop writing data, based on either:
655 - Having written all available data (when size is known)
656 - Having written all requested content (when content-length is specified)
658 """
659 return (available_len is not None and total_written_len >= available_len) or (
660 remaining_content_len is not None and remaining_content_len <= 0
661 )
663 def _close(self) -> None:
664 """
665 Async safe synchronous close operations for backwards compatibility.
667 This method exists only for backwards
668 compatibility. Use the async close() method instead.
670 WARNING: This method MUST be called from within an event loop.
671 Calling it outside an event loop will raise RuntimeError.
672 """
673 # Skip if already consumed
674 if self._consumed:
675 return
676 self._consumed = True # Mark as consumed to prevent further writes
677 # Schedule file closing without awaiting to prevent cancellation issues
678 loop = asyncio.get_running_loop()
679 close_future = loop.run_in_executor(None, self._value.close)
680 # Hold a strong reference to the future to prevent it from being
681 # garbage collected before it completes.
682 _CLOSE_FUTURES.add(close_future)
683 close_future.add_done_callback(_CLOSE_FUTURES.remove)
685 async def close(self) -> None:
686 """
687 Close the payload if it holds any resources.
689 IMPORTANT: This method must not await anything that might not finish
690 immediately, as it may be called during cleanup/cancellation. Schedule
691 any long-running operations without awaiting them.
692 """
693 self._close()
695 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
696 """
697 Return string representation of the value.
699 WARNING: This method does blocking I/O and should not be called in the event loop.
700 """
701 return self._read_all().decode(encoding, errors)
703 def _read_all(self) -> bytes:
704 """Read the entire file-like object and return its content as bytes."""
705 self._set_or_restore_start_position()
706 # Use readlines() to ensure we get all content
707 return b"".join(self._value.readlines())
709 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
710 """
711 Return bytes representation of the value.
713 This method reads the entire file content and returns it as bytes.
714 It is equivalent to reading the file-like object directly.
715 The file reading is performed in an executor to avoid blocking the event loop.
716 """
717 loop = asyncio.get_running_loop()
718 return await loop.run_in_executor(None, self._read_all)
721class TextIOPayload(IOBasePayload):
722 _value: io.TextIOBase
723 # _autoclose = False (inherited) - Has text file handle that needs explicit closing
725 def __init__(
726 self,
727 value: TextIO,
728 *args: Any,
729 encoding: str | None = None,
730 content_type: str | None = None,
731 **kwargs: Any,
732 ) -> None:
733 if encoding is None:
734 if content_type is None:
735 encoding = "utf-8"
736 content_type = "text/plain; charset=utf-8"
737 else:
738 mimetype = parse_mimetype(content_type)
739 encoding = mimetype.parameters.get("charset", "utf-8")
740 else:
741 if content_type is None:
742 content_type = "text/plain; charset=%s" % encoding
744 super().__init__(
745 value,
746 content_type=content_type,
747 encoding=encoding,
748 *args,
749 **kwargs,
750 )
752 def _read_and_available_len(
753 self, remaining_content_len: int | None
754 ) -> tuple[int | None, bytes]:
755 """
756 Read the text file-like object and return both its total size and the first chunk.
758 Args:
759 remaining_content_len: Optional limit on how many bytes to read in this operation.
760 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size.
762 Returns:
763 A tuple containing:
764 - The total size of the remaining unread content (None if size cannot be determined)
765 - The first chunk of bytes read from the file object, encoded using the payload's encoding
767 This method is optimized to perform both size calculation and initial read
768 in a single operation, which is executed in a single executor job to minimize
769 context switches and file operations when streaming content.
771 Note:
772 TextIOPayload handles encoding of the text content before writing it
773 to the stream. If no encoding is specified, UTF-8 is used as the default.
775 """
776 self._set_or_restore_start_position()
777 size = self.size
778 chunk = self._value.read(
779 min(
780 DEFAULT_CHUNK_SIZE,
781 size or DEFAULT_CHUNK_SIZE,
782 remaining_content_len or DEFAULT_CHUNK_SIZE,
783 )
784 )
785 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode()
787 def _read(self, remaining_content_len: int | None) -> bytes:
788 """
789 Read a chunk of data from the text file-like object.
791 Args:
792 remaining_content_len: Optional maximum number of bytes to read.
793 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size.
795 Returns:
796 A chunk of bytes read from the file object and encoded using the payload's
797 encoding. The data is automatically converted from text to bytes.
799 This method is used for subsequent reads during streaming after
800 the initial _read_and_available_len call has been made. It properly
801 handles text encoding, converting the text content to bytes using
802 the specified encoding (or UTF-8 if none was provided).
804 """
805 chunk = self._value.read(remaining_content_len or DEFAULT_CHUNK_SIZE)
806 return chunk.encode(self._encoding) if self._encoding else chunk.encode()
808 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
809 """
810 Return string representation of the value.
812 WARNING: This method does blocking I/O and should not be called in the event loop.
813 """
814 self._set_or_restore_start_position()
815 return self._value.read()
817 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
818 """
819 Return bytes representation of the value.
821 This method reads the entire text file content and returns it as bytes.
822 It encodes the text content using the specified encoding.
823 The file reading is performed in an executor to avoid blocking the event loop.
824 """
825 loop = asyncio.get_running_loop()
827 # Use instance encoding if available, otherwise use parameter
828 actual_encoding = self._encoding or encoding
830 def _read_and_encode() -> bytes:
831 self._set_or_restore_start_position()
832 # TextIO read() always returns the full content
833 return self._value.read().encode(actual_encoding, errors)
835 return await loop.run_in_executor(None, _read_and_encode)
838class BytesIOPayload(IOBasePayload):
839 _value: io.BytesIO
840 _size: int # Always initialized in __init__
841 _autoclose = True # BytesIO is in-memory, safe to auto-close
843 def __init__(self, value: io.BytesIO, *args: Any, **kwargs: Any) -> None:
844 super().__init__(value, *args, **kwargs)
845 # Calculate size once during initialization
846 self._size = len(self._value.getbuffer()) - self._value.tell()
848 @property
849 def size(self) -> int:
850 """Size of the payload in bytes.
852 Returns the number of bytes in the BytesIO buffer that will be transmitted.
853 This is calculated once during initialization for efficiency.
854 """
855 return self._size
857 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
858 self._set_or_restore_start_position()
859 return self._value.read().decode(encoding, errors)
861 async def write(self, writer: AbstractStreamWriter) -> None:
862 return await self.write_with_length(writer, None)
864 async def write_with_length(
865 self, writer: AbstractStreamWriter, content_length: int | None
866 ) -> None:
867 """
868 Write BytesIO payload with a specific content length constraint.
870 Args:
871 writer: An AbstractStreamWriter instance that handles the actual writing
872 content_length: Maximum number of bytes to write (None for unlimited)
874 This implementation is specifically optimized for BytesIO objects:
876 1. Reads content in chunks to maintain memory efficiency
877 2. Yields control back to the event loop periodically to prevent blocking
878 when dealing with large BytesIO objects
879 3. Respects content_length constraints when specified
880 4. Properly cleans up by closing the BytesIO object when done or on error
882 The periodic yielding to the event loop is important for maintaining
883 responsiveness when processing large in-memory buffers.
885 """
886 self._set_or_restore_start_position()
887 loop_count = 0
888 remaining_bytes = content_length
889 while chunk := self._value.read(DEFAULT_CHUNK_SIZE):
890 if loop_count > 0:
891 # Avoid blocking the event loop
892 # if they pass a large BytesIO object
893 # and we are not in the first iteration
894 # of the loop
895 await asyncio.sleep(0)
896 if remaining_bytes is None:
897 await writer.write(chunk)
898 else:
899 await writer.write(chunk[:remaining_bytes])
900 remaining_bytes -= len(chunk)
901 if remaining_bytes <= 0:
902 return
903 loop_count += 1
905 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
906 """
907 Return bytes representation of the value.
909 This method reads the entire BytesIO content and returns it as bytes.
910 It is equivalent to accessing the _value attribute directly.
911 """
912 self._set_or_restore_start_position()
913 return self._value.read()
915 async def close(self) -> None:
916 """
917 Close the BytesIO payload.
919 This does nothing since BytesIO is in-memory and does not require explicit closing.
920 """
923class BufferedReaderPayload(IOBasePayload):
924 _value: io.BufferedIOBase
925 # _autoclose = False (inherited) - Has buffered file handle that needs explicit closing
927 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
928 self._set_or_restore_start_position()
929 return self._value.read().decode(encoding, errors)
932class JsonPayload(BytesPayload):
933 def __init__(
934 self,
935 value: Any,
936 encoding: str = "utf-8",
937 content_type: str = "application/json",
938 dumps: JSONEncoder = json.dumps,
939 *args: Any,
940 **kwargs: Any,
941 ) -> None:
942 super().__init__(
943 dumps(value).encode(encoding),
944 content_type=content_type,
945 encoding=encoding,
946 *args,
947 **kwargs,
948 )
951class JsonBytesPayload(BytesPayload):
952 """JSON payload for encoders that return bytes directly.
954 Use this when your JSON encoder (like orjson) returns bytes
955 instead of str, avoiding the encode/decode overhead.
956 """
958 def __init__(
959 self,
960 value: Any,
961 dumps: JSONBytesEncoder,
962 content_type: str = "application/json",
963 *args: Any,
964 **kwargs: Any,
965 ) -> None:
966 super().__init__(
967 dumps(value),
968 content_type=content_type,
969 *args,
970 **kwargs,
971 )
974class AsyncIterablePayload(Payload):
975 _iter: AsyncIterator[bytes] | None = None
976 _value: AsyncIterable[bytes]
977 _cached_chunks: list[bytes] | None = None
978 # _consumed stays False to allow reuse with cached content
979 _autoclose = True # Iterator doesn't need explicit closing
981 def __init__(self, value: AsyncIterable[bytes], *args: Any, **kwargs: Any) -> None:
982 if not isinstance(value, AsyncIterable):
983 raise TypeError(
984 "value argument must support "
985 "collections.abc.AsyncIterable interface, "
986 f"got {type(value)!r}"
987 )
989 if "content_type" not in kwargs:
990 kwargs["content_type"] = "application/octet-stream"
992 super().__init__(value, *args, **kwargs)
994 self._iter = value.__aiter__()
996 async def write(self, writer: AbstractStreamWriter) -> None:
997 """
998 Write the entire async iterable payload to the writer stream.
1000 Args:
1001 writer: An AbstractStreamWriter instance that handles the actual writing
1003 This method iterates through the async iterable and writes each chunk
1004 to the writer without any length constraint.
1006 Note:
1007 For new implementations that need length control, use write_with_length() directly.
1008 This method is maintained for backwards compatibility with existing code.
1010 """
1011 await self.write_with_length(writer, None)
1013 async def write_with_length(
1014 self, writer: AbstractStreamWriter, content_length: int | None
1015 ) -> None:
1016 """
1017 Write async iterable payload with a specific content length constraint.
1019 Args:
1020 writer: An AbstractStreamWriter instance that handles the actual writing
1021 content_length: Maximum number of bytes to write (None for unlimited)
1023 This implementation handles streaming of async iterable content with length constraints:
1025 1. If cached chunks are available, writes from them
1026 2. Otherwise iterates through the async iterable one chunk at a time
1027 3. Respects content_length constraints when specified
1028 4. Does NOT generate cache - that's done by as_bytes()
1030 """
1031 # If we have cached chunks, use them
1032 if self._cached_chunks is not None:
1033 remaining_bytes = content_length
1034 for chunk in self._cached_chunks:
1035 if remaining_bytes is None:
1036 await writer.write(chunk)
1037 elif remaining_bytes > 0:
1038 await writer.write(chunk[:remaining_bytes])
1039 remaining_bytes -= len(chunk)
1040 else:
1041 break
1042 return
1044 # If iterator is exhausted and we don't have cached chunks, nothing to write
1045 if self._iter is None:
1046 return
1048 # Stream from the iterator
1049 remaining_bytes = content_length
1051 try:
1052 while True:
1053 chunk = await anext(self._iter)
1054 if remaining_bytes is None:
1055 await writer.write(chunk)
1056 # If we have a content length limit
1057 elif remaining_bytes > 0:
1058 await writer.write(chunk[:remaining_bytes])
1059 remaining_bytes -= len(chunk)
1060 # We still want to exhaust the iterator even
1061 # if we have reached the content length limit
1062 # since the file handle may not get closed by
1063 # the iterator if we don't do this
1064 except StopAsyncIteration:
1065 # Iterator is exhausted
1066 self._iter = None
1067 self._consumed = True # Mark as consumed when streamed without caching
1069 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
1070 """Decode the payload content as a string if cached chunks are available."""
1071 if self._cached_chunks is not None:
1072 return b"".join(self._cached_chunks).decode(encoding, errors)
1073 raise TypeError("Unable to decode - content not cached. Call as_bytes() first.")
1075 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
1076 """
1077 Return bytes representation of the value.
1079 This method reads the entire async iterable content and returns it as bytes.
1080 It generates and caches the chunks for future reuse.
1081 """
1082 # If we have cached chunks, return them joined
1083 if self._cached_chunks is not None:
1084 return b"".join(self._cached_chunks)
1086 # If iterator is exhausted and no cache, return empty
1087 if self._iter is None:
1088 return b""
1090 # Read all chunks and cache them
1091 chunks: list[bytes] = []
1092 async for chunk in self._iter:
1093 chunks.append(chunk)
1095 # Iterator is exhausted, cache the chunks
1096 self._iter = None
1097 self._cached_chunks = chunks
1098 # Keep _consumed as False to allow reuse with cached chunks
1100 return b"".join(chunks)
1103class StreamReaderPayload(AsyncIterablePayload):
1104 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None:
1105 super().__init__(value.iter_any(), *args, **kwargs)
1108PAYLOAD_REGISTRY = PayloadRegistry()
1109PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview))
1110PAYLOAD_REGISTRY.register(StringPayload, str)
1111PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO)
1112PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase)
1113PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO)
1114PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom))
1115PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase)
1116PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader)
1117# try_last for giving a chance to more specialized async interables like
1118# multipart.BodyPartReaderPayload override the default
1119PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)