Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_response.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import datetime
3import enum
4import json
5import math
6import time
7import warnings
8from collections.abc import Iterator, MutableMapping
9from concurrent.futures import Executor
10from http import HTTPStatus
11from typing import TYPE_CHECKING, Any, Optional, TypeVar, Union, cast, overload
13from multidict import CIMultiDict, istr
15from . import hdrs, payload
16from .abc import AbstractStreamWriter
17from .compression_utils import ZLibCompressor
18from .helpers import (
19 ETAG_ANY,
20 QUOTED_ETAG_RE,
21 CookieMixin,
22 ETag,
23 HeadersMixin,
24 ResponseKey,
25 must_be_empty_body,
26 parse_http_date,
27 populate_with_cookies,
28 rfc822_formatted_time,
29 sentinel,
30 should_remove_content_length,
31 validate_etag_value,
32)
33from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11
34from .payload import Payload
35from .typedefs import JSONBytesEncoder, JSONEncoder, LooseHeaders
37REASON_PHRASES = {http_status.value: http_status.phrase for http_status in HTTPStatus}
38LARGE_BODY_SIZE = 1024**2
40__all__ = (
41 "ContentCoding",
42 "StreamResponse",
43 "Response",
44 "json_response",
45 "json_bytes_response",
46)
49if TYPE_CHECKING:
50 from .web_request import BaseRequest
53_T = TypeVar("_T")
56# TODO(py311): Convert to StrEnum for wider use
57class ContentCoding(enum.Enum):
58 # The content codings that we have support for.
59 #
60 # Additional registered codings are listed at:
61 # https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding
62 deflate = "deflate"
63 gzip = "gzip"
64 identity = "identity"
67CONTENT_CODINGS = {coding.value: coding for coding in ContentCoding}
69############################################################
70# HTTP Response classes
71############################################################
74class StreamResponse(
75 MutableMapping[str | ResponseKey[Any], Any], HeadersMixin, CookieMixin
76):
78 _body: None | bytes | bytearray | Payload
79 _length_check = True
80 _body = None
81 _keep_alive: bool | None = None
82 _chunked: bool = False
83 _compression: bool = False
84 _compression_strategy: int | None = None
85 _compression_force: ContentCoding | None = None
86 _req: Optional["BaseRequest"] = None
87 _payload_writer: AbstractStreamWriter | None = None
88 _eof_sent: bool = False
89 _must_be_empty_body: bool | None = None
90 _body_length = 0
91 _send_headers_immediately = True
93 def __init__(
94 self,
95 *,
96 status: int = 200,
97 reason: str | None = None,
98 headers: LooseHeaders | None = None,
99 _real_headers: CIMultiDict[str] | None = None,
100 ) -> None:
101 """Initialize a new stream response object.
103 _real_headers is an internal parameter used to pass a pre-populated
104 headers object. It is used by the `Response` class to avoid copying
105 the headers when creating a new response object. It is not intended
106 to be used by external code.
107 """
108 self._state: dict[str | ResponseKey[Any], Any] = {}
110 if _real_headers is not None:
111 self._headers = _real_headers
112 elif headers is not None:
113 self._headers: CIMultiDict[str] = CIMultiDict(headers)
114 else:
115 self._headers = CIMultiDict()
117 self._set_status(status, reason)
119 @property
120 def prepared(self) -> bool:
121 return self._eof_sent or self._payload_writer is not None
123 @property
124 def task(self) -> "asyncio.Task[None] | None":
125 if self._req:
126 return self._req.task
127 else:
128 return None
130 @property
131 def status(self) -> int:
132 return self._status
134 @property
135 def chunked(self) -> bool:
136 return self._chunked
138 @property
139 def compression(self) -> bool:
140 return self._compression
142 @property
143 def reason(self) -> str:
144 return self._reason
146 def set_status(
147 self,
148 status: int,
149 reason: str | None = None,
150 ) -> None:
151 assert (
152 not self.prepared
153 ), "Cannot change the response status code after the headers have been sent"
154 self._set_status(status, reason)
156 def _set_status(self, status: int, reason: str | None) -> None:
157 self._status = status
158 if reason is None:
159 reason = REASON_PHRASES.get(self._status, "")
160 elif "\r" in reason or "\n" in reason:
161 raise ValueError("Reason cannot contain \\r or \\n")
162 self._reason = reason
164 @property
165 def keep_alive(self) -> bool | None:
166 return self._keep_alive
168 def force_close(self) -> None:
169 self._keep_alive = False
171 @property
172 def body_length(self) -> int:
173 return self._body_length
175 def enable_chunked_encoding(self) -> None:
176 """Enables automatic chunked transfer encoding."""
177 if hdrs.CONTENT_LENGTH in self._headers:
178 raise RuntimeError(
179 "You can't enable chunked encoding when a content length is set"
180 )
181 self._chunked = True
183 def enable_compression(
184 self,
185 force: ContentCoding | None = None,
186 strategy: int | None = None,
187 ) -> None:
188 """Enables response compression encoding."""
189 # Don't enable compression if content is already encoded.
190 # This prevents double compression and provides a safe, predictable behavior
191 # without breaking existing code that may call enable_compression() on
192 # responses that already have Content-Encoding set (e.g., FileResponse
193 # serving pre-compressed files).
194 if hdrs.CONTENT_ENCODING in self._headers:
195 return
196 self._compression = True
197 self._compression_force = force
198 self._compression_strategy = strategy
200 @property
201 def headers(self) -> "CIMultiDict[str]":
202 return self._headers
204 @property
205 def content_length(self) -> int | None:
206 # Just a placeholder for adding setter
207 return super().content_length
209 @content_length.setter
210 def content_length(self, value: int | None) -> None:
211 if value is not None:
212 value = int(value)
213 if self._chunked:
214 raise RuntimeError(
215 "You can't set content length when chunked encoding is enable"
216 )
217 self._headers[hdrs.CONTENT_LENGTH] = str(value)
218 else:
219 self._headers.pop(hdrs.CONTENT_LENGTH, None)
221 @property
222 def content_type(self) -> str:
223 # Just a placeholder for adding setter
224 return super().content_type
226 @content_type.setter
227 def content_type(self, value: str) -> None:
228 self.content_type # read header values if needed
229 self._content_type = str(value)
230 self._generate_content_type_header()
232 @property
233 def charset(self) -> str | None:
234 # Just a placeholder for adding setter
235 return super().charset
237 @charset.setter
238 def charset(self, value: str | None) -> None:
239 ctype = self.content_type # read header values if needed
240 if ctype == "application/octet-stream":
241 raise RuntimeError(
242 "Setting charset for application/octet-stream "
243 "doesn't make sense, setup content_type first"
244 )
245 assert self._content_dict is not None
246 if value is None:
247 self._content_dict.pop("charset", None)
248 else:
249 self._content_dict["charset"] = str(value).lower()
250 self._generate_content_type_header()
252 @property
253 def last_modified(self) -> datetime.datetime | None:
254 """The value of Last-Modified HTTP header, or None.
256 This header is represented as a `datetime` object.
257 """
258 return parse_http_date(self._headers.get(hdrs.LAST_MODIFIED))
260 @last_modified.setter
261 def last_modified(
262 self, value: int | float | datetime.datetime | str | None
263 ) -> None:
264 if value is None:
265 self._headers.pop(hdrs.LAST_MODIFIED, None)
266 elif isinstance(value, (int, float)):
267 self._headers[hdrs.LAST_MODIFIED] = time.strftime(
268 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(math.ceil(value))
269 )
270 elif isinstance(value, datetime.datetime):
271 self._headers[hdrs.LAST_MODIFIED] = time.strftime(
272 "%a, %d %b %Y %H:%M:%S GMT", value.utctimetuple()
273 )
274 elif isinstance(value, str):
275 self._headers[hdrs.LAST_MODIFIED] = value
276 else:
277 msg = f"Unsupported type for last_modified: {type(value).__name__}" # type: ignore[unreachable]
278 raise TypeError(msg)
280 @property
281 def etag(self) -> ETag | None:
282 quoted_value = self._headers.get(hdrs.ETAG)
283 if not quoted_value:
284 return None
285 elif quoted_value == ETAG_ANY:
286 return ETag(value=ETAG_ANY)
287 match = QUOTED_ETAG_RE.fullmatch(quoted_value)
288 if not match:
289 return None
290 is_weak, value = match.group(1, 2)
291 return ETag(
292 is_weak=bool(is_weak),
293 value=value,
294 )
296 @etag.setter
297 def etag(self, value: ETag | str | None) -> None:
298 if value is None:
299 self._headers.pop(hdrs.ETAG, None)
300 elif (isinstance(value, str) and value == ETAG_ANY) or (
301 isinstance(value, ETag) and value.value == ETAG_ANY
302 ):
303 self._headers[hdrs.ETAG] = ETAG_ANY
304 elif isinstance(value, str):
305 validate_etag_value(value)
306 self._headers[hdrs.ETAG] = f'"{value}"'
307 elif isinstance(value, ETag) and isinstance(value.value, str): # type: ignore[redundant-expr]
308 validate_etag_value(value.value)
309 hdr_value = f'W/"{value.value}"' if value.is_weak else f'"{value.value}"'
310 self._headers[hdrs.ETAG] = hdr_value
311 else:
312 raise ValueError(
313 f"Unsupported etag type: {type(value)}. "
314 f"etag must be str, ETag or None"
315 )
317 def _generate_content_type_header(
318 self, CONTENT_TYPE: istr = hdrs.CONTENT_TYPE
319 ) -> None:
320 assert self._content_dict is not None
321 assert self._content_type is not None
322 params = "; ".join(f"{k}={v}" for k, v in self._content_dict.items())
323 if params:
324 ctype = self._content_type + "; " + params
325 else:
326 ctype = self._content_type
327 self._headers[CONTENT_TYPE] = ctype
329 async def _do_start_compression(self, coding: ContentCoding) -> None:
330 if coding is ContentCoding.identity:
331 return
332 assert self._payload_writer is not None
333 self._headers[hdrs.CONTENT_ENCODING] = coding.value
334 self._payload_writer.enable_compression(
335 coding.value, self._compression_strategy
336 )
337 # Compressed payload may have different content length,
338 # remove the header
339 self._headers.popall(hdrs.CONTENT_LENGTH, None)
341 async def _start_compression(self, request: "BaseRequest") -> None:
342 if self._compression_force:
343 await self._do_start_compression(self._compression_force)
344 return
345 # Encoding comparisons should be case-insensitive
346 # https://www.rfc-editor.org/rfc/rfc9110#section-8.4.1
347 accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower()
348 for value, coding in CONTENT_CODINGS.items():
349 if value in accept_encoding:
350 await self._do_start_compression(coding)
351 return
353 async def prepare(self, request: "BaseRequest") -> AbstractStreamWriter | None:
354 if self._eof_sent:
355 return None
356 if self._payload_writer is not None:
357 return self._payload_writer
358 self._must_be_empty_body = must_be_empty_body(request.method, self.status)
359 return await self._start(request)
361 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
362 self._req = request
363 writer = self._payload_writer = request._payload_writer
365 await self._prepare_headers()
366 await request._prepare_hook(self)
367 await self._write_headers()
369 return writer
371 async def _prepare_headers(self) -> None:
372 request = self._req
373 assert request is not None
374 writer = self._payload_writer
375 assert writer is not None
376 keep_alive = self._keep_alive
377 if keep_alive is None:
378 keep_alive = request.keep_alive
379 self._keep_alive = keep_alive
381 version = request.version
383 headers = self._headers
384 if self._cookies:
385 populate_with_cookies(headers, self._cookies)
387 if self._compression:
388 await self._start_compression(request)
390 if self._chunked:
391 if version != HttpVersion11:
392 raise RuntimeError(
393 "Using chunked encoding is forbidden "
394 f"for HTTP/{request.version.major}.{request.version.minor}"
395 )
396 if not self._must_be_empty_body:
397 writer.enable_chunking()
398 headers[hdrs.TRANSFER_ENCODING] = "chunked"
399 elif self._length_check: # Disabled for WebSockets
400 writer.length = self.content_length
401 if writer.length is None:
402 if version >= HttpVersion11:
403 if not self._must_be_empty_body:
404 writer.enable_chunking()
405 headers[hdrs.TRANSFER_ENCODING] = "chunked"
406 elif not self._must_be_empty_body:
407 keep_alive = False
409 # HTTP 1.1: https://tools.ietf.org/html/rfc7230#section-3.3.2
410 # HTTP 1.0: https://tools.ietf.org/html/rfc1945#section-10.4
411 if self._must_be_empty_body:
412 if hdrs.CONTENT_LENGTH in headers and should_remove_content_length(
413 request.method, self.status
414 ):
415 del headers[hdrs.CONTENT_LENGTH]
416 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-10
417 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-13
418 if hdrs.TRANSFER_ENCODING in headers:
419 del headers[hdrs.TRANSFER_ENCODING]
420 elif (writer.length if self._length_check else self.content_length) != 0:
421 # https://www.rfc-editor.org/rfc/rfc9110#section-8.3-5
422 headers.setdefault(hdrs.CONTENT_TYPE, "application/octet-stream")
423 headers.setdefault(hdrs.DATE, rfc822_formatted_time())
424 headers.setdefault(hdrs.SERVER, SERVER_SOFTWARE)
426 # connection header
427 if hdrs.CONNECTION not in headers:
428 if keep_alive:
429 if version == HttpVersion10:
430 headers[hdrs.CONNECTION] = "keep-alive"
431 elif version == HttpVersion11:
432 headers[hdrs.CONNECTION] = "close"
434 async def _write_headers(self) -> None:
435 request = self._req
436 assert request is not None
437 writer = self._payload_writer
438 assert writer is not None
439 # status line
440 version = request.version
441 status_line = f"HTTP/{version[0]}.{version[1]} {self._status} {self._reason}"
442 await writer.write_headers(status_line, self._headers)
444 # Send headers immediately if not opted into buffering
445 if self._send_headers_immediately:
446 writer.send_headers()
448 async def write(
449 self, data: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
450 ) -> None:
451 assert isinstance(
452 data, (bytes, bytearray, memoryview)
453 ), "data argument must be byte-ish (%r)" % type(data)
455 if self._eof_sent:
456 raise RuntimeError("Cannot call write() after write_eof()")
457 if self._payload_writer is None:
458 raise RuntimeError("Cannot call write() before prepare()")
460 await self._payload_writer.write(data)
462 async def drain(self) -> None:
463 assert not self._eof_sent, "EOF has already been sent"
464 assert self._payload_writer is not None, "Response has not been started"
465 warnings.warn(
466 "drain method is deprecated, use await resp.write()",
467 DeprecationWarning,
468 stacklevel=2,
469 )
470 await self._payload_writer.drain()
472 async def write_eof(self, data: bytes = b"") -> None:
473 assert isinstance(
474 data, (bytes, bytearray, memoryview)
475 ), "data argument must be byte-ish (%r)" % type(data)
477 if self._eof_sent:
478 return
480 assert self._payload_writer is not None, "Response has not been started"
482 await self._payload_writer.write_eof(data)
483 self._eof_sent = True
484 self._req = None
485 self._body_length = self._payload_writer.output_size
486 self._payload_writer = None
488 def __repr__(self) -> str:
489 if self._eof_sent:
490 info = "eof"
491 elif self.prepared:
492 assert self._req is not None
493 info = f"{self._req.method} {self._req.path} "
494 else:
495 info = "not prepared"
496 return f"<{self.__class__.__name__} {self.reason} {info}>"
498 @overload # type: ignore[override]
499 def __getitem__(self, key: ResponseKey[_T]) -> _T: ...
501 @overload
502 def __getitem__(self, key: str) -> Any: ...
504 def __getitem__(self, key: str | ResponseKey[_T]) -> Any:
505 return self._state[key]
507 @overload # type: ignore[override]
508 def __setitem__(self, key: ResponseKey[_T], value: _T) -> None: ...
510 @overload
511 def __setitem__(self, key: str, value: Any) -> None: ...
513 def __setitem__(self, key: str | ResponseKey[_T], value: Any) -> None:
514 self._state[key] = value
516 def __delitem__(self, key: str | ResponseKey[_T]) -> None:
517 del self._state[key]
519 def __len__(self) -> int:
520 return len(self._state)
522 def __iter__(self) -> Iterator[str | ResponseKey[Any]]:
523 return iter(self._state)
525 def __hash__(self) -> int:
526 return hash(id(self))
528 def __eq__(self, other: object) -> bool:
529 return self is other
531 def __bool__(self) -> bool:
532 return True
535class Response(StreamResponse):
537 _compressed_body: bytes | None = None
538 _send_headers_immediately = False
540 def __init__(
541 self,
542 *,
543 body: Any = None,
544 status: int = 200,
545 reason: str | None = None,
546 text: str | None = None,
547 headers: LooseHeaders | None = None,
548 content_type: str | None = None,
549 charset: str | None = None,
550 zlib_executor_size: int | None = None,
551 zlib_executor: Executor | None = None,
552 ) -> None:
553 if body is not None and text is not None:
554 raise ValueError("body and text are not allowed together")
556 if headers is None:
557 real_headers: CIMultiDict[str] = CIMultiDict()
558 else:
559 real_headers = CIMultiDict(headers)
561 if content_type is not None and "charset" in content_type:
562 raise ValueError("charset must not be in content_type argument")
564 if text is not None:
565 if hdrs.CONTENT_TYPE in real_headers:
566 if content_type or charset:
567 raise ValueError(
568 "passing both Content-Type header and "
569 "content_type or charset params "
570 "is forbidden"
571 )
572 else:
573 # fast path for filling headers
574 if not isinstance(text, str):
575 raise TypeError("text argument must be str (%r)" % type(text))
576 if content_type is None:
577 content_type = "text/plain"
578 if charset is None:
579 charset = "utf-8"
580 real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset
581 body = text.encode(charset)
582 text = None
583 elif hdrs.CONTENT_TYPE in real_headers:
584 if content_type is not None or charset is not None:
585 raise ValueError(
586 "passing both Content-Type header and "
587 "content_type or charset params "
588 "is forbidden"
589 )
590 elif content_type is not None:
591 if charset is not None:
592 content_type += "; charset=" + charset
593 real_headers[hdrs.CONTENT_TYPE] = content_type
595 super().__init__(status=status, reason=reason, _real_headers=real_headers)
597 if text is not None:
598 self.text = text
599 else:
600 self.body = body
602 self._zlib_executor_size = zlib_executor_size
603 self._zlib_executor = zlib_executor
605 @property
606 def body(self) -> bytes | bytearray | Payload | None:
607 return self._body
609 @body.setter
610 def body(self, body: Any) -> None:
611 if body is None:
612 self._body = None
613 elif isinstance(body, (bytes, bytearray)):
614 self._body = body
615 else:
616 try:
617 self._body = body = payload.PAYLOAD_REGISTRY.get(body)
618 except payload.LookupError:
619 raise ValueError("Unsupported body type %r" % type(body))
621 headers = self._headers
623 # set content-type
624 if hdrs.CONTENT_TYPE not in headers:
625 headers[hdrs.CONTENT_TYPE] = body.content_type
627 # copy payload headers
628 if body.headers:
629 for key, value in body.headers.items():
630 if key not in headers:
631 headers[key] = value
633 self._compressed_body = None
635 @property
636 def text(self) -> str | None:
637 if self._body is None:
638 return None
639 # Note: When _body is a Payload (e.g. FilePayload), this may do blocking I/O
640 # This is generally safe as most common payloads (BytesPayload, StringPayload)
641 # don't do blocking I/O, but be careful with file-based payloads
642 return self._body.decode(self.charset or "utf-8")
644 @text.setter
645 def text(self, text: str) -> None:
646 assert isinstance(text, str), "text argument must be str (%r)" % type(text)
648 if self.content_type == "application/octet-stream":
649 self.content_type = "text/plain"
650 if self.charset is None:
651 self.charset = "utf-8"
653 self._body = text.encode(self.charset)
654 self._compressed_body = None
656 @property
657 def content_length(self) -> int | None:
658 if self._chunked:
659 return None
661 if hdrs.CONTENT_LENGTH in self._headers:
662 return int(self._headers[hdrs.CONTENT_LENGTH])
664 if self._compressed_body is not None:
665 # Return length of the compressed body
666 return len(self._compressed_body)
667 elif isinstance(self._body, Payload):
668 # A payload without content length, or a compressed payload
669 return None
670 elif self._body is not None:
671 return len(self._body)
672 else:
673 return 0
675 @content_length.setter
676 def content_length(self, value: int | None) -> None:
677 raise RuntimeError("Content length is set automatically")
679 async def write_eof(self, data: bytes = b"") -> None:
680 if self._eof_sent:
681 return
682 if self._compressed_body is None:
683 body = self._body
684 else:
685 body = self._compressed_body
686 assert not data, f"data arg is not supported, got {data!r}"
687 assert self._req is not None
688 assert self._payload_writer is not None
689 if body is None or self._must_be_empty_body:
690 await super().write_eof()
691 elif isinstance(self._body, Payload):
692 await self._body.write(self._payload_writer)
693 await self._body.close()
694 await super().write_eof()
695 else:
696 await super().write_eof(cast(bytes, body))
698 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
699 if hdrs.CONTENT_LENGTH in self._headers:
700 if should_remove_content_length(request.method, self.status):
701 del self._headers[hdrs.CONTENT_LENGTH]
702 elif not self._chunked:
703 if isinstance(self._body, Payload):
704 if (size := self._body.size) is not None:
705 self._headers[hdrs.CONTENT_LENGTH] = str(size)
706 else:
707 body_len = len(self._body) if self._body else "0"
708 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-7
709 if body_len != "0" or (
710 self.status != 304 and request.method not in hdrs.METH_HEAD_ALL
711 ):
712 self._headers[hdrs.CONTENT_LENGTH] = str(body_len)
714 return await super()._start(request)
716 async def _do_start_compression(self, coding: ContentCoding) -> None:
717 if self._chunked or isinstance(self._body, Payload):
718 return await super()._do_start_compression(coding)
719 if coding is ContentCoding.identity:
720 return
721 # Instead of using _payload_writer.enable_compression,
722 # compress the whole body
723 compressor = ZLibCompressor(
724 encoding=coding.value,
725 max_sync_chunk_size=self._zlib_executor_size,
726 executor=self._zlib_executor,
727 )
728 assert self._body is not None
729 if self._zlib_executor_size is None and len(self._body) > LARGE_BODY_SIZE:
730 warnings.warn(
731 "Synchronous compression of large response bodies "
732 f"({len(self._body)} bytes) might block the async event loop. "
733 "Consider providing a custom value to zlib_executor_size/"
734 "zlib_executor response properties or disabling compression on it."
735 )
736 self._compressed_body = (
737 await compressor.compress(self._body) + compressor.flush()
738 )
739 self._headers[hdrs.CONTENT_ENCODING] = coding.value
740 self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body))
743def json_response(
744 data: Any = sentinel,
745 *,
746 text: str | None = None,
747 body: bytes | None = None,
748 status: int = 200,
749 reason: str | None = None,
750 headers: LooseHeaders | None = None,
751 content_type: str = "application/json",
752 dumps: JSONEncoder = json.dumps,
753) -> Response:
754 if data is not sentinel:
755 if text or body:
756 raise ValueError("only one of data, text, or body should be specified")
757 else:
758 text = dumps(data)
759 return Response(
760 text=text,
761 body=body,
762 status=status,
763 reason=reason,
764 headers=headers,
765 content_type=content_type,
766 )
769def json_bytes_response(
770 data: Any = sentinel,
771 *,
772 dumps: JSONBytesEncoder,
773 body: bytes | None = None,
774 status: int = 200,
775 reason: str | None = None,
776 headers: LooseHeaders | None = None,
777 content_type: str = "application/json",
778) -> Response:
779 """Create a JSON response using a bytes-returning encoder.
781 Use this when your JSON encoder (like orjson) returns bytes
782 instead of str, avoiding the encode/decode overhead.
783 """
784 if data is not sentinel:
785 if body is not None:
786 raise ValueError("only one of data or body should be specified")
787 else:
788 body = dumps(data)
789 return Response(
790 body=body,
791 status=status,
792 reason=reason,
793 headers=headers,
794 content_type=content_type,
795 )