Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/payload.py: 50%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import enum
3import io
4import json
5import mimetypes
6import os
7import sys
8import warnings
9from abc import ABC, abstractmethod
10from collections.abc import AsyncIterable, AsyncIterator, Iterable
11from itertools import chain
12from typing import IO, Any, Final, TextIO
14from multidict import CIMultiDict
16from . import hdrs
17from .abc import AbstractStreamWriter
18from .helpers import (
19 _SENTINEL,
20 content_disposition_header,
21 guess_filename,
22 parse_mimetype,
23 sentinel,
24)
25from .streams import StreamReader
26from .typedefs import JSONBytesEncoder, JSONEncoder
28__all__ = (
29 "PAYLOAD_REGISTRY",
30 "get_payload",
31 "payload_type",
32 "Payload",
33 "BytesPayload",
34 "StringPayload",
35 "IOBasePayload",
36 "BytesIOPayload",
37 "BufferedReaderPayload",
38 "TextIOPayload",
39 "StringIOPayload",
40 "JsonPayload",
41 "JsonBytesPayload",
42 "AsyncIterablePayload",
43)
45TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB
46READ_SIZE: Final[int] = 2**16 # 64 KB
47_CLOSE_FUTURES: set[asyncio.Future[None]] = set()
50class LookupError(Exception):
51 """Raised when no payload factory is found for the given data type."""
54class Order(str, enum.Enum):
55 normal = "normal"
56 try_first = "try_first"
57 try_last = "try_last"
60def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload":
61 return PAYLOAD_REGISTRY.get(data, *args, **kwargs)
64def register_payload(
65 factory: type["Payload"], type: Any, *, order: Order = Order.normal
66) -> None:
67 PAYLOAD_REGISTRY.register(factory, type, order=order)
70class payload_type:
71 def __init__(self, type: Any, *, order: Order = Order.normal) -> None:
72 self.type = type
73 self.order = order
75 def __call__(self, factory: type["Payload"]) -> type["Payload"]:
76 register_payload(factory, self.type, order=self.order)
77 return factory
80PayloadType = type["Payload"]
81_PayloadRegistryItem = tuple[PayloadType, Any]
84class PayloadRegistry:
85 """Payload registry.
87 note: we need zope.interface for more efficient adapter search
88 """
90 __slots__ = ("_first", "_normal", "_last", "_normal_lookup")
92 def __init__(self) -> None:
93 self._first: list[_PayloadRegistryItem] = []
94 self._normal: list[_PayloadRegistryItem] = []
95 self._last: list[_PayloadRegistryItem] = []
96 self._normal_lookup: dict[Any, PayloadType] = {}
98 def get(
99 self,
100 data: Any,
101 *args: Any,
102 _CHAIN: "type[chain[_PayloadRegistryItem]]" = chain,
103 **kwargs: Any,
104 ) -> "Payload":
105 if self._first:
106 for factory, type_ in self._first:
107 if isinstance(data, type_):
108 return factory(data, *args, **kwargs)
109 # Try the fast lookup first
110 if lookup_factory := self._normal_lookup.get(type(data)):
111 return lookup_factory(data, *args, **kwargs)
112 # Bail early if its already a Payload
113 if isinstance(data, Payload):
114 return data
115 # Fallback to the slower linear search
116 for factory, type_ in _CHAIN(self._normal, self._last):
117 if isinstance(data, type_):
118 return factory(data, *args, **kwargs)
119 raise LookupError()
121 def register(
122 self, factory: PayloadType, type: Any, *, order: Order = Order.normal
123 ) -> None:
124 if order is Order.try_first:
125 self._first.append((factory, type))
126 elif order is Order.normal:
127 self._normal.append((factory, type))
128 if isinstance(type, Iterable):
129 for t in type:
130 self._normal_lookup[t] = factory
131 else:
132 self._normal_lookup[type] = factory
133 elif order is Order.try_last:
134 self._last.append((factory, type))
135 else:
136 raise ValueError(f"Unsupported order {order!r}")
139class Payload(ABC):
140 _default_content_type: str = "application/octet-stream"
141 _size: int | None = None
142 _consumed: bool = False # Default: payload has not been consumed yet
143 _autoclose: bool = False # Default: assume resource needs explicit closing
145 def __init__(
146 self,
147 value: Any,
148 headers: (
149 CIMultiDict[str] | dict[str, str] | Iterable[tuple[str, str]] | None
150 ) = None,
151 content_type: None | str | _SENTINEL = sentinel,
152 filename: str | None = None,
153 encoding: str | None = None,
154 **kwargs: Any,
155 ) -> None:
156 self._encoding = encoding
157 self._filename = filename
158 self._headers = CIMultiDict[str]()
159 self._value = value
160 if content_type is not sentinel and content_type is not None:
161 assert isinstance(content_type, str)
162 self._headers[hdrs.CONTENT_TYPE] = content_type
163 elif self._filename is not None:
164 if sys.version_info >= (3, 13):
165 guesser = mimetypes.guess_file_type
166 else:
167 guesser = mimetypes.guess_type
168 content_type = guesser(self._filename)[0]
169 if content_type is None:
170 content_type = self._default_content_type
171 self._headers[hdrs.CONTENT_TYPE] = content_type
172 else:
173 self._headers[hdrs.CONTENT_TYPE] = self._default_content_type
174 if headers:
175 self._headers.update(headers)
177 @property
178 def size(self) -> int | None:
179 """Size of the payload in bytes.
181 Returns the number of bytes that will be transmitted when the payload
182 is written. For string payloads, this is the size after encoding to bytes,
183 not the length of the string.
184 """
185 return self._size
187 @property
188 def filename(self) -> str | None:
189 """Filename of the payload."""
190 return self._filename
192 @property
193 def headers(self) -> CIMultiDict[str]:
194 """Custom item headers"""
195 return self._headers
197 @property
198 def _binary_headers(self) -> bytes:
199 return (
200 "".join([k + ": " + v + "\r\n" for k, v in self.headers.items()]).encode(
201 "utf-8"
202 )
203 + b"\r\n"
204 )
206 @property
207 def encoding(self) -> str | None:
208 """Payload encoding"""
209 return self._encoding
211 @property
212 def content_type(self) -> str:
213 """Content type"""
214 return self._headers[hdrs.CONTENT_TYPE]
216 @property
217 def consumed(self) -> bool:
218 """Whether the payload has been consumed and cannot be reused."""
219 return self._consumed
221 @property
222 def autoclose(self) -> bool:
223 """
224 Whether the payload can close itself automatically.
226 Returns True if the payload has no file handles or resources that need
227 explicit closing. If False, callers must await close() to release resources.
228 """
229 return self._autoclose
231 def set_content_disposition(
232 self,
233 disptype: str,
234 quote_fields: bool = True,
235 _charset: str = "utf-8",
236 **params: str,
237 ) -> None:
238 """Sets ``Content-Disposition`` header."""
239 self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header(
240 disptype, quote_fields=quote_fields, _charset=_charset, params=params
241 )
243 @abstractmethod
244 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
245 """
246 Return string representation of the value.
248 This is named decode() to allow compatibility with bytes objects.
249 """
251 @abstractmethod
252 async def write(self, writer: AbstractStreamWriter) -> None:
253 """
254 Write payload to the writer stream.
256 Args:
257 writer: An AbstractStreamWriter instance that handles the actual writing
259 This is a legacy method that writes the entire payload without length constraints.
261 Important:
262 For new implementations, use write_with_length() instead of this method.
263 This method is maintained for backwards compatibility and will eventually
264 delegate to write_with_length(writer, None) in all implementations.
266 All payload subclasses must override this method for backwards compatibility,
267 but new code should use write_with_length for more flexibility and control.
269 """
271 # write_with_length is new in aiohttp 3.12
272 # it should be overridden by subclasses
273 async def write_with_length(
274 self, writer: AbstractStreamWriter, content_length: int | None
275 ) -> None:
276 """
277 Write payload with a specific content length constraint.
279 Args:
280 writer: An AbstractStreamWriter instance that handles the actual writing
281 content_length: Maximum number of bytes to write (None for unlimited)
283 This method allows writing payload content with a specific length constraint,
284 which is particularly useful for HTTP responses with Content-Length header.
286 Note:
287 This is the base implementation that provides backwards compatibility
288 for subclasses that don't override this method. Specific payload types
289 should override this method to implement proper length-constrained writing.
291 """
292 # Backwards compatibility for subclasses that don't override this method
293 # and for the default implementation
294 await self.write(writer)
296 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
297 """
298 Return bytes representation of the value.
300 This is a convenience method that calls decode() and encodes the result
301 to bytes using the specified encoding.
302 """
303 # Use instance encoding if available, otherwise use parameter
304 actual_encoding = self._encoding or encoding
305 return self.decode(actual_encoding, errors).encode(actual_encoding)
307 def _close(self) -> None:
308 """
309 Async safe synchronous close operations for backwards compatibility.
311 This method exists only for backwards compatibility with code that
312 needs to clean up payloads synchronously. In the future, we will
313 drop this method and only support the async close() method.
315 WARNING: This method must be safe to call from within the event loop
316 without blocking. Subclasses should not perform any blocking I/O here.
318 WARNING: This method must be called from within an event loop for
319 certain payload types (e.g., IOBasePayload). Calling it outside an
320 event loop may raise RuntimeError.
321 """
322 # This is a no-op by default, but subclasses can override it
323 # for non-blocking cleanup operations.
325 async def close(self) -> None:
326 """
327 Close the payload if it holds any resources.
329 IMPORTANT: This method must not await anything that might not finish
330 immediately, as it may be called during cleanup/cancellation. Schedule
331 any long-running operations without awaiting them.
333 In the future, this will be the only close method supported.
334 """
335 self._close()
338class BytesPayload(Payload):
339 _value: bytes
340 # _consumed = False (inherited) - Bytes are immutable and can be reused
341 _autoclose = True # No file handle, just bytes in memory
343 def __init__(
344 self, value: bytes | bytearray | memoryview, *args: Any, **kwargs: Any
345 ) -> None:
346 if "content_type" not in kwargs:
347 kwargs["content_type"] = "application/octet-stream"
349 super().__init__(value, *args, **kwargs)
351 if isinstance(value, memoryview):
352 self._size = value.nbytes
353 elif isinstance(value, (bytes, bytearray)):
354 self._size = len(value)
355 else:
356 raise TypeError(f"value argument must be byte-ish, not {type(value)!r}")
358 if self._size > TOO_LARGE_BYTES_BODY:
359 warnings.warn(
360 "Sending a large body directly with raw bytes might"
361 " lock the event loop. You should probably pass an "
362 "io.BytesIO object instead",
363 ResourceWarning,
364 source=self,
365 )
367 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
368 return self._value.decode(encoding, errors)
370 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
371 """
372 Return bytes representation of the value.
374 This method returns the raw bytes content of the payload.
375 It is equivalent to accessing the _value attribute directly.
376 """
377 return self._value
379 async def write(self, writer: AbstractStreamWriter) -> None:
380 """
381 Write the entire bytes payload to the writer stream.
383 Args:
384 writer: An AbstractStreamWriter instance that handles the actual writing
386 This method writes the entire bytes content without any length constraint.
388 Note:
389 For new implementations that need length control, use write_with_length().
390 This method is maintained for backwards compatibility and is equivalent
391 to write_with_length(writer, None).
393 """
394 await writer.write(self._value)
396 async def write_with_length(
397 self, writer: AbstractStreamWriter, content_length: int | None
398 ) -> None:
399 """
400 Write bytes payload with a specific content length constraint.
402 Args:
403 writer: An AbstractStreamWriter instance that handles the actual writing
404 content_length: Maximum number of bytes to write (None for unlimited)
406 This method writes either the entire byte sequence or a slice of it
407 up to the specified content_length. For BytesPayload, this operation
408 is performed efficiently using array slicing.
410 """
411 if content_length is not None:
412 await writer.write(self._value[:content_length])
413 else:
414 await writer.write(self._value)
417class StringPayload(BytesPayload):
418 def __init__(
419 self,
420 value: str,
421 *args: Any,
422 encoding: str | None = None,
423 content_type: str | None = None,
424 **kwargs: Any,
425 ) -> None:
426 if encoding is None:
427 if content_type is None:
428 real_encoding = "utf-8"
429 content_type = "text/plain; charset=utf-8"
430 else:
431 mimetype = parse_mimetype(content_type)
432 real_encoding = mimetype.parameters.get("charset", "utf-8")
433 else:
434 if content_type is None:
435 content_type = "text/plain; charset=%s" % encoding
436 real_encoding = encoding
438 super().__init__(
439 value.encode(real_encoding),
440 encoding=real_encoding,
441 content_type=content_type,
442 *args,
443 **kwargs,
444 )
447class StringIOPayload(StringPayload):
448 def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None:
449 super().__init__(value.read(), *args, **kwargs)
452class IOBasePayload(Payload):
453 _value: io.IOBase
454 # _consumed = False (inherited) - File can be re-read from the same position
455 _start_position: int | None = None
456 # _autoclose = False (inherited) - Has file handle that needs explicit closing
458 def __init__(
459 self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any
460 ) -> None:
461 if "filename" not in kwargs:
462 kwargs["filename"] = guess_filename(value)
464 super().__init__(value, *args, **kwargs)
466 if self._filename is not None and disposition is not None:
467 if hdrs.CONTENT_DISPOSITION not in self.headers:
468 self.set_content_disposition(disposition, filename=self._filename)
470 def _set_or_restore_start_position(self) -> None:
471 """Set or restore the start position of the file-like object."""
472 if self._start_position is None:
473 try:
474 self._start_position = self._value.tell()
475 except (OSError, AttributeError):
476 self._consumed = True # Cannot seek, mark as consumed
477 return
478 try:
479 self._value.seek(self._start_position)
480 except (OSError, AttributeError):
481 # Failed to seek back - mark as consumed since we've already read
482 self._consumed = True
484 def _read_and_available_len(
485 self, remaining_content_len: int | None
486 ) -> tuple[int | None, bytes]:
487 """
488 Read the file-like object and return both its total size and the first chunk.
490 Args:
491 remaining_content_len: Optional limit on how many bytes to read in this operation.
492 If None, READ_SIZE will be used as the default chunk size.
494 Returns:
495 A tuple containing:
496 - The total size of the remaining unread content (None if size cannot be determined)
497 - The first chunk of bytes read from the file object
499 This method is optimized to perform both size calculation and initial read
500 in a single operation, which is executed in a single executor job to minimize
501 context switches and file operations when streaming content.
503 """
504 self._set_or_restore_start_position()
505 size = self.size # Call size only once since it does I/O
506 return size, self._value.read(
507 min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE)
508 )
510 def _read(self, remaining_content_len: int | None) -> bytes:
511 """
512 Read a chunk of data from the file-like object.
514 Args:
515 remaining_content_len: Optional maximum number of bytes to read.
516 If None, READ_SIZE will be used as the default chunk size.
518 Returns:
519 A chunk of bytes read from the file object, respecting the
520 remaining_content_len limit if specified.
522 This method is used for subsequent reads during streaming after
523 the initial _read_and_available_len call has been made.
525 """
526 return self._value.read(remaining_content_len or READ_SIZE) # type: ignore[no-any-return]
528 @property
529 def size(self) -> int | None:
530 """
531 Size of the payload in bytes.
533 Returns the total size of the payload content from the initial position.
534 This ensures consistent Content-Length for requests, including 307/308 redirects
535 where the same payload instance is reused.
537 Returns None if the size cannot be determined (e.g., for unseekable streams).
538 """
539 try:
540 # Store the start position on first access.
541 # This is critical when the same payload instance is reused (e.g., 307/308
542 # redirects). Without storing the initial position, after the payload is
543 # read once, the file position would be at EOF, which would cause the
544 # size calculation to return 0 (file_size - EOF position).
545 # By storing the start position, we ensure the size calculation always
546 # returns the correct total size for any subsequent use.
547 if self._start_position is None:
548 self._start_position = self._value.tell()
550 # Return the total size from the start position
551 # This ensures Content-Length is correct even after reading
552 return os.fstat(self._value.fileno()).st_size - self._start_position
553 except (AttributeError, OSError):
554 return None
556 async def write(self, writer: AbstractStreamWriter) -> None:
557 """
558 Write the entire file-like payload to the writer stream.
560 Args:
561 writer: An AbstractStreamWriter instance that handles the actual writing
563 This method writes the entire file content without any length constraint.
564 It delegates to write_with_length() with no length limit for implementation
565 consistency.
567 Note:
568 For new implementations that need length control, use write_with_length() directly.
569 This method is maintained for backwards compatibility with existing code.
571 """
572 await self.write_with_length(writer, None)
574 async def write_with_length(
575 self, writer: AbstractStreamWriter, content_length: int | None
576 ) -> None:
577 """
578 Write file-like payload with a specific content length constraint.
580 Args:
581 writer: An AbstractStreamWriter instance that handles the actual writing
582 content_length: Maximum number of bytes to write (None for unlimited)
584 This method implements optimized streaming of file content with length constraints:
586 1. File reading is performed in a thread pool to avoid blocking the event loop
587 2. Content is read and written in chunks to maintain memory efficiency
588 3. Writing stops when either:
589 - All available file content has been written (when size is known)
590 - The specified content_length has been reached
591 4. File resources are properly closed even if the operation is cancelled
593 The implementation carefully handles both known-size and unknown-size payloads,
594 as well as constrained and unconstrained content lengths.
596 """
597 loop = asyncio.get_running_loop()
598 total_written_len = 0
599 remaining_content_len = content_length
601 # Get initial data and available length
602 available_len, chunk = await loop.run_in_executor(
603 None, self._read_and_available_len, remaining_content_len
604 )
605 # Process data chunks until done
606 while chunk:
607 chunk_len = len(chunk)
609 # Write data with or without length constraint
610 if remaining_content_len is None:
611 await writer.write(chunk)
612 else:
613 await writer.write(chunk[:remaining_content_len])
614 remaining_content_len -= chunk_len
616 total_written_len += chunk_len
618 # Check if we're done writing
619 if self._should_stop_writing(
620 available_len, total_written_len, remaining_content_len
621 ):
622 return
624 # Read next chunk
625 chunk = await loop.run_in_executor(
626 None,
627 self._read,
628 (
629 min(READ_SIZE, remaining_content_len)
630 if remaining_content_len is not None
631 else READ_SIZE
632 ),
633 )
635 def _should_stop_writing(
636 self,
637 available_len: int | None,
638 total_written_len: int,
639 remaining_content_len: int | None,
640 ) -> bool:
641 """
642 Determine if we should stop writing data.
644 Args:
645 available_len: Known size of the payload if available (None if unknown)
646 total_written_len: Number of bytes already written
647 remaining_content_len: Remaining bytes to be written for content-length limited responses
649 Returns:
650 True if we should stop writing data, based on either:
651 - Having written all available data (when size is known)
652 - Having written all requested content (when content-length is specified)
654 """
655 return (available_len is not None and total_written_len >= available_len) or (
656 remaining_content_len is not None and remaining_content_len <= 0
657 )
659 def _close(self) -> None:
660 """
661 Async safe synchronous close operations for backwards compatibility.
663 This method exists only for backwards
664 compatibility. Use the async close() method instead.
666 WARNING: This method MUST be called from within an event loop.
667 Calling it outside an event loop will raise RuntimeError.
668 """
669 # Skip if already consumed
670 if self._consumed:
671 return
672 self._consumed = True # Mark as consumed to prevent further writes
673 # Schedule file closing without awaiting to prevent cancellation issues
674 loop = asyncio.get_running_loop()
675 close_future = loop.run_in_executor(None, self._value.close)
676 # Hold a strong reference to the future to prevent it from being
677 # garbage collected before it completes.
678 _CLOSE_FUTURES.add(close_future)
679 close_future.add_done_callback(_CLOSE_FUTURES.remove)
681 async def close(self) -> None:
682 """
683 Close the payload if it holds any resources.
685 IMPORTANT: This method must not await anything that might not finish
686 immediately, as it may be called during cleanup/cancellation. Schedule
687 any long-running operations without awaiting them.
688 """
689 self._close()
691 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
692 """
693 Return string representation of the value.
695 WARNING: This method does blocking I/O and should not be called in the event loop.
696 """
697 return self._read_all().decode(encoding, errors)
699 def _read_all(self) -> bytes:
700 """Read the entire file-like object and return its content as bytes."""
701 self._set_or_restore_start_position()
702 # Use readlines() to ensure we get all content
703 return b"".join(self._value.readlines())
705 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
706 """
707 Return bytes representation of the value.
709 This method reads the entire file content and returns it as bytes.
710 It is equivalent to reading the file-like object directly.
711 The file reading is performed in an executor to avoid blocking the event loop.
712 """
713 loop = asyncio.get_running_loop()
714 return await loop.run_in_executor(None, self._read_all)
717class TextIOPayload(IOBasePayload):
718 _value: io.TextIOBase
719 # _autoclose = False (inherited) - Has text file handle that needs explicit closing
721 def __init__(
722 self,
723 value: TextIO,
724 *args: Any,
725 encoding: str | None = None,
726 content_type: str | None = None,
727 **kwargs: Any,
728 ) -> None:
729 if encoding is None:
730 if content_type is None:
731 encoding = "utf-8"
732 content_type = "text/plain; charset=utf-8"
733 else:
734 mimetype = parse_mimetype(content_type)
735 encoding = mimetype.parameters.get("charset", "utf-8")
736 else:
737 if content_type is None:
738 content_type = "text/plain; charset=%s" % encoding
740 super().__init__(
741 value,
742 content_type=content_type,
743 encoding=encoding,
744 *args,
745 **kwargs,
746 )
748 def _read_and_available_len(
749 self, remaining_content_len: int | None
750 ) -> tuple[int | None, bytes]:
751 """
752 Read the text file-like object and return both its total size and the first chunk.
754 Args:
755 remaining_content_len: Optional limit on how many bytes to read in this operation.
756 If None, READ_SIZE will be used as the default chunk size.
758 Returns:
759 A tuple containing:
760 - The total size of the remaining unread content (None if size cannot be determined)
761 - The first chunk of bytes read from the file object, encoded using the payload's encoding
763 This method is optimized to perform both size calculation and initial read
764 in a single operation, which is executed in a single executor job to minimize
765 context switches and file operations when streaming content.
767 Note:
768 TextIOPayload handles encoding of the text content before writing it
769 to the stream. If no encoding is specified, UTF-8 is used as the default.
771 """
772 self._set_or_restore_start_position()
773 size = self.size
774 chunk = self._value.read(
775 min(READ_SIZE, size or READ_SIZE, remaining_content_len or READ_SIZE)
776 )
777 return size, chunk.encode(self._encoding) if self._encoding else chunk.encode()
779 def _read(self, remaining_content_len: int | None) -> bytes:
780 """
781 Read a chunk of data from the text file-like object.
783 Args:
784 remaining_content_len: Optional maximum number of bytes to read.
785 If None, READ_SIZE will be used as the default chunk size.
787 Returns:
788 A chunk of bytes read from the file object and encoded using the payload's
789 encoding. The data is automatically converted from text to bytes.
791 This method is used for subsequent reads during streaming after
792 the initial _read_and_available_len call has been made. It properly
793 handles text encoding, converting the text content to bytes using
794 the specified encoding (or UTF-8 if none was provided).
796 """
797 chunk = self._value.read(remaining_content_len or READ_SIZE)
798 return chunk.encode(self._encoding) if self._encoding else chunk.encode()
800 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
801 """
802 Return string representation of the value.
804 WARNING: This method does blocking I/O and should not be called in the event loop.
805 """
806 self._set_or_restore_start_position()
807 return self._value.read()
809 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
810 """
811 Return bytes representation of the value.
813 This method reads the entire text file content and returns it as bytes.
814 It encodes the text content using the specified encoding.
815 The file reading is performed in an executor to avoid blocking the event loop.
816 """
817 loop = asyncio.get_running_loop()
819 # Use instance encoding if available, otherwise use parameter
820 actual_encoding = self._encoding or encoding
822 def _read_and_encode() -> bytes:
823 self._set_or_restore_start_position()
824 # TextIO read() always returns the full content
825 return self._value.read().encode(actual_encoding, errors)
827 return await loop.run_in_executor(None, _read_and_encode)
830class BytesIOPayload(IOBasePayload):
831 _value: io.BytesIO
832 _size: int # Always initialized in __init__
833 _autoclose = True # BytesIO is in-memory, safe to auto-close
835 def __init__(self, value: io.BytesIO, *args: Any, **kwargs: Any) -> None:
836 super().__init__(value, *args, **kwargs)
837 # Calculate size once during initialization
838 self._size = len(self._value.getbuffer()) - self._value.tell()
840 @property
841 def size(self) -> int:
842 """Size of the payload in bytes.
844 Returns the number of bytes in the BytesIO buffer that will be transmitted.
845 This is calculated once during initialization for efficiency.
846 """
847 return self._size
849 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
850 self._set_or_restore_start_position()
851 return self._value.read().decode(encoding, errors)
853 async def write(self, writer: AbstractStreamWriter) -> None:
854 return await self.write_with_length(writer, None)
856 async def write_with_length(
857 self, writer: AbstractStreamWriter, content_length: int | None
858 ) -> None:
859 """
860 Write BytesIO payload with a specific content length constraint.
862 Args:
863 writer: An AbstractStreamWriter instance that handles the actual writing
864 content_length: Maximum number of bytes to write (None for unlimited)
866 This implementation is specifically optimized for BytesIO objects:
868 1. Reads content in chunks to maintain memory efficiency
869 2. Yields control back to the event loop periodically to prevent blocking
870 when dealing with large BytesIO objects
871 3. Respects content_length constraints when specified
872 4. Properly cleans up by closing the BytesIO object when done or on error
874 The periodic yielding to the event loop is important for maintaining
875 responsiveness when processing large in-memory buffers.
877 """
878 self._set_or_restore_start_position()
879 loop_count = 0
880 remaining_bytes = content_length
881 while chunk := self._value.read(READ_SIZE):
882 if loop_count > 0:
883 # Avoid blocking the event loop
884 # if they pass a large BytesIO object
885 # and we are not in the first iteration
886 # of the loop
887 await asyncio.sleep(0)
888 if remaining_bytes is None:
889 await writer.write(chunk)
890 else:
891 await writer.write(chunk[:remaining_bytes])
892 remaining_bytes -= len(chunk)
893 if remaining_bytes <= 0:
894 return
895 loop_count += 1
897 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
898 """
899 Return bytes representation of the value.
901 This method reads the entire BytesIO content and returns it as bytes.
902 It is equivalent to accessing the _value attribute directly.
903 """
904 self._set_or_restore_start_position()
905 return self._value.read()
907 async def close(self) -> None:
908 """
909 Close the BytesIO payload.
911 This does nothing since BytesIO is in-memory and does not require explicit closing.
912 """
915class BufferedReaderPayload(IOBasePayload):
916 _value: io.BufferedIOBase
917 # _autoclose = False (inherited) - Has buffered file handle that needs explicit closing
919 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
920 self._set_or_restore_start_position()
921 return self._value.read().decode(encoding, errors)
924class JsonPayload(BytesPayload):
925 def __init__(
926 self,
927 value: Any,
928 encoding: str = "utf-8",
929 content_type: str = "application/json",
930 dumps: JSONEncoder = json.dumps,
931 *args: Any,
932 **kwargs: Any,
933 ) -> None:
934 super().__init__(
935 dumps(value).encode(encoding),
936 content_type=content_type,
937 encoding=encoding,
938 *args,
939 **kwargs,
940 )
943class JsonBytesPayload(BytesPayload):
944 """JSON payload for encoders that return bytes directly.
946 Use this when your JSON encoder (like orjson) returns bytes
947 instead of str, avoiding the encode/decode overhead.
948 """
950 def __init__(
951 self,
952 value: Any,
953 dumps: JSONBytesEncoder,
954 content_type: str = "application/json",
955 *args: Any,
956 **kwargs: Any,
957 ) -> None:
958 super().__init__(
959 dumps(value),
960 content_type=content_type,
961 *args,
962 **kwargs,
963 )
966class AsyncIterablePayload(Payload):
967 _iter: AsyncIterator[bytes] | None = None
968 _value: AsyncIterable[bytes]
969 _cached_chunks: list[bytes] | None = None
970 # _consumed stays False to allow reuse with cached content
971 _autoclose = True # Iterator doesn't need explicit closing
973 def __init__(self, value: AsyncIterable[bytes], *args: Any, **kwargs: Any) -> None:
974 if not isinstance(value, AsyncIterable):
975 raise TypeError(
976 "value argument must support "
977 "collections.abc.AsyncIterable interface, "
978 f"got {type(value)!r}"
979 )
981 if "content_type" not in kwargs:
982 kwargs["content_type"] = "application/octet-stream"
984 super().__init__(value, *args, **kwargs)
986 self._iter = value.__aiter__()
988 async def write(self, writer: AbstractStreamWriter) -> None:
989 """
990 Write the entire async iterable payload to the writer stream.
992 Args:
993 writer: An AbstractStreamWriter instance that handles the actual writing
995 This method iterates through the async iterable and writes each chunk
996 to the writer without any length constraint.
998 Note:
999 For new implementations that need length control, use write_with_length() directly.
1000 This method is maintained for backwards compatibility with existing code.
1002 """
1003 await self.write_with_length(writer, None)
1005 async def write_with_length(
1006 self, writer: AbstractStreamWriter, content_length: int | None
1007 ) -> None:
1008 """
1009 Write async iterable payload with a specific content length constraint.
1011 Args:
1012 writer: An AbstractStreamWriter instance that handles the actual writing
1013 content_length: Maximum number of bytes to write (None for unlimited)
1015 This implementation handles streaming of async iterable content with length constraints:
1017 1. If cached chunks are available, writes from them
1018 2. Otherwise iterates through the async iterable one chunk at a time
1019 3. Respects content_length constraints when specified
1020 4. Does NOT generate cache - that's done by as_bytes()
1022 """
1023 # If we have cached chunks, use them
1024 if self._cached_chunks is not None:
1025 remaining_bytes = content_length
1026 for chunk in self._cached_chunks:
1027 if remaining_bytes is None:
1028 await writer.write(chunk)
1029 elif remaining_bytes > 0:
1030 await writer.write(chunk[:remaining_bytes])
1031 remaining_bytes -= len(chunk)
1032 else:
1033 break
1034 return
1036 # If iterator is exhausted and we don't have cached chunks, nothing to write
1037 if self._iter is None:
1038 return
1040 # Stream from the iterator
1041 remaining_bytes = content_length
1043 try:
1044 while True:
1045 chunk = await anext(self._iter)
1046 if remaining_bytes is None:
1047 await writer.write(chunk)
1048 # If we have a content length limit
1049 elif remaining_bytes > 0:
1050 await writer.write(chunk[:remaining_bytes])
1051 remaining_bytes -= len(chunk)
1052 # We still want to exhaust the iterator even
1053 # if we have reached the content length limit
1054 # since the file handle may not get closed by
1055 # the iterator if we don't do this
1056 except StopAsyncIteration:
1057 # Iterator is exhausted
1058 self._iter = None
1059 self._consumed = True # Mark as consumed when streamed without caching
1061 def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
1062 """Decode the payload content as a string if cached chunks are available."""
1063 if self._cached_chunks is not None:
1064 return b"".join(self._cached_chunks).decode(encoding, errors)
1065 raise TypeError("Unable to decode - content not cached. Call as_bytes() first.")
1067 async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> bytes:
1068 """
1069 Return bytes representation of the value.
1071 This method reads the entire async iterable content and returns it as bytes.
1072 It generates and caches the chunks for future reuse.
1073 """
1074 # If we have cached chunks, return them joined
1075 if self._cached_chunks is not None:
1076 return b"".join(self._cached_chunks)
1078 # If iterator is exhausted and no cache, return empty
1079 if self._iter is None:
1080 return b""
1082 # Read all chunks and cache them
1083 chunks: list[bytes] = []
1084 async for chunk in self._iter:
1085 chunks.append(chunk)
1087 # Iterator is exhausted, cache the chunks
1088 self._iter = None
1089 self._cached_chunks = chunks
1090 # Keep _consumed as False to allow reuse with cached chunks
1092 return b"".join(chunks)
1095class StreamReaderPayload(AsyncIterablePayload):
1096 def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None:
1097 super().__init__(value.iter_any(), *args, **kwargs)
1100PAYLOAD_REGISTRY = PayloadRegistry()
1101PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview))
1102PAYLOAD_REGISTRY.register(StringPayload, str)
1103PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO)
1104PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase)
1105PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO)
1106PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom))
1107PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase)
1108PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader)
1109# try_last for giving a chance to more specialized async interables like
1110# multipart.BodyPartReaderPayload override the default
1111PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)