Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_response.py: 24%

442 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-26 06:16 +0000

1import asyncio 

2import collections.abc 

3import datetime 

4import enum 

5import json 

6import math 

7import time 

8import warnings 

9from concurrent.futures import Executor 

10from http import HTTPStatus 

11from typing import ( 

12 TYPE_CHECKING, 

13 Any, 

14 Dict, 

15 Iterator, 

16 MutableMapping, 

17 Optional, 

18 Union, 

19 cast, 

20) 

21 

22from multidict import CIMultiDict, istr 

23 

24from . import hdrs, payload 

25from .abc import AbstractStreamWriter 

26from .compression_utils import ZLibCompressor 

27from .helpers import ( 

28 ETAG_ANY, 

29 QUOTED_ETAG_RE, 

30 CookieMixin, 

31 ETag, 

32 HeadersMixin, 

33 must_be_empty_body, 

34 parse_http_date, 

35 populate_with_cookies, 

36 rfc822_formatted_time, 

37 sentinel, 

38 should_remove_content_length, 

39 validate_etag_value, 

40) 

41from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11 

42from .payload import Payload 

43from .typedefs import JSONEncoder, LooseHeaders 

44 

45__all__ = ("ContentCoding", "StreamResponse", "Response", "json_response") 

46 

47 

48if TYPE_CHECKING: 

49 from .web_request import BaseRequest 

50 

51 BaseClass = MutableMapping[str, Any] 

52else: 

53 BaseClass = collections.abc.MutableMapping 

54 

55 

56class ContentCoding(enum.Enum): 

57 # The content codings that we have support for. 

58 # 

59 # Additional registered codings are listed at: 

60 # https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding 

61 deflate = "deflate" 

62 gzip = "gzip" 

63 identity = "identity" 

64 

65 

66############################################################ 

67# HTTP Response classes 

68############################################################ 

69 

70 

71class StreamResponse(BaseClass, HeadersMixin, CookieMixin): 

72 __slots__ = ( 

73 "_length_check", 

74 "_body", 

75 "_keep_alive", 

76 "_chunked", 

77 "_compression", 

78 "_compression_force", 

79 "_req", 

80 "_payload_writer", 

81 "_eof_sent", 

82 "_must_be_empty_body", 

83 "_body_length", 

84 "_state", 

85 "_headers", 

86 "_status", 

87 "_reason", 

88 "_cookies", 

89 "__weakref__", 

90 ) 

91 

92 def __init__( 

93 self, 

94 *, 

95 status: int = 200, 

96 reason: Optional[str] = None, 

97 headers: Optional[LooseHeaders] = None, 

98 ) -> None: 

99 super().__init__() 

100 self._length_check = True 

101 self._body = None 

102 self._keep_alive: Optional[bool] = None 

103 self._chunked = False 

104 self._compression = False 

105 self._compression_force: Optional[ContentCoding] = None 

106 

107 self._req: Optional[BaseRequest] = None 

108 self._payload_writer: Optional[AbstractStreamWriter] = None 

109 self._eof_sent = False 

110 self._must_be_empty_body: Optional[bool] = None 

111 self._body_length = 0 

112 self._state: Dict[str, Any] = {} 

113 

114 if headers is not None: 

115 self._headers: CIMultiDict[str] = CIMultiDict(headers) 

116 else: 

117 self._headers = CIMultiDict() 

118 

119 self.set_status(status, reason) 

120 

121 @property 

122 def prepared(self) -> bool: 

123 return self._payload_writer is not None 

124 

125 @property 

126 def task(self) -> "Optional[asyncio.Task[None]]": 

127 if self._req: 

128 return self._req.task 

129 else: 

130 return None 

131 

132 @property 

133 def status(self) -> int: 

134 return self._status 

135 

136 @property 

137 def chunked(self) -> bool: 

138 return self._chunked 

139 

140 @property 

141 def compression(self) -> bool: 

142 return self._compression 

143 

144 @property 

145 def reason(self) -> str: 

146 return self._reason 

147 

148 def set_status( 

149 self, 

150 status: int, 

151 reason: Optional[str] = None, 

152 ) -> None: 

153 assert not self.prepared, ( 

154 "Cannot change the response status code after " "the headers have been sent" 

155 ) 

156 self._status = int(status) 

157 if reason is None: 

158 try: 

159 reason = HTTPStatus(self._status).phrase 

160 except ValueError: 

161 reason = "" 

162 self._reason = reason 

163 

164 @property 

165 def keep_alive(self) -> Optional[bool]: 

166 return self._keep_alive 

167 

168 def force_close(self) -> None: 

169 self._keep_alive = False 

170 

171 @property 

172 def body_length(self) -> int: 

173 return self._body_length 

174 

175 def enable_chunked_encoding(self) -> None: 

176 """Enables automatic chunked transfer encoding.""" 

177 self._chunked = True 

178 

179 if hdrs.CONTENT_LENGTH in self._headers: 

180 raise RuntimeError( 

181 "You can't enable chunked encoding when " "a content length is set" 

182 ) 

183 

184 def enable_compression(self, force: Optional[ContentCoding] = None) -> None: 

185 """Enables response compression encoding.""" 

186 # Backwards compatibility for when force was a bool <0.17. 

187 self._compression = True 

188 self._compression_force = force 

189 

190 @property 

191 def headers(self) -> "CIMultiDict[str]": 

192 return self._headers 

193 

194 @property 

195 def content_length(self) -> Optional[int]: 

196 # Just a placeholder for adding setter 

197 return super().content_length 

198 

199 @content_length.setter 

200 def content_length(self, value: Optional[int]) -> None: 

201 if value is not None: 

202 value = int(value) 

203 if self._chunked: 

204 raise RuntimeError( 

205 "You can't set content length when " "chunked encoding is enable" 

206 ) 

207 self._headers[hdrs.CONTENT_LENGTH] = str(value) 

208 else: 

209 self._headers.pop(hdrs.CONTENT_LENGTH, None) 

210 

211 @property 

212 def content_type(self) -> str: 

213 # Just a placeholder for adding setter 

214 return super().content_type 

215 

216 @content_type.setter 

217 def content_type(self, value: str) -> None: 

218 self.content_type # read header values if needed 

219 self._content_type = str(value) 

220 self._generate_content_type_header() 

221 

222 @property 

223 def charset(self) -> Optional[str]: 

224 # Just a placeholder for adding setter 

225 return super().charset 

226 

227 @charset.setter 

228 def charset(self, value: Optional[str]) -> None: 

229 ctype = self.content_type # read header values if needed 

230 if ctype == "application/octet-stream": 

231 raise RuntimeError( 

232 "Setting charset for application/octet-stream " 

233 "doesn't make sense, setup content_type first" 

234 ) 

235 assert self._content_dict is not None 

236 if value is None: 

237 self._content_dict.pop("charset", None) 

238 else: 

239 self._content_dict["charset"] = str(value).lower() 

240 self._generate_content_type_header() 

241 

242 @property 

243 def last_modified(self) -> Optional[datetime.datetime]: 

244 """The value of Last-Modified HTTP header, or None. 

245 

246 This header is represented as a `datetime` object. 

247 """ 

248 return parse_http_date(self._headers.get(hdrs.LAST_MODIFIED)) 

249 

250 @last_modified.setter 

251 def last_modified( 

252 self, value: Optional[Union[int, float, datetime.datetime, str]] 

253 ) -> None: 

254 if value is None: 

255 self._headers.pop(hdrs.LAST_MODIFIED, None) 

256 elif isinstance(value, (int, float)): 

257 self._headers[hdrs.LAST_MODIFIED] = time.strftime( 

258 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(math.ceil(value)) 

259 ) 

260 elif isinstance(value, datetime.datetime): 

261 self._headers[hdrs.LAST_MODIFIED] = time.strftime( 

262 "%a, %d %b %Y %H:%M:%S GMT", value.utctimetuple() 

263 ) 

264 elif isinstance(value, str): 

265 self._headers[hdrs.LAST_MODIFIED] = value 

266 

267 @property 

268 def etag(self) -> Optional[ETag]: 

269 quoted_value = self._headers.get(hdrs.ETAG) 

270 if not quoted_value: 

271 return None 

272 elif quoted_value == ETAG_ANY: 

273 return ETag(value=ETAG_ANY) 

274 match = QUOTED_ETAG_RE.fullmatch(quoted_value) 

275 if not match: 

276 return None 

277 is_weak, value = match.group(1, 2) 

278 return ETag( 

279 is_weak=bool(is_weak), 

280 value=value, 

281 ) 

282 

283 @etag.setter 

284 def etag(self, value: Optional[Union[ETag, str]]) -> None: 

285 if value is None: 

286 self._headers.pop(hdrs.ETAG, None) 

287 elif (isinstance(value, str) and value == ETAG_ANY) or ( 

288 isinstance(value, ETag) and value.value == ETAG_ANY 

289 ): 

290 self._headers[hdrs.ETAG] = ETAG_ANY 

291 elif isinstance(value, str): 

292 validate_etag_value(value) 

293 self._headers[hdrs.ETAG] = f'"{value}"' 

294 elif isinstance(value, ETag) and isinstance(value.value, str): # type: ignore[redundant-expr] 

295 validate_etag_value(value.value) 

296 hdr_value = f'W/"{value.value}"' if value.is_weak else f'"{value.value}"' 

297 self._headers[hdrs.ETAG] = hdr_value 

298 else: 

299 raise ValueError( 

300 f"Unsupported etag type: {type(value)}. " 

301 f"etag must be str, ETag or None" 

302 ) 

303 

304 def _generate_content_type_header( 

305 self, CONTENT_TYPE: istr = hdrs.CONTENT_TYPE 

306 ) -> None: 

307 assert self._content_dict is not None 

308 assert self._content_type is not None 

309 params = "; ".join(f"{k}={v}" for k, v in self._content_dict.items()) 

310 if params: 

311 ctype = self._content_type + "; " + params 

312 else: 

313 ctype = self._content_type 

314 self._headers[CONTENT_TYPE] = ctype 

315 

316 async def _do_start_compression(self, coding: ContentCoding) -> None: 

317 if coding != ContentCoding.identity: 

318 assert self._payload_writer is not None 

319 self._headers[hdrs.CONTENT_ENCODING] = coding.value 

320 self._payload_writer.enable_compression(coding.value) 

321 # Compressed payload may have different content length, 

322 # remove the header 

323 self._headers.popall(hdrs.CONTENT_LENGTH, None) 

324 

325 async def _start_compression(self, request: "BaseRequest") -> None: 

326 if self._compression_force: 

327 await self._do_start_compression(self._compression_force) 

328 else: 

329 accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower() 

330 for coding in ContentCoding: 

331 if coding.value in accept_encoding: 

332 await self._do_start_compression(coding) 

333 return 

334 

335 async def prepare(self, request: "BaseRequest") -> Optional[AbstractStreamWriter]: 

336 if self._eof_sent: 

337 return None 

338 if self._payload_writer is not None: 

339 return self._payload_writer 

340 self._must_be_empty_body = must_be_empty_body(request.method, self.status) 

341 return await self._start(request) 

342 

343 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter: 

344 self._req = request 

345 writer = self._payload_writer = request._payload_writer 

346 

347 await self._prepare_headers() 

348 await request._prepare_hook(self) 

349 await self._write_headers() 

350 

351 return writer 

352 

353 async def _prepare_headers(self) -> None: 

354 request = self._req 

355 assert request is not None 

356 writer = self._payload_writer 

357 assert writer is not None 

358 keep_alive = self._keep_alive 

359 if keep_alive is None: 

360 keep_alive = request.keep_alive 

361 self._keep_alive = keep_alive 

362 

363 version = request.version 

364 

365 headers = self._headers 

366 populate_with_cookies(headers, self.cookies) 

367 

368 if self._compression: 

369 await self._start_compression(request) 

370 

371 if self._chunked: 

372 if version != HttpVersion11: 

373 raise RuntimeError( 

374 "Using chunked encoding is forbidden " 

375 "for HTTP/{0.major}.{0.minor}".format(request.version) 

376 ) 

377 if not self._must_be_empty_body: 

378 writer.enable_chunking() 

379 headers[hdrs.TRANSFER_ENCODING] = "chunked" 

380 if hdrs.CONTENT_LENGTH in headers: 

381 del headers[hdrs.CONTENT_LENGTH] 

382 elif self._length_check: 

383 writer.length = self.content_length 

384 if writer.length is None: 

385 if version >= HttpVersion11: 

386 if not self._must_be_empty_body: 

387 writer.enable_chunking() 

388 headers[hdrs.TRANSFER_ENCODING] = "chunked" 

389 elif not self._must_be_empty_body: 

390 keep_alive = False 

391 

392 # HTTP 1.1: https://tools.ietf.org/html/rfc7230#section-3.3.2 

393 # HTTP 1.0: https://tools.ietf.org/html/rfc1945#section-10.4 

394 if self._must_be_empty_body: 

395 if hdrs.CONTENT_LENGTH in headers and should_remove_content_length( 

396 request.method, self.status 

397 ): 

398 del headers[hdrs.CONTENT_LENGTH] 

399 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-10 

400 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-13 

401 if hdrs.TRANSFER_ENCODING in headers: 

402 del headers[hdrs.TRANSFER_ENCODING] 

403 else: 

404 headers.setdefault(hdrs.CONTENT_TYPE, "application/octet-stream") 

405 headers.setdefault(hdrs.DATE, rfc822_formatted_time()) 

406 headers.setdefault(hdrs.SERVER, SERVER_SOFTWARE) 

407 

408 # connection header 

409 if hdrs.CONNECTION not in headers: 

410 if keep_alive: 

411 if version == HttpVersion10: 

412 headers[hdrs.CONNECTION] = "keep-alive" 

413 else: 

414 if version == HttpVersion11: 

415 headers[hdrs.CONNECTION] = "close" 

416 

417 async def _write_headers(self) -> None: 

418 request = self._req 

419 assert request is not None 

420 writer = self._payload_writer 

421 assert writer is not None 

422 # status line 

423 version = request.version 

424 status_line = "HTTP/{}.{} {} {}".format( 

425 version[0], version[1], self._status, self._reason 

426 ) 

427 await writer.write_headers(status_line, self._headers) 

428 

429 async def write(self, data: bytes) -> None: 

430 assert isinstance( 

431 data, (bytes, bytearray, memoryview) 

432 ), "data argument must be byte-ish (%r)" % type(data) 

433 

434 if self._eof_sent: 

435 raise RuntimeError("Cannot call write() after write_eof()") 

436 if self._payload_writer is None: 

437 raise RuntimeError("Cannot call write() before prepare()") 

438 

439 await self._payload_writer.write(data) 

440 

441 async def drain(self) -> None: 

442 assert not self._eof_sent, "EOF has already been sent" 

443 assert self._payload_writer is not None, "Response has not been started" 

444 warnings.warn( 

445 "drain method is deprecated, use await resp.write()", 

446 DeprecationWarning, 

447 stacklevel=2, 

448 ) 

449 await self._payload_writer.drain() 

450 

451 async def write_eof(self, data: bytes = b"") -> None: 

452 assert isinstance( 

453 data, (bytes, bytearray, memoryview) 

454 ), "data argument must be byte-ish (%r)" % type(data) 

455 

456 if self._eof_sent: 

457 return 

458 

459 assert self._payload_writer is not None, "Response has not been started" 

460 

461 await self._payload_writer.write_eof(data) 

462 self._eof_sent = True 

463 self._req = None 

464 self._body_length = self._payload_writer.output_size 

465 self._payload_writer = None 

466 

467 def __repr__(self) -> str: 

468 if self._eof_sent: 

469 info = "eof" 

470 elif self.prepared: 

471 assert self._req is not None 

472 info = f"{self._req.method} {self._req.path} " 

473 else: 

474 info = "not prepared" 

475 return f"<{self.__class__.__name__} {self.reason} {info}>" 

476 

477 def __getitem__(self, key: str) -> Any: 

478 return self._state[key] 

479 

480 def __setitem__(self, key: str, value: Any) -> None: 

481 self._state[key] = value 

482 

483 def __delitem__(self, key: str) -> None: 

484 del self._state[key] 

485 

486 def __len__(self) -> int: 

487 return len(self._state) 

488 

489 def __iter__(self) -> Iterator[str]: 

490 return iter(self._state) 

491 

492 def __hash__(self) -> int: 

493 return hash(id(self)) 

494 

495 def __eq__(self, other: object) -> bool: 

496 return self is other 

497 

498 

499class Response(StreamResponse): 

500 __slots__ = ( 

501 "_body_payload", 

502 "_compressed_body", 

503 "_zlib_executor_size", 

504 "_zlib_executor", 

505 ) 

506 

507 def __init__( 

508 self, 

509 *, 

510 body: Any = None, 

511 status: int = 200, 

512 reason: Optional[str] = None, 

513 text: Optional[str] = None, 

514 headers: Optional[LooseHeaders] = None, 

515 content_type: Optional[str] = None, 

516 charset: Optional[str] = None, 

517 zlib_executor_size: Optional[int] = None, 

518 zlib_executor: Optional[Executor] = None, 

519 ) -> None: 

520 if body is not None and text is not None: 

521 raise ValueError("body and text are not allowed together") 

522 

523 if headers is None: 

524 real_headers: CIMultiDict[str] = CIMultiDict() 

525 elif not isinstance(headers, CIMultiDict): 

526 real_headers = CIMultiDict(headers) 

527 else: 

528 real_headers = headers # = cast('CIMultiDict[str]', headers) 

529 

530 if content_type is not None and "charset" in content_type: 

531 raise ValueError("charset must not be in content_type " "argument") 

532 

533 if text is not None: 

534 if hdrs.CONTENT_TYPE in real_headers: 

535 if content_type or charset: 

536 raise ValueError( 

537 "passing both Content-Type header and " 

538 "content_type or charset params " 

539 "is forbidden" 

540 ) 

541 else: 

542 # fast path for filling headers 

543 if not isinstance(text, str): 

544 raise TypeError("text argument must be str (%r)" % type(text)) 

545 if content_type is None: 

546 content_type = "text/plain" 

547 if charset is None: 

548 charset = "utf-8" 

549 real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset 

550 body = text.encode(charset) 

551 text = None 

552 else: 

553 if hdrs.CONTENT_TYPE in real_headers: 

554 if content_type is not None or charset is not None: 

555 raise ValueError( 

556 "passing both Content-Type header and " 

557 "content_type or charset params " 

558 "is forbidden" 

559 ) 

560 else: 

561 if content_type is not None: 

562 if charset is not None: 

563 content_type += "; charset=" + charset 

564 real_headers[hdrs.CONTENT_TYPE] = content_type 

565 

566 super().__init__(status=status, reason=reason, headers=real_headers) 

567 

568 if text is not None: 

569 self.text = text 

570 else: 

571 self.body = body 

572 

573 self._compressed_body: Optional[bytes] = None 

574 self._zlib_executor_size = zlib_executor_size 

575 self._zlib_executor = zlib_executor 

576 

577 @property 

578 def body(self) -> Optional[Union[bytes, Payload]]: 

579 return self._body 

580 

581 @body.setter 

582 def body(self, body: bytes) -> None: 

583 if body is None: 

584 self._body: Optional[bytes] = None 

585 self._body_payload: bool = False 

586 elif isinstance(body, (bytes, bytearray)): 

587 self._body = body 

588 self._body_payload = False 

589 else: 

590 try: 

591 self._body = body = payload.PAYLOAD_REGISTRY.get(body) 

592 except payload.LookupError: 

593 raise ValueError("Unsupported body type %r" % type(body)) 

594 

595 self._body_payload = True 

596 

597 headers = self._headers 

598 

599 # set content-type 

600 if hdrs.CONTENT_TYPE not in headers: 

601 headers[hdrs.CONTENT_TYPE] = body.content_type 

602 

603 # copy payload headers 

604 if body.headers: 

605 for key, value in body.headers.items(): 

606 if key not in headers: 

607 headers[key] = value 

608 

609 self._compressed_body = None 

610 

611 @property 

612 def text(self) -> Optional[str]: 

613 if self._body is None: 

614 return None 

615 return self._body.decode(self.charset or "utf-8") 

616 

617 @text.setter 

618 def text(self, text: str) -> None: 

619 assert isinstance(text, str), "text argument must be str (%r)" % type(text) 

620 

621 if self.content_type == "application/octet-stream": 

622 self.content_type = "text/plain" 

623 if self.charset is None: 

624 self.charset = "utf-8" 

625 

626 self._body = text.encode(self.charset) 

627 self._body_payload = False 

628 self._compressed_body = None 

629 

630 @property 

631 def content_length(self) -> Optional[int]: 

632 if self._chunked: 

633 return None 

634 

635 if hdrs.CONTENT_LENGTH in self._headers: 

636 return super().content_length 

637 

638 if self._compressed_body is not None: 

639 # Return length of the compressed body 

640 return len(self._compressed_body) 

641 elif self._body_payload: 

642 # A payload without content length, or a compressed payload 

643 return None 

644 elif self._body is not None: 

645 return len(self._body) 

646 else: 

647 return 0 

648 

649 @content_length.setter 

650 def content_length(self, value: Optional[int]) -> None: 

651 raise RuntimeError("Content length is set automatically") 

652 

653 async def write_eof(self, data: bytes = b"") -> None: 

654 if self._eof_sent: 

655 return 

656 if self._compressed_body is None: 

657 body: Optional[Union[bytes, Payload]] = self._body 

658 else: 

659 body = self._compressed_body 

660 assert not data, f"data arg is not supported, got {data!r}" 

661 assert self._req is not None 

662 assert self._payload_writer is not None 

663 if body is not None: 

664 if self._must_be_empty_body: 

665 await super().write_eof() 

666 elif self._body_payload: 

667 payload = cast(Payload, body) 

668 await payload.write(self._payload_writer) 

669 await super().write_eof() 

670 else: 

671 await super().write_eof(cast(bytes, body)) 

672 else: 

673 await super().write_eof() 

674 

675 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter: 

676 if should_remove_content_length(request.method, self.status): 

677 if hdrs.CONTENT_LENGTH in self._headers: 

678 del self._headers[hdrs.CONTENT_LENGTH] 

679 elif not self._chunked and hdrs.CONTENT_LENGTH not in self._headers: 

680 if self._body_payload: 

681 size = cast(Payload, self._body).size 

682 if size is not None: 

683 self._headers[hdrs.CONTENT_LENGTH] = str(size) 

684 else: 

685 body_len = len(self._body) if self._body else "0" 

686 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-7 

687 if body_len != "0" or ( 

688 self.status != 304 and request.method.upper() != hdrs.METH_HEAD 

689 ): 

690 self._headers[hdrs.CONTENT_LENGTH] = str(body_len) 

691 

692 return await super()._start(request) 

693 

694 async def _do_start_compression(self, coding: ContentCoding) -> None: 

695 if self._body_payload or self._chunked: 

696 return await super()._do_start_compression(coding) 

697 

698 if coding != ContentCoding.identity: 

699 # Instead of using _payload_writer.enable_compression, 

700 # compress the whole body 

701 compressor = ZLibCompressor( 

702 encoding=str(coding.value), 

703 max_sync_chunk_size=self._zlib_executor_size, 

704 executor=self._zlib_executor, 

705 ) 

706 assert self._body is not None 

707 if self._zlib_executor_size is None and len(self._body) > 1024 * 1024: 

708 warnings.warn( 

709 "Synchronous compression of large response bodies " 

710 f"({len(self._body)} bytes) might block the async event loop. " 

711 "Consider providing a custom value to zlib_executor_size/" 

712 "zlib_executor response properties or disabling compression on it." 

713 ) 

714 self._compressed_body = ( 

715 await compressor.compress(self._body) + compressor.flush() 

716 ) 

717 assert self._compressed_body is not None 

718 

719 self._headers[hdrs.CONTENT_ENCODING] = coding.value 

720 self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body)) 

721 

722 

723def json_response( 

724 data: Any = sentinel, 

725 *, 

726 text: Optional[str] = None, 

727 body: Optional[bytes] = None, 

728 status: int = 200, 

729 reason: Optional[str] = None, 

730 headers: Optional[LooseHeaders] = None, 

731 content_type: str = "application/json", 

732 dumps: JSONEncoder = json.dumps, 

733) -> Response: 

734 if data is not sentinel: 

735 if text or body: 

736 raise ValueError("only one of data, text, or body should be specified") 

737 else: 

738 text = dumps(data) 

739 return Response( 

740 text=text, 

741 body=body, 

742 status=status, 

743 reason=reason, 

744 headers=headers, 

745 content_type=content_type, 

746 )