Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_response.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

444 statements  

1import asyncio 

2import collections.abc 

3import datetime 

4import enum 

5import json 

6import math 

7import time 

8import warnings 

9from concurrent.futures import Executor 

10from http import HTTPStatus 

11from typing import ( 

12 TYPE_CHECKING, 

13 Any, 

14 Dict, 

15 Iterator, 

16 MutableMapping, 

17 Optional, 

18 Union, 

19 cast, 

20) 

21 

22from multidict import CIMultiDict, istr 

23 

24from . import hdrs, payload 

25from .abc import AbstractStreamWriter 

26from .compression_utils import ZLibCompressor 

27from .helpers import ( 

28 ETAG_ANY, 

29 QUOTED_ETAG_RE, 

30 CookieMixin, 

31 ETag, 

32 HeadersMixin, 

33 must_be_empty_body, 

34 parse_http_date, 

35 populate_with_cookies, 

36 rfc822_formatted_time, 

37 sentinel, 

38 should_remove_content_length, 

39 validate_etag_value, 

40) 

41from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11 

42from .payload import Payload 

43from .typedefs import JSONEncoder, LooseHeaders 

44 

45REASON_PHRASES = {http_status.value: http_status.phrase for http_status in HTTPStatus} 

46LARGE_BODY_SIZE = 1024**2 

47 

48__all__ = ("ContentCoding", "StreamResponse", "Response", "json_response") 

49 

50 

51if TYPE_CHECKING: 

52 from .web_request import BaseRequest 

53 

54 BaseClass = MutableMapping[str, Any] 

55else: 

56 BaseClass = collections.abc.MutableMapping 

57 

58 

59# TODO(py311): Convert to StrEnum for wider use 

60class ContentCoding(enum.Enum): 

61 # The content codings that we have support for. 

62 # 

63 # Additional registered codings are listed at: 

64 # https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding 

65 deflate = "deflate" 

66 gzip = "gzip" 

67 identity = "identity" 

68 

69 

70CONTENT_CODINGS = {coding.value: coding for coding in ContentCoding} 

71 

72############################################################ 

73# HTTP Response classes 

74############################################################ 

75 

76 

77class StreamResponse(BaseClass, HeadersMixin, CookieMixin): 

78 

79 _body: Union[None, bytes, bytearray, Payload] 

80 _length_check = True 

81 _body = None 

82 _keep_alive: Optional[bool] = None 

83 _chunked: bool = False 

84 _compression: bool = False 

85 _compression_strategy: Optional[int] = None 

86 _compression_force: Optional[ContentCoding] = None 

87 _req: Optional["BaseRequest"] = None 

88 _payload_writer: Optional[AbstractStreamWriter] = None 

89 _eof_sent: bool = False 

90 _must_be_empty_body: Optional[bool] = None 

91 _body_length = 0 

92 

93 def __init__( 

94 self, 

95 *, 

96 status: int = 200, 

97 reason: Optional[str] = None, 

98 headers: Optional[LooseHeaders] = None, 

99 _real_headers: Optional[CIMultiDict[str]] = None, 

100 ) -> None: 

101 """Initialize a new stream response object. 

102 

103 _real_headers is an internal parameter used to pass a pre-populated 

104 headers object. It is used by the `Response` class to avoid copying 

105 the headers when creating a new response object. It is not intended 

106 to be used by external code. 

107 """ 

108 self._state: Dict[str, Any] = {} 

109 

110 if _real_headers is not None: 

111 self._headers = _real_headers 

112 elif headers is not None: 

113 self._headers: CIMultiDict[str] = CIMultiDict(headers) 

114 else: 

115 self._headers = CIMultiDict() 

116 

117 self._set_status(status, reason) 

118 

119 @property 

120 def prepared(self) -> bool: 

121 return self._eof_sent or self._payload_writer is not None 

122 

123 @property 

124 def task(self) -> "Optional[asyncio.Task[None]]": 

125 if self._req: 

126 return self._req.task 

127 else: 

128 return None 

129 

130 @property 

131 def status(self) -> int: 

132 return self._status 

133 

134 @property 

135 def chunked(self) -> bool: 

136 return self._chunked 

137 

138 @property 

139 def compression(self) -> bool: 

140 return self._compression 

141 

142 @property 

143 def reason(self) -> str: 

144 return self._reason 

145 

146 def set_status( 

147 self, 

148 status: int, 

149 reason: Optional[str] = None, 

150 ) -> None: 

151 assert ( 

152 not self.prepared 

153 ), "Cannot change the response status code after the headers have been sent" 

154 self._set_status(status, reason) 

155 

156 def _set_status(self, status: int, reason: Optional[str]) -> None: 

157 self._status = status 

158 if reason is None: 

159 reason = REASON_PHRASES.get(self._status, "") 

160 elif "\n" in reason: 

161 raise ValueError("Reason cannot contain \\n") 

162 self._reason = reason 

163 

164 @property 

165 def keep_alive(self) -> Optional[bool]: 

166 return self._keep_alive 

167 

168 def force_close(self) -> None: 

169 self._keep_alive = False 

170 

171 @property 

172 def body_length(self) -> int: 

173 return self._body_length 

174 

175 def enable_chunked_encoding(self) -> None: 

176 """Enables automatic chunked transfer encoding.""" 

177 if hdrs.CONTENT_LENGTH in self._headers: 

178 raise RuntimeError( 

179 "You can't enable chunked encoding when a content length is set" 

180 ) 

181 self._chunked = True 

182 

183 def enable_compression( 

184 self, 

185 force: Optional[ContentCoding] = None, 

186 strategy: Optional[int] = None, 

187 ) -> None: 

188 """Enables response compression encoding.""" 

189 self._compression = True 

190 self._compression_force = force 

191 self._compression_strategy = strategy 

192 

193 @property 

194 def headers(self) -> "CIMultiDict[str]": 

195 return self._headers 

196 

197 @property 

198 def content_length(self) -> Optional[int]: 

199 # Just a placeholder for adding setter 

200 return super().content_length 

201 

202 @content_length.setter 

203 def content_length(self, value: Optional[int]) -> None: 

204 if value is not None: 

205 value = int(value) 

206 if self._chunked: 

207 raise RuntimeError( 

208 "You can't set content length when chunked encoding is enable" 

209 ) 

210 self._headers[hdrs.CONTENT_LENGTH] = str(value) 

211 else: 

212 self._headers.pop(hdrs.CONTENT_LENGTH, None) 

213 

214 @property 

215 def content_type(self) -> str: 

216 # Just a placeholder for adding setter 

217 return super().content_type 

218 

219 @content_type.setter 

220 def content_type(self, value: str) -> None: 

221 self.content_type # read header values if needed 

222 self._content_type = str(value) 

223 self._generate_content_type_header() 

224 

225 @property 

226 def charset(self) -> Optional[str]: 

227 # Just a placeholder for adding setter 

228 return super().charset 

229 

230 @charset.setter 

231 def charset(self, value: Optional[str]) -> None: 

232 ctype = self.content_type # read header values if needed 

233 if ctype == "application/octet-stream": 

234 raise RuntimeError( 

235 "Setting charset for application/octet-stream " 

236 "doesn't make sense, setup content_type first" 

237 ) 

238 assert self._content_dict is not None 

239 if value is None: 

240 self._content_dict.pop("charset", None) 

241 else: 

242 self._content_dict["charset"] = str(value).lower() 

243 self._generate_content_type_header() 

244 

245 @property 

246 def last_modified(self) -> Optional[datetime.datetime]: 

247 """The value of Last-Modified HTTP header, or None. 

248 

249 This header is represented as a `datetime` object. 

250 """ 

251 return parse_http_date(self._headers.get(hdrs.LAST_MODIFIED)) 

252 

253 @last_modified.setter 

254 def last_modified( 

255 self, value: Optional[Union[int, float, datetime.datetime, str]] 

256 ) -> None: 

257 if value is None: 

258 self._headers.pop(hdrs.LAST_MODIFIED, None) 

259 elif isinstance(value, (int, float)): 

260 self._headers[hdrs.LAST_MODIFIED] = time.strftime( 

261 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(math.ceil(value)) 

262 ) 

263 elif isinstance(value, datetime.datetime): 

264 self._headers[hdrs.LAST_MODIFIED] = time.strftime( 

265 "%a, %d %b %Y %H:%M:%S GMT", value.utctimetuple() 

266 ) 

267 elif isinstance(value, str): 

268 self._headers[hdrs.LAST_MODIFIED] = value 

269 else: 

270 msg = f"Unsupported type for last_modified: {type(value).__name__}" # type: ignore[unreachable] 

271 raise TypeError(msg) 

272 

273 @property 

274 def etag(self) -> Optional[ETag]: 

275 quoted_value = self._headers.get(hdrs.ETAG) 

276 if not quoted_value: 

277 return None 

278 elif quoted_value == ETAG_ANY: 

279 return ETag(value=ETAG_ANY) 

280 match = QUOTED_ETAG_RE.fullmatch(quoted_value) 

281 if not match: 

282 return None 

283 is_weak, value = match.group(1, 2) 

284 return ETag( 

285 is_weak=bool(is_weak), 

286 value=value, 

287 ) 

288 

289 @etag.setter 

290 def etag(self, value: Optional[Union[ETag, str]]) -> None: 

291 if value is None: 

292 self._headers.pop(hdrs.ETAG, None) 

293 elif (isinstance(value, str) and value == ETAG_ANY) or ( 

294 isinstance(value, ETag) and value.value == ETAG_ANY 

295 ): 

296 self._headers[hdrs.ETAG] = ETAG_ANY 

297 elif isinstance(value, str): 

298 validate_etag_value(value) 

299 self._headers[hdrs.ETAG] = f'"{value}"' 

300 elif isinstance(value, ETag) and isinstance(value.value, str): # type: ignore[redundant-expr] 

301 validate_etag_value(value.value) 

302 hdr_value = f'W/"{value.value}"' if value.is_weak else f'"{value.value}"' 

303 self._headers[hdrs.ETAG] = hdr_value 

304 else: 

305 raise ValueError( 

306 f"Unsupported etag type: {type(value)}. " 

307 f"etag must be str, ETag or None" 

308 ) 

309 

310 def _generate_content_type_header( 

311 self, CONTENT_TYPE: istr = hdrs.CONTENT_TYPE 

312 ) -> None: 

313 assert self._content_dict is not None 

314 assert self._content_type is not None 

315 params = "; ".join(f"{k}={v}" for k, v in self._content_dict.items()) 

316 if params: 

317 ctype = self._content_type + "; " + params 

318 else: 

319 ctype = self._content_type 

320 self._headers[CONTENT_TYPE] = ctype 

321 

322 async def _do_start_compression(self, coding: ContentCoding) -> None: 

323 if coding is ContentCoding.identity: 

324 return 

325 assert self._payload_writer is not None 

326 self._headers[hdrs.CONTENT_ENCODING] = coding.value 

327 self._payload_writer.enable_compression( 

328 coding.value, self._compression_strategy 

329 ) 

330 # Compressed payload may have different content length, 

331 # remove the header 

332 self._headers.popall(hdrs.CONTENT_LENGTH, None) 

333 

334 async def _start_compression(self, request: "BaseRequest") -> None: 

335 if self._compression_force: 

336 await self._do_start_compression(self._compression_force) 

337 return 

338 # Encoding comparisons should be case-insensitive 

339 # https://www.rfc-editor.org/rfc/rfc9110#section-8.4.1 

340 accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower() 

341 for value, coding in CONTENT_CODINGS.items(): 

342 if value in accept_encoding: 

343 await self._do_start_compression(coding) 

344 return 

345 

346 async def prepare(self, request: "BaseRequest") -> Optional[AbstractStreamWriter]: 

347 if self._eof_sent: 

348 return None 

349 if self._payload_writer is not None: 

350 return self._payload_writer 

351 self._must_be_empty_body = must_be_empty_body(request.method, self.status) 

352 return await self._start(request) 

353 

354 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter: 

355 self._req = request 

356 writer = self._payload_writer = request._payload_writer 

357 

358 await self._prepare_headers() 

359 await request._prepare_hook(self) 

360 await self._write_headers() 

361 

362 return writer 

363 

364 async def _prepare_headers(self) -> None: 

365 request = self._req 

366 assert request is not None 

367 writer = self._payload_writer 

368 assert writer is not None 

369 keep_alive = self._keep_alive 

370 if keep_alive is None: 

371 keep_alive = request.keep_alive 

372 self._keep_alive = keep_alive 

373 

374 version = request.version 

375 

376 headers = self._headers 

377 if self._cookies: 

378 populate_with_cookies(headers, self._cookies) 

379 

380 if self._compression: 

381 await self._start_compression(request) 

382 

383 if self._chunked: 

384 if version != HttpVersion11: 

385 raise RuntimeError( 

386 "Using chunked encoding is forbidden " 

387 "for HTTP/{0.major}.{0.minor}".format(request.version) 

388 ) 

389 if not self._must_be_empty_body: 

390 writer.enable_chunking() 

391 headers[hdrs.TRANSFER_ENCODING] = "chunked" 

392 elif self._length_check: # Disabled for WebSockets 

393 writer.length = self.content_length 

394 if writer.length is None: 

395 if version >= HttpVersion11: 

396 if not self._must_be_empty_body: 

397 writer.enable_chunking() 

398 headers[hdrs.TRANSFER_ENCODING] = "chunked" 

399 elif not self._must_be_empty_body: 

400 keep_alive = False 

401 

402 # HTTP 1.1: https://tools.ietf.org/html/rfc7230#section-3.3.2 

403 # HTTP 1.0: https://tools.ietf.org/html/rfc1945#section-10.4 

404 if self._must_be_empty_body: 

405 if hdrs.CONTENT_LENGTH in headers and should_remove_content_length( 

406 request.method, self.status 

407 ): 

408 del headers[hdrs.CONTENT_LENGTH] 

409 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-10 

410 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-13 

411 if hdrs.TRANSFER_ENCODING in headers: 

412 del headers[hdrs.TRANSFER_ENCODING] 

413 elif (writer.length if self._length_check else self.content_length) != 0: 

414 # https://www.rfc-editor.org/rfc/rfc9110#section-8.3-5 

415 headers.setdefault(hdrs.CONTENT_TYPE, "application/octet-stream") 

416 headers.setdefault(hdrs.DATE, rfc822_formatted_time()) 

417 headers.setdefault(hdrs.SERVER, SERVER_SOFTWARE) 

418 

419 # connection header 

420 if hdrs.CONNECTION not in headers: 

421 if keep_alive: 

422 if version == HttpVersion10: 

423 headers[hdrs.CONNECTION] = "keep-alive" 

424 elif version == HttpVersion11: 

425 headers[hdrs.CONNECTION] = "close" 

426 

427 async def _write_headers(self) -> None: 

428 request = self._req 

429 assert request is not None 

430 writer = self._payload_writer 

431 assert writer is not None 

432 # status line 

433 version = request.version 

434 status_line = f"HTTP/{version[0]}.{version[1]} {self._status} {self._reason}" 

435 await writer.write_headers(status_line, self._headers) 

436 

437 async def write( 

438 self, data: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

439 ) -> None: 

440 assert isinstance( 

441 data, (bytes, bytearray, memoryview) 

442 ), "data argument must be byte-ish (%r)" % type(data) 

443 

444 if self._eof_sent: 

445 raise RuntimeError("Cannot call write() after write_eof()") 

446 if self._payload_writer is None: 

447 raise RuntimeError("Cannot call write() before prepare()") 

448 

449 await self._payload_writer.write(data) 

450 

451 async def drain(self) -> None: 

452 assert not self._eof_sent, "EOF has already been sent" 

453 assert self._payload_writer is not None, "Response has not been started" 

454 warnings.warn( 

455 "drain method is deprecated, use await resp.write()", 

456 DeprecationWarning, 

457 stacklevel=2, 

458 ) 

459 await self._payload_writer.drain() 

460 

461 async def write_eof(self, data: bytes = b"") -> None: 

462 assert isinstance( 

463 data, (bytes, bytearray, memoryview) 

464 ), "data argument must be byte-ish (%r)" % type(data) 

465 

466 if self._eof_sent: 

467 return 

468 

469 assert self._payload_writer is not None, "Response has not been started" 

470 

471 await self._payload_writer.write_eof(data) 

472 self._eof_sent = True 

473 self._req = None 

474 self._body_length = self._payload_writer.output_size 

475 self._payload_writer = None 

476 

477 def __repr__(self) -> str: 

478 if self._eof_sent: 

479 info = "eof" 

480 elif self.prepared: 

481 assert self._req is not None 

482 info = f"{self._req.method} {self._req.path} " 

483 else: 

484 info = "not prepared" 

485 return f"<{self.__class__.__name__} {self.reason} {info}>" 

486 

487 def __getitem__(self, key: str) -> Any: 

488 return self._state[key] 

489 

490 def __setitem__(self, key: str, value: Any) -> None: 

491 self._state[key] = value 

492 

493 def __delitem__(self, key: str) -> None: 

494 del self._state[key] 

495 

496 def __len__(self) -> int: 

497 return len(self._state) 

498 

499 def __iter__(self) -> Iterator[str]: 

500 return iter(self._state) 

501 

502 def __hash__(self) -> int: 

503 return hash(id(self)) 

504 

505 def __eq__(self, other: object) -> bool: 

506 return self is other 

507 

508 def __bool__(self) -> bool: 

509 return True 

510 

511 

512class Response(StreamResponse): 

513 

514 _compressed_body: Optional[bytes] = None 

515 

516 def __init__( 

517 self, 

518 *, 

519 body: Any = None, 

520 status: int = 200, 

521 reason: Optional[str] = None, 

522 text: Optional[str] = None, 

523 headers: Optional[LooseHeaders] = None, 

524 content_type: Optional[str] = None, 

525 charset: Optional[str] = None, 

526 zlib_executor_size: Optional[int] = None, 

527 zlib_executor: Optional[Executor] = None, 

528 ) -> None: 

529 if body is not None and text is not None: 

530 raise ValueError("body and text are not allowed together") 

531 

532 if headers is None: 

533 real_headers: CIMultiDict[str] = CIMultiDict() 

534 else: 

535 real_headers = CIMultiDict(headers) 

536 

537 if content_type is not None and "charset" in content_type: 

538 raise ValueError("charset must not be in content_type argument") 

539 

540 if text is not None: 

541 if hdrs.CONTENT_TYPE in real_headers: 

542 if content_type or charset: 

543 raise ValueError( 

544 "passing both Content-Type header and " 

545 "content_type or charset params " 

546 "is forbidden" 

547 ) 

548 else: 

549 # fast path for filling headers 

550 if not isinstance(text, str): 

551 raise TypeError("text argument must be str (%r)" % type(text)) 

552 if content_type is None: 

553 content_type = "text/plain" 

554 if charset is None: 

555 charset = "utf-8" 

556 real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset 

557 body = text.encode(charset) 

558 text = None 

559 elif hdrs.CONTENT_TYPE in real_headers: 

560 if content_type is not None or charset is not None: 

561 raise ValueError( 

562 "passing both Content-Type header and " 

563 "content_type or charset params " 

564 "is forbidden" 

565 ) 

566 elif content_type is not None: 

567 if charset is not None: 

568 content_type += "; charset=" + charset 

569 real_headers[hdrs.CONTENT_TYPE] = content_type 

570 

571 super().__init__(status=status, reason=reason, _real_headers=real_headers) 

572 

573 if text is not None: 

574 self.text = text 

575 else: 

576 self.body = body 

577 

578 self._zlib_executor_size = zlib_executor_size 

579 self._zlib_executor = zlib_executor 

580 

581 @property 

582 def body(self) -> Optional[Union[bytes, bytearray, Payload]]: 

583 return self._body 

584 

585 @body.setter 

586 def body(self, body: Any) -> None: 

587 if body is None: 

588 self._body = None 

589 elif isinstance(body, (bytes, bytearray)): 

590 self._body = body 

591 else: 

592 try: 

593 self._body = body = payload.PAYLOAD_REGISTRY.get(body) 

594 except payload.LookupError: 

595 raise ValueError("Unsupported body type %r" % type(body)) 

596 

597 headers = self._headers 

598 

599 # set content-type 

600 if hdrs.CONTENT_TYPE not in headers: 

601 headers[hdrs.CONTENT_TYPE] = body.content_type 

602 

603 # copy payload headers 

604 if body.headers: 

605 for key, value in body.headers.items(): 

606 if key not in headers: 

607 headers[key] = value 

608 

609 self._compressed_body = None 

610 

611 @property 

612 def text(self) -> Optional[str]: 

613 if self._body is None: 

614 return None 

615 return self._body.decode(self.charset or "utf-8") 

616 

617 @text.setter 

618 def text(self, text: str) -> None: 

619 assert isinstance(text, str), "text argument must be str (%r)" % type(text) 

620 

621 if self.content_type == "application/octet-stream": 

622 self.content_type = "text/plain" 

623 if self.charset is None: 

624 self.charset = "utf-8" 

625 

626 self._body = text.encode(self.charset) 

627 self._compressed_body = None 

628 

629 @property 

630 def content_length(self) -> Optional[int]: 

631 if self._chunked: 

632 return None 

633 

634 if hdrs.CONTENT_LENGTH in self._headers: 

635 return int(self._headers[hdrs.CONTENT_LENGTH]) 

636 

637 if self._compressed_body is not None: 

638 # Return length of the compressed body 

639 return len(self._compressed_body) 

640 elif isinstance(self._body, Payload): 

641 # A payload without content length, or a compressed payload 

642 return None 

643 elif self._body is not None: 

644 return len(self._body) 

645 else: 

646 return 0 

647 

648 @content_length.setter 

649 def content_length(self, value: Optional[int]) -> None: 

650 raise RuntimeError("Content length is set automatically") 

651 

652 async def write_eof(self, data: bytes = b"") -> None: 

653 if self._eof_sent: 

654 return 

655 if self._compressed_body is None: 

656 body = self._body 

657 else: 

658 body = self._compressed_body 

659 assert not data, f"data arg is not supported, got {data!r}" 

660 assert self._req is not None 

661 assert self._payload_writer is not None 

662 if body is None or self._must_be_empty_body: 

663 await super().write_eof() 

664 elif isinstance(self._body, Payload): 

665 await self._body.write(self._payload_writer) 

666 await super().write_eof() 

667 else: 

668 await super().write_eof(cast(bytes, body)) 

669 

670 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter: 

671 if hdrs.CONTENT_LENGTH in self._headers: 

672 if should_remove_content_length(request.method, self.status): 

673 del self._headers[hdrs.CONTENT_LENGTH] 

674 elif not self._chunked: 

675 if isinstance(self._body, Payload): 

676 if self._body.size is not None: 

677 self._headers[hdrs.CONTENT_LENGTH] = str(self._body.size) 

678 else: 

679 body_len = len(self._body) if self._body else "0" 

680 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-7 

681 if body_len != "0" or ( 

682 self.status != 304 and request.method not in hdrs.METH_HEAD_ALL 

683 ): 

684 self._headers[hdrs.CONTENT_LENGTH] = str(body_len) 

685 

686 return await super()._start(request) 

687 

688 async def _do_start_compression(self, coding: ContentCoding) -> None: 

689 if self._chunked or isinstance(self._body, Payload): 

690 return await super()._do_start_compression(coding) 

691 if coding is ContentCoding.identity: 

692 return 

693 # Instead of using _payload_writer.enable_compression, 

694 # compress the whole body 

695 compressor = ZLibCompressor( 

696 encoding=coding.value, 

697 max_sync_chunk_size=self._zlib_executor_size, 

698 executor=self._zlib_executor, 

699 ) 

700 assert self._body is not None 

701 if self._zlib_executor_size is None and len(self._body) > LARGE_BODY_SIZE: 

702 warnings.warn( 

703 "Synchronous compression of large response bodies " 

704 f"({len(self._body)} bytes) might block the async event loop. " 

705 "Consider providing a custom value to zlib_executor_size/" 

706 "zlib_executor response properties or disabling compression on it." 

707 ) 

708 self._compressed_body = ( 

709 await compressor.compress(self._body) + compressor.flush() 

710 ) 

711 self._headers[hdrs.CONTENT_ENCODING] = coding.value 

712 self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body)) 

713 

714 

715def json_response( 

716 data: Any = sentinel, 

717 *, 

718 text: Optional[str] = None, 

719 body: Optional[bytes] = None, 

720 status: int = 200, 

721 reason: Optional[str] = None, 

722 headers: Optional[LooseHeaders] = None, 

723 content_type: str = "application/json", 

724 dumps: JSONEncoder = json.dumps, 

725) -> Response: 

726 if data is not sentinel: 

727 if text or body: 

728 raise ValueError("only one of data, text, or body should be specified") 

729 else: 

730 text = dumps(data) 

731 return Response( 

732 text=text, 

733 body=body, 

734 status=status, 

735 reason=reason, 

736 headers=headers, 

737 content_type=content_type, 

738 )