Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_response.py: 28%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import collections.abc
3import datetime
4import enum
5import json
6import math
7import time
8import warnings
9from concurrent.futures import Executor
10from http import HTTPStatus
11from typing import (
12 TYPE_CHECKING,
13 Any,
14 Dict,
15 Iterator,
16 MutableMapping,
17 Optional,
18 Union,
19 cast,
20)
22from multidict import CIMultiDict, istr
24from . import hdrs, payload
25from .abc import AbstractStreamWriter
26from .compression_utils import ZLibCompressor
27from .helpers import (
28 ETAG_ANY,
29 QUOTED_ETAG_RE,
30 CookieMixin,
31 ETag,
32 HeadersMixin,
33 must_be_empty_body,
34 parse_http_date,
35 populate_with_cookies,
36 rfc822_formatted_time,
37 sentinel,
38 should_remove_content_length,
39 validate_etag_value,
40)
41from .http import SERVER_SOFTWARE, HttpVersion10, HttpVersion11
42from .payload import Payload
43from .typedefs import JSONEncoder, LooseHeaders
45REASON_PHRASES = {http_status.value: http_status.phrase for http_status in HTTPStatus}
46LARGE_BODY_SIZE = 1024**2
48__all__ = ("ContentCoding", "StreamResponse", "Response", "json_response")
51if TYPE_CHECKING:
52 from .web_request import BaseRequest
54 BaseClass = MutableMapping[str, Any]
55else:
56 BaseClass = collections.abc.MutableMapping
59# TODO(py311): Convert to StrEnum for wider use
60class ContentCoding(enum.Enum):
61 # The content codings that we have support for.
62 #
63 # Additional registered codings are listed at:
64 # https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding
65 deflate = "deflate"
66 gzip = "gzip"
67 identity = "identity"
70CONTENT_CODINGS = {coding.value: coding for coding in ContentCoding}
72############################################################
73# HTTP Response classes
74############################################################
77class StreamResponse(BaseClass, HeadersMixin, CookieMixin):
79 _body: Union[None, bytes, bytearray, Payload]
80 _length_check = True
81 _body = None
82 _keep_alive: Optional[bool] = None
83 _chunked: bool = False
84 _compression: bool = False
85 _compression_strategy: Optional[int] = None
86 _compression_force: Optional[ContentCoding] = None
87 _req: Optional["BaseRequest"] = None
88 _payload_writer: Optional[AbstractStreamWriter] = None
89 _eof_sent: bool = False
90 _must_be_empty_body: Optional[bool] = None
91 _body_length = 0
93 def __init__(
94 self,
95 *,
96 status: int = 200,
97 reason: Optional[str] = None,
98 headers: Optional[LooseHeaders] = None,
99 _real_headers: Optional[CIMultiDict[str]] = None,
100 ) -> None:
101 """Initialize a new stream response object.
103 _real_headers is an internal parameter used to pass a pre-populated
104 headers object. It is used by the `Response` class to avoid copying
105 the headers when creating a new response object. It is not intended
106 to be used by external code.
107 """
108 self._state: Dict[str, Any] = {}
110 if _real_headers is not None:
111 self._headers = _real_headers
112 elif headers is not None:
113 self._headers: CIMultiDict[str] = CIMultiDict(headers)
114 else:
115 self._headers = CIMultiDict()
117 self._set_status(status, reason)
119 @property
120 def prepared(self) -> bool:
121 return self._eof_sent or self._payload_writer is not None
123 @property
124 def task(self) -> "Optional[asyncio.Task[None]]":
125 if self._req:
126 return self._req.task
127 else:
128 return None
130 @property
131 def status(self) -> int:
132 return self._status
134 @property
135 def chunked(self) -> bool:
136 return self._chunked
138 @property
139 def compression(self) -> bool:
140 return self._compression
142 @property
143 def reason(self) -> str:
144 return self._reason
146 def set_status(
147 self,
148 status: int,
149 reason: Optional[str] = None,
150 ) -> None:
151 assert (
152 not self.prepared
153 ), "Cannot change the response status code after the headers have been sent"
154 self._set_status(status, reason)
156 def _set_status(self, status: int, reason: Optional[str]) -> None:
157 self._status = status
158 if reason is None:
159 reason = REASON_PHRASES.get(self._status, "")
160 elif "\n" in reason:
161 raise ValueError("Reason cannot contain \\n")
162 self._reason = reason
164 @property
165 def keep_alive(self) -> Optional[bool]:
166 return self._keep_alive
168 def force_close(self) -> None:
169 self._keep_alive = False
171 @property
172 def body_length(self) -> int:
173 return self._body_length
175 def enable_chunked_encoding(self) -> None:
176 """Enables automatic chunked transfer encoding."""
177 if hdrs.CONTENT_LENGTH in self._headers:
178 raise RuntimeError(
179 "You can't enable chunked encoding when a content length is set"
180 )
181 self._chunked = True
183 def enable_compression(
184 self,
185 force: Optional[ContentCoding] = None,
186 strategy: Optional[int] = None,
187 ) -> None:
188 """Enables response compression encoding."""
189 self._compression = True
190 self._compression_force = force
191 self._compression_strategy = strategy
193 @property
194 def headers(self) -> "CIMultiDict[str]":
195 return self._headers
197 @property
198 def content_length(self) -> Optional[int]:
199 # Just a placeholder for adding setter
200 return super().content_length
202 @content_length.setter
203 def content_length(self, value: Optional[int]) -> None:
204 if value is not None:
205 value = int(value)
206 if self._chunked:
207 raise RuntimeError(
208 "You can't set content length when chunked encoding is enable"
209 )
210 self._headers[hdrs.CONTENT_LENGTH] = str(value)
211 else:
212 self._headers.pop(hdrs.CONTENT_LENGTH, None)
214 @property
215 def content_type(self) -> str:
216 # Just a placeholder for adding setter
217 return super().content_type
219 @content_type.setter
220 def content_type(self, value: str) -> None:
221 self.content_type # read header values if needed
222 self._content_type = str(value)
223 self._generate_content_type_header()
225 @property
226 def charset(self) -> Optional[str]:
227 # Just a placeholder for adding setter
228 return super().charset
230 @charset.setter
231 def charset(self, value: Optional[str]) -> None:
232 ctype = self.content_type # read header values if needed
233 if ctype == "application/octet-stream":
234 raise RuntimeError(
235 "Setting charset for application/octet-stream "
236 "doesn't make sense, setup content_type first"
237 )
238 assert self._content_dict is not None
239 if value is None:
240 self._content_dict.pop("charset", None)
241 else:
242 self._content_dict["charset"] = str(value).lower()
243 self._generate_content_type_header()
245 @property
246 def last_modified(self) -> Optional[datetime.datetime]:
247 """The value of Last-Modified HTTP header, or None.
249 This header is represented as a `datetime` object.
250 """
251 return parse_http_date(self._headers.get(hdrs.LAST_MODIFIED))
253 @last_modified.setter
254 def last_modified(
255 self, value: Optional[Union[int, float, datetime.datetime, str]]
256 ) -> None:
257 if value is None:
258 self._headers.pop(hdrs.LAST_MODIFIED, None)
259 elif isinstance(value, (int, float)):
260 self._headers[hdrs.LAST_MODIFIED] = time.strftime(
261 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(math.ceil(value))
262 )
263 elif isinstance(value, datetime.datetime):
264 self._headers[hdrs.LAST_MODIFIED] = time.strftime(
265 "%a, %d %b %Y %H:%M:%S GMT", value.utctimetuple()
266 )
267 elif isinstance(value, str):
268 self._headers[hdrs.LAST_MODIFIED] = value
269 else:
270 msg = f"Unsupported type for last_modified: {type(value).__name__}" # type: ignore[unreachable]
271 raise TypeError(msg)
273 @property
274 def etag(self) -> Optional[ETag]:
275 quoted_value = self._headers.get(hdrs.ETAG)
276 if not quoted_value:
277 return None
278 elif quoted_value == ETAG_ANY:
279 return ETag(value=ETAG_ANY)
280 match = QUOTED_ETAG_RE.fullmatch(quoted_value)
281 if not match:
282 return None
283 is_weak, value = match.group(1, 2)
284 return ETag(
285 is_weak=bool(is_weak),
286 value=value,
287 )
289 @etag.setter
290 def etag(self, value: Optional[Union[ETag, str]]) -> None:
291 if value is None:
292 self._headers.pop(hdrs.ETAG, None)
293 elif (isinstance(value, str) and value == ETAG_ANY) or (
294 isinstance(value, ETag) and value.value == ETAG_ANY
295 ):
296 self._headers[hdrs.ETAG] = ETAG_ANY
297 elif isinstance(value, str):
298 validate_etag_value(value)
299 self._headers[hdrs.ETAG] = f'"{value}"'
300 elif isinstance(value, ETag) and isinstance(value.value, str): # type: ignore[redundant-expr]
301 validate_etag_value(value.value)
302 hdr_value = f'W/"{value.value}"' if value.is_weak else f'"{value.value}"'
303 self._headers[hdrs.ETAG] = hdr_value
304 else:
305 raise ValueError(
306 f"Unsupported etag type: {type(value)}. "
307 f"etag must be str, ETag or None"
308 )
310 def _generate_content_type_header(
311 self, CONTENT_TYPE: istr = hdrs.CONTENT_TYPE
312 ) -> None:
313 assert self._content_dict is not None
314 assert self._content_type is not None
315 params = "; ".join(f"{k}={v}" for k, v in self._content_dict.items())
316 if params:
317 ctype = self._content_type + "; " + params
318 else:
319 ctype = self._content_type
320 self._headers[CONTENT_TYPE] = ctype
322 async def _do_start_compression(self, coding: ContentCoding) -> None:
323 if coding is ContentCoding.identity:
324 return
325 assert self._payload_writer is not None
326 self._headers[hdrs.CONTENT_ENCODING] = coding.value
327 self._payload_writer.enable_compression(
328 coding.value, self._compression_strategy
329 )
330 # Compressed payload may have different content length,
331 # remove the header
332 self._headers.popall(hdrs.CONTENT_LENGTH, None)
334 async def _start_compression(self, request: "BaseRequest") -> None:
335 if self._compression_force:
336 await self._do_start_compression(self._compression_force)
337 return
338 # Encoding comparisons should be case-insensitive
339 # https://www.rfc-editor.org/rfc/rfc9110#section-8.4.1
340 accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower()
341 for value, coding in CONTENT_CODINGS.items():
342 if value in accept_encoding:
343 await self._do_start_compression(coding)
344 return
346 async def prepare(self, request: "BaseRequest") -> Optional[AbstractStreamWriter]:
347 if self._eof_sent:
348 return None
349 if self._payload_writer is not None:
350 return self._payload_writer
351 self._must_be_empty_body = must_be_empty_body(request.method, self.status)
352 return await self._start(request)
354 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
355 self._req = request
356 writer = self._payload_writer = request._payload_writer
358 await self._prepare_headers()
359 await request._prepare_hook(self)
360 await self._write_headers()
362 return writer
364 async def _prepare_headers(self) -> None:
365 request = self._req
366 assert request is not None
367 writer = self._payload_writer
368 assert writer is not None
369 keep_alive = self._keep_alive
370 if keep_alive is None:
371 keep_alive = request.keep_alive
372 self._keep_alive = keep_alive
374 version = request.version
376 headers = self._headers
377 if self._cookies:
378 populate_with_cookies(headers, self._cookies)
380 if self._compression:
381 await self._start_compression(request)
383 if self._chunked:
384 if version != HttpVersion11:
385 raise RuntimeError(
386 "Using chunked encoding is forbidden "
387 "for HTTP/{0.major}.{0.minor}".format(request.version)
388 )
389 if not self._must_be_empty_body:
390 writer.enable_chunking()
391 headers[hdrs.TRANSFER_ENCODING] = "chunked"
392 elif self._length_check: # Disabled for WebSockets
393 writer.length = self.content_length
394 if writer.length is None:
395 if version >= HttpVersion11:
396 if not self._must_be_empty_body:
397 writer.enable_chunking()
398 headers[hdrs.TRANSFER_ENCODING] = "chunked"
399 elif not self._must_be_empty_body:
400 keep_alive = False
402 # HTTP 1.1: https://tools.ietf.org/html/rfc7230#section-3.3.2
403 # HTTP 1.0: https://tools.ietf.org/html/rfc1945#section-10.4
404 if self._must_be_empty_body:
405 if hdrs.CONTENT_LENGTH in headers and should_remove_content_length(
406 request.method, self.status
407 ):
408 del headers[hdrs.CONTENT_LENGTH]
409 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-10
410 # https://datatracker.ietf.org/doc/html/rfc9112#section-6.1-13
411 if hdrs.TRANSFER_ENCODING in headers:
412 del headers[hdrs.TRANSFER_ENCODING]
413 elif (writer.length if self._length_check else self.content_length) != 0:
414 # https://www.rfc-editor.org/rfc/rfc9110#section-8.3-5
415 headers.setdefault(hdrs.CONTENT_TYPE, "application/octet-stream")
416 headers.setdefault(hdrs.DATE, rfc822_formatted_time())
417 headers.setdefault(hdrs.SERVER, SERVER_SOFTWARE)
419 # connection header
420 if hdrs.CONNECTION not in headers:
421 if keep_alive:
422 if version == HttpVersion10:
423 headers[hdrs.CONNECTION] = "keep-alive"
424 elif version == HttpVersion11:
425 headers[hdrs.CONNECTION] = "close"
427 async def _write_headers(self) -> None:
428 request = self._req
429 assert request is not None
430 writer = self._payload_writer
431 assert writer is not None
432 # status line
433 version = request.version
434 status_line = f"HTTP/{version[0]}.{version[1]} {self._status} {self._reason}"
435 await writer.write_headers(status_line, self._headers)
437 async def write(
438 self, data: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
439 ) -> None:
440 assert isinstance(
441 data, (bytes, bytearray, memoryview)
442 ), "data argument must be byte-ish (%r)" % type(data)
444 if self._eof_sent:
445 raise RuntimeError("Cannot call write() after write_eof()")
446 if self._payload_writer is None:
447 raise RuntimeError("Cannot call write() before prepare()")
449 await self._payload_writer.write(data)
451 async def drain(self) -> None:
452 assert not self._eof_sent, "EOF has already been sent"
453 assert self._payload_writer is not None, "Response has not been started"
454 warnings.warn(
455 "drain method is deprecated, use await resp.write()",
456 DeprecationWarning,
457 stacklevel=2,
458 )
459 await self._payload_writer.drain()
461 async def write_eof(self, data: bytes = b"") -> None:
462 assert isinstance(
463 data, (bytes, bytearray, memoryview)
464 ), "data argument must be byte-ish (%r)" % type(data)
466 if self._eof_sent:
467 return
469 assert self._payload_writer is not None, "Response has not been started"
471 await self._payload_writer.write_eof(data)
472 self._eof_sent = True
473 self._req = None
474 self._body_length = self._payload_writer.output_size
475 self._payload_writer = None
477 def __repr__(self) -> str:
478 if self._eof_sent:
479 info = "eof"
480 elif self.prepared:
481 assert self._req is not None
482 info = f"{self._req.method} {self._req.path} "
483 else:
484 info = "not prepared"
485 return f"<{self.__class__.__name__} {self.reason} {info}>"
487 def __getitem__(self, key: str) -> Any:
488 return self._state[key]
490 def __setitem__(self, key: str, value: Any) -> None:
491 self._state[key] = value
493 def __delitem__(self, key: str) -> None:
494 del self._state[key]
496 def __len__(self) -> int:
497 return len(self._state)
499 def __iter__(self) -> Iterator[str]:
500 return iter(self._state)
502 def __hash__(self) -> int:
503 return hash(id(self))
505 def __eq__(self, other: object) -> bool:
506 return self is other
508 def __bool__(self) -> bool:
509 return True
512class Response(StreamResponse):
514 _compressed_body: Optional[bytes] = None
516 def __init__(
517 self,
518 *,
519 body: Any = None,
520 status: int = 200,
521 reason: Optional[str] = None,
522 text: Optional[str] = None,
523 headers: Optional[LooseHeaders] = None,
524 content_type: Optional[str] = None,
525 charset: Optional[str] = None,
526 zlib_executor_size: Optional[int] = None,
527 zlib_executor: Optional[Executor] = None,
528 ) -> None:
529 if body is not None and text is not None:
530 raise ValueError("body and text are not allowed together")
532 if headers is None:
533 real_headers: CIMultiDict[str] = CIMultiDict()
534 else:
535 real_headers = CIMultiDict(headers)
537 if content_type is not None and "charset" in content_type:
538 raise ValueError("charset must not be in content_type argument")
540 if text is not None:
541 if hdrs.CONTENT_TYPE in real_headers:
542 if content_type or charset:
543 raise ValueError(
544 "passing both Content-Type header and "
545 "content_type or charset params "
546 "is forbidden"
547 )
548 else:
549 # fast path for filling headers
550 if not isinstance(text, str):
551 raise TypeError("text argument must be str (%r)" % type(text))
552 if content_type is None:
553 content_type = "text/plain"
554 if charset is None:
555 charset = "utf-8"
556 real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset
557 body = text.encode(charset)
558 text = None
559 elif hdrs.CONTENT_TYPE in real_headers:
560 if content_type is not None or charset is not None:
561 raise ValueError(
562 "passing both Content-Type header and "
563 "content_type or charset params "
564 "is forbidden"
565 )
566 elif content_type is not None:
567 if charset is not None:
568 content_type += "; charset=" + charset
569 real_headers[hdrs.CONTENT_TYPE] = content_type
571 super().__init__(status=status, reason=reason, _real_headers=real_headers)
573 if text is not None:
574 self.text = text
575 else:
576 self.body = body
578 self._zlib_executor_size = zlib_executor_size
579 self._zlib_executor = zlib_executor
581 @property
582 def body(self) -> Optional[Union[bytes, bytearray, Payload]]:
583 return self._body
585 @body.setter
586 def body(self, body: Any) -> None:
587 if body is None:
588 self._body = None
589 elif isinstance(body, (bytes, bytearray)):
590 self._body = body
591 else:
592 try:
593 self._body = body = payload.PAYLOAD_REGISTRY.get(body)
594 except payload.LookupError:
595 raise ValueError("Unsupported body type %r" % type(body))
597 headers = self._headers
599 # set content-type
600 if hdrs.CONTENT_TYPE not in headers:
601 headers[hdrs.CONTENT_TYPE] = body.content_type
603 # copy payload headers
604 if body.headers:
605 for key, value in body.headers.items():
606 if key not in headers:
607 headers[key] = value
609 self._compressed_body = None
611 @property
612 def text(self) -> Optional[str]:
613 if self._body is None:
614 return None
615 return self._body.decode(self.charset or "utf-8")
617 @text.setter
618 def text(self, text: str) -> None:
619 assert isinstance(text, str), "text argument must be str (%r)" % type(text)
621 if self.content_type == "application/octet-stream":
622 self.content_type = "text/plain"
623 if self.charset is None:
624 self.charset = "utf-8"
626 self._body = text.encode(self.charset)
627 self._compressed_body = None
629 @property
630 def content_length(self) -> Optional[int]:
631 if self._chunked:
632 return None
634 if hdrs.CONTENT_LENGTH in self._headers:
635 return int(self._headers[hdrs.CONTENT_LENGTH])
637 if self._compressed_body is not None:
638 # Return length of the compressed body
639 return len(self._compressed_body)
640 elif isinstance(self._body, Payload):
641 # A payload without content length, or a compressed payload
642 return None
643 elif self._body is not None:
644 return len(self._body)
645 else:
646 return 0
648 @content_length.setter
649 def content_length(self, value: Optional[int]) -> None:
650 raise RuntimeError("Content length is set automatically")
652 async def write_eof(self, data: bytes = b"") -> None:
653 if self._eof_sent:
654 return
655 if self._compressed_body is None:
656 body = self._body
657 else:
658 body = self._compressed_body
659 assert not data, f"data arg is not supported, got {data!r}"
660 assert self._req is not None
661 assert self._payload_writer is not None
662 if body is None or self._must_be_empty_body:
663 await super().write_eof()
664 elif isinstance(self._body, Payload):
665 await self._body.write(self._payload_writer)
666 await super().write_eof()
667 else:
668 await super().write_eof(cast(bytes, body))
670 async def _start(self, request: "BaseRequest") -> AbstractStreamWriter:
671 if hdrs.CONTENT_LENGTH in self._headers:
672 if should_remove_content_length(request.method, self.status):
673 del self._headers[hdrs.CONTENT_LENGTH]
674 elif not self._chunked:
675 if isinstance(self._body, Payload):
676 if self._body.size is not None:
677 self._headers[hdrs.CONTENT_LENGTH] = str(self._body.size)
678 else:
679 body_len = len(self._body) if self._body else "0"
680 # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-7
681 if body_len != "0" or (
682 self.status != 304 and request.method not in hdrs.METH_HEAD_ALL
683 ):
684 self._headers[hdrs.CONTENT_LENGTH] = str(body_len)
686 return await super()._start(request)
688 async def _do_start_compression(self, coding: ContentCoding) -> None:
689 if self._chunked or isinstance(self._body, Payload):
690 return await super()._do_start_compression(coding)
691 if coding is ContentCoding.identity:
692 return
693 # Instead of using _payload_writer.enable_compression,
694 # compress the whole body
695 compressor = ZLibCompressor(
696 encoding=coding.value,
697 max_sync_chunk_size=self._zlib_executor_size,
698 executor=self._zlib_executor,
699 )
700 assert self._body is not None
701 if self._zlib_executor_size is None and len(self._body) > LARGE_BODY_SIZE:
702 warnings.warn(
703 "Synchronous compression of large response bodies "
704 f"({len(self._body)} bytes) might block the async event loop. "
705 "Consider providing a custom value to zlib_executor_size/"
706 "zlib_executor response properties or disabling compression on it."
707 )
708 self._compressed_body = (
709 await compressor.compress(self._body) + compressor.flush()
710 )
711 self._headers[hdrs.CONTENT_ENCODING] = coding.value
712 self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body))
715def json_response(
716 data: Any = sentinel,
717 *,
718 text: Optional[str] = None,
719 body: Optional[bytes] = None,
720 status: int = 200,
721 reason: Optional[str] = None,
722 headers: Optional[LooseHeaders] = None,
723 content_type: str = "application/json",
724 dumps: JSONEncoder = json.dumps,
725) -> Response:
726 if data is not sentinel:
727 if text or body:
728 raise ValueError("only one of data, text, or body should be specified")
729 else:
730 text = dumps(data)
731 return Response(
732 text=text,
733 body=body,
734 status=status,
735 reason=reason,
736 headers=headers,
737 content_type=content_type,
738 )