Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/payload.py: 50%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import enum
3import io
4import json
5import mimetypes
6import os
7import sys
8import warnings
9from abc import ABC, abstractmethod
10from collections.abc import AsyncIterable, AsyncIterator, Iterable
11from itertools import chain
12from typing import IO, Any, Final, TextIO
14from multidict import CIMultiDict
16from . import hdrs
17from .abc import AbstractStreamWriter
18from .helpers import (
19 _SENTINEL,
20 DEFAULT_CHUNK_SIZE,
21 content_disposition_header,
22 guess_filename,
23 parse_mimetype,
24 sentinel,
25)
26from .http_writer import _safe_header
27from .streams import StreamReader
28from .typedefs import JSONBytesEncoder, JSONEncoder
30__all__ = (
31 "PAYLOAD_REGISTRY",
32 "get_payload",
33 "payload_type",
34 "Payload",
35 "BytesPayload",
36 "StringPayload",
37 "IOBasePayload",
38 "BytesIOPayload",
39 "BufferedReaderPayload",
40 "TextIOPayload",
41 "StringIOPayload",
42 "JsonPayload",
43 "JsonBytesPayload",
44 "AsyncIterablePayload",
45)
47TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB
48_CLOSE_FUTURES: set[asyncio.Future[None]] = set()
51class LookupError(Exception):
52 """Raised when no payload factory is found for the given data type."""
55class Order(str, enum.Enum):
56 normal = "normal"
57 try_first = "try_first"
58 try_last = "try_last"
61def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload":
62 return PAYLOAD_REGISTRY.get(data, *args, **kwargs)
65def register_payload(
66 factory: type["Payload"], type: Any, *, order: Order = Order.normal
67) -> None:
68 PAYLOAD_REGISTRY.register(factory, type, order=order)
71class payload_type:
72 def __init__(self, type: Any, *, order: Order = Order.normal) -> None:
73 self.type = type
74 self.order = order
76 def __call__(self, factory: type["Payload"]) -> type["Payload"]:
77 register_payload(factory, self.type, order=self.order)
78 return factory
81PayloadType = type["Payload"]
82_PayloadRegistryItem = tuple[PayloadType, Any]
85class PayloadRegistry:
86 """Payload registry.
88 note: we need zope.interface for more efficient adapter search
89 """
91 __slots__ = ("_first", "_normal", "_last", "_normal_lookup")
93 def __init__(self) -> None:
94 self._first: list[_PayloadRegistryItem] = []
95 self._normal: list[_PayloadRegistryItem] = []
96 self._last: list[_PayloadRegistryItem] = []
97 self._normal_lookup: dict[Any, PayloadType] = {}
99 def get(
100 self,
101 data: Any,
102 *args: Any,
103 _CHAIN: "type[chain[_PayloadRegistryItem]]" = chain,
104 **kwargs: Any,
105 ) -> "Payload":
106 if self._first:
107 for factory, type_ in self._first:
108 if isinstance(data, type_):
109 return factory(data, *args, **kwargs)
110 # Try the fast lookup first
111 if lookup_factory := self._normal_lookup.get(type(data)):
112 return lookup_factory(data, *args, **kwargs)
113 # Bail early if its already a Payload
114 if isinstance(data, Payload):
115 return data
116 # Fallback to the slower linear search
117 for factory, type_ in _CHAIN(self._normal, self._last):
118 if isinstance(data, type_):
119 return factory(data, *args, **kwargs)
120 raise LookupError()
122 def register(
123 self, factory: PayloadType, type: Any, *, order: Order = Order.normal
124 ) -> None:
125 if order is Order.try_first:
126 self._first.append((factory, type))
127 elif order is Order.normal:
128 self._normal.append((factory, type))
129 if isinstance(type, Iterable):
130 for t in type:
131 self._normal_lookup[t] = factory
132 else:
133 self._normal_lookup[type] = factory
134 elif order is Order.try_last:
135 self._last.append((factory, type))
136 else:
137 raise ValueError(f"Unsupported order {order!r}")
140class Payload(ABC):
141 _default_content_type: str = "application/octet-stream"
142 _size: int | None = None
143 _consumed: bool = False # Default: payload has not been consumed yet
144 _autoclose: bool = False # Default: assume resource needs explicit closing
146 def __init__(
147 self,
148 value: Any,
149 headers: (
150 CIMultiDict[str] | dict[str, str] | Iterable[tuple[str, str]] | None
151 ) = None,
152 content_type: None | str | _SENTINEL = sentinel,
153 filename: str | None = None,
154 encoding: str | None = None,
155 **kwargs: Any,
156 ) -> None:
157 self._encoding = encoding
158 self._filename = filename
159 self._headers = CIMultiDict[str]()
160 self._value = value
161 if content_type is not sentinel and content_type is not None:
162 assert isinstance(content_type, str)
163 self._headers[hdrs.CONTENT_TYPE] = content_type
164 elif self._filename is not None:
165 if sys.version_info >= (3, 13):
166 guesser = mimetypes.guess_file_type
167 else:
168 guesser = mimetypes.guess_type
169 content_type = guesser(self._filename)[0]
170 if content_type is None:
171 content_type = self._default_content_type
172 self._headers[hdrs.CONTENT_TYPE] = content_type
173 else:
174 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type
175 if headers:
176 self._headers.update(headers)
178 @property
179 def size(self) -> int | None:
180 """Size of the payload in bytes.
182 Returns the number of bytes that will be transmitted when the payload
183 is written. For string payloads, this is the size after encoding to bytes,
184 not the length of the string.
185 """
186 return self._size
188 @property
189 def filename(self) -> str | None:
190 """Filename of the payload."""
191 return self._filename
193 @property
194 def headers(self) -> CIMultiDict[str]:
195 """Custom item headers"""
196 return self._headers
198 @property
199 def _binary_headers(self) -> bytes:
200 return (
201 "".join(
202 _safe_header(k) + ": " + _safe_header(v) + "\r\n"
203 for k, v in self.headers.items()
204 ).encode("utf-8")
205 + b"\r\n"
206 )
208 @property
209 def encoding(self) -> str | None:
210 """Payload encoding"""
211 return self._encoding
213 @property
214 def content_type(self) -> str:
215 """Content type"""
216 return self._headers[hdrs.CONTENT_TYPE]
218 @property
219 def consumed(self) -> bool:
220 """Whether the payload has been consumed and cannot be reused."""
221 return self._consumed
223 @property
224 def autoclose(self) -> bool:
225 """
226 Whether the payload can close itself automatically.
228 Returns True if the payload has no file handles or resources that need
229 explicit closing. If False, callers must await close() to release resources.
230 """
231 return self._autoclose
233 def set_content_disposition(
234 self,
235 disptype: str,
236 quote_fields: bool = True,
237 _charset: str = "utf-8",
238 **params: str,
239 ) -> None:
240 """Sets ``Content-Disposition`` header."""
241 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header(
242 disptype, quote_fields=quote_fields, _charset=_charset, params=params
243 )
245 @abstractmethod
246 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
247 """
248 Return string representation of the value.
250 This is named decode() to allow compatibility with bytes objects.
251 """
253 @abstractmethod
254 async def write(self, writer: AbstractStreamWriter) -> None:
255 """
256 Write payload to the writer stream.
258 Args:
259 writer: An AbstractStreamWriter instance that handles the actual writing
261 This is a legacy method that writes the entire payload without length constraints.
263 Important:
264 For new implementations, use write_with_length() instead of this method.
265 This method is maintained for backwards compatibility and will eventually
266 delegate to write_with_length(writer, None) in all implementations.
268 All payload subclasses must override this method for backwards compatibility,
269 but new code should use write_with_length for more flexibility and control.
271 """
273 # write_with_length is new in aiohttp 3.12
274 # it should be overridden by subclasses
275 async def write_with_length(
276 self, writer: AbstractStreamWriter, content_length: int | None
277 ) -> None:
278 """
279 Write payload with a specific content length constraint.
281 Args:
282 writer: An AbstractStreamWriter instance that handles the actual writing
283 content_length: Maximum number of bytes to write (None for unlimited)
285 This method allows writing payload content with a specific length constraint,
286 which is particularly useful for HTTP responses with Content-Length header.
288 Note:
289 This is the base implementation that provides backwards compatibility
290 for subclasses that don't override this method. Specific payload types
291 should override this method to implement proper length-constrained writing.
293 """
294 # Backwards compatibility for subclasses that don't override this method
295 # and for the default implementation
296 await self.write(writer)
298 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
299 """
300 Return bytes representation of the value.
302 This is a convenience method that calls decode() and encodes the result
303 to bytes using the specified encoding.
304 """
305 # Use instance encoding if available, otherwise use parameter
306 actual_encoding = self._encoding or encoding
307 return self.decode(actual_encoding, errors).encode(actual_encoding)
309 def _close(self) -> None:
310 """
311 Async safe synchronous close operations for backwards compatibility.
313 This method exists only for backwards compatibility with code that
314 needs to clean up payloads synchronously. In the future, we will
315 drop this method and only support the async close() method.
317 WARNING: This method must be safe to call from within the event loop
318 without blocking. Subclasses should not perform any blocking I/O here.
320 WARNING: This method must be called from within an event loop for
321 certain payload types (e.g., IOBasePayload). Calling it outside an
322 event loop may raise RuntimeError.
323 """
324 # This is a no-op by default, but subclasses can override it
325 # for non-blocking cleanup operations.
327 async def close(self) -> None:
328 """
329 Close the payload if it holds any resources.
331 IMPORTANT: This method must not await anything that might not finish
332 immediately, as it may be called during cleanup/cancellation. Schedule
333 any long-running operations without awaiting them.
335 In the future, this will be the only close method supported.
336 """
337 self._close()
340class BytesPayload(Payload):
341 _value: bytes
342 # _consumed = False (inherited) - Bytes are immutable and can be reused
343 _autoclose = True # No file handle, just bytes in memory
345 def __init__(
346 self, value: bytes | bytearray | memoryview, *args: Any, **kwargs: Any
347 ) -> None:
348 if "content_type" not in kwargs:
349 kwargs["content_type"] = "application/octet-stream"
351 super().__init__(value, *args, **kwargs)
353 if isinstance(value, memoryview):
354 self._size = value.nbytes
355 elif isinstance(value, (bytes, bytearray)):
356 self._size = len(value)
357 else:
358 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}")
360 if self._size > TOO_LARGE_BYTES_BODY:
361 warnings.warn(
362 "Sending a large body directly with raw bytes might"
363 " lock the event loop. You should probably pass an "
364 "io.BytesIO object instead",
365 ResourceWarning,
366 source=self,
367 )
369 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
370 return self._value.decode(encoding, errors)
372 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
373 """
374 Return bytes representation of the value.
376 This method returns the raw bytes content of the payload.
377 It is equivalent to accessing the _value attribute directly.
378 """
379 return self._value
381 async def write(self, writer: AbstractStreamWriter) -> None:
382 """
383 Write the entire bytes payload to the writer stream.
385 Args:
386 writer: An AbstractStreamWriter instance that handles the actual writing
388 This method writes the entire bytes content without any length constraint.
390 Note:
391 For new implementations that need length control, use write_with_length().
392 This method is maintained for backwards compatibility and is equivalent
393 to write_with_length(writer, None).
395 """
396 await writer.write(self._value)
398 async def write_with_length(
399 self, writer: AbstractStreamWriter, content_length: int | None
400 ) -> None:
401 """
402 Write bytes payload with a specific content length constraint.
404 Args:
405 writer: An AbstractStreamWriter instance that handles the actual writing
406 content_length: Maximum number of bytes to write (None for unlimited)
408 This method writes either the entire byte sequence or a slice of it
409 up to the specified content_length. For BytesPayload, this operation
410 is performed efficiently using array slicing.
412 """
413 if content_length is not None:
414 await writer.write(self._value[:content_length])
415 else:
416 await writer.write(self._value)
419class StringPayload(BytesPayload):
420 def __init__(
421 self,
422 value: str,
423 *args: Any,
424 encoding: str | None = None,
425 content_type: str | None = None,
426 **kwargs: Any,
427 ) -> None:
428 if encoding is None:
429 if content_type is None:
430 real_encoding = "utf-8"
431 content_type = "text/plain; charset=utf-8"
432 else:
433 mimetype = parse_mimetype(content_type)
434 real_encoding = mimetype.parameters.get("charset", "utf-8")
435 else:
436 if content_type is None:
437 content_type = "text/plain; charset=%s" % encoding
438 real_encoding = encoding
440 super().__init__(
441 value.encode(real_encoding),
442 encoding=real_encoding,
443 content_type=content_type,
444 *args,
445 **kwargs,
446 )
449class StringIOPayload(StringPayload):
450 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None:
451 super().__init__(value.read(), *args, **kwargs)
454class IOBasePayload(Payload):
455 _value: io.IOBase
456 # _consumed = False (inherited) - File can be re-read from the same position
457 _start_position: int | None = None
458 # _autoclose = False (inherited) - Has file handle that needs explicit closing
460 def __init__(
461 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any
462 ) -> None:
463 if "filename" not in kwargs:
464 kwargs["filename"] = guess_filename(value)
466 super().__init__(value, *args, **kwargs)
468 if self._filename is not None and disposition is not None:
469 if hdrs.CONTENT_DISPOSITION not in self.headers:
470 self.set_content_disposition(disposition, filename=self._filename)
472 def _set_or_restore_start_position(self) -> None:
473 """Set or restore the start position of the file-like object."""
474 if self._start_position is None:
475 try:
476 self._start_position = self._value.tell()
477 except (OSError, AttributeError):
478 self._consumed = True # Cannot seek, mark as consumed
479 return
480 try:
481 self._value.seek(self._start_position)
482 except (OSError, AttributeError):
483 # Failed to seek back - mark as consumed since we've already read
484 self._consumed = True
486 def _read_and_available_len(
487 self, remaining_content_len: int | None
488 ) -> tuple[int | None, bytes]:
489 """
490 Read the file-like object and return both its total size and the first chunk.
492 Args:
493 remaining_content_len: Optional limit on how many bytes to read in this operation.
494 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size.
496 Returns:
497 A tuple containing:
498 - The total size of the remaining unread content (None if size cannot be determined)
499 - The first chunk of bytes read from the file object
501 This method is optimized to perform both size calculation and initial read
502 in a single operation, which is executed in a single executor job to minimize
503 context switches and file operations when streaming content.
505 """
506 self._set_or_restore_start_position()
507 size = self.size # Call size only once since it does I/O
508 return size, self._value.read(
509 min(
510 DEFAULT_CHUNK_SIZE,
511 size or DEFAULT_CHUNK_SIZE,
512 remaining_content_len or DEFAULT_CHUNK_SIZE,
513 )
514 )
516 def _read(self, remaining_content_len: int | None) -> bytes:
517 """
518 Read a chunk of data from the file-like object.
520 Args:
521 remaining_content_len: Optional maximum number of bytes to read.
522 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size.
524 Returns:
525 A chunk of bytes read from the file object, respecting the
526 remaining_content_len limit if specified.
528 This method is used for subsequent reads during streaming after
529 the initial _read_and_available_len call has been made.
531 """
532 return self._value.read(remaining_content_len or DEFAULT_CHUNK_SIZE) # type: ignore[no-any-return]
534 @property
535 def size(self) -> int | None:
536 """
537 Size of the payload in bytes.
539 Returns the total size of the payload content from the initial position.
540 This ensures consistent Content-Length for requests, including 307/308 redirects
541 where the same payload instance is reused.
543 Returns None if the size cannot be determined (e.g., for unseekable streams).
544 """
545 try:
546 # Store the start position on first access.
547 # This is critical when the same payload instance is reused (e.g., 307/308
548 # redirects). Without storing the initial position, after the payload is
549 # read once, the file position would be at EOF, which would cause the
550 # size calculation to return 0 (file_size - EOF position).
551 # By storing the start position, we ensure the size calculation always
552 # returns the correct total size for any subsequent use.
553 if self._start_position is None:
554 self._start_position = self._value.tell()
556 # Return the total size from the start position
557 # This ensures Content-Length is correct even after reading
558 return os.fstat(self._value.fileno()).st_size - self._start_position
559 except (AttributeError, OSError):
560 return None
562 async def write(self, writer: AbstractStreamWriter) -> None:
563 """
564 Write the entire file-like payload to the writer stream.
566 Args:
567 writer: An AbstractStreamWriter instance that handles the actual writing
569 This method writes the entire file content without any length constraint.
570 It delegates to write_with_length() with no length limit for implementation
571 consistency.
573 Note:
574 For new implementations that need length control, use write_with_length() directly.
575 This method is maintained for backwards compatibility with existing code.
577 """
578 await self.write_with_length(writer, None)
580 async def write_with_length(
581 self, writer: AbstractStreamWriter, content_length: int | None
582 ) -> None:
583 """
584 Write file-like payload with a specific content length constraint.
586 Args:
587 writer: An AbstractStreamWriter instance that handles the actual writing
588 content_length: Maximum number of bytes to write (None for unlimited)
590 This method implements optimized streaming of file content with length constraints:
592 1. File reading is performed in a thread pool to avoid blocking the event loop
593 2. Content is read and written in chunks to maintain memory efficiency
594 3. Writing stops when either:
595 - All available file content has been written (when size is known)
596 - The specified content_length has been reached
597 4. File resources are properly closed even if the operation is cancelled
599 The implementation carefully handles both known-size and unknown-size payloads,
600 as well as constrained and unconstrained content lengths.
602 """
603 loop = asyncio.get_running_loop()
604 total_written_len = 0
605 remaining_content_len = content_length
607 # Get initial data and available length
608 available_len, chunk = await loop.run_in_executor(
609 None, self._read_and_available_len, remaining_content_len
610 )
611 # Process data chunks until done
612 while chunk:
613 chunk_len = len(chunk)
615 # Write data with or without length constraint
616 if remaining_content_len is None:
617 await writer.write(chunk)
618 else:
619 await writer.write(chunk[:remaining_content_len])
620 remaining_content_len -= chunk_len
622 total_written_len += chunk_len
624 # Check if we're done writing
625 if self._should_stop_writing(
626 available_len, total_written_len, remaining_content_len
627 ):
628 return
630 # Read next chunk
631 chunk = await loop.run_in_executor(
632 None,
633 self._read,
634 (
635 min(DEFAULT_CHUNK_SIZE, remaining_content_len)
636 if remaining_content_len is not None
637 else DEFAULT_CHUNK_SIZE
638 ),
639 )
641 def _should_stop_writing(
642 self,
643 available_len: int | None,
644 total_written_len: int,
645 remaining_content_len: int | None,
646 ) -> bool:
647 """
648 Determine if we should stop writing data.
650 Args:
651 available_len: Known size of the payload if available (None if unknown)
652 total_written_len: Number of bytes already written
653 remaining_content_len: Remaining bytes to be written for content-length limited responses
655 Returns:
656 True if we should stop writing data, based on either:
657 - Having written all available data (when size is known)
658 - Having written all requested content (when content-length is specified)
660 """
661 return (available_len is not None and total_written_len >= available_len) or (
662 remaining_content_len is not None and remaining_content_len <= 0
663 )
665 def _close(self) -> None:
666 """
667 Async safe synchronous close operations for backwards compatibility.
669 This method exists only for backwards
670 compatibility. Use the async close() method instead.
672 WARNING: This method MUST be called from within an event loop.
673 Calling it outside an event loop will raise RuntimeError.
674 """
675 # Skip if already consumed
676 if self._consumed:
677 return
678 self._consumed = True # Mark as consumed to prevent further writes
679 # Schedule file closing without awaiting to prevent cancellation issues
680 loop = asyncio.get_running_loop()
681 close_future = loop.run_in_executor(None, self._value.close)
682 # Hold a strong reference to the future to prevent it from being
683 # garbage collected before it completes.
684 _CLOSE_FUTURES.add(close_future)
685 close_future.add_done_callback(_CLOSE_FUTURES.remove)
687 async def close(self) -> None:
688 """
689 Close the payload if it holds any resources.
691 IMPORTANT: This method must not await anything that might not finish
692 immediately, as it may be called during cleanup/cancellation. Schedule
693 any long-running operations without awaiting them.
694 """
695 self._close()
697 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
698 """
699 Return string representation of the value.
701 WARNING: This method does blocking I/O and should not be called in the event loop.
702 """
703 return self._read_all().decode(encoding, errors)
705 def _read_all(self) -> bytes:
706 """Read the entire file-like object and return its content as bytes."""
707 self._set_or_restore_start_position()
708 # Use readlines() to ensure we get all content
709 return b"".join(self._value.readlines())
711 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
712 """
713 Return bytes representation of the value.
715 This method reads the entire file content and returns it as bytes.
716 It is equivalent to reading the file-like object directly.
717 The file reading is performed in an executor to avoid blocking the event loop.
718 """
719 loop = asyncio.get_running_loop()
720 return await loop.run_in_executor(None, self._read_all)
723class TextIOPayload(IOBasePayload):
724 _value: io.TextIOBase
725 # _autoclose = False (inherited) - Has text file handle that needs explicit closing
727 def __init__(
728 self,
729 value: TextIO,
730 *args: Any,
731 encoding: str | None = None,
732 content_type: str | None = None,
733 **kwargs: Any,
734 ) -> None:
735 if encoding is None:
736 if content_type is None:
737 encoding = "utf-8"
738 content_type = "text/plain; charset=utf-8"
739 else:
740 mimetype = parse_mimetype(content_type)
741 encoding = mimetype.parameters.get("charset", "utf-8")
742 else:
743 if content_type is None:
744 content_type = "text/plain; charset=%s" % encoding
746 super().__init__(
747 value,
748 content_type=content_type,
749 encoding=encoding,
750 *args,
751 **kwargs,
752 )
754 def _read_and_available_len(
755 self, remaining_content_len: int | None
756 ) -> tuple[int | None, bytes]:
757 """
758 Read the text file-like object and return both its total size and the first chunk.
760 Args:
761 remaining_content_len: Optional limit on how many bytes to read in this operation.
762 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size.
764 Returns:
765 A tuple containing:
766 - The total size of the remaining unread content (None if size cannot be determined)
767 - The first chunk of bytes read from the file object, encoded using the payload's encoding
769 This method is optimized to perform both size calculation and initial read
770 in a single operation, which is executed in a single executor job to minimize
771 context switches and file operations when streaming content.
773 Note:
774 TextIOPayload handles encoding of the text content before writing it
775 to the stream. If no encoding is specified, UTF-8 is used as the default.
777 """
778 self._set_or_restore_start_position()
779 size = self.size
780 chunk = self._value.read(
781 min(
782 DEFAULT_CHUNK_SIZE,
783 size or DEFAULT_CHUNK_SIZE,
784 remaining_content_len or DEFAULT_CHUNK_SIZE,
785 )
786 )
787 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode()
789 def _read(self, remaining_content_len: int | None) -> bytes:
790 """
791 Read a chunk of data from the text file-like object.
793 Args:
794 remaining_content_len: Optional maximum number of bytes to read.
795 If None, DEFAULT_CHUNK_SIZE will be used as the default chunk size.
797 Returns:
798 A chunk of bytes read from the file object and encoded using the payload's
799 encoding. The data is automatically converted from text to bytes.
801 This method is used for subsequent reads during streaming after
802 the initial _read_and_available_len call has been made. It properly
803 handles text encoding, converting the text content to bytes using
804 the specified encoding (or UTF-8 if none was provided).
806 """
807 chunk = self._value.read(remaining_content_len or DEFAULT_CHUNK_SIZE)
808 return chunk.encode(self._encoding) if self._encoding else chunk.encode()
810 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
811 """
812 Return string representation of the value.
814 WARNING: This method does blocking I/O and should not be called in the event loop.
815 """
816 self._set_or_restore_start_position()
817 return self._value.read()
819 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
820 """
821 Return bytes representation of the value.
823 This method reads the entire text file content and returns it as bytes.
824 It encodes the text content using the specified encoding.
825 The file reading is performed in an executor to avoid blocking the event loop.
826 """
827 loop = asyncio.get_running_loop()
829 # Use instance encoding if available, otherwise use parameter
830 actual_encoding = self._encoding or encoding
832 def _read_and_encode() -> bytes:
833 self._set_or_restore_start_position()
834 # TextIO read() always returns the full content
835 return self._value.read().encode(actual_encoding, errors)
837 return await loop.run_in_executor(None, _read_and_encode)
840class BytesIOPayload(IOBasePayload):
841 _value: io.BytesIO
842 _size: int # Always initialized in __init__
843 _autoclose = True # BytesIO is in-memory, safe to auto-close
845 def __init__(self, value: io.BytesIO, *args: Any, **kwargs: Any) -> None:
846 super().__init__(value, *args, **kwargs)
847 # Calculate size once during initialization
848 self._size = len(self._value.getbuffer()) - self._value.tell()
850 @property
851 def size(self) -> int:
852 """Size of the payload in bytes.
854 Returns the number of bytes in the BytesIO buffer that will be transmitted.
855 This is calculated once during initialization for efficiency.
856 """
857 return self._size
859 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
860 self._set_or_restore_start_position()
861 return self._value.read().decode(encoding, errors)
863 async def write(self, writer: AbstractStreamWriter) -> None:
864 return await self.write_with_length(writer, None)
866 async def write_with_length(
867 self, writer: AbstractStreamWriter, content_length: int | None
868 ) -> None:
869 """
870 Write BytesIO payload with a specific content length constraint.
872 Args:
873 writer: An AbstractStreamWriter instance that handles the actual writing
874 content_length: Maximum number of bytes to write (None for unlimited)
876 This implementation is specifically optimized for BytesIO objects:
878 1. Reads content in chunks to maintain memory efficiency
879 2. Yields control back to the event loop periodically to prevent blocking
880 when dealing with large BytesIO objects
881 3. Respects content_length constraints when specified
882 4. Properly cleans up by closing the BytesIO object when done or on error
884 The periodic yielding to the event loop is important for maintaining
885 responsiveness when processing large in-memory buffers.
887 """
888 self._set_or_restore_start_position()
889 loop_count = 0
890 remaining_bytes = content_length
891 while chunk := self._value.read(DEFAULT_CHUNK_SIZE):
892 if loop_count > 0:
893 # Avoid blocking the event loop
894 # if they pass a large BytesIO object
895 # and we are not in the first iteration
896 # of the loop
897 await asyncio.sleep(0)
898 if remaining_bytes is None:
899 await writer.write(chunk)
900 else:
901 await writer.write(chunk[:remaining_bytes])
902 remaining_bytes -= len(chunk)
903 if remaining_bytes <= 0:
904 return
905 loop_count += 1
907 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
908 """
909 Return bytes representation of the value.
911 This method reads the entire BytesIO content and returns it as bytes.
912 It is equivalent to accessing the _value attribute directly.
913 """
914 self._set_or_restore_start_position()
915 return self._value.read()
917 async def close(self) -> None:
918 """
919 Close the BytesIO payload.
921 This does nothing since BytesIO is in-memory and does not require explicit closing.
922 """
925class BufferedReaderPayload(IOBasePayload):
926 _value: io.BufferedIOBase
927 # _autoclose = False (inherited) - Has buffered file handle that needs explicit closing
929 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
930 self._set_or_restore_start_position()
931 return self._value.read().decode(encoding, errors)
934class JsonPayload(BytesPayload):
935 def __init__(
936 self,
937 value: Any,
938 encoding: str = "utf-8",
939 content_type: str = "application/json",
940 dumps: JSONEncoder = json.dumps,
941 *args: Any,
942 **kwargs: Any,
943 ) -> None:
944 super().__init__(
945 dumps(value).encode(encoding),
946 content_type=content_type,
947 encoding=encoding,
948 *args,
949 **kwargs,
950 )
953class JsonBytesPayload(BytesPayload):
954 """JSON payload for encoders that return bytes directly.
956 Use this when your JSON encoder (like orjson) returns bytes
957 instead of str, avoiding the encode/decode overhead.
958 """
960 def __init__(
961 self,
962 value: Any,
963 dumps: JSONBytesEncoder,
964 content_type: str = "application/json",
965 *args: Any,
966 **kwargs: Any,
967 ) -> None:
968 super().__init__(
969 dumps(value),
970 content_type=content_type,
971 *args,
972 **kwargs,
973 )
976class AsyncIterablePayload(Payload):
977 _iter: AsyncIterator[bytes] | None = None
978 _value: AsyncIterable[bytes]
979 _cached_chunks: list[bytes] | None = None
980 # _consumed stays False to allow reuse with cached content
981 _autoclose = True # Iterator doesn't need explicit closing
983 def __init__(self, value: AsyncIterable[bytes], *args: Any, **kwargs: Any) -> None:
984 if not isinstance(value, AsyncIterable):
985 raise TypeError(
986 "value argument must support "
987 "collections.abc.AsyncIterable interface, "
988 f"got {type(value)!r}"
989 )
991 if "content_type" not in kwargs:
992 kwargs["content_type"] = "application/octet-stream"
994 super().__init__(value, *args, **kwargs)
996 self._iter = value.__aiter__()
998 async def write(self, writer: AbstractStreamWriter) -> None:
999 """
1000 Write the entire async iterable payload to the writer stream.
1002 Args:
1003 writer: An AbstractStreamWriter instance that handles the actual writing
1005 This method iterates through the async iterable and writes each chunk
1006 to the writer without any length constraint.
1008 Note:
1009 For new implementations that need length control, use write_with_length() directly.
1010 This method is maintained for backwards compatibility with existing code.
1012 """
1013 await self.write_with_length(writer, None)
1015 async def write_with_length(
1016 self, writer: AbstractStreamWriter, content_length: int | None
1017 ) -> None:
1018 """
1019 Write async iterable payload with a specific content length constraint.
1021 Args:
1022 writer: An AbstractStreamWriter instance that handles the actual writing
1023 content_length: Maximum number of bytes to write (None for unlimited)
1025 This implementation handles streaming of async iterable content with length constraints:
1027 1. If cached chunks are available, writes from them
1028 2. Otherwise iterates through the async iterable one chunk at a time
1029 3. Respects content_length constraints when specified
1030 4. Does NOT generate cache - that's done by as_bytes()
1032 """
1033 # If we have cached chunks, use them
1034 if self._cached_chunks is not None:
1035 remaining_bytes = content_length
1036 for chunk in self._cached_chunks:
1037 if remaining_bytes is None:
1038 await writer.write(chunk)
1039 elif remaining_bytes > 0:
1040 await writer.write(chunk[:remaining_bytes])
1041 remaining_bytes -= len(chunk)
1042 else:
1043 break
1044 return
1046 # If iterator is exhausted and we don't have cached chunks, nothing to write
1047 if self._iter is None:
1048 return
1050 # Stream from the iterator
1051 remaining_bytes = content_length
1053 try:
1054 while True:
1055 chunk = await anext(self._iter)
1056 if remaining_bytes is None:
1057 await writer.write(chunk)
1058 # If we have a content length limit
1059 elif remaining_bytes > 0:
1060 await writer.write(chunk[:remaining_bytes])
1061 remaining_bytes -= len(chunk)
1062 # We still want to exhaust the iterator even
1063 # if we have reached the content length limit
1064 # since the file handle may not get closed by
1065 # the iterator if we don't do this
1066 except StopAsyncIteration:
1067 # Iterator is exhausted
1068 self._iter = None
1069 self._consumed = True # Mark as consumed when streamed without caching
1071 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
1072 """Decode the payload content as a string if cached chunks are available."""
1073 if self._cached_chunks is not None:
1074 return b"".join(self._cached_chunks).decode(encoding, errors)
1075 raise TypeError("Unable to decode - content not cached. Call as_bytes() first.")
1077 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
1078 """
1079 Return bytes representation of the value.
1081 This method reads the entire async iterable content and returns it as bytes.
1082 It generates and caches the chunks for future reuse.
1083 """
1084 # If we have cached chunks, return them joined
1085 if self._cached_chunks is not None:
1086 return b"".join(self._cached_chunks)
1088 # If iterator is exhausted and no cache, return empty
1089 if self._iter is None:
1090 return b""
1092 # Read all chunks and cache them
1093 chunks: list[bytes] = []
1094 async for chunk in self._iter:
1095 chunks.append(chunk)
1097 # Iterator is exhausted, cache the chunks
1098 self._iter = None
1099 self._cached_chunks = chunks
1100 # Keep _consumed as False to allow reuse with cached chunks
1102 return b"".join(chunks)
1105class StreamReaderPayload(AsyncIterablePayload):
1106 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None:
1107 super().__init__(value.iter_any(), *args, **kwargs)
1110PAYLOAD_REGISTRY = PayloadRegistry()
1111PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview))
1112PAYLOAD_REGISTRY.register(StringPayload, str)
1113PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO)
1114PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase)
1115PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO)
1116PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom))
1117PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase)
1118PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader)
1119# try_last for giving a chance to more specialized async interables like
1120# multipart.BodyPartReaderPayload override the default
1121PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)