Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_response.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

461 statements  

1import asyncio 

2import datetime 

3import enum 

4import json 

5import math 

6import time 

7import warnings 

8from collections.abc import Iterator, MutableMapping 

9from concurrent.futures import Executor 

10from http import HTTPStatus 

11from typing import TYPE_CHECKING, Any, Optional, TypeVar, Union, cast, overload 

12 

13from multidict import CIMultiDict, istr 

14 

15from . import hdrs, payload 

16from .abc import AbstractStreamWriter 

17from .compression_utils import MAX_SYNC_CHUNK_SIZE, ZLibCompressor 

18from .helpers import ( 

19 ETAG_ANY, 

20 QUOTED_ETAG_RE, 

21 CookieMixin, 

22 ETag, 

23 HeadersMixin, 

24 ResponseKey, 

25 must_be_empty_body, 

26 parse_http_date, 

27 populate_with_cookies, 

28 rfc822_formatted_time, 

29 sentinel, 

30 should_remove_content_length, 

31 validate_etag_value, 

32) 

33from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11 

34from .payload import Payload 

35from .typedefs import JSONBytesEncoder, JSONEncoder, LooseHeaders 

36 

37REASON_PHRASES = {http_status.value: http_status.phrase for http_status in HTTPStatus} 

38 

39__all__ = ( 

40 "ContentCoding", 

41 "StreamResponse", 

42 "Response", 

43 "json_response", 

44 "json_bytes_response", 

45) 

46 

47 

48if TYPE_CHECKING: 

49 from .web_request import BaseRequest 

50 

51 

52_T = TypeVar("_T") 

53 

54 

55# TODO(py311): Convert to StrEnum for wider use 

56class ContentCoding(enum.Enum): 

57 # The content codings that we have support for. 

58 # 

59 # Additional registered codings are listed at: 

60 # https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding 

61 deflate = "deflate" 

62 gzip = "gzip" 

63 identity = "identity" 

64 

65 

66CONTENT_CODINGS = {coding.value: coding for coding in ContentCoding} 

67 

68############################################################ 

69# HTTP Response classes 

70############################################################ 

71 

72 

73class StreamResponse( 

74 MutableMapping[str | ResponseKey[Any], Any], HeadersMixin, CookieMixin 

75): 

76 

77 _body: None | bytes | bytearray | Payload 

78 _length_check = True 

79 _body = None 

80 _keep_alive: bool | None = None 

81 _chunked: bool = False 

82 _compression: bool = False 

83 _compression_strategy: int | None = None 

84 _compression_force: ContentCoding | None = None 

85 _req: Optional["BaseRequest"] = None 

86 _payload_writer: AbstractStreamWriter | None = None 

87 _eof_sent: bool = False 

88 _must_be_empty_body: bool | None = None 

89 _body_length = 0 

90 _send_headers_immediately = True 

91 

92 def __init__( 

93 self, 

94 *, 

95 status: int = 200, 

96 reason: str | None = None, 

97 headers: LooseHeaders | None = None, 

98 _real_headers: CIMultiDict[str] | None = None, 

99 ) -> None: 

100 """Initialize a new stream response object. 

101 

102 _real_headers is an internal parameter used to pass a pre-populated 

103 headers object. It is used by the `Response` class to avoid copying 

104 the headers when creating a new response object. It is not intended 

105 to be used by external code. 

106 """ 

107 self._state: dict[str | ResponseKey[Any], Any] = {} 

108 

109 if _real_headers is not None: 

110 self._headers = _real_headers 

111 elif headers is not None: 

112 self._headers: CIMultiDict[str] = CIMultiDict(headers) 

113 else: 

114 self._headers = CIMultiDict() 

115 

116 self._set_status(status, reason) 

117 

118 @property 

119 def prepared(self) -> bool: 

120 return self._eof_sent or self._payload_writer is not None 

121 

122 @property 

123 def task(self) -> "asyncio.Task[None] | None": 

124 if self._req: 

125 return self._req.task 

126 else: 

127 return None 

128 

129 @property 

130 def status(self) -> int: 

131 return self._status 

132 

133 @property 

134 def chunked(self) -> bool: 

135 return self._chunked 

136 

137 @property 

138 def compression(self) -> bool: 

139 return self._compression 

140 

141 @property 

142 def reason(self) -> str: 

143 return self._reason 

144 

145 def set_status( 

146 self, 

147 status: int, 

148 reason: str | None = None, 

149 ) -> None: 

150 assert ( 

151 not self.prepared 

152 ), "Cannot change the response status code after the headers have been sent" 

153 self._set_status(status, reason) 

154 

155 def _set_status(self, status: int, reason: str | None) -> None: 

156 self._status = status 

157 if reason is None: 

158 reason = REASON_PHRASES.get(self._status, "") 

159 elif "\r" in reason or "\n" in reason: 

160 raise ValueError("Reason cannot contain \\r or \\n") 

161 self._reason = reason 

162 

163 @property 

164 def keep_alive(self) -> bool | None: 

165 return self._keep_alive 

166 

167 def force_close(self) -> None: 

168 self._keep_alive = False 

169 

170 @property 

171 def body_length(self) -> int: 

172 return self._body_length 

173 

174 def enable_chunked_encoding(self) -> None: 

175 """Enables automatic chunked transfer encoding.""" 

176 if hdrs.CONTENT_LENGTH in self._headers: 

177 raise RuntimeError( 

178 "You can't enable chunked encoding when a content length is set" 

179 ) 

180 self._chunked = True 

181 

182 def enable_compression( 

183 self, 

184 force: ContentCoding | None = None, 

185 strategy: int | None = None, 

186 ) -> None: 

187 """Enables response compression encoding.""" 

188 # Don't enable compression if content is already encoded. 

189 # This prevents double compression and provides a safe, predictable behavior 

190 # without breaking existing code that may call enable_compression() on 

191 # responses that already have Content-Encoding set (e.g., FileResponse 

192 # serving pre-compressed files). 

193 if hdrs.CONTENT_ENCODING in self._headers: 

194 return 

195 self._compression = True 

196 self._compression_force = force 

197 self._compression_strategy = strategy 

198 

199 @property 

200 def headers(self) -> "CIMultiDict[str]": 

201 return self._headers 

202 

203 @property 

204 def content_length(self) -> int | None: 

205 # Just a placeholder for adding setter 

206 return super().content_length 

207 

208 @content_length.setter 

209 def content_length(self, value: int | None) -> None: 

210 if value is not None: 

211 value = int(value) 

212 if self._chunked: 

213 raise RuntimeError( 

214 "You can't set content length when chunked encoding is enable" 

215 ) 

216 self._headers[hdrs.CONTENT_LENGTH] = str(value) 

217 else: 

218 self._headers.pop(hdrs.CONTENT_LENGTH, None) 

219 

220 @property 

221 def content_type(self) -> str: 

222 # Just a placeholder for adding setter 

223 return super().content_type 

224 

225 @content_type.setter 

226 def content_type(self, value: str) -> None: 

227 self.content_type # read header values if needed 

228 self._content_type = str(value) 

229 self._generate_content_type_header() 

230 

231 @property 

232 def charset(self) -> str | None: 

233 # Just a placeholder for adding setter 

234 return super().charset 

235 

236 @charset.setter 

237 def charset(self, value: str | None) -> None: 

238 ctype = self.content_type # read header values if needed 

239 if ctype == "application/octet-stream": 

240 raise RuntimeError( 

241 "Setting charset for application/octet-stream " 

242 "doesn't make sense, setup content_type first" 

243 ) 

244 assert self._content_dict is not None 

245 if value is None: 

246 self._content_dict.pop("charset", None) 

247 else: 

248 self._content_dict["charset"] = str(value).lower() 

249 self._generate_content_type_header() 

250 

251 @property 

252 def last_modified(self) -> datetime.datetime | None: 

253 """The value of Last-Modified HTTP header, or None. 

254 

255 This header is represented as a `datetime` object. 

256 """ 

257 return parse_http_date(self._headers.get(hdrs.LAST_MODIFIED)) 

258 

259 @last_modified.setter 

260 def last_modified( 

261 self, value: int | float | datetime.datetime | str | None 

262 ) -> None: 

263 if value is None: 

264 self._headers.pop(hdrs.LAST_MODIFIED, None) 

265 elif isinstance(value, (int, float)): 

266 self._headers[hdrs.LAST_MODIFIED] = time.strftime( 

267 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(math.ceil(value)) 

268 ) 

269 elif isinstance(value, datetime.datetime): 

270 self._headers[hdrs.LAST_MODIFIED] = time.strftime( 

271 "%a, %d %b %Y %H:%M:%S GMT", value.utctimetuple() 

272 ) 

273 elif isinstance(value, str): 

274 self._headers[hdrs.LAST_MODIFIED] = value 

275 else: 

276 msg = f"Unsupported type for last_modified: {type(value).__name__}" # type: ignore[unreachable] 

277 raise TypeError(msg) 

278 

279 @property 

280 def etag(self) -> ETag | None: 

281 quoted_value = self._headers.get(hdrs.ETAG) 

282 if not quoted_value: 

283 return None 

284 elif quoted_value == ETAG_ANY: 

285 return ETag(value=ETAG_ANY) 

286 match = QUOTED_ETAG_RE.fullmatch(quoted_value) 

287 if not match: 

288 return None 

289 is_weak, value = match.group(1, 2) 

290 return ETag( 

291 is_weak=bool(is_weak), 

292 value=value, 

293 ) 

294 

295 @etag.setter 

296 def etag(self, value: ETag | str | None) -> None: 

297 if value is None: 

298 self._headers.pop(hdrs.ETAG, None) 

299 elif (isinstance(value, str) and value == ETAG_ANY) or ( 

300 isinstance(value, ETag) and value.value == ETAG_ANY 

301 ): 

302 self._headers[hdrs.ETAG] = ETAG_ANY 

303 elif isinstance(value, str): 

304 validate_etag_value(value) 

305 self._headers[hdrs.ETAG] = f'"{value}"' 

306 elif isinstance(value, ETag) and isinstance(value.value, str): # type: ignore[redundant-expr] 

307 validate_etag_value(value.value) 

308 hdr_value = f'W/"{value.value}"' if value.is_weak else f'"{value.value}"' 

309 self._headers[hdrs.ETAG] = hdr_value 

310 else: 

311 raise ValueError( 

312 f"Unsupported etag type: {type(value)}. " 

313 f"etag must be str, ETag or None" 

314 ) 

315 

316 def _generate_content_type_header( 

317 self, CONTENT_TYPE: istr = hdrs.CONTENT_TYPE 

318 ) -> None: 

319 assert self._content_dict is not None 

320 assert self._content_type is not None 

321 params = "; ".join(f"{k}={v}" for k, v in self._content_dict.items()) 

322 if params: 

323 ctype = self._content_type + "; " + params 

324 else: 

325 ctype = self._content_type 

326 self._headers[CONTENT_TYPE] = ctype 

327 

328 async def _do_start_compression(self, coding: ContentCoding) -> None: 

329 if coding is ContentCoding.identity: 

330 return 

331 assert self._payload_writer is not None 

332 self._headers[hdrs.CONTENT_ENCODING] = coding.value 

333 self._payload_writer.enable_compression( 

334 coding.value, self._compression_strategy 

335 ) 

336 # Compressed payload may have different content length, 

337 # remove the header 

338 self._headers.popall(hdrs.CONTENT_LENGTH, None) 

339 

340 async def _start_compression(self, request: "BaseRequest") -> None: 

341 if self._compression_force: 

342 await self._do_start_compression(self._compression_force) 

343 return 

344 # Encoding comparisons should be case-insensitive 

345 # https://www.rfc-editor.org/rfc/rfc9110#section-8.4.1 

346 accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower() 

347 for value, coding in CONTENT_CODINGS.items(): 

348 if value in accept_encoding: 

349 await self._do_start_compression(coding) 

350 return 

351 

352 async def prepare(self, request: "BaseRequest") -> AbstractStreamWriter | None: 

353 if self._eof_sent: 

354 return None 

355 if self._payload_writer is not None: 

356 return self._payload_writer 

357 self._must_be_empty_body = must_be_empty_body(request.method, self.status) 

358 return await self._start(request) 

359 

360 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter: 

361 self._req = request 

362 writer = self._payload_writer = request._payload_writer 

363 

364 await self._prepare_headers() 

365 await request._prepare_hook(self) 

366 await self._write_headers() 

367 

368 return writer 

369 

370 async def _prepare_headers(self) -> None: 

371 request = self._req 

372 assert request is not None 

373 writer = self._payload_writer 

374 assert writer is not None 

375 keep_alive = self._keep_alive 

376 if keep_alive is None: 

377 keep_alive = request.keep_alive 

378 self._keep_alive = keep_alive 

379 

380 version = request.version 

381 

382 headers = self._headers 

383 if self._cookies: 

384 populate_with_cookies(headers, self._cookies) 

385 

386 if self._compression: 

387 await self._start_compression(request) 

388 

389 if self._chunked: 

390 if version != HttpVersion11: 

391 raise RuntimeError( 

392 "Using chunked encoding is forbidden " 

393 f"for HTTP/{request.version.major}.{request.version.minor}" 

394 ) 

395 if not self._must_be_empty_body: 

396 writer.enable_chunking() 

397 headers[hdrs.TRANSFER_ENCODING] = "chunked" 

398 elif self._length_check: # Disabled for WebSockets 

399 writer.length = self.content_length 

400 if writer.length is None: 

401 if version >= HttpVersion11: 

402 if not self._must_be_empty_body: 

403 writer.enable_chunking() 

404 headers[hdrs.TRANSFER_ENCODING] = "chunked" 

405 elif not self._must_be_empty_body: 

406 keep_alive = False 

407 

408 # HTTP 1.1: https://tools.ietf.org/html/rfc7230#section-3.3.2 

409 # HTTP 1.0: https://tools.ietf.org/html/rfc1945#section-10.4 

410 if self._must_be_empty_body: 

411 if hdrs.CONTENT_LENGTH in headers and should_remove_content_length( 

412 request.method, self.status 

413 ): 

414 del headers[hdrs.CONTENT_LENGTH] 

415 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-10 

416 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-13 

417 if hdrs.TRANSFER_ENCODING in headers: 

418 del headers[hdrs.TRANSFER_ENCODING] 

419 elif (writer.length if self._length_check else self.content_length) != 0: 

420 # https://www.rfc-editor.org/rfc/rfc9110#section-8.3-5 

421 headers.setdefault(hdrs.CONTENT_TYPE, "application/octet-stream") 

422 headers.setdefault(hdrs.DATE, rfc822_formatted_time()) 

423 headers.setdefault(hdrs.SERVER, SERVER_SOFTWARE) 

424 

425 # connection header 

426 if hdrs.CONNECTION not in headers: 

427 if keep_alive: 

428 if version == HttpVersion10: 

429 headers[hdrs.CONNECTION] = "keep-alive" 

430 elif version == HttpVersion11: 

431 headers[hdrs.CONNECTION] = "close" 

432 

433 async def _write_headers(self) -> None: 

434 request = self._req 

435 assert request is not None 

436 writer = self._payload_writer 

437 assert writer is not None 

438 # status line 

439 version = request.version 

440 status_line = f"HTTP/{version[0]}.{version[1]} {self._status} {self._reason}" 

441 await writer.write_headers(status_line, self._headers) 

442 

443 # Send headers immediately if not opted into buffering 

444 if self._send_headers_immediately: 

445 writer.send_headers() 

446 

447 async def write( 

448 self, data: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

449 ) -> None: 

450 assert isinstance( 

451 data, (bytes, bytearray, memoryview) 

452 ), "data argument must be byte-ish (%r)" % type(data) 

453 

454 if self._eof_sent: 

455 raise RuntimeError("Cannot call write() after write_eof()") 

456 if self._payload_writer is None: 

457 raise RuntimeError("Cannot call write() before prepare()") 

458 

459 await self._payload_writer.write(data) 

460 

461 async def drain(self) -> None: 

462 assert not self._eof_sent, "EOF has already been sent" 

463 assert self._payload_writer is not None, "Response has not been started" 

464 warnings.warn( 

465 "drain method is deprecated, use await resp.write()", 

466 DeprecationWarning, 

467 stacklevel=2, 

468 ) 

469 await self._payload_writer.drain() 

470 

471 async def write_eof(self, data: bytes = b"") -> None: 

472 assert isinstance( 

473 data, (bytes, bytearray, memoryview) 

474 ), "data argument must be byte-ish (%r)" % type(data) 

475 

476 if self._eof_sent: 

477 return 

478 

479 assert self._payload_writer is not None, "Response has not been started" 

480 

481 await self._payload_writer.write_eof(data) 

482 self._eof_sent = True 

483 self._req = None 

484 self._body_length = self._payload_writer.output_size 

485 self._payload_writer = None 

486 

487 def __repr__(self) -> str: 

488 if self._eof_sent: 

489 info = "eof" 

490 elif self.prepared: 

491 assert self._req is not None 

492 info = f"{self._req.method} {self._req.path} " 

493 else: 

494 info = "not prepared" 

495 return f"<{self.__class__.__name__} {self.reason} {info}>" 

496 

497 @overload # type: ignore[override] 

498 def __getitem__(self, key: ResponseKey[_T]) -> _T: ... 

499 

500 @overload 

501 def __getitem__(self, key: str) -> Any: ... 

502 

503 def __getitem__(self, key: str | ResponseKey[_T]) -> Any: 

504 return self._state[key] 

505 

506 @overload # type: ignore[override] 

507 def __setitem__(self, key: ResponseKey[_T], value: _T) -> None: ... 

508 

509 @overload 

510 def __setitem__(self, key: str, value: Any) -> None: ... 

511 

512 def __setitem__(self, key: str | ResponseKey[_T], value: Any) -> None: 

513 self._state[key] = value 

514 

515 def __delitem__(self, key: str | ResponseKey[_T]) -> None: 

516 del self._state[key] 

517 

518 def __len__(self) -> int: 

519 return len(self._state) 

520 

521 def __iter__(self) -> Iterator[str | ResponseKey[Any]]: 

522 return iter(self._state) 

523 

524 def __hash__(self) -> int: 

525 return hash(id(self)) 

526 

527 def __eq__(self, other: object) -> bool: 

528 return self is other 

529 

530 def __bool__(self) -> bool: 

531 return True 

532 

533 

534class Response(StreamResponse): 

535 

536 _compressed_body: bytes | None = None 

537 _send_headers_immediately = False 

538 

539 def __init__( 

540 self, 

541 *, 

542 body: Any = None, 

543 status: int = 200, 

544 reason: str | None = None, 

545 text: str | None = None, 

546 headers: LooseHeaders | None = None, 

547 content_type: str | None = None, 

548 charset: str | None = None, 

549 zlib_executor_size: int = MAX_SYNC_CHUNK_SIZE, 

550 zlib_executor: Executor | None = None, 

551 ) -> None: 

552 if body is not None and text is not None: 

553 raise ValueError("body and text are not allowed together") 

554 

555 if headers is None: 

556 real_headers: CIMultiDict[str] = CIMultiDict() 

557 else: 

558 real_headers = CIMultiDict(headers) 

559 

560 if content_type is not None and "charset" in content_type: 

561 raise ValueError("charset must not be in content_type argument") 

562 

563 if text is not None: 

564 if hdrs.CONTENT_TYPE in real_headers: 

565 if content_type or charset: 

566 raise ValueError( 

567 "passing both Content-Type header and " 

568 "content_type or charset params " 

569 "is forbidden" 

570 ) 

571 else: 

572 # fast path for filling headers 

573 if not isinstance(text, str): 

574 raise TypeError("text argument must be str (%r)" % type(text)) 

575 if content_type is None: 

576 content_type = "text/plain" 

577 if charset is None: 

578 charset = "utf-8" 

579 real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset 

580 body = text.encode(charset) 

581 text = None 

582 elif hdrs.CONTENT_TYPE in real_headers: 

583 if content_type is not None or charset is not None: 

584 raise ValueError( 

585 "passing both Content-Type header and " 

586 "content_type or charset params " 

587 "is forbidden" 

588 ) 

589 elif content_type is not None: 

590 if charset is not None: 

591 content_type += "; charset=" + charset 

592 real_headers[hdrs.CONTENT_TYPE] = content_type 

593 

594 super().__init__(status=status, reason=reason, _real_headers=real_headers) 

595 

596 if text is not None: 

597 self.text = text 

598 else: 

599 self.body = body 

600 

601 self._zlib_executor_size = zlib_executor_size 

602 self._zlib_executor = zlib_executor 

603 

604 @property 

605 def body(self) -> bytes | bytearray | Payload | None: 

606 return self._body 

607 

608 @body.setter 

609 def body(self, body: Any) -> None: 

610 if body is None: 

611 self._body = None 

612 elif isinstance(body, (bytes, bytearray)): 

613 self._body = body 

614 else: 

615 try: 

616 self._body = body = payload.PAYLOAD_REGISTRY.get(body) 

617 except payload.LookupError: 

618 raise ValueError("Unsupported body type %r" % type(body)) 

619 

620 headers = self._headers 

621 

622 # set content-type 

623 if hdrs.CONTENT_TYPE not in headers: 

624 headers[hdrs.CONTENT_TYPE] = body.content_type 

625 

626 # copy payload headers 

627 if body.headers: 

628 for key, value in body.headers.items(): 

629 if key not in headers: 

630 headers[key] = value 

631 

632 self._compressed_body = None 

633 

634 @property 

635 def text(self) -> str | None: 

636 if self._body is None: 

637 return None 

638 # Note: When _body is a Payload (e.g. FilePayload), this may do blocking I/O 

639 # This is generally safe as most common payloads (BytesPayload, StringPayload) 

640 # don't do blocking I/O, but be careful with file-based payloads 

641 return self._body.decode(self.charset or "utf-8") 

642 

643 @text.setter 

644 def text(self, text: str) -> None: 

645 assert isinstance(text, str), "text argument must be str (%r)" % type(text) 

646 

647 if self.content_type == "application/octet-stream": 

648 self.content_type = "text/plain" 

649 if self.charset is None: 

650 self.charset = "utf-8" 

651 

652 self._body = text.encode(self.charset) 

653 self._compressed_body = None 

654 

655 @property 

656 def content_length(self) -> int | None: 

657 if self._chunked: 

658 return None 

659 

660 if hdrs.CONTENT_LENGTH in self._headers: 

661 return int(self._headers[hdrs.CONTENT_LENGTH]) 

662 

663 if self._compressed_body is not None: 

664 # Return length of the compressed body 

665 return len(self._compressed_body) 

666 elif isinstance(self._body, Payload): 

667 # A payload without content length, or a compressed payload 

668 return None 

669 elif self._body is not None: 

670 return len(self._body) 

671 else: 

672 return 0 

673 

674 @content_length.setter 

675 def content_length(self, value: int | None) -> None: 

676 raise RuntimeError("Content length is set automatically") 

677 

678 async def write_eof(self, data: bytes = b"") -> None: 

679 if self._eof_sent: 

680 return 

681 if self._compressed_body is None: 

682 body = self._body 

683 else: 

684 body = self._compressed_body 

685 assert not data, f"data arg is not supported, got {data!r}" 

686 assert self._req is not None 

687 assert self._payload_writer is not None 

688 if body is None or self._must_be_empty_body: 

689 await super().write_eof() 

690 elif isinstance(self._body, Payload): 

691 await self._body.write(self._payload_writer) 

692 await self._body.close() 

693 await super().write_eof() 

694 else: 

695 await super().write_eof(cast(bytes, body)) 

696 

697 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter: 

698 if hdrs.CONTENT_LENGTH in self._headers: 

699 if should_remove_content_length(request.method, self.status): 

700 del self._headers[hdrs.CONTENT_LENGTH] 

701 elif not self._chunked: 

702 if isinstance(self._body, Payload): 

703 if (size := self._body.size) is not None: 

704 self._headers[hdrs.CONTENT_LENGTH] = str(size) 

705 else: 

706 body_len = len(self._body) if self._body else "0" 

707 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-7 

708 if body_len != "0" or ( 

709 self.status != 304 and request.method not in hdrs.METH_HEAD_ALL 

710 ): 

711 self._headers[hdrs.CONTENT_LENGTH] = str(body_len) 

712 

713 return await super()._start(request) 

714 

715 async def _do_start_compression(self, coding: ContentCoding) -> None: 

716 if self._chunked or isinstance(self._body, Payload): 

717 return await super()._do_start_compression(coding) 

718 if coding is ContentCoding.identity: 

719 return 

720 # Instead of using _payload_writer.enable_compression, 

721 # compress the whole body 

722 compressor = ZLibCompressor( 

723 encoding=coding.value, 

724 max_sync_chunk_size=self._zlib_executor_size, 

725 executor=self._zlib_executor, 

726 ) 

727 assert self._body is not None 

728 self._compressed_body = ( 

729 await compressor.compress(self._body) + compressor.flush() 

730 ) 

731 self._headers[hdrs.CONTENT_ENCODING] = coding.value 

732 self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body)) 

733 

734 

735def json_response( 

736 data: Any = sentinel, 

737 *, 

738 text: str | None = None, 

739 body: bytes | None = None, 

740 status: int = 200, 

741 reason: str | None = None, 

742 headers: LooseHeaders | None = None, 

743 content_type: str = "application/json", 

744 dumps: JSONEncoder = json.dumps, 

745) -> Response: 

746 if data is not sentinel: 

747 if text or body: 

748 raise ValueError("only one of data, text, or body should be specified") 

749 else: 

750 text = dumps(data) 

751 return Response( 

752 text=text, 

753 body=body, 

754 status=status, 

755 reason=reason, 

756 headers=headers, 

757 content_type=content_type, 

758 ) 

759 

760 

761def json_bytes_response( 

762 data: Any = sentinel, 

763 *, 

764 dumps: JSONBytesEncoder, 

765 body: bytes | None = None, 

766 status: int = 200, 

767 reason: str | None = None, 

768 headers: LooseHeaders | None = None, 

769 content_type: str = "application/json", 

770) -> Response: 

771 """Create a JSON response using a bytes-returning encoder. 

772 

773 Use this when your JSON encoder (like orjson) returns bytes 

774 instead of str, avoiding the encode/decode overhead. 

775 """ 

776 if data is not sentinel: 

777 if body is not None: 

778 raise ValueError("only one of data or body should be specified") 

779 else: 

780 body = dumps(data) 

781 return Response( 

782 body=body, 

783 status=status, 

784 reason=reason, 

785 headers=headers, 

786 content_type=content_type, 

787 )