Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_response.py: 24%
442 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-26 06:16 +0000
1import asyncio
2import collections.abc
3import datetime
4import enum
5import json
6import math
7import time
8import warnings
9from concurrent.futures import Executor
10from http import HTTPStatus
11from typing import (
12 TYPE_CHECKING,
13 Any,
14 Dict,
15 Iterator,
16 MutableMapping,
17 Optional,
18 Union,
19 cast,
20)
22from multidict import CIMultiDict, istr
24from . import hdrs, payload
25from .abc import AbstractStreamWriter
26from .compression_utils import ZLibCompressor
27from .helpers import (
28 ETAG_ANY,
29 QUOTED_ETAG_RE,
30 CookieMixin,
31 ETag,
32 HeadersMixin,
33 must_be_empty_body,
34 parse_http_date,
35 populate_with_cookies,
36 rfc822_formatted_time,
37 sentinel,
38 should_remove_content_length,
39 validate_etag_value,
40)
41from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11
42from .payload import Payload
43from .typedefs import JSONEncoder, LooseHeaders
45__all__ = ("ContentCoding", "StreamResponse", "Response", "json_response")
48if TYPE_CHECKING:
49 from .web_request import BaseRequest
51 BaseClass = MutableMapping[str, Any]
52else:
53 BaseClass = collections.abc.MutableMapping
56class ContentCoding(enum.Enum):
57 # The content codings that we have support for.
58 #
59 # Additional registered codings are listed at:
60 # https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding
61 deflate = "deflate"
62 gzip = "gzip"
63 identity = "identity"
66############################################################
67# HTTP Response classes
68############################################################
71class StreamResponse(BaseClass, HeadersMixin, CookieMixin):
72 __slots__ = (
73 "_length_check",
74 "_body",
75 "_keep_alive",
76 "_chunked",
77 "_compression",
78 "_compression_force",
79 "_req",
80 "_payload_writer",
81 "_eof_sent",
82 "_must_be_empty_body",
83 "_body_length",
84 "_state",
85 "_headers",
86 "_status",
87 "_reason",
88 "_cookies",
89 "__weakref__",
90 )
92 def __init__(
93 self,
94 *,
95 status: int = 200,
96 reason: Optional[str] = None,
97 headers: Optional[LooseHeaders] = None,
98 ) -> None:
99 super().__init__()
100 self._length_check = True
101 self._body = None
102 self._keep_alive: Optional[bool] = None
103 self._chunked = False
104 self._compression = False
105 self._compression_force: Optional[ContentCoding] = None
107 self._req: Optional[BaseRequest] = None
108 self._payload_writer: Optional[AbstractStreamWriter] = None
109 self._eof_sent = False
110 self._must_be_empty_body: Optional[bool] = None
111 self._body_length = 0
112 self._state: Dict[str, Any] = {}
114 if headers is not None:
115 self._headers: CIMultiDict[str] = CIMultiDict(headers)
116 else:
117 self._headers = CIMultiDict()
119 self.set_status(status, reason)
121 @property
122 def prepared(self) -> bool:
123 return self._payload_writer is not None
125 @property
126 def task(self) -> "Optional[asyncio.Task[None]]":
127 if self._req:
128 return self._req.task
129 else:
130 return None
132 @property
133 def status(self) -> int:
134 return self._status
136 @property
137 def chunked(self) -> bool:
138 return self._chunked
140 @property
141 def compression(self) -> bool:
142 return self._compression
144 @property
145 def reason(self) -> str:
146 return self._reason
148 def set_status(
149 self,
150 status: int,
151 reason: Optional[str] = None,
152 ) -> None:
153 assert not self.prepared, (
154 "Cannot change the response status code after " "the headers have been sent"
155 )
156 self._status = int(status)
157 if reason is None:
158 try:
159 reason = HTTPStatus(self._status).phrase
160 except ValueError:
161 reason = ""
162 self._reason = reason
164 @property
165 def keep_alive(self) -> Optional[bool]:
166 return self._keep_alive
168 def force_close(self) -> None:
169 self._keep_alive = False
171 @property
172 def body_length(self) -> int:
173 return self._body_length
175 def enable_chunked_encoding(self) -> None:
176 """Enables automatic chunked transfer encoding."""
177 self._chunked = True
179 if hdrs.CONTENT_LENGTH in self._headers:
180 raise RuntimeError(
181 "You can't enable chunked encoding when " "a content length is set"
182 )
184 def enable_compression(self, force: Optional[ContentCoding] = None) -> None:
185 """Enables response compression encoding."""
186 # Backwards compatibility for when force was a bool <0.17.
187 self._compression = True
188 self._compression_force = force
190 @property
191 def headers(self) -> "CIMultiDict[str]":
192 return self._headers
194 @property
195 def content_length(self) -> Optional[int]:
196 # Just a placeholder for adding setter
197 return super().content_length
199 @content_length.setter
200 def content_length(self, value: Optional[int]) -> None:
201 if value is not None:
202 value = int(value)
203 if self._chunked:
204 raise RuntimeError(
205 "You can't set content length when " "chunked encoding is enable"
206 )
207 self._headers[hdrs.CONTENT_LENGTH] = str(value)
208 else:
209 self._headers.pop(hdrs.CONTENT_LENGTH, None)
211 @property
212 def content_type(self) -> str:
213 # Just a placeholder for adding setter
214 return super().content_type
216 @content_type.setter
217 def content_type(self, value: str) -> None:
218 self.content_type # read header values if needed
219 self._content_type = str(value)
220 self._generate_content_type_header()
222 @property
223 def charset(self) -> Optional[str]:
224 # Just a placeholder for adding setter
225 return super().charset
227 @charset.setter
228 def charset(self, value: Optional[str]) -> None:
229 ctype = self.content_type # read header values if needed
230 if ctype == "application/octet-stream":
231 raise RuntimeError(
232 "Setting charset for application/octet-stream "
233 "doesn't make sense, setup content_type first"
234 )
235 assert self._content_dict is not None
236 if value is None:
237 self._content_dict.pop("charset", None)
238 else:
239 self._content_dict["charset"] = str(value).lower()
240 self._generate_content_type_header()
242 @property
243 def last_modified(self) -> Optional[datetime.datetime]:
244 """The value of Last-Modified HTTP header, or None.
246 This header is represented as a `datetime` object.
247 """
248 return parse_http_date(self._headers.get(hdrs.LAST_MODIFIED))
250 @last_modified.setter
251 def last_modified(
252 self, value: Optional[Union[int, float, datetime.datetime, str]]
253 ) -> None:
254 if value is None:
255 self._headers.pop(hdrs.LAST_MODIFIED, None)
256 elif isinstance(value, (int, float)):
257 self._headers[hdrs.LAST_MODIFIED] = time.strftime(
258 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(math.ceil(value))
259 )
260 elif isinstance(value, datetime.datetime):
261 self._headers[hdrs.LAST_MODIFIED] = time.strftime(
262 "%a, %d %b %Y %H:%M:%S GMT", value.utctimetuple()
263 )
264 elif isinstance(value, str):
265 self._headers[hdrs.LAST_MODIFIED] = value
267 @property
268 def etag(self) -> Optional[ETag]:
269 quoted_value = self._headers.get(hdrs.ETAG)
270 if not quoted_value:
271 return None
272 elif quoted_value == ETAG_ANY:
273 return ETag(value=ETAG_ANY)
274 match = QUOTED_ETAG_RE.fullmatch(quoted_value)
275 if not match:
276 return None
277 is_weak, value = match.group(1, 2)
278 return ETag(
279 is_weak=bool(is_weak),
280 value=value,
281 )
283 @etag.setter
284 def etag(self, value: Optional[Union[ETag, str]]) -> None:
285 if value is None:
286 self._headers.pop(hdrs.ETAG, None)
287 elif (isinstance(value, str) and value == ETAG_ANY) or (
288 isinstance(value, ETag) and value.value == ETAG_ANY
289 ):
290 self._headers[hdrs.ETAG] = ETAG_ANY
291 elif isinstance(value, str):
292 validate_etag_value(value)
293 self._headers[hdrs.ETAG] = f'"{value}"'
294 elif isinstance(value, ETag) and isinstance(value.value, str): # type: ignore[redundant-expr]
295 validate_etag_value(value.value)
296 hdr_value = f'W/"{value.value}"' if value.is_weak else f'"{value.value}"'
297 self._headers[hdrs.ETAG] = hdr_value
298 else:
299 raise ValueError(
300 f"Unsupported etag type: {type(value)}. "
301 f"etag must be str, ETag or None"
302 )
304 def _generate_content_type_header(
305 self, CONTENT_TYPE: istr = hdrs.CONTENT_TYPE
306 ) -> None:
307 assert self._content_dict is not None
308 assert self._content_type is not None
309 params = "; ".join(f"{k}={v}" for k, v in self._content_dict.items())
310 if params:
311 ctype = self._content_type + "; " + params
312 else:
313 ctype = self._content_type
314 self._headers[CONTENT_TYPE] = ctype
316 async def _do_start_compression(self, coding: ContentCoding) -> None:
317 if coding != ContentCoding.identity:
318 assert self._payload_writer is not None
319 self._headers[hdrs.CONTENT_ENCODING] = coding.value
320 self._payload_writer.enable_compression(coding.value)
321 # Compressed payload may have different content length,
322 # remove the header
323 self._headers.popall(hdrs.CONTENT_LENGTH, None)
325 async def _start_compression(self, request: "BaseRequest") -> None:
326 if self._compression_force:
327 await self._do_start_compression(self._compression_force)
328 else:
329 accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower()
330 for coding in ContentCoding:
331 if coding.value in accept_encoding:
332 await self._do_start_compression(coding)
333 return
335 async def prepare(self, request: "BaseRequest") -> Optional[AbstractStreamWriter]:
336 if self._eof_sent:
337 return None
338 if self._payload_writer is not None:
339 return self._payload_writer
340 self._must_be_empty_body = must_be_empty_body(request.method, self.status)
341 return await self._start(request)
343 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
344 self._req = request
345 writer = self._payload_writer = request._payload_writer
347 await self._prepare_headers()
348 await request._prepare_hook(self)
349 await self._write_headers()
351 return writer
353 async def _prepare_headers(self) -> None:
354 request = self._req
355 assert request is not None
356 writer = self._payload_writer
357 assert writer is not None
358 keep_alive = self._keep_alive
359 if keep_alive is None:
360 keep_alive = request.keep_alive
361 self._keep_alive = keep_alive
363 version = request.version
365 headers = self._headers
366 populate_with_cookies(headers, self.cookies)
368 if self._compression:
369 await self._start_compression(request)
371 if self._chunked:
372 if version != HttpVersion11:
373 raise RuntimeError(
374 "Using chunked encoding is forbidden "
375 "for HTTP/{0.major}.{0.minor}".format(request.version)
376 )
377 if not self._must_be_empty_body:
378 writer.enable_chunking()
379 headers[hdrs.TRANSFER_ENCODING] = "chunked"
380 if hdrs.CONTENT_LENGTH in headers:
381 del headers[hdrs.CONTENT_LENGTH]
382 elif self._length_check:
383 writer.length = self.content_length
384 if writer.length is None:
385 if version >= HttpVersion11:
386 if not self._must_be_empty_body:
387 writer.enable_chunking()
388 headers[hdrs.TRANSFER_ENCODING] = "chunked"
389 elif not self._must_be_empty_body:
390 keep_alive = False
392 # HTTP 1.1: https://tools.ietf.org/html/rfc7230#section-3.3.2
393 # HTTP 1.0: https://tools.ietf.org/html/rfc1945#section-10.4
394 if self._must_be_empty_body:
395 if hdrs.CONTENT_LENGTH in headers and should_remove_content_length(
396 request.method, self.status
397 ):
398 del headers[hdrs.CONTENT_LENGTH]
399 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-10
400 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-13
401 if hdrs.TRANSFER_ENCODING in headers:
402 del headers[hdrs.TRANSFER_ENCODING]
403 else:
404 headers.setdefault(hdrs.CONTENT_TYPE, "application/octet-stream")
405 headers.setdefault(hdrs.DATE, rfc822_formatted_time())
406 headers.setdefault(hdrs.SERVER, SERVER_SOFTWARE)
408 # connection header
409 if hdrs.CONNECTION not in headers:
410 if keep_alive:
411 if version == HttpVersion10:
412 headers[hdrs.CONNECTION] = "keep-alive"
413 else:
414 if version == HttpVersion11:
415 headers[hdrs.CONNECTION] = "close"
417 async def _write_headers(self) -> None:
418 request = self._req
419 assert request is not None
420 writer = self._payload_writer
421 assert writer is not None
422 # status line
423 version = request.version
424 status_line = "HTTP/{}.{} {} {}".format(
425 version[0], version[1], self._status, self._reason
426 )
427 await writer.write_headers(status_line, self._headers)
429 async def write(self, data: bytes) -> None:
430 assert isinstance(
431 data, (bytes, bytearray, memoryview)
432 ), "data argument must be byte-ish (%r)" % type(data)
434 if self._eof_sent:
435 raise RuntimeError("Cannot call write() after write_eof()")
436 if self._payload_writer is None:
437 raise RuntimeError("Cannot call write() before prepare()")
439 await self._payload_writer.write(data)
441 async def drain(self) -> None:
442 assert not self._eof_sent, "EOF has already been sent"
443 assert self._payload_writer is not None, "Response has not been started"
444 warnings.warn(
445 "drain method is deprecated, use await resp.write()",
446 DeprecationWarning,
447 stacklevel=2,
448 )
449 await self._payload_writer.drain()
451 async def write_eof(self, data: bytes = b"") -> None:
452 assert isinstance(
453 data, (bytes, bytearray, memoryview)
454 ), "data argument must be byte-ish (%r)" % type(data)
456 if self._eof_sent:
457 return
459 assert self._payload_writer is not None, "Response has not been started"
461 await self._payload_writer.write_eof(data)
462 self._eof_sent = True
463 self._req = None
464 self._body_length = self._payload_writer.output_size
465 self._payload_writer = None
467 def __repr__(self) -> str:
468 if self._eof_sent:
469 info = "eof"
470 elif self.prepared:
471 assert self._req is not None
472 info = f"{self._req.method} {self._req.path} "
473 else:
474 info = "not prepared"
475 return f"<{self.__class__.__name__} {self.reason} {info}>"
477 def __getitem__(self, key: str) -> Any:
478 return self._state[key]
480 def __setitem__(self, key: str, value: Any) -> None:
481 self._state[key] = value
483 def __delitem__(self, key: str) -> None:
484 del self._state[key]
486 def __len__(self) -> int:
487 return len(self._state)
489 def __iter__(self) -> Iterator[str]:
490 return iter(self._state)
492 def __hash__(self) -> int:
493 return hash(id(self))
495 def __eq__(self, other: object) -> bool:
496 return self is other
499class Response(StreamResponse):
500 __slots__ = (
501 "_body_payload",
502 "_compressed_body",
503 "_zlib_executor_size",
504 "_zlib_executor",
505 )
507 def __init__(
508 self,
509 *,
510 body: Any = None,
511 status: int = 200,
512 reason: Optional[str] = None,
513 text: Optional[str] = None,
514 headers: Optional[LooseHeaders] = None,
515 content_type: Optional[str] = None,
516 charset: Optional[str] = None,
517 zlib_executor_size: Optional[int] = None,
518 zlib_executor: Optional[Executor] = None,
519 ) -> None:
520 if body is not None and text is not None:
521 raise ValueError("body and text are not allowed together")
523 if headers is None:
524 real_headers: CIMultiDict[str] = CIMultiDict()
525 elif not isinstance(headers, CIMultiDict):
526 real_headers = CIMultiDict(headers)
527 else:
528 real_headers = headers # = cast('CIMultiDict[str]', headers)
530 if content_type is not None and "charset" in content_type:
531 raise ValueError("charset must not be in content_type " "argument")
533 if text is not None:
534 if hdrs.CONTENT_TYPE in real_headers:
535 if content_type or charset:
536 raise ValueError(
537 "passing both Content-Type header and "
538 "content_type or charset params "
539 "is forbidden"
540 )
541 else:
542 # fast path for filling headers
543 if not isinstance(text, str):
544 raise TypeError("text argument must be str (%r)" % type(text))
545 if content_type is None:
546 content_type = "text/plain"
547 if charset is None:
548 charset = "utf-8"
549 real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset
550 body = text.encode(charset)
551 text = None
552 else:
553 if hdrs.CONTENT_TYPE in real_headers:
554 if content_type is not None or charset is not None:
555 raise ValueError(
556 "passing both Content-Type header and "
557 "content_type or charset params "
558 "is forbidden"
559 )
560 else:
561 if content_type is not None:
562 if charset is not None:
563 content_type += "; charset=" + charset
564 real_headers[hdrs.CONTENT_TYPE] = content_type
566 super().__init__(status=status, reason=reason, headers=real_headers)
568 if text is not None:
569 self.text = text
570 else:
571 self.body = body
573 self._compressed_body: Optional[bytes] = None
574 self._zlib_executor_size = zlib_executor_size
575 self._zlib_executor = zlib_executor
577 @property
578 def body(self) -> Optional[Union[bytes, Payload]]:
579 return self._body
581 @body.setter
582 def body(self, body: bytes) -> None:
583 if body is None:
584 self._body: Optional[bytes] = None
585 self._body_payload: bool = False
586 elif isinstance(body, (bytes, bytearray)):
587 self._body = body
588 self._body_payload = False
589 else:
590 try:
591 self._body = body = payload.PAYLOAD_REGISTRY.get(body)
592 except payload.LookupError:
593 raise ValueError("Unsupported body type %r" % type(body))
595 self._body_payload = True
597 headers = self._headers
599 # set content-type
600 if hdrs.CONTENT_TYPE not in headers:
601 headers[hdrs.CONTENT_TYPE] = body.content_type
603 # copy payload headers
604 if body.headers:
605 for key, value in body.headers.items():
606 if key not in headers:
607 headers[key] = value
609 self._compressed_body = None
611 @property
612 def text(self) -> Optional[str]:
613 if self._body is None:
614 return None
615 return self._body.decode(self.charset or "utf-8")
617 @text.setter
618 def text(self, text: str) -> None:
619 assert isinstance(text, str), "text argument must be str (%r)" % type(text)
621 if self.content_type == "application/octet-stream":
622 self.content_type = "text/plain"
623 if self.charset is None:
624 self.charset = "utf-8"
626 self._body = text.encode(self.charset)
627 self._body_payload = False
628 self._compressed_body = None
630 @property
631 def content_length(self) -> Optional[int]:
632 if self._chunked:
633 return None
635 if hdrs.CONTENT_LENGTH in self._headers:
636 return super().content_length
638 if self._compressed_body is not None:
639 # Return length of the compressed body
640 return len(self._compressed_body)
641 elif self._body_payload:
642 # A payload without content length, or a compressed payload
643 return None
644 elif self._body is not None:
645 return len(self._body)
646 else:
647 return 0
649 @content_length.setter
650 def content_length(self, value: Optional[int]) -> None:
651 raise RuntimeError("Content length is set automatically")
653 async def write_eof(self, data: bytes = b"") -> None:
654 if self._eof_sent:
655 return
656 if self._compressed_body is None:
657 body: Optional[Union[bytes, Payload]] = self._body
658 else:
659 body = self._compressed_body
660 assert not data, f"data arg is not supported, got {data!r}"
661 assert self._req is not None
662 assert self._payload_writer is not None
663 if body is not None:
664 if self._must_be_empty_body:
665 await super().write_eof()
666 elif self._body_payload:
667 payload = cast(Payload, body)
668 await payload.write(self._payload_writer)
669 await super().write_eof()
670 else:
671 await super().write_eof(cast(bytes, body))
672 else:
673 await super().write_eof()
675 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
676 if should_remove_content_length(request.method, self.status):
677 if hdrs.CONTENT_LENGTH in self._headers:
678 del self._headers[hdrs.CONTENT_LENGTH]
679 elif not self._chunked and hdrs.CONTENT_LENGTH not in self._headers:
680 if self._body_payload:
681 size = cast(Payload, self._body).size
682 if size is not None:
683 self._headers[hdrs.CONTENT_LENGTH] = str(size)
684 else:
685 body_len = len(self._body) if self._body else "0"
686 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-7
687 if body_len != "0" or (
688 self.status != 304 and request.method.upper() != hdrs.METH_HEAD
689 ):
690 self._headers[hdrs.CONTENT_LENGTH] = str(body_len)
692 return await super()._start(request)
694 async def _do_start_compression(self, coding: ContentCoding) -> None:
695 if self._body_payload or self._chunked:
696 return await super()._do_start_compression(coding)
698 if coding != ContentCoding.identity:
699 # Instead of using _payload_writer.enable_compression,
700 # compress the whole body
701 compressor = ZLibCompressor(
702 encoding=str(coding.value),
703 max_sync_chunk_size=self._zlib_executor_size,
704 executor=self._zlib_executor,
705 )
706 assert self._body is not None
707 if self._zlib_executor_size is None and len(self._body) > 1024 * 1024:
708 warnings.warn(
709 "Synchronous compression of large response bodies "
710 f"({len(self._body)} bytes) might block the async event loop. "
711 "Consider providing a custom value to zlib_executor_size/"
712 "zlib_executor response properties or disabling compression on it."
713 )
714 self._compressed_body = (
715 await compressor.compress(self._body) + compressor.flush()
716 )
717 assert self._compressed_body is not None
719 self._headers[hdrs.CONTENT_ENCODING] = coding.value
720 self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body))
723def json_response(
724 data: Any = sentinel,
725 *,
726 text: Optional[str] = None,
727 body: Optional[bytes] = None,
728 status: int = 200,
729 reason: Optional[str] = None,
730 headers: Optional[LooseHeaders] = None,
731 content_type: str = "application/json",
732 dumps: JSONEncoder = json.dumps,
733) -> Response:
734 if data is not sentinel:
735 if text or body:
736 raise ValueError("only one of data, text, or body should be specified")
737 else:
738 text = dumps(data)
739 return Response(
740 text=text,
741 body=body,
742 status=status,
743 reason=reason,
744 headers=headers,
745 content_type=content_type,
746 )