Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_response.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import datetime
3import enum
4import json
5import math
6import time
7import warnings
8from collections.abc import Iterator, MutableMapping
9from concurrent.futures import Executor
10from http import HTTPStatus
11from typing import TYPE_CHECKING, Any, Optional, TypeVar, Union, cast, overload
13from multidict import CIMultiDict, istr
15from . import hdrs, payload
16from .abc import AbstractStreamWriter
17from .compression_utils import MAX_SYNC_CHUNK_SIZE, ZLibCompressor
18from .helpers import (
19 ETAG_ANY,
20 QUOTED_ETAG_RE,
21 CookieMixin,
22 ETag,
23 HeadersMixin,
24 ResponseKey,
25 must_be_empty_body,
26 parse_http_date,
27 populate_with_cookies,
28 rfc822_formatted_time,
29 sentinel,
30 should_remove_content_length,
31 validate_etag_value,
32)
33from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11
34from .payload import Payload
35from .typedefs import JSONBytesEncoder, JSONEncoder, LooseHeaders
37REASON_PHRASES = {http_status.value: http_status.phrase for http_status in HTTPStatus}
39__all__ = (
40 "ContentCoding",
41 "StreamResponse",
42 "Response",
43 "json_response",
44 "json_bytes_response",
45)
48if TYPE_CHECKING:
49 from .web_request import BaseRequest
52_T = TypeVar("_T")
55# TODO(py311): Convert to StrEnum for wider use
56class ContentCoding(enum.Enum):
57 # The content codings that we have support for.
58 #
59 # Additional registered codings are listed at:
60 # https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding
61 deflate = "deflate"
62 gzip = "gzip"
63 identity = "identity"
66CONTENT_CODINGS = {coding.value: coding for coding in ContentCoding}
68############################################################
69# HTTP Response classes
70############################################################
73class StreamResponse(
74 MutableMapping[str | ResponseKey[Any], Any], HeadersMixin, CookieMixin
75):
77 _body: None | bytes | bytearray | Payload
78 _length_check = True
79 _body = None
80 _keep_alive: bool | None = None
81 _chunked: bool = False
82 _compression: bool = False
83 _compression_strategy: int | None = None
84 _compression_force: ContentCoding | None = None
85 _req: Optional["BaseRequest"] = None
86 _payload_writer: AbstractStreamWriter | None = None
87 _eof_sent: bool = False
88 _must_be_empty_body: bool | None = None
89 _body_length = 0
90 _send_headers_immediately = True
92 def __init__(
93 self,
94 *,
95 status: int = 200,
96 reason: str | None = None,
97 headers: LooseHeaders | None = None,
98 _real_headers: CIMultiDict[str] | None = None,
99 ) -> None:
100 """Initialize a new stream response object.
102 _real_headers is an internal parameter used to pass a pre-populated
103 headers object. It is used by the `Response` class to avoid copying
104 the headers when creating a new response object. It is not intended
105 to be used by external code.
106 """
107 self._state: dict[str | ResponseKey[Any], Any] = {}
109 if _real_headers is not None:
110 self._headers = _real_headers
111 elif headers is not None:
112 self._headers: CIMultiDict[str] = CIMultiDict(headers)
113 else:
114 self._headers = CIMultiDict()
116 self._set_status(status, reason)
118 @property
119 def prepared(self) -> bool:
120 return self._eof_sent or self._payload_writer is not None
122 @property
123 def task(self) -> "asyncio.Task[None] | None":
124 if self._req:
125 return self._req.task
126 else:
127 return None
129 @property
130 def status(self) -> int:
131 return self._status
133 @property
134 def chunked(self) -> bool:
135 return self._chunked
137 @property
138 def compression(self) -> bool:
139 return self._compression
141 @property
142 def reason(self) -> str:
143 return self._reason
145 def set_status(
146 self,
147 status: int,
148 reason: str | None = None,
149 ) -> None:
150 assert (
151 not self.prepared
152 ), "Cannot change the response status code after the headers have been sent"
153 self._set_status(status, reason)
155 def _set_status(self, status: int, reason: str | None) -> None:
156 self._status = status
157 if reason is None:
158 reason = REASON_PHRASES.get(self._status, "")
159 elif "\r" in reason or "\n" in reason:
160 raise ValueError("Reason cannot contain \\r or \\n")
161 self._reason = reason
163 @property
164 def keep_alive(self) -> bool | None:
165 return self._keep_alive
167 def force_close(self) -> None:
168 self._keep_alive = False
170 @property
171 def body_length(self) -> int:
172 return self._body_length
174 def enable_chunked_encoding(self) -> None:
175 """Enables automatic chunked transfer encoding."""
176 if hdrs.CONTENT_LENGTH in self._headers:
177 raise RuntimeError(
178 "You can't enable chunked encoding when a content length is set"
179 )
180 self._chunked = True
182 def enable_compression(
183 self,
184 force: ContentCoding | None = None,
185 strategy: int | None = None,
186 ) -> None:
187 """Enables response compression encoding."""
188 # Don't enable compression if content is already encoded.
189 # This prevents double compression and provides a safe, predictable behavior
190 # without breaking existing code that may call enable_compression() on
191 # responses that already have Content-Encoding set (e.g., FileResponse
192 # serving pre-compressed files).
193 if hdrs.CONTENT_ENCODING in self._headers:
194 return
195 self._compression = True
196 self._compression_force = force
197 self._compression_strategy = strategy
199 @property
200 def headers(self) -> "CIMultiDict[str]":
201 return self._headers
203 @property
204 def content_length(self) -> int | None:
205 # Just a placeholder for adding setter
206 return super().content_length
208 @content_length.setter
209 def content_length(self, value: int | None) -> None:
210 if value is not None:
211 value = int(value)
212 if self._chunked:
213 raise RuntimeError(
214 "You can't set content length when chunked encoding is enable"
215 )
216 self._headers[hdrs.CONTENT_LENGTH] = str(value)
217 else:
218 self._headers.pop(hdrs.CONTENT_LENGTH, None)
220 @property
221 def content_type(self) -> str:
222 # Just a placeholder for adding setter
223 return super().content_type
225 @content_type.setter
226 def content_type(self, value: str) -> None:
227 self.content_type # read header values if needed
228 self._content_type = str(value)
229 self._generate_content_type_header()
231 @property
232 def charset(self) -> str | None:
233 # Just a placeholder for adding setter
234 return super().charset
236 @charset.setter
237 def charset(self, value: str | None) -> None:
238 ctype = self.content_type # read header values if needed
239 if ctype == "application/octet-stream":
240 raise RuntimeError(
241 "Setting charset for application/octet-stream "
242 "doesn't make sense, setup content_type first"
243 )
244 assert self._content_dict is not None
245 if value is None:
246 self._content_dict.pop("charset", None)
247 else:
248 self._content_dict["charset"] = str(value).lower()
249 self._generate_content_type_header()
251 @property
252 def last_modified(self) -> datetime.datetime | None:
253 """The value of Last-Modified HTTP header, or None.
255 This header is represented as a `datetime` object.
256 """
257 return parse_http_date(self._headers.get(hdrs.LAST_MODIFIED))
259 @last_modified.setter
260 def last_modified(
261 self, value: int | float | datetime.datetime | str | None
262 ) -> None:
263 if value is None:
264 self._headers.pop(hdrs.LAST_MODIFIED, None)
265 elif isinstance(value, (int, float)):
266 self._headers[hdrs.LAST_MODIFIED] = time.strftime(
267 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(math.ceil(value))
268 )
269 elif isinstance(value, datetime.datetime):
270 self._headers[hdrs.LAST_MODIFIED] = time.strftime(
271 "%a, %d %b %Y %H:%M:%S GMT", value.utctimetuple()
272 )
273 elif isinstance(value, str):
274 self._headers[hdrs.LAST_MODIFIED] = value
275 else:
276 msg = f"Unsupported type for last_modified: {type(value).__name__}" # type: ignore[unreachable]
277 raise TypeError(msg)
279 @property
280 def etag(self) -> ETag | None:
281 quoted_value = self._headers.get(hdrs.ETAG)
282 if not quoted_value:
283 return None
284 elif quoted_value == ETAG_ANY:
285 return ETag(value=ETAG_ANY)
286 match = QUOTED_ETAG_RE.fullmatch(quoted_value)
287 if not match:
288 return None
289 is_weak, value = match.group(1, 2)
290 return ETag(
291 is_weak=bool(is_weak),
292 value=value,
293 )
295 @etag.setter
296 def etag(self, value: ETag | str | None) -> None:
297 if value is None:
298 self._headers.pop(hdrs.ETAG, None)
299 elif (isinstance(value, str) and value == ETAG_ANY) or (
300 isinstance(value, ETag) and value.value == ETAG_ANY
301 ):
302 self._headers[hdrs.ETAG] = ETAG_ANY
303 elif isinstance(value, str):
304 validate_etag_value(value)
305 self._headers[hdrs.ETAG] = f'"{value}"'
306 elif isinstance(value, ETag) and isinstance(value.value, str): # type: ignore[redundant-expr]
307 validate_etag_value(value.value)
308 hdr_value = f'W/"{value.value}"' if value.is_weak else f'"{value.value}"'
309 self._headers[hdrs.ETAG] = hdr_value
310 else:
311 raise ValueError(
312 f"Unsupported etag type: {type(value)}. "
313 f"etag must be str, ETag or None"
314 )
316 def _generate_content_type_header(
317 self, CONTENT_TYPE: istr = hdrs.CONTENT_TYPE
318 ) -> None:
319 assert self._content_dict is not None
320 assert self._content_type is not None
321 params = "; ".join(f"{k}={v}" for k, v in self._content_dict.items())
322 if params:
323 ctype = self._content_type + "; " + params
324 else:
325 ctype = self._content_type
326 self._headers[CONTENT_TYPE] = ctype
328 async def _do_start_compression(self, coding: ContentCoding) -> None:
329 if coding is ContentCoding.identity:
330 return
331 assert self._payload_writer is not None
332 self._headers[hdrs.CONTENT_ENCODING] = coding.value
333 self._payload_writer.enable_compression(
334 coding.value, self._compression_strategy
335 )
336 # Compressed payload may have different content length,
337 # remove the header
338 self._headers.popall(hdrs.CONTENT_LENGTH, None)
340 async def _start_compression(self, request: "BaseRequest") -> None:
341 if self._compression_force:
342 await self._do_start_compression(self._compression_force)
343 return
344 # Encoding comparisons should be case-insensitive
345 # https://www.rfc-editor.org/rfc/rfc9110#section-8.4.1
346 accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower()
347 for value, coding in CONTENT_CODINGS.items():
348 if value in accept_encoding:
349 await self._do_start_compression(coding)
350 return
352 async def prepare(self, request: "BaseRequest") -> AbstractStreamWriter | None:
353 if self._eof_sent:
354 return None
355 if self._payload_writer is not None:
356 return self._payload_writer
357 self._must_be_empty_body = must_be_empty_body(request.method, self.status)
358 return await self._start(request)
360 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
361 self._req = request
362 writer = self._payload_writer = request._payload_writer
364 await self._prepare_headers()
365 await request._prepare_hook(self)
366 await self._write_headers()
368 return writer
370 async def _prepare_headers(self) -> None:
371 request = self._req
372 assert request is not None
373 writer = self._payload_writer
374 assert writer is not None
375 keep_alive = self._keep_alive
376 if keep_alive is None:
377 keep_alive = request.keep_alive
378 self._keep_alive = keep_alive
380 version = request.version
382 headers = self._headers
383 if self._cookies:
384 populate_with_cookies(headers, self._cookies)
386 if self._compression:
387 await self._start_compression(request)
389 if self._chunked:
390 if version != HttpVersion11:
391 raise RuntimeError(
392 "Using chunked encoding is forbidden "
393 f"for HTTP/{request.version.major}.{request.version.minor}"
394 )
395 if not self._must_be_empty_body:
396 writer.enable_chunking()
397 headers[hdrs.TRANSFER_ENCODING] = "chunked"
398 elif self._length_check: # Disabled for WebSockets
399 writer.length = self.content_length
400 if writer.length is None:
401 if version >= HttpVersion11:
402 if not self._must_be_empty_body:
403 writer.enable_chunking()
404 headers[hdrs.TRANSFER_ENCODING] = "chunked"
405 elif not self._must_be_empty_body:
406 keep_alive = False
408 # HTTP 1.1: https://tools.ietf.org/html/rfc7230#section-3.3.2
409 # HTTP 1.0: https://tools.ietf.org/html/rfc1945#section-10.4
410 if self._must_be_empty_body:
411 if hdrs.CONTENT_LENGTH in headers and should_remove_content_length(
412 request.method, self.status
413 ):
414 del headers[hdrs.CONTENT_LENGTH]
415 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-10
416 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-13
417 if hdrs.TRANSFER_ENCODING in headers:
418 del headers[hdrs.TRANSFER_ENCODING]
419 elif (writer.length if self._length_check else self.content_length) != 0:
420 # https://www.rfc-editor.org/rfc/rfc9110#section-8.3-5
421 headers.setdefault(hdrs.CONTENT_TYPE, "application/octet-stream")
422 headers.setdefault(hdrs.DATE, rfc822_formatted_time())
423 headers.setdefault(hdrs.SERVER, SERVER_SOFTWARE)
425 # connection header
426 if hdrs.CONNECTION not in headers:
427 if keep_alive:
428 if version == HttpVersion10:
429 headers[hdrs.CONNECTION] = "keep-alive"
430 elif version == HttpVersion11:
431 headers[hdrs.CONNECTION] = "close"
433 async def _write_headers(self) -> None:
434 request = self._req
435 assert request is not None
436 writer = self._payload_writer
437 assert writer is not None
438 # status line
439 version = request.version
440 status_line = f"HTTP/{version[0]}.{version[1]} {self._status} {self._reason}"
441 await writer.write_headers(status_line, self._headers)
443 # Send headers immediately if not opted into buffering
444 if self._send_headers_immediately:
445 writer.send_headers()
447 async def write(
448 self, data: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
449 ) -> None:
450 assert isinstance(
451 data, (bytes, bytearray, memoryview)
452 ), "data argument must be byte-ish (%r)" % type(data)
454 if self._eof_sent:
455 raise RuntimeError("Cannot call write() after write_eof()")
456 if self._payload_writer is None:
457 raise RuntimeError("Cannot call write() before prepare()")
459 await self._payload_writer.write(data)
461 async def drain(self) -> None:
462 assert not self._eof_sent, "EOF has already been sent"
463 assert self._payload_writer is not None, "Response has not been started"
464 warnings.warn(
465 "drain method is deprecated, use await resp.write()",
466 DeprecationWarning,
467 stacklevel=2,
468 )
469 await self._payload_writer.drain()
471 async def write_eof(self, data: bytes = b"") -> None:
472 assert isinstance(
473 data, (bytes, bytearray, memoryview)
474 ), "data argument must be byte-ish (%r)" % type(data)
476 if self._eof_sent:
477 return
479 assert self._payload_writer is not None, "Response has not been started"
481 await self._payload_writer.write_eof(data)
482 self._eof_sent = True
483 self._req = None
484 self._body_length = self._payload_writer.output_size
485 self._payload_writer = None
487 def __repr__(self) -> str:
488 if self._eof_sent:
489 info = "eof"
490 elif self.prepared:
491 assert self._req is not None
492 info = f"{self._req.method} {self._req.path} "
493 else:
494 info = "not prepared"
495 return f"<{self.__class__.__name__} {self.reason} {info}>"
497 @overload # type: ignore[override]
498 def __getitem__(self, key: ResponseKey[_T]) -> _T: ...
500 @overload
501 def __getitem__(self, key: str) -> Any: ...
503 def __getitem__(self, key: str | ResponseKey[_T]) -> Any:
504 return self._state[key]
506 @overload # type: ignore[override]
507 def __setitem__(self, key: ResponseKey[_T], value: _T) -> None: ...
509 @overload
510 def __setitem__(self, key: str, value: Any) -> None: ...
512 def __setitem__(self, key: str | ResponseKey[_T], value: Any) -> None:
513 self._state[key] = value
515 def __delitem__(self, key: str | ResponseKey[_T]) -> None:
516 del self._state[key]
518 def __len__(self) -> int:
519 return len(self._state)
521 def __iter__(self) -> Iterator[str | ResponseKey[Any]]:
522 return iter(self._state)
524 def __hash__(self) -> int:
525 return hash(id(self))
527 def __eq__(self, other: object) -> bool:
528 return self is other
530 def __bool__(self) -> bool:
531 return True
534class Response(StreamResponse):
536 _compressed_body: bytes | None = None
537 _send_headers_immediately = False
539 def __init__(
540 self,
541 *,
542 body: Any = None,
543 status: int = 200,
544 reason: str | None = None,
545 text: str | None = None,
546 headers: LooseHeaders | None = None,
547 content_type: str | None = None,
548 charset: str | None = None,
549 zlib_executor_size: int = MAX_SYNC_CHUNK_SIZE,
550 zlib_executor: Executor | None = None,
551 ) -> None:
552 if body is not None and text is not None:
553 raise ValueError("body and text are not allowed together")
555 if headers is None:
556 real_headers: CIMultiDict[str] = CIMultiDict()
557 else:
558 real_headers = CIMultiDict(headers)
560 if content_type is not None and "charset" in content_type:
561 raise ValueError("charset must not be in content_type argument")
563 if text is not None:
564 if hdrs.CONTENT_TYPE in real_headers:
565 if content_type or charset:
566 raise ValueError(
567 "passing both Content-Type header and "
568 "content_type or charset params "
569 "is forbidden"
570 )
571 else:
572 # fast path for filling headers
573 if not isinstance(text, str):
574 raise TypeError("text argument must be str (%r)" % type(text))
575 if content_type is None:
576 content_type = "text/plain"
577 if charset is None:
578 charset = "utf-8"
579 real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset
580 body = text.encode(charset)
581 text = None
582 elif hdrs.CONTENT_TYPE in real_headers:
583 if content_type is not None or charset is not None:
584 raise ValueError(
585 "passing both Content-Type header and "
586 "content_type or charset params "
587 "is forbidden"
588 )
589 elif content_type is not None:
590 if charset is not None:
591 content_type += "; charset=" + charset
592 real_headers[hdrs.CONTENT_TYPE] = content_type
594 super().__init__(status=status, reason=reason, _real_headers=real_headers)
596 if text is not None:
597 self.text = text
598 else:
599 self.body = body
601 self._zlib_executor_size = zlib_executor_size
602 self._zlib_executor = zlib_executor
604 @property
605 def body(self) -> bytes | bytearray | Payload | None:
606 return self._body
608 @body.setter
609 def body(self, body: Any) -> None:
610 if body is None:
611 self._body = None
612 elif isinstance(body, (bytes, bytearray)):
613 self._body = body
614 else:
615 try:
616 self._body = body = payload.PAYLOAD_REGISTRY.get(body)
617 except payload.LookupError:
618 raise ValueError("Unsupported body type %r" % type(body))
620 headers = self._headers
622 # set content-type
623 if hdrs.CONTENT_TYPE not in headers:
624 headers[hdrs.CONTENT_TYPE] = body.content_type
626 # copy payload headers
627 if body.headers:
628 for key, value in body.headers.items():
629 if key not in headers:
630 headers[key] = value
632 self._compressed_body = None
634 @property
635 def text(self) -> str | None:
636 if self._body is None:
637 return None
638 # Note: When _body is a Payload (e.g. FilePayload), this may do blocking I/O
639 # This is generally safe as most common payloads (BytesPayload, StringPayload)
640 # don't do blocking I/O, but be careful with file-based payloads
641 return self._body.decode(self.charset or "utf-8")
643 @text.setter
644 def text(self, text: str) -> None:
645 assert isinstance(text, str), "text argument must be str (%r)" % type(text)
647 if self.content_type == "application/octet-stream":
648 self.content_type = "text/plain"
649 if self.charset is None:
650 self.charset = "utf-8"
652 self._body = text.encode(self.charset)
653 self._compressed_body = None
655 @property
656 def content_length(self) -> int | None:
657 if self._chunked:
658 return None
660 if hdrs.CONTENT_LENGTH in self._headers:
661 return int(self._headers[hdrs.CONTENT_LENGTH])
663 if self._compressed_body is not None:
664 # Return length of the compressed body
665 return len(self._compressed_body)
666 elif isinstance(self._body, Payload):
667 # A payload without content length, or a compressed payload
668 return None
669 elif self._body is not None:
670 return len(self._body)
671 else:
672 return 0
674 @content_length.setter
675 def content_length(self, value: int | None) -> None:
676 raise RuntimeError("Content length is set automatically")
678 async def write_eof(self, data: bytes = b"") -> None:
679 if self._eof_sent:
680 return
681 if self._compressed_body is None:
682 body = self._body
683 else:
684 body = self._compressed_body
685 assert not data, f"data arg is not supported, got {data!r}"
686 assert self._req is not None
687 assert self._payload_writer is not None
688 if body is None or self._must_be_empty_body:
689 await super().write_eof()
690 elif isinstance(self._body, Payload):
691 try:
692 await self._body.write(self._payload_writer)
693 finally:
694 await self._body.close()
695 await super().write_eof()
696 else:
697 await super().write_eof(cast(bytes, body))
699 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
700 if hdrs.CONTENT_LENGTH in self._headers:
701 if should_remove_content_length(request.method, self.status):
702 del self._headers[hdrs.CONTENT_LENGTH]
703 elif not self._chunked:
704 if isinstance(self._body, Payload):
705 if (size := self._body.size) is not None:
706 self._headers[hdrs.CONTENT_LENGTH] = str(size)
707 else:
708 body_len = len(self._body) if self._body else "0"
709 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-7
710 if body_len != "0" or (
711 self.status != 304 and request.method not in hdrs.METH_HEAD_ALL
712 ):
713 self._headers[hdrs.CONTENT_LENGTH] = str(body_len)
715 return await super()._start(request)
717 async def _do_start_compression(self, coding: ContentCoding) -> None:
718 if self._chunked or isinstance(self._body, Payload):
719 return await super()._do_start_compression(coding)
720 if coding is ContentCoding.identity:
721 return
722 # Instead of using _payload_writer.enable_compression,
723 # compress the whole body
724 compressor = ZLibCompressor(
725 encoding=coding.value,
726 max_sync_chunk_size=self._zlib_executor_size,
727 executor=self._zlib_executor,
728 )
729 assert self._body is not None
730 self._compressed_body = (
731 await compressor.compress(self._body) + compressor.flush()
732 )
733 self._headers[hdrs.CONTENT_ENCODING] = coding.value
734 self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body))
737def json_response(
738 data: Any = sentinel,
739 *,
740 text: str | None = None,
741 body: bytes | None = None,
742 status: int = 200,
743 reason: str | None = None,
744 headers: LooseHeaders | None = None,
745 content_type: str = "application/json",
746 dumps: JSONEncoder = json.dumps,
747) -> Response:
748 if data is not sentinel:
749 if text or body:
750 raise ValueError("only one of data, text, or body should be specified")
751 else:
752 text = dumps(data)
753 return Response(
754 text=text,
755 body=body,
756 status=status,
757 reason=reason,
758 headers=headers,
759 content_type=content_type,
760 )
763def json_bytes_response(
764 data: Any = sentinel,
765 *,
766 dumps: JSONBytesEncoder,
767 body: bytes | None = None,
768 status: int = 200,
769 reason: str | None = None,
770 headers: LooseHeaders | None = None,
771 content_type: str = "application/json",
772) -> Response:
773 """Create a JSON response using a bytes-returning encoder.
775 Use this when your JSON encoder (like orjson) returns bytes
776 instead of str, avoiding the encode/decode overhead.
777 """
778 if data is not sentinel:
779 if body is not None:
780 raise ValueError("only one of data or body should be specified")
781 else:
782 body = dumps(data)
783 return Response(
784 body=body,
785 status=status,
786 reason=reason,
787 headers=headers,
788 content_type=content_type,
789 )