Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_response.py: 24%

442 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-09 06:47 +0000

1import asyncio 

2import collections.abc 

3import datetime 

4import enum 

5import json 

6import math 

7import time 

8import warnings 

9from concurrent.futures import Executor 

10from http import HTTPStatus 

11from typing import ( 

12 TYPE_CHECKING, 

13 Any, 

14 Dict, 

15 Iterator, 

16 MutableMapping, 

17 Optional, 

18 Union, 

19 cast, 

20) 

21 

22from multidict import CIMultiDict, istr 

23 

24from . import hdrs, payload 

25from .abc import AbstractStreamWriter 

26from .compression_utils import ZLibCompressor 

27from .helpers import ( 

28 ETAG_ANY, 

29 QUOTED_ETAG_RE, 

30 CookieMixin, 

31 ETag, 

32 HeadersMixin, 

33 must_be_empty_body, 

34 parse_http_date, 

35 populate_with_cookies, 

36 rfc822_formatted_time, 

37 sentinel, 

38 should_remove_content_length, 

39 validate_etag_value, 

40) 

41from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11 

42from .payload import Payload 

43from .typedefs import JSONEncoder, LooseHeaders 

44 

45__all__ = ("ContentCoding", "StreamResponse", "Response", "json_response") 

46 

47 

48if TYPE_CHECKING: 

49 from .web_request import BaseRequest 

50 

51 BaseClass = MutableMapping[str, Any] 

52else: 

53 BaseClass = collections.abc.MutableMapping 

54 

55 

56class ContentCoding(enum.Enum): 

57 # The content codings that we have support for. 

58 # 

59 # Additional registered codings are listed at: 

60 # https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding 

61 deflate = "deflate" 

62 gzip = "gzip" 

63 identity = "identity" 

64 

65 

66############################################################ 

67# HTTP Response classes 

68############################################################ 

69 

70 

71class StreamResponse(BaseClass, HeadersMixin, CookieMixin): 

72 __slots__ = ( 

73 "_length_check", 

74 "_body", 

75 "_keep_alive", 

76 "_chunked", 

77 "_compression", 

78 "_compression_force", 

79 "_req", 

80 "_payload_writer", 

81 "_eof_sent", 

82 "_must_be_empty_body", 

83 "_body_length", 

84 "_state", 

85 "_headers", 

86 "_status", 

87 "_reason", 

88 "_cookies", 

89 "__weakref__", 

90 ) 

91 

92 def __init__( 

93 self, 

94 *, 

95 status: int = 200, 

96 reason: Optional[str] = None, 

97 headers: Optional[LooseHeaders] = None, 

98 ) -> None: 

99 super().__init__() 

100 self._length_check = True 

101 self._body = None 

102 self._keep_alive: Optional[bool] = None 

103 self._chunked = False 

104 self._compression = False 

105 self._compression_force: Optional[ContentCoding] = None 

106 

107 self._req: Optional[BaseRequest] = None 

108 self._payload_writer: Optional[AbstractStreamWriter] = None 

109 self._eof_sent = False 

110 self._must_be_empty_body: Optional[bool] = None 

111 self._body_length = 0 

112 self._state: Dict[str, Any] = {} 

113 

114 if headers is not None: 

115 self._headers: CIMultiDict[str] = CIMultiDict(headers) 

116 else: 

117 self._headers = CIMultiDict() 

118 

119 self.set_status(status, reason) 

120 

121 @property 

122 def prepared(self) -> bool: 

123 return self._payload_writer is not None 

124 

125 @property 

126 def task(self) -> "Optional[asyncio.Task[None]]": 

127 if self._req: 

128 return self._req.task 

129 else: 

130 return None 

131 

132 @property 

133 def status(self) -> int: 

134 return self._status 

135 

136 @property 

137 def chunked(self) -> bool: 

138 return self._chunked 

139 

140 @property 

141 def compression(self) -> bool: 

142 return self._compression 

143 

144 @property 

145 def reason(self) -> str: 

146 return self._reason 

147 

148 def set_status( 

149 self, 

150 status: int, 

151 reason: Optional[str] = None, 

152 ) -> None: 

153 assert not self.prepared, ( 

154 "Cannot change the response status code after " "the headers have been sent" 

155 ) 

156 self._status = int(status) 

157 if reason is None: 

158 try: 

159 reason = HTTPStatus(self._status).phrase 

160 except ValueError: 

161 reason = "" 

162 self._reason = reason 

163 

164 @property 

165 def keep_alive(self) -> Optional[bool]: 

166 return self._keep_alive 

167 

168 def force_close(self) -> None: 

169 self._keep_alive = False 

170 

171 @property 

172 def body_length(self) -> int: 

173 return self._body_length 

174 

175 def enable_chunked_encoding(self) -> None: 

176 """Enables automatic chunked transfer encoding.""" 

177 self._chunked = True 

178 

179 if hdrs.CONTENT_LENGTH in self._headers: 

180 raise RuntimeError( 

181 "You can't enable chunked encoding when " "a content length is set" 

182 ) 

183 

184 def enable_compression(self, force: Optional[ContentCoding] = None) -> None: 

185 """Enables response compression encoding.""" 

186 self._compression = True 

187 self._compression_force = force 

188 

189 @property 

190 def headers(self) -> "CIMultiDict[str]": 

191 return self._headers 

192 

193 @property 

194 def content_length(self) -> Optional[int]: 

195 # Just a placeholder for adding setter 

196 return super().content_length 

197 

198 @content_length.setter 

199 def content_length(self, value: Optional[int]) -> None: 

200 if value is not None: 

201 value = int(value) 

202 if self._chunked: 

203 raise RuntimeError( 

204 "You can't set content length when " "chunked encoding is enable" 

205 ) 

206 self._headers[hdrs.CONTENT_LENGTH] = str(value) 

207 else: 

208 self._headers.pop(hdrs.CONTENT_LENGTH, None) 

209 

210 @property 

211 def content_type(self) -> str: 

212 # Just a placeholder for adding setter 

213 return super().content_type 

214 

215 @content_type.setter 

216 def content_type(self, value: str) -> None: 

217 self.content_type # read header values if needed 

218 self._content_type = str(value) 

219 self._generate_content_type_header() 

220 

221 @property 

222 def charset(self) -> Optional[str]: 

223 # Just a placeholder for adding setter 

224 return super().charset 

225 

226 @charset.setter 

227 def charset(self, value: Optional[str]) -> None: 

228 ctype = self.content_type # read header values if needed 

229 if ctype == "application/octet-stream": 

230 raise RuntimeError( 

231 "Setting charset for application/octet-stream " 

232 "doesn't make sense, setup content_type first" 

233 ) 

234 assert self._content_dict is not None 

235 if value is None: 

236 self._content_dict.pop("charset", None) 

237 else: 

238 self._content_dict["charset"] = str(value).lower() 

239 self._generate_content_type_header() 

240 

241 @property 

242 def last_modified(self) -> Optional[datetime.datetime]: 

243 """The value of Last-Modified HTTP header, or None. 

244 

245 This header is represented as a `datetime` object. 

246 """ 

247 return parse_http_date(self._headers.get(hdrs.LAST_MODIFIED)) 

248 

249 @last_modified.setter 

250 def last_modified( 

251 self, value: Optional[Union[int, float, datetime.datetime, str]] 

252 ) -> None: 

253 if value is None: 

254 self._headers.pop(hdrs.LAST_MODIFIED, None) 

255 elif isinstance(value, (int, float)): 

256 self._headers[hdrs.LAST_MODIFIED] = time.strftime( 

257 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(math.ceil(value)) 

258 ) 

259 elif isinstance(value, datetime.datetime): 

260 self._headers[hdrs.LAST_MODIFIED] = time.strftime( 

261 "%a, %d %b %Y %H:%M:%S GMT", value.utctimetuple() 

262 ) 

263 elif isinstance(value, str): 

264 self._headers[hdrs.LAST_MODIFIED] = value 

265 

266 @property 

267 def etag(self) -> Optional[ETag]: 

268 quoted_value = self._headers.get(hdrs.ETAG) 

269 if not quoted_value: 

270 return None 

271 elif quoted_value == ETAG_ANY: 

272 return ETag(value=ETAG_ANY) 

273 match = QUOTED_ETAG_RE.fullmatch(quoted_value) 

274 if not match: 

275 return None 

276 is_weak, value = match.group(1, 2) 

277 return ETag( 

278 is_weak=bool(is_weak), 

279 value=value, 

280 ) 

281 

282 @etag.setter 

283 def etag(self, value: Optional[Union[ETag, str]]) -> None: 

284 if value is None: 

285 self._headers.pop(hdrs.ETAG, None) 

286 elif (isinstance(value, str) and value == ETAG_ANY) or ( 

287 isinstance(value, ETag) and value.value == ETAG_ANY 

288 ): 

289 self._headers[hdrs.ETAG] = ETAG_ANY 

290 elif isinstance(value, str): 

291 validate_etag_value(value) 

292 self._headers[hdrs.ETAG] = f'"{value}"' 

293 elif isinstance(value, ETag) and isinstance(value.value, str): # type: ignore[redundant-expr] 

294 validate_etag_value(value.value) 

295 hdr_value = f'W/"{value.value}"' if value.is_weak else f'"{value.value}"' 

296 self._headers[hdrs.ETAG] = hdr_value 

297 else: 

298 raise ValueError( 

299 f"Unsupported etag type: {type(value)}. " 

300 f"etag must be str, ETag or None" 

301 ) 

302 

303 def _generate_content_type_header( 

304 self, CONTENT_TYPE: istr = hdrs.CONTENT_TYPE 

305 ) -> None: 

306 assert self._content_dict is not None 

307 assert self._content_type is not None 

308 params = "; ".join(f"{k}={v}" for k, v in self._content_dict.items()) 

309 if params: 

310 ctype = self._content_type + "; " + params 

311 else: 

312 ctype = self._content_type 

313 self._headers[CONTENT_TYPE] = ctype 

314 

315 async def _do_start_compression(self, coding: ContentCoding) -> None: 

316 if coding != ContentCoding.identity: 

317 assert self._payload_writer is not None 

318 self._headers[hdrs.CONTENT_ENCODING] = coding.value 

319 self._payload_writer.enable_compression(coding.value) 

320 # Compressed payload may have different content length, 

321 # remove the header 

322 self._headers.popall(hdrs.CONTENT_LENGTH, None) 

323 

324 async def _start_compression(self, request: "BaseRequest") -> None: 

325 if self._compression_force: 

326 await self._do_start_compression(self._compression_force) 

327 else: 

328 # Encoding comparisons should be case-insensitive 

329 # https://www.rfc-editor.org/rfc/rfc9110#section-8.4.1 

330 accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower() 

331 for coding in ContentCoding: 

332 if coding.value in accept_encoding: 

333 await self._do_start_compression(coding) 

334 return 

335 

336 async def prepare(self, request: "BaseRequest") -> Optional[AbstractStreamWriter]: 

337 if self._eof_sent: 

338 return None 

339 if self._payload_writer is not None: 

340 return self._payload_writer 

341 self._must_be_empty_body = must_be_empty_body(request.method, self.status) 

342 return await self._start(request) 

343 

344 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter: 

345 self._req = request 

346 writer = self._payload_writer = request._payload_writer 

347 

348 await self._prepare_headers() 

349 await request._prepare_hook(self) 

350 await self._write_headers() 

351 

352 return writer 

353 

354 async def _prepare_headers(self) -> None: 

355 request = self._req 

356 assert request is not None 

357 writer = self._payload_writer 

358 assert writer is not None 

359 keep_alive = self._keep_alive 

360 if keep_alive is None: 

361 keep_alive = request.keep_alive 

362 self._keep_alive = keep_alive 

363 

364 version = request.version 

365 

366 headers = self._headers 

367 populate_with_cookies(headers, self.cookies) 

368 

369 if self._compression: 

370 await self._start_compression(request) 

371 

372 if self._chunked: 

373 if version != HttpVersion11: 

374 raise RuntimeError( 

375 "Using chunked encoding is forbidden " 

376 "for HTTP/{0.major}.{0.minor}".format(request.version) 

377 ) 

378 if not self._must_be_empty_body: 

379 writer.enable_chunking() 

380 headers[hdrs.TRANSFER_ENCODING] = "chunked" 

381 if hdrs.CONTENT_LENGTH in headers: 

382 del headers[hdrs.CONTENT_LENGTH] 

383 elif self._length_check: 

384 writer.length = self.content_length 

385 if writer.length is None: 

386 if version >= HttpVersion11: 

387 if not self._must_be_empty_body: 

388 writer.enable_chunking() 

389 headers[hdrs.TRANSFER_ENCODING] = "chunked" 

390 elif not self._must_be_empty_body: 

391 keep_alive = False 

392 

393 # HTTP 1.1: https://tools.ietf.org/html/rfc7230#section-3.3.2 

394 # HTTP 1.0: https://tools.ietf.org/html/rfc1945#section-10.4 

395 if self._must_be_empty_body: 

396 if hdrs.CONTENT_LENGTH in headers and should_remove_content_length( 

397 request.method, self.status 

398 ): 

399 del headers[hdrs.CONTENT_LENGTH] 

400 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-10 

401 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-13 

402 if hdrs.TRANSFER_ENCODING in headers: 

403 del headers[hdrs.TRANSFER_ENCODING] 

404 else: 

405 headers.setdefault(hdrs.CONTENT_TYPE, "application/octet-stream") 

406 headers.setdefault(hdrs.DATE, rfc822_formatted_time()) 

407 headers.setdefault(hdrs.SERVER, SERVER_SOFTWARE) 

408 

409 # connection header 

410 if hdrs.CONNECTION not in headers: 

411 if keep_alive: 

412 if version == HttpVersion10: 

413 headers[hdrs.CONNECTION] = "keep-alive" 

414 else: 

415 if version == HttpVersion11: 

416 headers[hdrs.CONNECTION] = "close" 

417 

418 async def _write_headers(self) -> None: 

419 request = self._req 

420 assert request is not None 

421 writer = self._payload_writer 

422 assert writer is not None 

423 # status line 

424 version = request.version 

425 status_line = "HTTP/{}.{} {} {}".format( 

426 version[0], version[1], self._status, self._reason 

427 ) 

428 await writer.write_headers(status_line, self._headers) 

429 

430 async def write(self, data: bytes) -> None: 

431 assert isinstance( 

432 data, (bytes, bytearray, memoryview) 

433 ), "data argument must be byte-ish (%r)" % type(data) 

434 

435 if self._eof_sent: 

436 raise RuntimeError("Cannot call write() after write_eof()") 

437 if self._payload_writer is None: 

438 raise RuntimeError("Cannot call write() before prepare()") 

439 

440 await self._payload_writer.write(data) 

441 

442 async def drain(self) -> None: 

443 assert not self._eof_sent, "EOF has already been sent" 

444 assert self._payload_writer is not None, "Response has not been started" 

445 warnings.warn( 

446 "drain method is deprecated, use await resp.write()", 

447 DeprecationWarning, 

448 stacklevel=2, 

449 ) 

450 await self._payload_writer.drain() 

451 

452 async def write_eof(self, data: bytes = b"") -> None: 

453 assert isinstance( 

454 data, (bytes, bytearray, memoryview) 

455 ), "data argument must be byte-ish (%r)" % type(data) 

456 

457 if self._eof_sent: 

458 return 

459 

460 assert self._payload_writer is not None, "Response has not been started" 

461 

462 await self._payload_writer.write_eof(data) 

463 self._eof_sent = True 

464 self._req = None 

465 self._body_length = self._payload_writer.output_size 

466 self._payload_writer = None 

467 

468 def __repr__(self) -> str: 

469 if self._eof_sent: 

470 info = "eof" 

471 elif self.prepared: 

472 assert self._req is not None 

473 info = f"{self._req.method} {self._req.path} " 

474 else: 

475 info = "not prepared" 

476 return f"<{self.__class__.__name__} {self.reason} {info}>" 

477 

478 def __getitem__(self, key: str) -> Any: 

479 return self._state[key] 

480 

481 def __setitem__(self, key: str, value: Any) -> None: 

482 self._state[key] = value 

483 

484 def __delitem__(self, key: str) -> None: 

485 del self._state[key] 

486 

487 def __len__(self) -> int: 

488 return len(self._state) 

489 

490 def __iter__(self) -> Iterator[str]: 

491 return iter(self._state) 

492 

493 def __hash__(self) -> int: 

494 return hash(id(self)) 

495 

496 def __eq__(self, other: object) -> bool: 

497 return self is other 

498 

499 

500class Response(StreamResponse): 

501 __slots__ = ( 

502 "_body_payload", 

503 "_compressed_body", 

504 "_zlib_executor_size", 

505 "_zlib_executor", 

506 ) 

507 

508 def __init__( 

509 self, 

510 *, 

511 body: Any = None, 

512 status: int = 200, 

513 reason: Optional[str] = None, 

514 text: Optional[str] = None, 

515 headers: Optional[LooseHeaders] = None, 

516 content_type: Optional[str] = None, 

517 charset: Optional[str] = None, 

518 zlib_executor_size: Optional[int] = None, 

519 zlib_executor: Optional[Executor] = None, 

520 ) -> None: 

521 if body is not None and text is not None: 

522 raise ValueError("body and text are not allowed together") 

523 

524 if headers is None: 

525 real_headers: CIMultiDict[str] = CIMultiDict() 

526 elif not isinstance(headers, CIMultiDict): 

527 real_headers = CIMultiDict(headers) 

528 else: 

529 real_headers = headers # = cast('CIMultiDict[str]', headers) 

530 

531 if content_type is not None and "charset" in content_type: 

532 raise ValueError("charset must not be in content_type " "argument") 

533 

534 if text is not None: 

535 if hdrs.CONTENT_TYPE in real_headers: 

536 if content_type or charset: 

537 raise ValueError( 

538 "passing both Content-Type header and " 

539 "content_type or charset params " 

540 "is forbidden" 

541 ) 

542 else: 

543 # fast path for filling headers 

544 if not isinstance(text, str): 

545 raise TypeError("text argument must be str (%r)" % type(text)) 

546 if content_type is None: 

547 content_type = "text/plain" 

548 if charset is None: 

549 charset = "utf-8" 

550 real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset 

551 body = text.encode(charset) 

552 text = None 

553 else: 

554 if hdrs.CONTENT_TYPE in real_headers: 

555 if content_type is not None or charset is not None: 

556 raise ValueError( 

557 "passing both Content-Type header and " 

558 "content_type or charset params " 

559 "is forbidden" 

560 ) 

561 else: 

562 if content_type is not None: 

563 if charset is not None: 

564 content_type += "; charset=" + charset 

565 real_headers[hdrs.CONTENT_TYPE] = content_type 

566 

567 super().__init__(status=status, reason=reason, headers=real_headers) 

568 

569 if text is not None: 

570 self.text = text 

571 else: 

572 self.body = body 

573 

574 self._compressed_body: Optional[bytes] = None 

575 self._zlib_executor_size = zlib_executor_size 

576 self._zlib_executor = zlib_executor 

577 

578 @property 

579 def body(self) -> Optional[Union[bytes, Payload]]: 

580 return self._body 

581 

582 @body.setter 

583 def body(self, body: bytes) -> None: 

584 if body is None: 

585 self._body: Optional[bytes] = None 

586 self._body_payload: bool = False 

587 elif isinstance(body, (bytes, bytearray)): 

588 self._body = body 

589 self._body_payload = False 

590 else: 

591 try: 

592 self._body = body = payload.PAYLOAD_REGISTRY.get(body) 

593 except payload.LookupError: 

594 raise ValueError("Unsupported body type %r" % type(body)) 

595 

596 self._body_payload = True 

597 

598 headers = self._headers 

599 

600 # set content-type 

601 if hdrs.CONTENT_TYPE not in headers: 

602 headers[hdrs.CONTENT_TYPE] = body.content_type 

603 

604 # copy payload headers 

605 if body.headers: 

606 for key, value in body.headers.items(): 

607 if key not in headers: 

608 headers[key] = value 

609 

610 self._compressed_body = None 

611 

612 @property 

613 def text(self) -> Optional[str]: 

614 if self._body is None: 

615 return None 

616 return self._body.decode(self.charset or "utf-8") 

617 

618 @text.setter 

619 def text(self, text: str) -> None: 

620 assert isinstance(text, str), "text argument must be str (%r)" % type(text) 

621 

622 if self.content_type == "application/octet-stream": 

623 self.content_type = "text/plain" 

624 if self.charset is None: 

625 self.charset = "utf-8" 

626 

627 self._body = text.encode(self.charset) 

628 self._body_payload = False 

629 self._compressed_body = None 

630 

631 @property 

632 def content_length(self) -> Optional[int]: 

633 if self._chunked: 

634 return None 

635 

636 if hdrs.CONTENT_LENGTH in self._headers: 

637 return super().content_length 

638 

639 if self._compressed_body is not None: 

640 # Return length of the compressed body 

641 return len(self._compressed_body) 

642 elif self._body_payload: 

643 # A payload without content length, or a compressed payload 

644 return None 

645 elif self._body is not None: 

646 return len(self._body) 

647 else: 

648 return 0 

649 

650 @content_length.setter 

651 def content_length(self, value: Optional[int]) -> None: 

652 raise RuntimeError("Content length is set automatically") 

653 

654 async def write_eof(self, data: bytes = b"") -> None: 

655 if self._eof_sent: 

656 return 

657 if self._compressed_body is None: 

658 body: Optional[Union[bytes, Payload]] = self._body 

659 else: 

660 body = self._compressed_body 

661 assert not data, f"data arg is not supported, got {data!r}" 

662 assert self._req is not None 

663 assert self._payload_writer is not None 

664 if body is not None: 

665 if self._must_be_empty_body: 

666 await super().write_eof() 

667 elif self._body_payload: 

668 payload = cast(Payload, body) 

669 await payload.write(self._payload_writer) 

670 await super().write_eof() 

671 else: 

672 await super().write_eof(cast(bytes, body)) 

673 else: 

674 await super().write_eof() 

675 

676 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter: 

677 if should_remove_content_length(request.method, self.status): 

678 if hdrs.CONTENT_LENGTH in self._headers: 

679 del self._headers[hdrs.CONTENT_LENGTH] 

680 elif not self._chunked and hdrs.CONTENT_LENGTH not in self._headers: 

681 if self._body_payload: 

682 size = cast(Payload, self._body).size 

683 if size is not None: 

684 self._headers[hdrs.CONTENT_LENGTH] = str(size) 

685 else: 

686 body_len = len(self._body) if self._body else "0" 

687 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-7 

688 if body_len != "0" or ( 

689 self.status != 304 and request.method.upper() != hdrs.METH_HEAD 

690 ): 

691 self._headers[hdrs.CONTENT_LENGTH] = str(body_len) 

692 

693 return await super()._start(request) 

694 

695 async def _do_start_compression(self, coding: ContentCoding) -> None: 

696 if self._body_payload or self._chunked: 

697 return await super()._do_start_compression(coding) 

698 

699 if coding != ContentCoding.identity: 

700 # Instead of using _payload_writer.enable_compression, 

701 # compress the whole body 

702 compressor = ZLibCompressor( 

703 encoding=str(coding.value), 

704 max_sync_chunk_size=self._zlib_executor_size, 

705 executor=self._zlib_executor, 

706 ) 

707 assert self._body is not None 

708 if self._zlib_executor_size is None and len(self._body) > 1024 * 1024: 

709 warnings.warn( 

710 "Synchronous compression of large response bodies " 

711 f"({len(self._body)} bytes) might block the async event loop. " 

712 "Consider providing a custom value to zlib_executor_size/" 

713 "zlib_executor response properties or disabling compression on it." 

714 ) 

715 self._compressed_body = ( 

716 await compressor.compress(self._body) + compressor.flush() 

717 ) 

718 assert self._compressed_body is not None 

719 

720 self._headers[hdrs.CONTENT_ENCODING] = coding.value 

721 self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body)) 

722 

723 

724def json_response( 

725 data: Any = sentinel, 

726 *, 

727 text: Optional[str] = None, 

728 body: Optional[bytes] = None, 

729 status: int = 200, 

730 reason: Optional[str] = None, 

731 headers: Optional[LooseHeaders] = None, 

732 content_type: str = "application/json", 

733 dumps: JSONEncoder = json.dumps, 

734) -> Response: 

735 if data is not sentinel: 

736 if text or body: 

737 raise ValueError("only one of data, text, or body should be specified") 

738 else: 

739 text = dumps(data) 

740 return Response( 

741 text=text, 

742 body=body, 

743 status=status, 

744 reason=reason, 

745 headers=headers, 

746 content_type=content_type, 

747 )