Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_response.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import datetime
3import enum
4import json
5import math
6import time
7import warnings
8from collections.abc import Iterator, MutableMapping
9from concurrent.futures import Executor
10from http import HTTPStatus
11from typing import TYPE_CHECKING, Any, Optional, TypeVar, Union, cast, overload
13from multidict import CIMultiDict, istr
15from . import hdrs, payload
16from .abc import AbstractStreamWriter
17from .compression_utils import MAX_SYNC_CHUNK_SIZE, ZLibCompressor
18from .helpers import (
19 ETAG_ANY,
20 QUOTED_ETAG_RE,
21 CookieMixin,
22 ETag,
23 HeadersMixin,
24 ResponseKey,
25 must_be_empty_body,
26 parse_http_date,
27 populate_with_cookies,
28 rfc822_formatted_time,
29 sentinel,
30 should_remove_content_length,
31 validate_etag_value,
32)
33from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11
34from .payload import Payload
35from .typedefs import JSONBytesEncoder, JSONEncoder, LooseHeaders
37REASON_PHRASES = {http_status.value: http_status.phrase for http_status in HTTPStatus}
39__all__ = (
40 "ContentCoding",
41 "StreamResponse",
42 "Response",
43 "json_response",
44 "json_bytes_response",
45)
48if TYPE_CHECKING:
49 from .web_request import BaseRequest
52_T = TypeVar("_T")
55# TODO(py311): Convert to StrEnum for wider use
56class ContentCoding(enum.Enum):
57 # The content codings that we have support for.
58 #
59 # Additional registered codings are listed at:
60 # https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding
61 deflate = "deflate"
62 gzip = "gzip"
63 identity = "identity"
66CONTENT_CODINGS = {coding.value: coding for coding in ContentCoding}
68############################################################
69# HTTP Response classes
70############################################################
73class StreamResponse(
74 MutableMapping[str | ResponseKey[Any], Any], HeadersMixin, CookieMixin
75):
77 _body: None | bytes | bytearray | Payload
78 _length_check = True
79 _body = None
80 _keep_alive: bool | None = None
81 _chunked: bool = False
82 _compression: bool = False
83 _compression_strategy: int | None = None
84 _compression_force: ContentCoding | None = None
85 _req: Optional["BaseRequest"] = None
86 _payload_writer: AbstractStreamWriter | None = None
87 _eof_sent: bool = False
88 _must_be_empty_body: bool | None = None
89 _body_length = 0
90 _send_headers_immediately = True
92 def __init__(
93 self,
94 *,
95 status: int = 200,
96 reason: str | None = None,
97 headers: LooseHeaders | None = None,
98 _real_headers: CIMultiDict[str] | None = None,
99 ) -> None:
100 """Initialize a new stream response object.
102 _real_headers is an internal parameter used to pass a pre-populated
103 headers object. It is used by the `Response` class to avoid copying
104 the headers when creating a new response object. It is not intended
105 to be used by external code.
106 """
107 self._state: dict[str | ResponseKey[Any], Any] = {}
109 if _real_headers is not None:
110 self._headers = _real_headers
111 elif headers is not None:
112 self._headers: CIMultiDict[str] = CIMultiDict(headers)
113 else:
114 self._headers = CIMultiDict()
116 self._set_status(status, reason)
118 @property
119 def prepared(self) -> bool:
120 return self._eof_sent or self._payload_writer is not None
122 @property
123 def task(self) -> "asyncio.Task[None] | None":
124 if self._req:
125 return self._req.task
126 else:
127 return None
129 @property
130 def status(self) -> int:
131 return self._status
133 @property
134 def chunked(self) -> bool:
135 return self._chunked
137 @property
138 def compression(self) -> bool:
139 return self._compression
141 @property
142 def reason(self) -> str:
143 return self._reason
145 def set_status(
146 self,
147 status: int,
148 reason: str | None = None,
149 ) -> None:
150 assert (
151 not self.prepared
152 ), "Cannot change the response status code after the headers have been sent"
153 self._set_status(status, reason)
155 def _set_status(self, status: int, reason: str | None) -> None:
156 self._status = status
157 if reason is None:
158 reason = REASON_PHRASES.get(self._status, "")
159 elif "\r" in reason or "\n" in reason:
160 raise ValueError("Reason cannot contain \\r or \\n")
161 self._reason = reason
163 @property
164 def keep_alive(self) -> bool | None:
165 return self._keep_alive
167 def force_close(self) -> None:
168 self._keep_alive = False
170 @property
171 def body_length(self) -> int:
172 return self._body_length
174 def enable_chunked_encoding(self) -> None:
175 """Enables automatic chunked transfer encoding."""
176 if hdrs.CONTENT_LENGTH in self._headers:
177 raise RuntimeError(
178 "You can't enable chunked encoding when a content length is set"
179 )
180 self._chunked = True
182 def enable_compression(
183 self,
184 force: ContentCoding | None = None,
185 strategy: int | None = None,
186 ) -> None:
187 """Enables response compression encoding."""
188 # Don't enable compression if content is already encoded.
189 # This prevents double compression and provides a safe, predictable behavior
190 # without breaking existing code that may call enable_compression() on
191 # responses that already have Content-Encoding set (e.g., FileResponse
192 # serving pre-compressed files).
193 if hdrs.CONTENT_ENCODING in self._headers:
194 return
195 self._compression = True
196 self._compression_force = force
197 self._compression_strategy = strategy
199 @property
200 def headers(self) -> "CIMultiDict[str]":
201 return self._headers
203 @property
204 def content_length(self) -> int | None:
205 # Just a placeholder for adding setter
206 return super().content_length
208 @content_length.setter
209 def content_length(self, value: int | None) -> None:
210 if value is not None:
211 value = int(value)
212 if self._chunked:
213 raise RuntimeError(
214 "You can't set content length when chunked encoding is enable"
215 )
216 self._headers[hdrs.CONTENT_LENGTH] = str(value)
217 else:
218 self._headers.pop(hdrs.CONTENT_LENGTH, None)
220 @property
221 def content_type(self) -> str:
222 # Just a placeholder for adding setter
223 return super().content_type
225 @content_type.setter
226 def content_type(self, value: str) -> None:
227 self.content_type # read header values if needed
228 self._content_type = str(value)
229 self._generate_content_type_header()
231 @property
232 def charset(self) -> str | None:
233 # Just a placeholder for adding setter
234 return super().charset
236 @charset.setter
237 def charset(self, value: str | None) -> None:
238 ctype = self.content_type # read header values if needed
239 if ctype == "application/octet-stream":
240 raise RuntimeError(
241 "Setting charset for application/octet-stream "
242 "doesn't make sense, setup content_type first"
243 )
244 assert self._content_dict is not None
245 if value is None:
246 self._content_dict.pop("charset", None)
247 else:
248 self._content_dict["charset"] = str(value).lower()
249 self._generate_content_type_header()
251 @property
252 def last_modified(self) -> datetime.datetime | None:
253 """The value of Last-Modified HTTP header, or None.
255 This header is represented as a `datetime` object.
256 """
257 return parse_http_date(self._headers.get(hdrs.LAST_MODIFIED))
259 @last_modified.setter
260 def last_modified(
261 self, value: int | float | datetime.datetime | str | None
262 ) -> None:
263 if value is None:
264 self._headers.pop(hdrs.LAST_MODIFIED, None)
265 elif isinstance(value, (int, float)):
266 self._headers[hdrs.LAST_MODIFIED] = time.strftime(
267 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(math.ceil(value))
268 )
269 elif isinstance(value, datetime.datetime):
270 self._headers[hdrs.LAST_MODIFIED] = time.strftime(
271 "%a, %d %b %Y %H:%M:%S GMT", value.utctimetuple()
272 )
273 elif isinstance(value, str):
274 self._headers[hdrs.LAST_MODIFIED] = value
275 else:
276 msg = f"Unsupported type for last_modified: {type(value).__name__}" # type: ignore[unreachable]
277 raise TypeError(msg)
279 @property
280 def etag(self) -> ETag | None:
281 quoted_value = self._headers.get(hdrs.ETAG)
282 if not quoted_value:
283 return None
284 elif quoted_value == ETAG_ANY:
285 return ETag(value=ETAG_ANY)
286 match = QUOTED_ETAG_RE.fullmatch(quoted_value)
287 if not match:
288 return None
289 is_weak, value = match.group(1, 2)
290 return ETag(
291 is_weak=bool(is_weak),
292 value=value,
293 )
295 @etag.setter
296 def etag(self, value: ETag | str | None) -> None:
297 if value is None:
298 self._headers.pop(hdrs.ETAG, None)
299 elif (isinstance(value, str) and value == ETAG_ANY) or (
300 isinstance(value, ETag) and value.value == ETAG_ANY
301 ):
302 self._headers[hdrs.ETAG] = ETAG_ANY
303 elif isinstance(value, str):
304 validate_etag_value(value)
305 self._headers[hdrs.ETAG] = f'"{value}"'
306 elif isinstance(value, ETag) and isinstance(value.value, str): # type: ignore[redundant-expr]
307 validate_etag_value(value.value)
308 hdr_value = f'W/"{value.value}"' if value.is_weak else f'"{value.value}"'
309 self._headers[hdrs.ETAG] = hdr_value
310 else:
311 raise ValueError(
312 f"Unsupported etag type: {type(value)}. "
313 f"etag must be str, ETag or None"
314 )
316 def _generate_content_type_header(
317 self, CONTENT_TYPE: istr = hdrs.CONTENT_TYPE
318 ) -> None:
319 assert self._content_dict is not None
320 assert self._content_type is not None
321 params = "; ".join(f"{k}={v}" for k, v in self._content_dict.items())
322 if params:
323 ctype = self._content_type + "; " + params
324 else:
325 ctype = self._content_type
326 self._headers[CONTENT_TYPE] = ctype
328 async def _do_start_compression(self, coding: ContentCoding) -> None:
329 if coding is ContentCoding.identity:
330 return
331 assert self._payload_writer is not None
332 self._headers[hdrs.CONTENT_ENCODING] = coding.value
333 self._payload_writer.enable_compression(
334 coding.value, self._compression_strategy
335 )
336 # Compressed payload may have different content length,
337 # remove the header
338 self._headers.popall(hdrs.CONTENT_LENGTH, None)
340 async def _start_compression(self, request: "BaseRequest") -> None:
341 if self._compression_force:
342 await self._do_start_compression(self._compression_force)
343 return
344 # Encoding comparisons should be case-insensitive
345 # https://www.rfc-editor.org/rfc/rfc9110#section-8.4.1
346 accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower()
347 for value, coding in CONTENT_CODINGS.items():
348 if value in accept_encoding:
349 await self._do_start_compression(coding)
350 return
352 async def prepare(self, request: "BaseRequest") -> AbstractStreamWriter | None:
353 if self._eof_sent:
354 return None
355 if self._payload_writer is not None:
356 return self._payload_writer
357 self._must_be_empty_body = must_be_empty_body(request.method, self.status)
358 return await self._start(request)
360 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
361 self._req = request
362 writer = self._payload_writer = request._payload_writer
364 await self._prepare_headers()
365 await request._prepare_hook(self)
366 await self._write_headers()
368 return writer
370 async def _prepare_headers(self) -> None:
371 request = self._req
372 assert request is not None
373 writer = self._payload_writer
374 assert writer is not None
375 keep_alive = self._keep_alive
376 if keep_alive is None:
377 keep_alive = request.keep_alive
378 self._keep_alive = keep_alive
380 version = request.version
382 headers = self._headers
383 if self._cookies:
384 populate_with_cookies(headers, self._cookies)
386 if self._compression:
387 await self._start_compression(request)
389 if self._chunked:
390 if version != HttpVersion11:
391 raise RuntimeError(
392 "Using chunked encoding is forbidden "
393 f"for HTTP/{request.version.major}.{request.version.minor}"
394 )
395 if not self._must_be_empty_body:
396 writer.enable_chunking()
397 headers[hdrs.TRANSFER_ENCODING] = "chunked"
398 elif self._length_check: # Disabled for WebSockets
399 writer.length = self.content_length
400 if writer.length is None:
401 if version >= HttpVersion11:
402 if not self._must_be_empty_body:
403 writer.enable_chunking()
404 headers[hdrs.TRANSFER_ENCODING] = "chunked"
405 elif not self._must_be_empty_body:
406 keep_alive = False
408 # HTTP 1.1: https://tools.ietf.org/html/rfc7230#section-3.3.2
409 # HTTP 1.0: https://tools.ietf.org/html/rfc1945#section-10.4
410 if self._must_be_empty_body:
411 if hdrs.CONTENT_LENGTH in headers and should_remove_content_length(
412 request.method, self.status
413 ):
414 del headers[hdrs.CONTENT_LENGTH]
415 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-10
416 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-13
417 if hdrs.TRANSFER_ENCODING in headers:
418 del headers[hdrs.TRANSFER_ENCODING]
419 elif (writer.length if self._length_check else self.content_length) != 0:
420 # https://www.rfc-editor.org/rfc/rfc9110#section-8.3-5
421 headers.setdefault(hdrs.CONTENT_TYPE, "application/octet-stream")
422 headers.setdefault(hdrs.DATE, rfc822_formatted_time())
423 headers.setdefault(hdrs.SERVER, SERVER_SOFTWARE)
425 # connection header
426 if hdrs.CONNECTION not in headers:
427 if keep_alive:
428 if version == HttpVersion10:
429 headers[hdrs.CONNECTION] = "keep-alive"
430 elif version == HttpVersion11:
431 headers[hdrs.CONNECTION] = "close"
433 async def _write_headers(self) -> None:
434 request = self._req
435 assert request is not None
436 writer = self._payload_writer
437 assert writer is not None
438 # status line
439 version = request.version
440 status_line = f"HTTP/{version[0]}.{version[1]} {self._status} {self._reason}"
441 await writer.write_headers(status_line, self._headers)
443 # Send headers immediately if not opted into buffering
444 if self._send_headers_immediately:
445 writer.send_headers()
447 async def write(
448 self, data: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
449 ) -> None:
450 assert isinstance(
451 data, (bytes, bytearray, memoryview)
452 ), "data argument must be byte-ish (%r)" % type(data)
454 if self._eof_sent:
455 raise RuntimeError("Cannot call write() after write_eof()")
456 if self._payload_writer is None:
457 raise RuntimeError("Cannot call write() before prepare()")
459 await self._payload_writer.write(data)
461 async def drain(self) -> None:
462 assert not self._eof_sent, "EOF has already been sent"
463 assert self._payload_writer is not None, "Response has not been started"
464 warnings.warn(
465 "drain method is deprecated, use await resp.write()",
466 DeprecationWarning,
467 stacklevel=2,
468 )
469 await self._payload_writer.drain()
471 async def write_eof(self, data: bytes = b"") -> None:
472 assert isinstance(
473 data, (bytes, bytearray, memoryview)
474 ), "data argument must be byte-ish (%r)" % type(data)
476 if self._eof_sent:
477 return
479 assert self._payload_writer is not None, "Response has not been started"
481 await self._payload_writer.write_eof(data)
482 self._eof_sent = True
483 self._req = None
484 self._body_length = self._payload_writer.output_size
485 self._payload_writer = None
487 def __repr__(self) -> str:
488 if self._eof_sent:
489 info = "eof"
490 elif self.prepared:
491 assert self._req is not None
492 info = f"{self._req.method} {self._req.path} "
493 else:
494 info = "not prepared"
495 return f"<{self.__class__.__name__} {self.reason} {info}>"
497 @overload # type: ignore[override]
498 def __getitem__(self, key: ResponseKey[_T]) -> _T: ...
500 @overload
501 def __getitem__(self, key: str) -> Any: ...
503 def __getitem__(self, key: str | ResponseKey[_T]) -> Any:
504 return self._state[key]
506 @overload # type: ignore[override]
507 def __setitem__(self, key: ResponseKey[_T], value: _T) -> None: ...
509 @overload
510 def __setitem__(self, key: str, value: Any) -> None: ...
512 def __setitem__(self, key: str | ResponseKey[_T], value: Any) -> None:
513 self._state[key] = value
515 def __delitem__(self, key: str | ResponseKey[_T]) -> None:
516 del self._state[key]
518 def __len__(self) -> int:
519 return len(self._state)
521 def __iter__(self) -> Iterator[str | ResponseKey[Any]]:
522 return iter(self._state)
524 def __hash__(self) -> int:
525 return hash(id(self))
527 def __eq__(self, other: object) -> bool:
528 return self is other
530 def __bool__(self) -> bool:
531 return True
534class Response(StreamResponse):
536 _compressed_body: bytes | None = None
537 _send_headers_immediately = False
539 def __init__(
540 self,
541 *,
542 body: Any = None,
543 status: int = 200,
544 reason: str | None = None,
545 text: str | None = None,
546 headers: LooseHeaders | None = None,
547 content_type: str | None = None,
548 charset: str | None = None,
549 zlib_executor_size: int = MAX_SYNC_CHUNK_SIZE,
550 zlib_executor: Executor | None = None,
551 ) -> None:
552 if body is not None and text is not None:
553 raise ValueError("body and text are not allowed together")
555 if headers is None:
556 real_headers: CIMultiDict[str] = CIMultiDict()
557 else:
558 real_headers = CIMultiDict(headers)
560 if content_type is not None and "charset" in content_type:
561 raise ValueError("charset must not be in content_type argument")
563 if text is not None:
564 if hdrs.CONTENT_TYPE in real_headers:
565 if content_type or charset:
566 raise ValueError(
567 "passing both Content-Type header and "
568 "content_type or charset params "
569 "is forbidden"
570 )
571 else:
572 # fast path for filling headers
573 if not isinstance(text, str):
574 raise TypeError("text argument must be str (%r)" % type(text))
575 if content_type is None:
576 content_type = "text/plain"
577 if charset is None:
578 charset = "utf-8"
579 real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset
580 body = text.encode(charset)
581 text = None
582 elif hdrs.CONTENT_TYPE in real_headers:
583 if content_type is not None or charset is not None:
584 raise ValueError(
585 "passing both Content-Type header and "
586 "content_type or charset params "
587 "is forbidden"
588 )
589 elif content_type is not None:
590 if charset is not None:
591 content_type += "; charset=" + charset
592 real_headers[hdrs.CONTENT_TYPE] = content_type
594 super().__init__(status=status, reason=reason, _real_headers=real_headers)
596 if text is not None:
597 self.text = text
598 else:
599 self.body = body
601 self._zlib_executor_size = zlib_executor_size
602 self._zlib_executor = zlib_executor
604 @property
605 def body(self) -> bytes | bytearray | Payload | None:
606 return self._body
608 @body.setter
609 def body(self, body: Any) -> None:
610 if body is None:
611 self._body = None
612 elif isinstance(body, (bytes, bytearray)):
613 self._body = body
614 else:
615 try:
616 self._body = body = payload.PAYLOAD_REGISTRY.get(body)
617 except payload.LookupError:
618 raise ValueError("Unsupported body type %r" % type(body))
620 headers = self._headers
622 # set content-type
623 if hdrs.CONTENT_TYPE not in headers:
624 headers[hdrs.CONTENT_TYPE] = body.content_type
626 # copy payload headers
627 if body.headers:
628 for key, value in body.headers.items():
629 if key not in headers:
630 headers[key] = value
632 self._compressed_body = None
634 @property
635 def text(self) -> str | None:
636 if self._body is None:
637 return None
638 # Note: When _body is a Payload (e.g. FilePayload), this may do blocking I/O
639 # This is generally safe as most common payloads (BytesPayload, StringPayload)
640 # don't do blocking I/O, but be careful with file-based payloads
641 return self._body.decode(self.charset or "utf-8")
643 @text.setter
644 def text(self, text: str) -> None:
645 assert isinstance(text, str), "text argument must be str (%r)" % type(text)
647 if self.content_type == "application/octet-stream":
648 self.content_type = "text/plain"
649 if self.charset is None:
650 self.charset = "utf-8"
652 self._body = text.encode(self.charset)
653 self._compressed_body = None
655 @property
656 def content_length(self) -> int | None:
657 if self._chunked:
658 return None
660 if hdrs.CONTENT_LENGTH in self._headers:
661 return int(self._headers[hdrs.CONTENT_LENGTH])
663 if self._compressed_body is not None:
664 # Return length of the compressed body
665 return len(self._compressed_body)
666 elif isinstance(self._body, Payload):
667 # A payload without content length, or a compressed payload
668 return None
669 elif self._body is not None:
670 return len(self._body)
671 else:
672 return 0
674 @content_length.setter
675 def content_length(self, value: int | None) -> None:
676 raise RuntimeError("Content length is set automatically")
678 async def write_eof(self, data: bytes = b"") -> None:
679 if self._eof_sent:
680 return
681 if self._compressed_body is None:
682 body = self._body
683 else:
684 body = self._compressed_body
685 assert not data, f"data arg is not supported, got {data!r}"
686 assert self._req is not None
687 assert self._payload_writer is not None
688 if body is None or self._must_be_empty_body:
689 await super().write_eof()
690 elif isinstance(self._body, Payload):
691 await self._body.write(self._payload_writer)
692 await self._body.close()
693 await super().write_eof()
694 else:
695 await super().write_eof(cast(bytes, body))
697 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
698 if hdrs.CONTENT_LENGTH in self._headers:
699 if should_remove_content_length(request.method, self.status):
700 del self._headers[hdrs.CONTENT_LENGTH]
701 elif not self._chunked:
702 if isinstance(self._body, Payload):
703 if (size := self._body.size) is not None:
704 self._headers[hdrs.CONTENT_LENGTH] = str(size)
705 else:
706 body_len = len(self._body) if self._body else "0"
707 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-7
708 if body_len != "0" or (
709 self.status != 304 and request.method not in hdrs.METH_HEAD_ALL
710 ):
711 self._headers[hdrs.CONTENT_LENGTH] = str(body_len)
713 return await super()._start(request)
715 async def _do_start_compression(self, coding: ContentCoding) -> None:
716 if self._chunked or isinstance(self._body, Payload):
717 return await super()._do_start_compression(coding)
718 if coding is ContentCoding.identity:
719 return
720 # Instead of using _payload_writer.enable_compression,
721 # compress the whole body
722 compressor = ZLibCompressor(
723 encoding=coding.value,
724 max_sync_chunk_size=self._zlib_executor_size,
725 executor=self._zlib_executor,
726 )
727 assert self._body is not None
728 self._compressed_body = (
729 await compressor.compress(self._body) + compressor.flush()
730 )
731 self._headers[hdrs.CONTENT_ENCODING] = coding.value
732 self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body))
735def json_response(
736 data: Any = sentinel,
737 *,
738 text: str | None = None,
739 body: bytes | None = None,
740 status: int = 200,
741 reason: str | None = None,
742 headers: LooseHeaders | None = None,
743 content_type: str = "application/json",
744 dumps: JSONEncoder = json.dumps,
745) -> Response:
746 if data is not sentinel:
747 if text or body:
748 raise ValueError("only one of data, text, or body should be specified")
749 else:
750 text = dumps(data)
751 return Response(
752 text=text,
753 body=body,
754 status=status,
755 reason=reason,
756 headers=headers,
757 content_type=content_type,
758 )
761def json_bytes_response(
762 data: Any = sentinel,
763 *,
764 dumps: JSONBytesEncoder,
765 body: bytes | None = None,
766 status: int = 200,
767 reason: str | None = None,
768 headers: LooseHeaders | None = None,
769 content_type: str = "application/json",
770) -> Response:
771 """Create a JSON response using a bytes-returning encoder.
773 Use this when your JSON encoder (like orjson) returns bytes
774 instead of str, avoiding the encode/decode overhead.
775 """
776 if data is not sentinel:
777 if body is not None:
778 raise ValueError("only one of data or body should be specified")
779 else:
780 body = dumps(data)
781 return Response(
782 body=body,
783 status=status,
784 reason=reason,
785 headers=headers,
786 content_type=content_type,
787 )