Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_response.py: 24%
442 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-09 06:47 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-09 06:47 +0000
1import asyncio
2import collections.abc
3import datetime
4import enum
5import json
6import math
7import time
8import warnings
9from concurrent.futures import Executor
10from http import HTTPStatus
11from typing import (
12 TYPE_CHECKING,
13 Any,
14 Dict,
15 Iterator,
16 MutableMapping,
17 Optional,
18 Union,
19 cast,
20)
22from multidict import CIMultiDict, istr
24from . import hdrs, payload
25from .abc import AbstractStreamWriter
26from .compression_utils import ZLibCompressor
27from .helpers import (
28 ETAG_ANY,
29 QUOTED_ETAG_RE,
30 CookieMixin,
31 ETag,
32 HeadersMixin,
33 must_be_empty_body,
34 parse_http_date,
35 populate_with_cookies,
36 rfc822_formatted_time,
37 sentinel,
38 should_remove_content_length,
39 validate_etag_value,
40)
41from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11
42from .payload import Payload
43from .typedefs import JSONEncoder, LooseHeaders
45__all__ = ("ContentCoding", "StreamResponse", "Response", "json_response")
48if TYPE_CHECKING:
49 from .web_request import BaseRequest
51 BaseClass = MutableMapping[str, Any]
52else:
53 BaseClass = collections.abc.MutableMapping
56class ContentCoding(enum.Enum):
57 # The content codings that we have support for.
58 #
59 # Additional registered codings are listed at:
60 # https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding
61 deflate = "deflate"
62 gzip = "gzip"
63 identity = "identity"
66############################################################
67# HTTP Response classes
68############################################################
71class StreamResponse(BaseClass, HeadersMixin, CookieMixin):
72 __slots__ = (
73 "_length_check",
74 "_body",
75 "_keep_alive",
76 "_chunked",
77 "_compression",
78 "_compression_force",
79 "_req",
80 "_payload_writer",
81 "_eof_sent",
82 "_must_be_empty_body",
83 "_body_length",
84 "_state",
85 "_headers",
86 "_status",
87 "_reason",
88 "_cookies",
89 "__weakref__",
90 )
92 def __init__(
93 self,
94 *,
95 status: int = 200,
96 reason: Optional[str] = None,
97 headers: Optional[LooseHeaders] = None,
98 ) -> None:
99 super().__init__()
100 self._length_check = True
101 self._body = None
102 self._keep_alive: Optional[bool] = None
103 self._chunked = False
104 self._compression = False
105 self._compression_force: Optional[ContentCoding] = None
107 self._req: Optional[BaseRequest] = None
108 self._payload_writer: Optional[AbstractStreamWriter] = None
109 self._eof_sent = False
110 self._must_be_empty_body: Optional[bool] = None
111 self._body_length = 0
112 self._state: Dict[str, Any] = {}
114 if headers is not None:
115 self._headers: CIMultiDict[str] = CIMultiDict(headers)
116 else:
117 self._headers = CIMultiDict()
119 self.set_status(status, reason)
121 @property
122 def prepared(self) -> bool:
123 return self._payload_writer is not None
125 @property
126 def task(self) -> "Optional[asyncio.Task[None]]":
127 if self._req:
128 return self._req.task
129 else:
130 return None
132 @property
133 def status(self) -> int:
134 return self._status
136 @property
137 def chunked(self) -> bool:
138 return self._chunked
140 @property
141 def compression(self) -> bool:
142 return self._compression
144 @property
145 def reason(self) -> str:
146 return self._reason
148 def set_status(
149 self,
150 status: int,
151 reason: Optional[str] = None,
152 ) -> None:
153 assert not self.prepared, (
154 "Cannot change the response status code after " "the headers have been sent"
155 )
156 self._status = int(status)
157 if reason is None:
158 try:
159 reason = HTTPStatus(self._status).phrase
160 except ValueError:
161 reason = ""
162 self._reason = reason
164 @property
165 def keep_alive(self) -> Optional[bool]:
166 return self._keep_alive
168 def force_close(self) -> None:
169 self._keep_alive = False
171 @property
172 def body_length(self) -> int:
173 return self._body_length
175 def enable_chunked_encoding(self) -> None:
176 """Enables automatic chunked transfer encoding."""
177 self._chunked = True
179 if hdrs.CONTENT_LENGTH in self._headers:
180 raise RuntimeError(
181 "You can't enable chunked encoding when " "a content length is set"
182 )
184 def enable_compression(self, force: Optional[ContentCoding] = None) -> None:
185 """Enables response compression encoding."""
186 self._compression = True
187 self._compression_force = force
189 @property
190 def headers(self) -> "CIMultiDict[str]":
191 return self._headers
193 @property
194 def content_length(self) -> Optional[int]:
195 # Just a placeholder for adding setter
196 return super().content_length
198 @content_length.setter
199 def content_length(self, value: Optional[int]) -> None:
200 if value is not None:
201 value = int(value)
202 if self._chunked:
203 raise RuntimeError(
204 "You can't set content length when " "chunked encoding is enable"
205 )
206 self._headers[hdrs.CONTENT_LENGTH] = str(value)
207 else:
208 self._headers.pop(hdrs.CONTENT_LENGTH, None)
210 @property
211 def content_type(self) -> str:
212 # Just a placeholder for adding setter
213 return super().content_type
215 @content_type.setter
216 def content_type(self, value: str) -> None:
217 self.content_type # read header values if needed
218 self._content_type = str(value)
219 self._generate_content_type_header()
221 @property
222 def charset(self) -> Optional[str]:
223 # Just a placeholder for adding setter
224 return super().charset
226 @charset.setter
227 def charset(self, value: Optional[str]) -> None:
228 ctype = self.content_type # read header values if needed
229 if ctype == "application/octet-stream":
230 raise RuntimeError(
231 "Setting charset for application/octet-stream "
232 "doesn't make sense, setup content_type first"
233 )
234 assert self._content_dict is not None
235 if value is None:
236 self._content_dict.pop("charset", None)
237 else:
238 self._content_dict["charset"] = str(value).lower()
239 self._generate_content_type_header()
241 @property
242 def last_modified(self) -> Optional[datetime.datetime]:
243 """The value of Last-Modified HTTP header, or None.
245 This header is represented as a `datetime` object.
246 """
247 return parse_http_date(self._headers.get(hdrs.LAST_MODIFIED))
249 @last_modified.setter
250 def last_modified(
251 self, value: Optional[Union[int, float, datetime.datetime, str]]
252 ) -> None:
253 if value is None:
254 self._headers.pop(hdrs.LAST_MODIFIED, None)
255 elif isinstance(value, (int, float)):
256 self._headers[hdrs.LAST_MODIFIED] = time.strftime(
257 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(math.ceil(value))
258 )
259 elif isinstance(value, datetime.datetime):
260 self._headers[hdrs.LAST_MODIFIED] = time.strftime(
261 "%a, %d %b %Y %H:%M:%S GMT", value.utctimetuple()
262 )
263 elif isinstance(value, str):
264 self._headers[hdrs.LAST_MODIFIED] = value
266 @property
267 def etag(self) -> Optional[ETag]:
268 quoted_value = self._headers.get(hdrs.ETAG)
269 if not quoted_value:
270 return None
271 elif quoted_value == ETAG_ANY:
272 return ETag(value=ETAG_ANY)
273 match = QUOTED_ETAG_RE.fullmatch(quoted_value)
274 if not match:
275 return None
276 is_weak, value = match.group(1, 2)
277 return ETag(
278 is_weak=bool(is_weak),
279 value=value,
280 )
282 @etag.setter
283 def etag(self, value: Optional[Union[ETag, str]]) -> None:
284 if value is None:
285 self._headers.pop(hdrs.ETAG, None)
286 elif (isinstance(value, str) and value == ETAG_ANY) or (
287 isinstance(value, ETag) and value.value == ETAG_ANY
288 ):
289 self._headers[hdrs.ETAG] = ETAG_ANY
290 elif isinstance(value, str):
291 validate_etag_value(value)
292 self._headers[hdrs.ETAG] = f'"{value}"'
293 elif isinstance(value, ETag) and isinstance(value.value, str): # type: ignore[redundant-expr]
294 validate_etag_value(value.value)
295 hdr_value = f'W/"{value.value}"' if value.is_weak else f'"{value.value}"'
296 self._headers[hdrs.ETAG] = hdr_value
297 else:
298 raise ValueError(
299 f"Unsupported etag type: {type(value)}. "
300 f"etag must be str, ETag or None"
301 )
303 def _generate_content_type_header(
304 self, CONTENT_TYPE: istr = hdrs.CONTENT_TYPE
305 ) -> None:
306 assert self._content_dict is not None
307 assert self._content_type is not None
308 params = "; ".join(f"{k}={v}" for k, v in self._content_dict.items())
309 if params:
310 ctype = self._content_type + "; " + params
311 else:
312 ctype = self._content_type
313 self._headers[CONTENT_TYPE] = ctype
315 async def _do_start_compression(self, coding: ContentCoding) -> None:
316 if coding != ContentCoding.identity:
317 assert self._payload_writer is not None
318 self._headers[hdrs.CONTENT_ENCODING] = coding.value
319 self._payload_writer.enable_compression(coding.value)
320 # Compressed payload may have different content length,
321 # remove the header
322 self._headers.popall(hdrs.CONTENT_LENGTH, None)
324 async def _start_compression(self, request: "BaseRequest") -> None:
325 if self._compression_force:
326 await self._do_start_compression(self._compression_force)
327 else:
328 # Encoding comparisons should be case-insensitive
329 # https://www.rfc-editor.org/rfc/rfc9110#section-8.4.1
330 accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower()
331 for coding in ContentCoding:
332 if coding.value in accept_encoding:
333 await self._do_start_compression(coding)
334 return
336 async def prepare(self, request: "BaseRequest") -> Optional[AbstractStreamWriter]:
337 if self._eof_sent:
338 return None
339 if self._payload_writer is not None:
340 return self._payload_writer
341 self._must_be_empty_body = must_be_empty_body(request.method, self.status)
342 return await self._start(request)
344 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
345 self._req = request
346 writer = self._payload_writer = request._payload_writer
348 await self._prepare_headers()
349 await request._prepare_hook(self)
350 await self._write_headers()
352 return writer
354 async def _prepare_headers(self) -> None:
355 request = self._req
356 assert request is not None
357 writer = self._payload_writer
358 assert writer is not None
359 keep_alive = self._keep_alive
360 if keep_alive is None:
361 keep_alive = request.keep_alive
362 self._keep_alive = keep_alive
364 version = request.version
366 headers = self._headers
367 populate_with_cookies(headers, self.cookies)
369 if self._compression:
370 await self._start_compression(request)
372 if self._chunked:
373 if version != HttpVersion11:
374 raise RuntimeError(
375 "Using chunked encoding is forbidden "
376 "for HTTP/{0.major}.{0.minor}".format(request.version)
377 )
378 if not self._must_be_empty_body:
379 writer.enable_chunking()
380 headers[hdrs.TRANSFER_ENCODING] = "chunked"
381 if hdrs.CONTENT_LENGTH in headers:
382 del headers[hdrs.CONTENT_LENGTH]
383 elif self._length_check:
384 writer.length = self.content_length
385 if writer.length is None:
386 if version >= HttpVersion11:
387 if not self._must_be_empty_body:
388 writer.enable_chunking()
389 headers[hdrs.TRANSFER_ENCODING] = "chunked"
390 elif not self._must_be_empty_body:
391 keep_alive = False
393 # HTTP 1.1: https://tools.ietf.org/html/rfc7230#section-3.3.2
394 # HTTP 1.0: https://tools.ietf.org/html/rfc1945#section-10.4
395 if self._must_be_empty_body:
396 if hdrs.CONTENT_LENGTH in headers and should_remove_content_length(
397 request.method, self.status
398 ):
399 del headers[hdrs.CONTENT_LENGTH]
400 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-10
401 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-13
402 if hdrs.TRANSFER_ENCODING in headers:
403 del headers[hdrs.TRANSFER_ENCODING]
404 else:
405 headers.setdefault(hdrs.CONTENT_TYPE, "application/octet-stream")
406 headers.setdefault(hdrs.DATE, rfc822_formatted_time())
407 headers.setdefault(hdrs.SERVER, SERVER_SOFTWARE)
409 # connection header
410 if hdrs.CONNECTION not in headers:
411 if keep_alive:
412 if version == HttpVersion10:
413 headers[hdrs.CONNECTION] = "keep-alive"
414 else:
415 if version == HttpVersion11:
416 headers[hdrs.CONNECTION] = "close"
418 async def _write_headers(self) -> None:
419 request = self._req
420 assert request is not None
421 writer = self._payload_writer
422 assert writer is not None
423 # status line
424 version = request.version
425 status_line = "HTTP/{}.{} {} {}".format(
426 version[0], version[1], self._status, self._reason
427 )
428 await writer.write_headers(status_line, self._headers)
430 async def write(self, data: bytes) -> None:
431 assert isinstance(
432 data, (bytes, bytearray, memoryview)
433 ), "data argument must be byte-ish (%r)" % type(data)
435 if self._eof_sent:
436 raise RuntimeError("Cannot call write() after write_eof()")
437 if self._payload_writer is None:
438 raise RuntimeError("Cannot call write() before prepare()")
440 await self._payload_writer.write(data)
442 async def drain(self) -> None:
443 assert not self._eof_sent, "EOF has already been sent"
444 assert self._payload_writer is not None, "Response has not been started"
445 warnings.warn(
446 "drain method is deprecated, use await resp.write()",
447 DeprecationWarning,
448 stacklevel=2,
449 )
450 await self._payload_writer.drain()
452 async def write_eof(self, data: bytes = b"") -> None:
453 assert isinstance(
454 data, (bytes, bytearray, memoryview)
455 ), "data argument must be byte-ish (%r)" % type(data)
457 if self._eof_sent:
458 return
460 assert self._payload_writer is not None, "Response has not been started"
462 await self._payload_writer.write_eof(data)
463 self._eof_sent = True
464 self._req = None
465 self._body_length = self._payload_writer.output_size
466 self._payload_writer = None
468 def __repr__(self) -> str:
469 if self._eof_sent:
470 info = "eof"
471 elif self.prepared:
472 assert self._req is not None
473 info = f"{self._req.method} {self._req.path} "
474 else:
475 info = "not prepared"
476 return f"<{self.__class__.__name__} {self.reason} {info}>"
478 def __getitem__(self, key: str) -> Any:
479 return self._state[key]
481 def __setitem__(self, key: str, value: Any) -> None:
482 self._state[key] = value
484 def __delitem__(self, key: str) -> None:
485 del self._state[key]
487 def __len__(self) -> int:
488 return len(self._state)
490 def __iter__(self) -> Iterator[str]:
491 return iter(self._state)
493 def __hash__(self) -> int:
494 return hash(id(self))
496 def __eq__(self, other: object) -> bool:
497 return self is other
500class Response(StreamResponse):
501 __slots__ = (
502 "_body_payload",
503 "_compressed_body",
504 "_zlib_executor_size",
505 "_zlib_executor",
506 )
508 def __init__(
509 self,
510 *,
511 body: Any = None,
512 status: int = 200,
513 reason: Optional[str] = None,
514 text: Optional[str] = None,
515 headers: Optional[LooseHeaders] = None,
516 content_type: Optional[str] = None,
517 charset: Optional[str] = None,
518 zlib_executor_size: Optional[int] = None,
519 zlib_executor: Optional[Executor] = None,
520 ) -> None:
521 if body is not None and text is not None:
522 raise ValueError("body and text are not allowed together")
524 if headers is None:
525 real_headers: CIMultiDict[str] = CIMultiDict()
526 elif not isinstance(headers, CIMultiDict):
527 real_headers = CIMultiDict(headers)
528 else:
529 real_headers = headers # = cast('CIMultiDict[str]', headers)
531 if content_type is not None and "charset" in content_type:
532 raise ValueError("charset must not be in content_type " "argument")
534 if text is not None:
535 if hdrs.CONTENT_TYPE in real_headers:
536 if content_type or charset:
537 raise ValueError(
538 "passing both Content-Type header and "
539 "content_type or charset params "
540 "is forbidden"
541 )
542 else:
543 # fast path for filling headers
544 if not isinstance(text, str):
545 raise TypeError("text argument must be str (%r)" % type(text))
546 if content_type is None:
547 content_type = "text/plain"
548 if charset is None:
549 charset = "utf-8"
550 real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset
551 body = text.encode(charset)
552 text = None
553 else:
554 if hdrs.CONTENT_TYPE in real_headers:
555 if content_type is not None or charset is not None:
556 raise ValueError(
557 "passing both Content-Type header and "
558 "content_type or charset params "
559 "is forbidden"
560 )
561 else:
562 if content_type is not None:
563 if charset is not None:
564 content_type += "; charset=" + charset
565 real_headers[hdrs.CONTENT_TYPE] = content_type
567 super().__init__(status=status, reason=reason, headers=real_headers)
569 if text is not None:
570 self.text = text
571 else:
572 self.body = body
574 self._compressed_body: Optional[bytes] = None
575 self._zlib_executor_size = zlib_executor_size
576 self._zlib_executor = zlib_executor
578 @property
579 def body(self) -> Optional[Union[bytes, Payload]]:
580 return self._body
582 @body.setter
583 def body(self, body: bytes) -> None:
584 if body is None:
585 self._body: Optional[bytes] = None
586 self._body_payload: bool = False
587 elif isinstance(body, (bytes, bytearray)):
588 self._body = body
589 self._body_payload = False
590 else:
591 try:
592 self._body = body = payload.PAYLOAD_REGISTRY.get(body)
593 except payload.LookupError:
594 raise ValueError("Unsupported body type %r" % type(body))
596 self._body_payload = True
598 headers = self._headers
600 # set content-type
601 if hdrs.CONTENT_TYPE not in headers:
602 headers[hdrs.CONTENT_TYPE] = body.content_type
604 # copy payload headers
605 if body.headers:
606 for key, value in body.headers.items():
607 if key not in headers:
608 headers[key] = value
610 self._compressed_body = None
612 @property
613 def text(self) -> Optional[str]:
614 if self._body is None:
615 return None
616 return self._body.decode(self.charset or "utf-8")
618 @text.setter
619 def text(self, text: str) -> None:
620 assert isinstance(text, str), "text argument must be str (%r)" % type(text)
622 if self.content_type == "application/octet-stream":
623 self.content_type = "text/plain"
624 if self.charset is None:
625 self.charset = "utf-8"
627 self._body = text.encode(self.charset)
628 self._body_payload = False
629 self._compressed_body = None
631 @property
632 def content_length(self) -> Optional[int]:
633 if self._chunked:
634 return None
636 if hdrs.CONTENT_LENGTH in self._headers:
637 return super().content_length
639 if self._compressed_body is not None:
640 # Return length of the compressed body
641 return len(self._compressed_body)
642 elif self._body_payload:
643 # A payload without content length, or a compressed payload
644 return None
645 elif self._body is not None:
646 return len(self._body)
647 else:
648 return 0
650 @content_length.setter
651 def content_length(self, value: Optional[int]) -> None:
652 raise RuntimeError("Content length is set automatically")
654 async def write_eof(self, data: bytes = b"") -> None:
655 if self._eof_sent:
656 return
657 if self._compressed_body is None:
658 body: Optional[Union[bytes, Payload]] = self._body
659 else:
660 body = self._compressed_body
661 assert not data, f"data arg is not supported, got {data!r}"
662 assert self._req is not None
663 assert self._payload_writer is not None
664 if body is not None:
665 if self._must_be_empty_body:
666 await super().write_eof()
667 elif self._body_payload:
668 payload = cast(Payload, body)
669 await payload.write(self._payload_writer)
670 await super().write_eof()
671 else:
672 await super().write_eof(cast(bytes, body))
673 else:
674 await super().write_eof()
676 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
677 if should_remove_content_length(request.method, self.status):
678 if hdrs.CONTENT_LENGTH in self._headers:
679 del self._headers[hdrs.CONTENT_LENGTH]
680 elif not self._chunked and hdrs.CONTENT_LENGTH not in self._headers:
681 if self._body_payload:
682 size = cast(Payload, self._body).size
683 if size is not None:
684 self._headers[hdrs.CONTENT_LENGTH] = str(size)
685 else:
686 body_len = len(self._body) if self._body else "0"
687 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-7
688 if body_len != "0" or (
689 self.status != 304 and request.method.upper() != hdrs.METH_HEAD
690 ):
691 self._headers[hdrs.CONTENT_LENGTH] = str(body_len)
693 return await super()._start(request)
695 async def _do_start_compression(self, coding: ContentCoding) -> None:
696 if self._body_payload or self._chunked:
697 return await super()._do_start_compression(coding)
699 if coding != ContentCoding.identity:
700 # Instead of using _payload_writer.enable_compression,
701 # compress the whole body
702 compressor = ZLibCompressor(
703 encoding=str(coding.value),
704 max_sync_chunk_size=self._zlib_executor_size,
705 executor=self._zlib_executor,
706 )
707 assert self._body is not None
708 if self._zlib_executor_size is None and len(self._body) > 1024 * 1024:
709 warnings.warn(
710 "Synchronous compression of large response bodies "
711 f"({len(self._body)} bytes) might block the async event loop. "
712 "Consider providing a custom value to zlib_executor_size/"
713 "zlib_executor response properties or disabling compression on it."
714 )
715 self._compressed_body = (
716 await compressor.compress(self._body) + compressor.flush()
717 )
718 assert self._compressed_body is not None
720 self._headers[hdrs.CONTENT_ENCODING] = coding.value
721 self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body))
724def json_response(
725 data: Any = sentinel,
726 *,
727 text: Optional[str] = None,
728 body: Optional[bytes] = None,
729 status: int = 200,
730 reason: Optional[str] = None,
731 headers: Optional[LooseHeaders] = None,
732 content_type: str = "application/json",
733 dumps: JSONEncoder = json.dumps,
734) -> Response:
735 if data is not sentinel:
736 if text or body:
737 raise ValueError("only one of data, text, or body should be specified")
738 else:
739 text = dumps(data)
740 return Response(
741 text=text,
742 body=body,
743 status=status,
744 reason=reason,
745 headers=headers,
746 content_type=content_type,
747 )