Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/sansio/response.py: 57%
261 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 07:17 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 07:17 +0000
1from __future__ import annotations
3import typing as t
4from datetime import datetime
5from datetime import timedelta
6from datetime import timezone
7from http import HTTPStatus
9from ..datastructures import Headers
10from ..datastructures import HeaderSet
11from ..http import dump_cookie
12from ..http import HTTP_STATUS_CODES
13from ..utils import get_content_type
14from werkzeug.datastructures import CallbackDict
15from werkzeug.datastructures import ContentRange
16from werkzeug.datastructures import ContentSecurityPolicy
17from werkzeug.datastructures import ResponseCacheControl
18from werkzeug.datastructures import WWWAuthenticate
19from werkzeug.http import COEP
20from werkzeug.http import COOP
21from werkzeug.http import dump_age
22from werkzeug.http import dump_header
23from werkzeug.http import dump_options_header
24from werkzeug.http import http_date
25from werkzeug.http import parse_age
26from werkzeug.http import parse_cache_control_header
27from werkzeug.http import parse_content_range_header
28from werkzeug.http import parse_csp_header
29from werkzeug.http import parse_date
30from werkzeug.http import parse_options_header
31from werkzeug.http import parse_set_header
32from werkzeug.http import quote_etag
33from werkzeug.http import unquote_etag
34from werkzeug.utils import header_property
37def _set_property(name: str, doc: str | None = None) -> property:
38 def fget(self: Response) -> HeaderSet:
39 def on_update(header_set: HeaderSet) -> None:
40 if not header_set and name in self.headers:
41 del self.headers[name]
42 elif header_set:
43 self.headers[name] = header_set.to_header()
45 return parse_set_header(self.headers.get(name), on_update)
47 def fset(
48 self: Response,
49 value: None | (str | dict[str, str | int] | t.Iterable[str]),
50 ) -> None:
51 if not value:
52 del self.headers[name]
53 elif isinstance(value, str):
54 self.headers[name] = value
55 else:
56 self.headers[name] = dump_header(value)
58 return property(fget, fset, doc=doc)
61class Response:
62 """Represents the non-IO parts of an HTTP response, specifically the
63 status and headers but not the body.
65 This class is not meant for general use. It should only be used when
66 implementing WSGI, ASGI, or another HTTP application spec. Werkzeug
67 provides a WSGI implementation at :cls:`werkzeug.wrappers.Response`.
69 :param status: The status code for the response. Either an int, in
70 which case the default status message is added, or a string in
71 the form ``{code} {message}``, like ``404 Not Found``. Defaults
72 to 200.
73 :param headers: A :class:`~werkzeug.datastructures.Headers` object,
74 or a list of ``(key, value)`` tuples that will be converted to a
75 ``Headers`` object.
76 :param mimetype: The mime type (content type without charset or
77 other parameters) of the response. If the value starts with
78 ``text/`` (or matches some other special cases), the charset
79 will be added to create the ``content_type``.
80 :param content_type: The full content type of the response.
81 Overrides building the value from ``mimetype``.
83 .. versionchanged:: 3.0
84 The ``charset`` attribute was removed.
86 .. versionadded:: 2.0
87 """
89 #: the default status if none is provided.
90 default_status = 200
92 #: the default mimetype if none is provided.
93 default_mimetype: str | None = "text/plain"
95 #: Warn if a cookie header exceeds this size. The default, 4093, should be
96 #: safely `supported by most browsers <cookie_>`_. A cookie larger than
97 #: this size will still be sent, but it may be ignored or handled
98 #: incorrectly by some browsers. Set to 0 to disable this check.
99 #:
100 #: .. versionadded:: 0.13
101 #:
102 #: .. _`cookie`: http://browsercookielimits.squawky.net/
103 max_cookie_size = 4093
105 # A :class:`Headers` object representing the response headers.
106 headers: Headers
108 def __init__(
109 self,
110 status: int | str | HTTPStatus | None = None,
111 headers: t.Mapping[str, str | t.Iterable[str]]
112 | t.Iterable[tuple[str, str]]
113 | None = None,
114 mimetype: str | None = None,
115 content_type: str | None = None,
116 ) -> None:
117 if isinstance(headers, Headers):
118 self.headers = headers
119 elif not headers:
120 self.headers = Headers()
121 else:
122 self.headers = Headers(headers)
124 if content_type is None:
125 if mimetype is None and "content-type" not in self.headers:
126 mimetype = self.default_mimetype
127 if mimetype is not None:
128 mimetype = get_content_type(mimetype, "utf-8")
129 content_type = mimetype
130 if content_type is not None:
131 self.headers["Content-Type"] = content_type
132 if status is None:
133 status = self.default_status
134 self.status = status # type: ignore
136 def __repr__(self) -> str:
137 return f"<{type(self).__name__} [{self.status}]>"
139 @property
140 def status_code(self) -> int:
141 """The HTTP status code as a number."""
142 return self._status_code
144 @status_code.setter
145 def status_code(self, code: int) -> None:
146 self.status = code # type: ignore
148 @property
149 def status(self) -> str:
150 """The HTTP status code as a string."""
151 return self._status
153 @status.setter
154 def status(self, value: str | int | HTTPStatus) -> None:
155 self._status, self._status_code = self._clean_status(value)
157 def _clean_status(self, value: str | int | HTTPStatus) -> tuple[str, int]:
158 if isinstance(value, (int, HTTPStatus)):
159 status_code = int(value)
160 else:
161 value = value.strip()
163 if not value:
164 raise ValueError("Empty status argument")
166 code_str, sep, _ = value.partition(" ")
168 try:
169 status_code = int(code_str)
170 except ValueError:
171 # only message
172 return f"0 {value}", 0
174 if sep:
175 # code and message
176 return value, status_code
178 # only code, look up message
179 try:
180 status = f"{status_code} {HTTP_STATUS_CODES[status_code].upper()}"
181 except KeyError:
182 status = f"{status_code} UNKNOWN"
184 return status, status_code
186 def set_cookie(
187 self,
188 key: str,
189 value: str = "",
190 max_age: timedelta | int | None = None,
191 expires: str | datetime | int | float | None = None,
192 path: str | None = "/",
193 domain: str | None = None,
194 secure: bool = False,
195 httponly: bool = False,
196 samesite: str | None = None,
197 ) -> None:
198 """Sets a cookie.
200 A warning is raised if the size of the cookie header exceeds
201 :attr:`max_cookie_size`, but the header will still be set.
203 :param key: the key (name) of the cookie to be set.
204 :param value: the value of the cookie.
205 :param max_age: should be a number of seconds, or `None` (default) if
206 the cookie should last only as long as the client's
207 browser session.
208 :param expires: should be a `datetime` object or UNIX timestamp.
209 :param path: limits the cookie to a given path, per default it will
210 span the whole domain.
211 :param domain: if you want to set a cross-domain cookie. For example,
212 ``domain="example.com"`` will set a cookie that is
213 readable by the domain ``www.example.com``,
214 ``foo.example.com`` etc. Otherwise, a cookie will only
215 be readable by the domain that set it.
216 :param secure: If ``True``, the cookie will only be available
217 via HTTPS.
218 :param httponly: Disallow JavaScript access to the cookie.
219 :param samesite: Limit the scope of the cookie to only be
220 attached to requests that are "same-site".
221 """
222 self.headers.add(
223 "Set-Cookie",
224 dump_cookie(
225 key,
226 value=value,
227 max_age=max_age,
228 expires=expires,
229 path=path,
230 domain=domain,
231 secure=secure,
232 httponly=httponly,
233 max_size=self.max_cookie_size,
234 samesite=samesite,
235 ),
236 )
238 def delete_cookie(
239 self,
240 key: str,
241 path: str | None = "/",
242 domain: str | None = None,
243 secure: bool = False,
244 httponly: bool = False,
245 samesite: str | None = None,
246 ) -> None:
247 """Delete a cookie. Fails silently if key doesn't exist.
249 :param key: the key (name) of the cookie to be deleted.
250 :param path: if the cookie that should be deleted was limited to a
251 path, the path has to be defined here.
252 :param domain: if the cookie that should be deleted was limited to a
253 domain, that domain has to be defined here.
254 :param secure: If ``True``, the cookie will only be available
255 via HTTPS.
256 :param httponly: Disallow JavaScript access to the cookie.
257 :param samesite: Limit the scope of the cookie to only be
258 attached to requests that are "same-site".
259 """
260 self.set_cookie(
261 key,
262 expires=0,
263 max_age=0,
264 path=path,
265 domain=domain,
266 secure=secure,
267 httponly=httponly,
268 samesite=samesite,
269 )
271 @property
272 def is_json(self) -> bool:
273 """Check if the mimetype indicates JSON data, either
274 :mimetype:`application/json` or :mimetype:`application/*+json`.
275 """
276 mt = self.mimetype
277 return mt is not None and (
278 mt == "application/json"
279 or mt.startswith("application/")
280 and mt.endswith("+json")
281 )
283 # Common Descriptors
285 @property
286 def mimetype(self) -> str | None:
287 """The mimetype (content type without charset etc.)"""
288 ct = self.headers.get("content-type")
290 if ct:
291 return ct.split(";")[0].strip()
292 else:
293 return None
295 @mimetype.setter
296 def mimetype(self, value: str) -> None:
297 self.headers["Content-Type"] = get_content_type(value, "utf-8")
299 @property
300 def mimetype_params(self) -> dict[str, str]:
301 """The mimetype parameters as dict. For example if the
302 content type is ``text/html; charset=utf-8`` the params would be
303 ``{'charset': 'utf-8'}``.
305 .. versionadded:: 0.5
306 """
308 def on_update(d: CallbackDict) -> None:
309 self.headers["Content-Type"] = dump_options_header(self.mimetype, d)
311 d = parse_options_header(self.headers.get("content-type", ""))[1]
312 return CallbackDict(d, on_update)
314 location = header_property[str](
315 "Location",
316 doc="""The Location response-header field is used to redirect
317 the recipient to a location other than the Request-URI for
318 completion of the request or identification of a new
319 resource.""",
320 )
321 age = header_property(
322 "Age",
323 None,
324 parse_age,
325 dump_age, # type: ignore
326 doc="""The Age response-header field conveys the sender's
327 estimate of the amount of time since the response (or its
328 revalidation) was generated at the origin server.
330 Age values are non-negative decimal integers, representing time
331 in seconds.""",
332 )
333 content_type = header_property[str](
334 "Content-Type",
335 doc="""The Content-Type entity-header field indicates the media
336 type of the entity-body sent to the recipient or, in the case of
337 the HEAD method, the media type that would have been sent had
338 the request been a GET.""",
339 )
340 content_length = header_property(
341 "Content-Length",
342 None,
343 int,
344 str,
345 doc="""The Content-Length entity-header field indicates the size
346 of the entity-body, in decimal number of OCTETs, sent to the
347 recipient or, in the case of the HEAD method, the size of the
348 entity-body that would have been sent had the request been a
349 GET.""",
350 )
351 content_location = header_property[str](
352 "Content-Location",
353 doc="""The Content-Location entity-header field MAY be used to
354 supply the resource location for the entity enclosed in the
355 message when that entity is accessible from a location separate
356 from the requested resource's URI.""",
357 )
358 content_encoding = header_property[str](
359 "Content-Encoding",
360 doc="""The Content-Encoding entity-header field is used as a
361 modifier to the media-type. When present, its value indicates
362 what additional content codings have been applied to the
363 entity-body, and thus what decoding mechanisms must be applied
364 in order to obtain the media-type referenced by the Content-Type
365 header field.""",
366 )
367 content_md5 = header_property[str](
368 "Content-MD5",
369 doc="""The Content-MD5 entity-header field, as defined in
370 RFC 1864, is an MD5 digest of the entity-body for the purpose of
371 providing an end-to-end message integrity check (MIC) of the
372 entity-body. (Note: a MIC is good for detecting accidental
373 modification of the entity-body in transit, but is not proof
374 against malicious attacks.)""",
375 )
376 date = header_property(
377 "Date",
378 None,
379 parse_date,
380 http_date,
381 doc="""The Date general-header field represents the date and
382 time at which the message was originated, having the same
383 semantics as orig-date in RFC 822.
385 .. versionchanged:: 2.0
386 The datetime object is timezone-aware.
387 """,
388 )
389 expires = header_property(
390 "Expires",
391 None,
392 parse_date,
393 http_date,
394 doc="""The Expires entity-header field gives the date/time after
395 which the response is considered stale. A stale cache entry may
396 not normally be returned by a cache.
398 .. versionchanged:: 2.0
399 The datetime object is timezone-aware.
400 """,
401 )
402 last_modified = header_property(
403 "Last-Modified",
404 None,
405 parse_date,
406 http_date,
407 doc="""The Last-Modified entity-header field indicates the date
408 and time at which the origin server believes the variant was
409 last modified.
411 .. versionchanged:: 2.0
412 The datetime object is timezone-aware.
413 """,
414 )
416 @property
417 def retry_after(self) -> datetime | None:
418 """The Retry-After response-header field can be used with a
419 503 (Service Unavailable) response to indicate how long the
420 service is expected to be unavailable to the requesting client.
422 Time in seconds until expiration or date.
424 .. versionchanged:: 2.0
425 The datetime object is timezone-aware.
426 """
427 value = self.headers.get("retry-after")
428 if value is None:
429 return None
431 try:
432 seconds = int(value)
433 except ValueError:
434 return parse_date(value)
436 return datetime.now(timezone.utc) + timedelta(seconds=seconds)
438 @retry_after.setter
439 def retry_after(self, value: datetime | int | str | None) -> None:
440 if value is None:
441 if "retry-after" in self.headers:
442 del self.headers["retry-after"]
443 return
444 elif isinstance(value, datetime):
445 value = http_date(value)
446 else:
447 value = str(value)
448 self.headers["Retry-After"] = value
450 vary = _set_property(
451 "Vary",
452 doc="""The Vary field value indicates the set of request-header
453 fields that fully determines, while the response is fresh,
454 whether a cache is permitted to use the response to reply to a
455 subsequent request without revalidation.""",
456 )
457 content_language = _set_property(
458 "Content-Language",
459 doc="""The Content-Language entity-header field describes the
460 natural language(s) of the intended audience for the enclosed
461 entity. Note that this might not be equivalent to all the
462 languages used within the entity-body.""",
463 )
464 allow = _set_property(
465 "Allow",
466 doc="""The Allow entity-header field lists the set of methods
467 supported by the resource identified by the Request-URI. The
468 purpose of this field is strictly to inform the recipient of
469 valid methods associated with the resource. An Allow header
470 field MUST be present in a 405 (Method Not Allowed)
471 response.""",
472 )
474 # ETag
476 @property
477 def cache_control(self) -> ResponseCacheControl:
478 """The Cache-Control general-header field is used to specify
479 directives that MUST be obeyed by all caching mechanisms along the
480 request/response chain.
481 """
483 def on_update(cache_control: ResponseCacheControl) -> None:
484 if not cache_control and "cache-control" in self.headers:
485 del self.headers["cache-control"]
486 elif cache_control:
487 self.headers["Cache-Control"] = cache_control.to_header()
489 return parse_cache_control_header(
490 self.headers.get("cache-control"), on_update, ResponseCacheControl
491 )
493 def set_etag(self, etag: str, weak: bool = False) -> None:
494 """Set the etag, and override the old one if there was one."""
495 self.headers["ETag"] = quote_etag(etag, weak)
497 def get_etag(self) -> tuple[str, bool] | tuple[None, None]:
498 """Return a tuple in the form ``(etag, is_weak)``. If there is no
499 ETag the return value is ``(None, None)``.
500 """
501 return unquote_etag(self.headers.get("ETag"))
503 accept_ranges = header_property[str](
504 "Accept-Ranges",
505 doc="""The `Accept-Ranges` header. Even though the name would
506 indicate that multiple values are supported, it must be one
507 string token only.
509 The values ``'bytes'`` and ``'none'`` are common.
511 .. versionadded:: 0.7""",
512 )
514 @property
515 def content_range(self) -> ContentRange:
516 """The ``Content-Range`` header as a
517 :class:`~werkzeug.datastructures.ContentRange` object. Available
518 even if the header is not set.
520 .. versionadded:: 0.7
521 """
523 def on_update(rng: ContentRange) -> None:
524 if not rng:
525 del self.headers["content-range"]
526 else:
527 self.headers["Content-Range"] = rng.to_header()
529 rv = parse_content_range_header(self.headers.get("content-range"), on_update)
530 # always provide a content range object to make the descriptor
531 # more user friendly. It provides an unset() method that can be
532 # used to remove the header quickly.
533 if rv is None:
534 rv = ContentRange(None, None, None, on_update=on_update)
535 return rv
537 @content_range.setter
538 def content_range(self, value: ContentRange | str | None) -> None:
539 if not value:
540 del self.headers["content-range"]
541 elif isinstance(value, str):
542 self.headers["Content-Range"] = value
543 else:
544 self.headers["Content-Range"] = value.to_header()
546 # Authorization
548 @property
549 def www_authenticate(self) -> WWWAuthenticate:
550 """The ``WWW-Authenticate`` header parsed into a :class:`.WWWAuthenticate`
551 object. Modifying the object will modify the header value.
553 This header is not set by default. To set this header, assign an instance of
554 :class:`.WWWAuthenticate` to this attribute.
556 .. code-block:: python
558 response.www_authenticate = WWWAuthenticate(
559 "basic", {"realm": "Authentication Required"}
560 )
562 Multiple values for this header can be sent to give the client multiple options.
563 Assign a list to set multiple headers. However, modifying the items in the list
564 will not automatically update the header values, and accessing this attribute
565 will only ever return the first value.
567 To unset this header, assign ``None`` or use ``del``.
569 .. versionchanged:: 2.3
570 This attribute can be assigned to to set the header. A list can be assigned
571 to set multiple header values. Use ``del`` to unset the header.
573 .. versionchanged:: 2.3
574 :class:`WWWAuthenticate` is no longer a ``dict``. The ``token`` attribute
575 was added for auth challenges that use a token instead of parameters.
576 """
577 value = WWWAuthenticate.from_header(self.headers.get("WWW-Authenticate"))
579 if value is None:
580 value = WWWAuthenticate("basic")
582 def on_update(value: WWWAuthenticate) -> None:
583 self.www_authenticate = value
585 value._on_update = on_update
586 return value
588 @www_authenticate.setter
589 def www_authenticate(
590 self, value: WWWAuthenticate | list[WWWAuthenticate] | None
591 ) -> None:
592 if not value: # None or empty list
593 del self.www_authenticate
594 elif isinstance(value, list):
595 # Clear any existing header by setting the first item.
596 self.headers.set("WWW-Authenticate", value[0].to_header())
598 for item in value[1:]:
599 # Add additional header lines for additional items.
600 self.headers.add("WWW-Authenticate", item.to_header())
601 else:
602 self.headers.set("WWW-Authenticate", value.to_header())
604 def on_update(value: WWWAuthenticate) -> None:
605 self.www_authenticate = value
607 # When setting a single value, allow updating it directly.
608 value._on_update = on_update
610 @www_authenticate.deleter
611 def www_authenticate(self) -> None:
612 if "WWW-Authenticate" in self.headers:
613 del self.headers["WWW-Authenticate"]
615 # CSP
617 @property
618 def content_security_policy(self) -> ContentSecurityPolicy:
619 """The ``Content-Security-Policy`` header as a
620 :class:`~werkzeug.datastructures.ContentSecurityPolicy` object. Available
621 even if the header is not set.
623 The Content-Security-Policy header adds an additional layer of
624 security to help detect and mitigate certain types of attacks.
625 """
627 def on_update(csp: ContentSecurityPolicy) -> None:
628 if not csp:
629 del self.headers["content-security-policy"]
630 else:
631 self.headers["Content-Security-Policy"] = csp.to_header()
633 rv = parse_csp_header(self.headers.get("content-security-policy"), on_update)
634 if rv is None:
635 rv = ContentSecurityPolicy(None, on_update=on_update)
636 return rv
638 @content_security_policy.setter
639 def content_security_policy(
640 self, value: ContentSecurityPolicy | str | None
641 ) -> None:
642 if not value:
643 del self.headers["content-security-policy"]
644 elif isinstance(value, str):
645 self.headers["Content-Security-Policy"] = value
646 else:
647 self.headers["Content-Security-Policy"] = value.to_header()
649 @property
650 def content_security_policy_report_only(self) -> ContentSecurityPolicy:
651 """The ``Content-Security-policy-report-only`` header as a
652 :class:`~werkzeug.datastructures.ContentSecurityPolicy` object. Available
653 even if the header is not set.
655 The Content-Security-Policy-Report-Only header adds a csp policy
656 that is not enforced but is reported thereby helping detect
657 certain types of attacks.
658 """
660 def on_update(csp: ContentSecurityPolicy) -> None:
661 if not csp:
662 del self.headers["content-security-policy-report-only"]
663 else:
664 self.headers["Content-Security-policy-report-only"] = csp.to_header()
666 rv = parse_csp_header(
667 self.headers.get("content-security-policy-report-only"), on_update
668 )
669 if rv is None:
670 rv = ContentSecurityPolicy(None, on_update=on_update)
671 return rv
673 @content_security_policy_report_only.setter
674 def content_security_policy_report_only(
675 self, value: ContentSecurityPolicy | str | None
676 ) -> None:
677 if not value:
678 del self.headers["content-security-policy-report-only"]
679 elif isinstance(value, str):
680 self.headers["Content-Security-policy-report-only"] = value
681 else:
682 self.headers["Content-Security-policy-report-only"] = value.to_header()
684 # CORS
686 @property
687 def access_control_allow_credentials(self) -> bool:
688 """Whether credentials can be shared by the browser to
689 JavaScript code. As part of the preflight request it indicates
690 whether credentials can be used on the cross origin request.
691 """
692 return "Access-Control-Allow-Credentials" in self.headers
694 @access_control_allow_credentials.setter
695 def access_control_allow_credentials(self, value: bool | None) -> None:
696 if value is True:
697 self.headers["Access-Control-Allow-Credentials"] = "true"
698 else:
699 self.headers.pop("Access-Control-Allow-Credentials", None)
701 access_control_allow_headers = header_property(
702 "Access-Control-Allow-Headers",
703 load_func=parse_set_header,
704 dump_func=dump_header,
705 doc="Which headers can be sent with the cross origin request.",
706 )
708 access_control_allow_methods = header_property(
709 "Access-Control-Allow-Methods",
710 load_func=parse_set_header,
711 dump_func=dump_header,
712 doc="Which methods can be used for the cross origin request.",
713 )
715 access_control_allow_origin = header_property[str](
716 "Access-Control-Allow-Origin",
717 doc="The origin or '*' for any origin that may make cross origin requests.",
718 )
720 access_control_expose_headers = header_property(
721 "Access-Control-Expose-Headers",
722 load_func=parse_set_header,
723 dump_func=dump_header,
724 doc="Which headers can be shared by the browser to JavaScript code.",
725 )
727 access_control_max_age = header_property(
728 "Access-Control-Max-Age",
729 load_func=int,
730 dump_func=str,
731 doc="The maximum age in seconds the access control settings can be cached for.",
732 )
734 cross_origin_opener_policy = header_property[COOP](
735 "Cross-Origin-Opener-Policy",
736 load_func=lambda value: COOP(value),
737 dump_func=lambda value: value.value,
738 default=COOP.UNSAFE_NONE,
739 doc="""Allows control over sharing of browsing context group with cross-origin
740 documents. Values must be a member of the :class:`werkzeug.http.COOP` enum.""",
741 )
743 cross_origin_embedder_policy = header_property[COEP](
744 "Cross-Origin-Embedder-Policy",
745 load_func=lambda value: COEP(value),
746 dump_func=lambda value: value.value,
747 default=COEP.UNSAFE_NONE,
748 doc="""Prevents a document from loading any cross-origin resources that do not
749 explicitly grant the document permission. Values must be a member of the
750 :class:`werkzeug.http.COEP` enum.""",
751 )