Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_response.py: 25%
433 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1import asyncio
2import collections.abc
3import datetime
4import enum
5import json
6import math
7import time
8import warnings
9from concurrent.futures import Executor
10from http import HTTPStatus
11from http.cookies import Morsel
12from typing import (
13 TYPE_CHECKING,
14 Any,
15 Dict,
16 Iterator,
17 MutableMapping,
18 Optional,
19 Union,
20 cast,
21)
23from multidict import CIMultiDict, istr
25from . import hdrs, payload
26from .abc import AbstractStreamWriter
27from .compression_utils import ZLibCompressor
28from .helpers import (
29 ETAG_ANY,
30 PY_38,
31 QUOTED_ETAG_RE,
32 CookieMixin,
33 ETag,
34 HeadersMixin,
35 parse_http_date,
36 populate_with_cookies,
37 rfc822_formatted_time,
38 sentinel,
39 validate_etag_value,
40)
41from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11
42from .payload import Payload
43from .typedefs import JSONEncoder, LooseHeaders
45__all__ = ("ContentCoding", "StreamResponse", "Response", "json_response")
48if TYPE_CHECKING: # pragma: no cover
49 from .web_request import BaseRequest
51 BaseClass = MutableMapping[str, Any]
52else:
53 BaseClass = collections.abc.MutableMapping
56if not PY_38:
57 # allow samesite to be used in python < 3.8
58 # already permitted in python 3.8, see https://bugs.python.org/issue29613
59 Morsel._reserved["samesite"] = "SameSite" # type: ignore[attr-defined]
62class ContentCoding(enum.Enum):
63 # The content codings that we have support for.
64 #
65 # Additional registered codings are listed at:
66 # https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding
67 deflate = "deflate"
68 gzip = "gzip"
69 identity = "identity"
72############################################################
73# HTTP Response classes
74############################################################
77class StreamResponse(BaseClass, HeadersMixin, CookieMixin):
78 __slots__ = (
79 "_length_check",
80 "_body",
81 "_keep_alive",
82 "_chunked",
83 "_compression",
84 "_compression_force",
85 "_req",
86 "_payload_writer",
87 "_eof_sent",
88 "_body_length",
89 "_state",
90 "_headers",
91 "_status",
92 "_reason",
93 "_cookies",
94 "__weakref__",
95 )
97 def __init__(
98 self,
99 *,
100 status: int = 200,
101 reason: Optional[str] = None,
102 headers: Optional[LooseHeaders] = None,
103 ) -> None:
104 super().__init__()
105 self._length_check = True
106 self._body = None
107 self._keep_alive: Optional[bool] = None
108 self._chunked = False
109 self._compression = False
110 self._compression_force: Optional[ContentCoding] = None
112 self._req: Optional[BaseRequest] = None
113 self._payload_writer: Optional[AbstractStreamWriter] = None
114 self._eof_sent = False
115 self._body_length = 0
116 self._state: Dict[str, Any] = {}
118 if headers is not None:
119 self._headers: CIMultiDict[str] = CIMultiDict(headers)
120 else:
121 self._headers = CIMultiDict()
123 self.set_status(status, reason)
125 @property
126 def prepared(self) -> bool:
127 return self._payload_writer is not None
129 @property
130 def task(self) -> "Optional[asyncio.Task[None]]":
131 if self._req:
132 return self._req.task
133 else:
134 return None
136 @property
137 def status(self) -> int:
138 return self._status
140 @property
141 def chunked(self) -> bool:
142 return self._chunked
144 @property
145 def compression(self) -> bool:
146 return self._compression
148 @property
149 def reason(self) -> str:
150 return self._reason
152 def set_status(
153 self,
154 status: int,
155 reason: Optional[str] = None,
156 ) -> None:
157 assert not self.prepared, (
158 "Cannot change the response status code after " "the headers have been sent"
159 )
160 self._status = int(status)
161 if reason is None:
162 try:
163 reason = HTTPStatus(self._status).phrase
164 except ValueError:
165 reason = ""
166 self._reason = reason
168 @property
169 def keep_alive(self) -> Optional[bool]:
170 return self._keep_alive
172 def force_close(self) -> None:
173 self._keep_alive = False
175 @property
176 def body_length(self) -> int:
177 return self._body_length
179 def enable_chunked_encoding(self) -> None:
180 """Enables automatic chunked transfer encoding."""
181 self._chunked = True
183 if hdrs.CONTENT_LENGTH in self._headers:
184 raise RuntimeError(
185 "You can't enable chunked encoding when " "a content length is set"
186 )
188 def enable_compression(self, force: Optional[ContentCoding] = None) -> None:
189 """Enables response compression encoding."""
190 # Backwards compatibility for when force was a bool <0.17.
191 self._compression = True
192 self._compression_force = force
194 @property
195 def headers(self) -> "CIMultiDict[str]":
196 return self._headers
198 @property
199 def content_length(self) -> Optional[int]:
200 # Just a placeholder for adding setter
201 return super().content_length
203 @content_length.setter
204 def content_length(self, value: Optional[int]) -> None:
205 if value is not None:
206 value = int(value)
207 if self._chunked:
208 raise RuntimeError(
209 "You can't set content length when " "chunked encoding is enable"
210 )
211 self._headers[hdrs.CONTENT_LENGTH] = str(value)
212 else:
213 self._headers.pop(hdrs.CONTENT_LENGTH, None)
215 @property
216 def content_type(self) -> str:
217 # Just a placeholder for adding setter
218 return super().content_type
220 @content_type.setter
221 def content_type(self, value: str) -> None:
222 self.content_type # read header values if needed
223 self._content_type = str(value)
224 self._generate_content_type_header()
226 @property
227 def charset(self) -> Optional[str]:
228 # Just a placeholder for adding setter
229 return super().charset
231 @charset.setter
232 def charset(self, value: Optional[str]) -> None:
233 ctype = self.content_type # read header values if needed
234 if ctype == "application/octet-stream":
235 raise RuntimeError(
236 "Setting charset for application/octet-stream "
237 "doesn't make sense, setup content_type first"
238 )
239 assert self._content_dict is not None
240 if value is None:
241 self._content_dict.pop("charset", None)
242 else:
243 self._content_dict["charset"] = str(value).lower()
244 self._generate_content_type_header()
246 @property
247 def last_modified(self) -> Optional[datetime.datetime]:
248 """The value of Last-Modified HTTP header, or None.
250 This header is represented as a `datetime` object.
251 """
252 return parse_http_date(self._headers.get(hdrs.LAST_MODIFIED))
254 @last_modified.setter
255 def last_modified(
256 self, value: Optional[Union[int, float, datetime.datetime, str]]
257 ) -> None:
258 if value is None:
259 self._headers.pop(hdrs.LAST_MODIFIED, None)
260 elif isinstance(value, (int, float)):
261 self._headers[hdrs.LAST_MODIFIED] = time.strftime(
262 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(math.ceil(value))
263 )
264 elif isinstance(value, datetime.datetime):
265 self._headers[hdrs.LAST_MODIFIED] = time.strftime(
266 "%a, %d %b %Y %H:%M:%S GMT", value.utctimetuple()
267 )
268 elif isinstance(value, str):
269 self._headers[hdrs.LAST_MODIFIED] = value
271 @property
272 def etag(self) -> Optional[ETag]:
273 quoted_value = self._headers.get(hdrs.ETAG)
274 if not quoted_value:
275 return None
276 elif quoted_value == ETAG_ANY:
277 return ETag(value=ETAG_ANY)
278 match = QUOTED_ETAG_RE.fullmatch(quoted_value)
279 if not match:
280 return None
281 is_weak, value = match.group(1, 2)
282 return ETag(
283 is_weak=bool(is_weak),
284 value=value,
285 )
287 @etag.setter
288 def etag(self, value: Optional[Union[ETag, str]]) -> None:
289 if value is None:
290 self._headers.pop(hdrs.ETAG, None)
291 elif (isinstance(value, str) and value == ETAG_ANY) or (
292 isinstance(value, ETag) and value.value == ETAG_ANY
293 ):
294 self._headers[hdrs.ETAG] = ETAG_ANY
295 elif isinstance(value, str):
296 validate_etag_value(value)
297 self._headers[hdrs.ETAG] = f'"{value}"'
298 elif isinstance(value, ETag) and isinstance(value.value, str):
299 validate_etag_value(value.value)
300 hdr_value = f'W/"{value.value}"' if value.is_weak else f'"{value.value}"'
301 self._headers[hdrs.ETAG] = hdr_value
302 else:
303 raise ValueError(
304 f"Unsupported etag type: {type(value)}. "
305 f"etag must be str, ETag or None"
306 )
308 def _generate_content_type_header(
309 self, CONTENT_TYPE: istr = hdrs.CONTENT_TYPE
310 ) -> None:
311 assert self._content_dict is not None
312 assert self._content_type is not None
313 params = "; ".join(f"{k}={v}" for k, v in self._content_dict.items())
314 if params:
315 ctype = self._content_type + "; " + params
316 else:
317 ctype = self._content_type
318 self._headers[CONTENT_TYPE] = ctype
320 async def _do_start_compression(self, coding: ContentCoding) -> None:
321 if coding != ContentCoding.identity:
322 assert self._payload_writer is not None
323 self._headers[hdrs.CONTENT_ENCODING] = coding.value
324 self._payload_writer.enable_compression(coding.value)
325 # Compressed payload may have different content length,
326 # remove the header
327 self._headers.popall(hdrs.CONTENT_LENGTH, None)
329 async def _start_compression(self, request: "BaseRequest") -> None:
330 if self._compression_force:
331 await self._do_start_compression(self._compression_force)
332 else:
333 accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower()
334 for coding in ContentCoding:
335 if coding.value in accept_encoding:
336 await self._do_start_compression(coding)
337 return
339 async def prepare(self, request: "BaseRequest") -> Optional[AbstractStreamWriter]:
340 if self._eof_sent:
341 return None
342 if self._payload_writer is not None:
343 return self._payload_writer
345 return await self._start(request)
347 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
348 self._req = request
349 writer = self._payload_writer = request._payload_writer
351 await self._prepare_headers()
352 await request._prepare_hook(self)
353 await self._write_headers()
355 return writer
357 async def _prepare_headers(self) -> None:
358 request = self._req
359 assert request is not None
360 writer = self._payload_writer
361 assert writer is not None
362 keep_alive = self._keep_alive
363 if keep_alive is None:
364 keep_alive = request.keep_alive
365 self._keep_alive = keep_alive
367 version = request.version
369 headers = self._headers
370 populate_with_cookies(headers, self.cookies)
372 if self._compression:
373 await self._start_compression(request)
375 if self._chunked:
376 if version != HttpVersion11:
377 raise RuntimeError(
378 "Using chunked encoding is forbidden "
379 "for HTTP/{0.major}.{0.minor}".format(request.version)
380 )
381 writer.enable_chunking()
382 headers[hdrs.TRANSFER_ENCODING] = "chunked"
383 if hdrs.CONTENT_LENGTH in headers:
384 del headers[hdrs.CONTENT_LENGTH]
385 elif self._length_check:
386 writer.length = self.content_length
387 if writer.length is None:
388 if version >= HttpVersion11 and self.status != 204:
389 writer.enable_chunking()
390 headers[hdrs.TRANSFER_ENCODING] = "chunked"
391 if hdrs.CONTENT_LENGTH in headers:
392 del headers[hdrs.CONTENT_LENGTH]
393 else:
394 keep_alive = False
395 # HTTP 1.1: https://tools.ietf.org/html/rfc7230#section-3.3.2
396 # HTTP 1.0: https://tools.ietf.org/html/rfc1945#section-10.4
397 elif version >= HttpVersion11 and self.status in (100, 101, 102, 103, 204):
398 del headers[hdrs.CONTENT_LENGTH]
400 if self.status not in (204, 304):
401 headers.setdefault(hdrs.CONTENT_TYPE, "application/octet-stream")
402 headers.setdefault(hdrs.DATE, rfc822_formatted_time())
403 headers.setdefault(hdrs.SERVER, SERVER_SOFTWARE)
405 # connection header
406 if hdrs.CONNECTION not in headers:
407 if keep_alive:
408 if version == HttpVersion10:
409 headers[hdrs.CONNECTION] = "keep-alive"
410 else:
411 if version == HttpVersion11:
412 headers[hdrs.CONNECTION] = "close"
414 async def _write_headers(self) -> None:
415 request = self._req
416 assert request is not None
417 writer = self._payload_writer
418 assert writer is not None
419 # status line
420 version = request.version
421 status_line = "HTTP/{}.{} {} {}".format(
422 version[0], version[1], self._status, self._reason
423 )
424 await writer.write_headers(status_line, self._headers)
426 async def write(self, data: bytes) -> None:
427 assert isinstance(
428 data, (bytes, bytearray, memoryview)
429 ), "data argument must be byte-ish (%r)" % type(data)
431 if self._eof_sent:
432 raise RuntimeError("Cannot call write() after write_eof()")
433 if self._payload_writer is None:
434 raise RuntimeError("Cannot call write() before prepare()")
436 await self._payload_writer.write(data)
438 async def drain(self) -> None:
439 assert not self._eof_sent, "EOF has already been sent"
440 assert self._payload_writer is not None, "Response has not been started"
441 warnings.warn(
442 "drain method is deprecated, use await resp.write()",
443 DeprecationWarning,
444 stacklevel=2,
445 )
446 await self._payload_writer.drain()
448 async def write_eof(self, data: bytes = b"") -> None:
449 assert isinstance(
450 data, (bytes, bytearray, memoryview)
451 ), "data argument must be byte-ish (%r)" % type(data)
453 if self._eof_sent:
454 return
456 assert self._payload_writer is not None, "Response has not been started"
458 await self._payload_writer.write_eof(data)
459 self._eof_sent = True
460 self._req = None
461 self._body_length = self._payload_writer.output_size
462 self._payload_writer = None
464 def __repr__(self) -> str:
465 if self._eof_sent:
466 info = "eof"
467 elif self.prepared:
468 assert self._req is not None
469 info = f"{self._req.method} {self._req.path} "
470 else:
471 info = "not prepared"
472 return f"<{self.__class__.__name__} {self.reason} {info}>"
474 def __getitem__(self, key: str) -> Any:
475 return self._state[key]
477 def __setitem__(self, key: str, value: Any) -> None:
478 self._state[key] = value
480 def __delitem__(self, key: str) -> None:
481 del self._state[key]
483 def __len__(self) -> int:
484 return len(self._state)
486 def __iter__(self) -> Iterator[str]:
487 return iter(self._state)
489 def __hash__(self) -> int:
490 return hash(id(self))
492 def __eq__(self, other: object) -> bool:
493 return self is other
496class Response(StreamResponse):
497 __slots__ = (
498 "_body_payload",
499 "_compressed_body",
500 "_zlib_executor_size",
501 "_zlib_executor",
502 )
504 def __init__(
505 self,
506 *,
507 body: Any = None,
508 status: int = 200,
509 reason: Optional[str] = None,
510 text: Optional[str] = None,
511 headers: Optional[LooseHeaders] = None,
512 content_type: Optional[str] = None,
513 charset: Optional[str] = None,
514 zlib_executor_size: Optional[int] = None,
515 zlib_executor: Optional[Executor] = None,
516 ) -> None:
517 if body is not None and text is not None:
518 raise ValueError("body and text are not allowed together")
520 if headers is None:
521 real_headers: CIMultiDict[str] = CIMultiDict()
522 elif not isinstance(headers, CIMultiDict):
523 real_headers = CIMultiDict(headers)
524 else:
525 real_headers = headers # = cast('CIMultiDict[str]', headers)
527 if content_type is not None and "charset" in content_type:
528 raise ValueError("charset must not be in content_type " "argument")
530 if text is not None:
531 if hdrs.CONTENT_TYPE in real_headers:
532 if content_type or charset:
533 raise ValueError(
534 "passing both Content-Type header and "
535 "content_type or charset params "
536 "is forbidden"
537 )
538 else:
539 # fast path for filling headers
540 if not isinstance(text, str):
541 raise TypeError("text argument must be str (%r)" % type(text))
542 if content_type is None:
543 content_type = "text/plain"
544 if charset is None:
545 charset = "utf-8"
546 real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset
547 body = text.encode(charset)
548 text = None
549 else:
550 if hdrs.CONTENT_TYPE in real_headers:
551 if content_type is not None or charset is not None:
552 raise ValueError(
553 "passing both Content-Type header and "
554 "content_type or charset params "
555 "is forbidden"
556 )
557 else:
558 if content_type is not None:
559 if charset is not None:
560 content_type += "; charset=" + charset
561 real_headers[hdrs.CONTENT_TYPE] = content_type
563 super().__init__(status=status, reason=reason, headers=real_headers)
565 if text is not None:
566 self.text = text
567 else:
568 self.body = body
570 self._compressed_body: Optional[bytes] = None
571 self._zlib_executor_size = zlib_executor_size
572 self._zlib_executor = zlib_executor
574 @property
575 def body(self) -> Optional[Union[bytes, Payload]]:
576 return self._body
578 @body.setter
579 def body(self, body: bytes) -> None:
580 if body is None:
581 self._body: Optional[bytes] = None
582 self._body_payload: bool = False
583 elif isinstance(body, (bytes, bytearray)):
584 self._body = body
585 self._body_payload = False
586 else:
587 try:
588 self._body = body = payload.PAYLOAD_REGISTRY.get(body)
589 except payload.LookupError:
590 raise ValueError("Unsupported body type %r" % type(body))
592 self._body_payload = True
594 headers = self._headers
596 # set content-type
597 if hdrs.CONTENT_TYPE not in headers:
598 headers[hdrs.CONTENT_TYPE] = body.content_type
600 # copy payload headers
601 if body.headers:
602 for key, value in body.headers.items():
603 if key not in headers:
604 headers[key] = value
606 self._compressed_body = None
608 @property
609 def text(self) -> Optional[str]:
610 if self._body is None:
611 return None
612 return self._body.decode(self.charset or "utf-8")
614 @text.setter
615 def text(self, text: str) -> None:
616 assert text is None or isinstance(
617 text, str
618 ), "text argument must be str (%r)" % type(text)
620 if self.content_type == "application/octet-stream":
621 self.content_type = "text/plain"
622 if self.charset is None:
623 self.charset = "utf-8"
625 self._body = text.encode(self.charset)
626 self._body_payload = False
627 self._compressed_body = None
629 @property
630 def content_length(self) -> Optional[int]:
631 if self._chunked:
632 return None
634 if hdrs.CONTENT_LENGTH in self._headers:
635 return super().content_length
637 if self._compressed_body is not None:
638 # Return length of the compressed body
639 return len(self._compressed_body)
640 elif self._body_payload:
641 # A payload without content length, or a compressed payload
642 return None
643 elif self._body is not None:
644 return len(self._body)
645 else:
646 return 0
648 @content_length.setter
649 def content_length(self, value: Optional[int]) -> None:
650 raise RuntimeError("Content length is set automatically")
652 async def write_eof(self, data: bytes = b"") -> None:
653 if self._eof_sent:
654 return
655 if self._compressed_body is None:
656 body: Optional[Union[bytes, Payload]] = self._body
657 else:
658 body = self._compressed_body
659 assert not data, f"data arg is not supported, got {data!r}"
660 assert self._req is not None
661 assert self._payload_writer is not None
662 if body is not None:
663 if self._req._method == hdrs.METH_HEAD or self._status in [204, 304]:
664 await super().write_eof()
665 elif self._body_payload:
666 payload = cast(Payload, body)
667 await payload.write(self._payload_writer)
668 await super().write_eof()
669 else:
670 await super().write_eof(cast(bytes, body))
671 else:
672 await super().write_eof()
674 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
675 if not self._chunked and hdrs.CONTENT_LENGTH not in self._headers:
676 if self._body_payload:
677 size = cast(Payload, self._body).size
678 if size is not None:
679 self._headers[hdrs.CONTENT_LENGTH] = str(size)
680 else:
681 body_len = len(self._body) if self._body else "0"
682 self._headers[hdrs.CONTENT_LENGTH] = str(body_len)
684 return await super()._start(request)
686 async def _do_start_compression(self, coding: ContentCoding) -> None:
687 if self._body_payload or self._chunked:
688 return await super()._do_start_compression(coding)
690 if coding != ContentCoding.identity:
691 # Instead of using _payload_writer.enable_compression,
692 # compress the whole body
693 compressor = ZLibCompressor(
694 encoding=str(coding.value),
695 max_sync_chunk_size=self._zlib_executor_size,
696 executor=self._zlib_executor,
697 )
698 assert self._body is not None
699 if self._zlib_executor_size is None and len(self._body) > 1024 * 1024:
700 warnings.warn(
701 "Synchronous compression of large response bodies "
702 f"({len(self._body)} bytes) might block the async event loop. "
703 "Consider providing a custom value to zlib_executor_size/"
704 "zlib_executor response properties or disabling compression on it."
705 )
706 self._compressed_body = (
707 await compressor.compress(self._body) + compressor.flush()
708 )
709 assert self._compressed_body is not None
711 self._headers[hdrs.CONTENT_ENCODING] = coding.value
712 self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body))
715def json_response(
716 data: Any = sentinel,
717 *,
718 text: Optional[str] = None,
719 body: Optional[bytes] = None,
720 status: int = 200,
721 reason: Optional[str] = None,
722 headers: Optional[LooseHeaders] = None,
723 content_type: str = "application/json",
724 dumps: JSONEncoder = json.dumps,
725) -> Response:
726 if data is not sentinel:
727 if text or body:
728 raise ValueError("only one of data, text, or body should be specified")
729 else:
730 text = dumps(data)
731 return Response(
732 text=text,
733 body=body,
734 status=status,
735 reason=reason,
736 headers=headers,
737 content_type=content_type,
738 )