Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/sansio/response.py: 45%
248 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
1import typing as t
2from datetime import datetime
3from datetime import timedelta
4from datetime import timezone
5from http import HTTPStatus
7from .._internal import _to_str
8from ..datastructures import Headers
9from ..datastructures import HeaderSet
10from ..http import dump_cookie
11from ..http import HTTP_STATUS_CODES
12from ..utils import get_content_type
13from werkzeug.datastructures import CallbackDict
14from werkzeug.datastructures import ContentRange
15from werkzeug.datastructures import ContentSecurityPolicy
16from werkzeug.datastructures import ResponseCacheControl
17from werkzeug.datastructures import WWWAuthenticate
18from werkzeug.http import COEP
19from werkzeug.http import COOP
20from werkzeug.http import dump_age
21from werkzeug.http import dump_header
22from werkzeug.http import dump_options_header
23from werkzeug.http import http_date
24from werkzeug.http import parse_age
25from werkzeug.http import parse_cache_control_header
26from werkzeug.http import parse_content_range_header
27from werkzeug.http import parse_csp_header
28from werkzeug.http import parse_date
29from werkzeug.http import parse_options_header
30from werkzeug.http import parse_set_header
31from werkzeug.http import parse_www_authenticate_header
32from werkzeug.http import quote_etag
33from werkzeug.http import unquote_etag
34from werkzeug.utils import header_property
37def _set_property(name: str, doc: t.Optional[str] = None) -> property:
38 def fget(self: "Response") -> HeaderSet:
39 def on_update(header_set: HeaderSet) -> None:
40 if not header_set and name in self.headers:
41 del self.headers[name]
42 elif header_set:
43 self.headers[name] = header_set.to_header()
45 return parse_set_header(self.headers.get(name), on_update)
47 def fset(
48 self: "Response",
49 value: t.Optional[
50 t.Union[str, t.Dict[str, t.Union[str, int]], t.Iterable[str]]
51 ],
52 ) -> None:
53 if not value:
54 del self.headers[name]
55 elif isinstance(value, str):
56 self.headers[name] = value
57 else:
58 self.headers[name] = dump_header(value)
60 return property(fget, fset, doc=doc)
63class Response:
64 """Represents the non-IO parts of an HTTP response, specifically the
65 status and headers but not the body.
67 This class is not meant for general use. It should only be used when
68 implementing WSGI, ASGI, or another HTTP application spec. Werkzeug
69 provides a WSGI implementation at :cls:`werkzeug.wrappers.Response`.
71 :param status: The status code for the response. Either an int, in
72 which case the default status message is added, or a string in
73 the form ``{code} {message}``, like ``404 Not Found``. Defaults
74 to 200.
75 :param headers: A :class:`~werkzeug.datastructures.Headers` object,
76 or a list of ``(key, value)`` tuples that will be converted to a
77 ``Headers`` object.
78 :param mimetype: The mime type (content type without charset or
79 other parameters) of the response. If the value starts with
80 ``text/`` (or matches some other special cases), the charset
81 will be added to create the ``content_type``.
82 :param content_type: The full content type of the response.
83 Overrides building the value from ``mimetype``.
85 .. versionadded:: 2.0
86 """
88 #: the charset of the response.
89 charset = "utf-8"
91 #: the default status if none is provided.
92 default_status = 200
94 #: the default mimetype if none is provided.
95 default_mimetype = "text/plain"
97 #: Warn if a cookie header exceeds this size. The default, 4093, should be
98 #: safely `supported by most browsers <cookie_>`_. A cookie larger than
99 #: this size will still be sent, but it may be ignored or handled
100 #: incorrectly by some browsers. Set to 0 to disable this check.
101 #:
102 #: .. versionadded:: 0.13
103 #:
104 #: .. _`cookie`: http://browsercookielimits.squawky.net/
105 max_cookie_size = 4093
107 # A :class:`Headers` object representing the response headers.
108 headers: Headers
110 def __init__(
111 self,
112 status: t.Optional[t.Union[int, str, HTTPStatus]] = None,
113 headers: t.Optional[
114 t.Union[
115 t.Mapping[str, t.Union[str, int, t.Iterable[t.Union[str, int]]]],
116 t.Iterable[t.Tuple[str, t.Union[str, int]]],
117 ]
118 ] = None,
119 mimetype: t.Optional[str] = None,
120 content_type: t.Optional[str] = None,
121 ) -> None:
122 if isinstance(headers, Headers):
123 self.headers = headers
124 elif not headers:
125 self.headers = Headers()
126 else:
127 self.headers = Headers(headers)
129 if content_type is None:
130 if mimetype is None and "content-type" not in self.headers:
131 mimetype = self.default_mimetype
132 if mimetype is not None:
133 mimetype = get_content_type(mimetype, self.charset)
134 content_type = mimetype
135 if content_type is not None:
136 self.headers["Content-Type"] = content_type
137 if status is None:
138 status = self.default_status
139 self.status = status # type: ignore
141 def __repr__(self) -> str:
142 return f"<{type(self).__name__} [{self.status}]>"
144 @property
145 def status_code(self) -> int:
146 """The HTTP status code as a number."""
147 return self._status_code
149 @status_code.setter
150 def status_code(self, code: int) -> None:
151 self.status = code # type: ignore
153 @property
154 def status(self) -> str:
155 """The HTTP status code as a string."""
156 return self._status
158 @status.setter
159 def status(self, value: t.Union[str, int, HTTPStatus]) -> None:
160 if not isinstance(value, (str, bytes, int, HTTPStatus)):
161 raise TypeError("Invalid status argument")
163 self._status, self._status_code = self._clean_status(value)
165 def _clean_status(self, value: t.Union[str, int, HTTPStatus]) -> t.Tuple[str, int]:
166 if isinstance(value, HTTPStatus):
167 value = int(value)
168 status = _to_str(value, self.charset)
169 split_status = status.split(None, 1)
171 if len(split_status) == 0:
172 raise ValueError("Empty status argument")
174 if len(split_status) > 1:
175 if split_status[0].isdigit():
176 # code and message
177 return status, int(split_status[0])
179 # multi-word message
180 return f"0 {status}", 0
182 if split_status[0].isdigit():
183 # code only
184 status_code = int(split_status[0])
186 try:
187 status = f"{status_code} {HTTP_STATUS_CODES[status_code].upper()}"
188 except KeyError:
189 status = f"{status_code} UNKNOWN"
191 return status, status_code
193 # one-word message
194 return f"0 {status}", 0
196 def set_cookie(
197 self,
198 key: str,
199 value: str = "",
200 max_age: t.Optional[t.Union[timedelta, int]] = None,
201 expires: t.Optional[t.Union[str, datetime, int, float]] = None,
202 path: t.Optional[str] = "/",
203 domain: t.Optional[str] = None,
204 secure: bool = False,
205 httponly: bool = False,
206 samesite: t.Optional[str] = None,
207 ) -> None:
208 """Sets a cookie.
210 A warning is raised if the size of the cookie header exceeds
211 :attr:`max_cookie_size`, but the header will still be set.
213 :param key: the key (name) of the cookie to be set.
214 :param value: the value of the cookie.
215 :param max_age: should be a number of seconds, or `None` (default) if
216 the cookie should last only as long as the client's
217 browser session.
218 :param expires: should be a `datetime` object or UNIX timestamp.
219 :param path: limits the cookie to a given path, per default it will
220 span the whole domain.
221 :param domain: if you want to set a cross-domain cookie. For example,
222 ``domain=".example.com"`` will set a cookie that is
223 readable by the domain ``www.example.com``,
224 ``foo.example.com`` etc. Otherwise, a cookie will only
225 be readable by the domain that set it.
226 :param secure: If ``True``, the cookie will only be available
227 via HTTPS.
228 :param httponly: Disallow JavaScript access to the cookie.
229 :param samesite: Limit the scope of the cookie to only be
230 attached to requests that are "same-site".
231 """
232 self.headers.add(
233 "Set-Cookie",
234 dump_cookie(
235 key,
236 value=value,
237 max_age=max_age,
238 expires=expires,
239 path=path,
240 domain=domain,
241 secure=secure,
242 httponly=httponly,
243 charset=self.charset,
244 max_size=self.max_cookie_size,
245 samesite=samesite,
246 ),
247 )
249 def delete_cookie(
250 self,
251 key: str,
252 path: str = "/",
253 domain: t.Optional[str] = None,
254 secure: bool = False,
255 httponly: bool = False,
256 samesite: t.Optional[str] = None,
257 ) -> None:
258 """Delete a cookie. Fails silently if key doesn't exist.
260 :param key: the key (name) of the cookie to be deleted.
261 :param path: if the cookie that should be deleted was limited to a
262 path, the path has to be defined here.
263 :param domain: if the cookie that should be deleted was limited to a
264 domain, that domain has to be defined here.
265 :param secure: If ``True``, the cookie will only be available
266 via HTTPS.
267 :param httponly: Disallow JavaScript access to the cookie.
268 :param samesite: Limit the scope of the cookie to only be
269 attached to requests that are "same-site".
270 """
271 self.set_cookie(
272 key,
273 expires=0,
274 max_age=0,
275 path=path,
276 domain=domain,
277 secure=secure,
278 httponly=httponly,
279 samesite=samesite,
280 )
282 @property
283 def is_json(self) -> bool:
284 """Check if the mimetype indicates JSON data, either
285 :mimetype:`application/json` or :mimetype:`application/*+json`.
286 """
287 mt = self.mimetype
288 return mt is not None and (
289 mt == "application/json"
290 or mt.startswith("application/")
291 and mt.endswith("+json")
292 )
294 # Common Descriptors
296 @property
297 def mimetype(self) -> t.Optional[str]:
298 """The mimetype (content type without charset etc.)"""
299 ct = self.headers.get("content-type")
301 if ct:
302 return ct.split(";")[0].strip()
303 else:
304 return None
306 @mimetype.setter
307 def mimetype(self, value: str) -> None:
308 self.headers["Content-Type"] = get_content_type(value, self.charset)
310 @property
311 def mimetype_params(self) -> t.Dict[str, str]:
312 """The mimetype parameters as dict. For example if the
313 content type is ``text/html; charset=utf-8`` the params would be
314 ``{'charset': 'utf-8'}``.
316 .. versionadded:: 0.5
317 """
319 def on_update(d: CallbackDict) -> None:
320 self.headers["Content-Type"] = dump_options_header(self.mimetype, d)
322 d = parse_options_header(self.headers.get("content-type", ""))[1]
323 return CallbackDict(d, on_update)
325 location = header_property[str](
326 "Location",
327 doc="""The Location response-header field is used to redirect
328 the recipient to a location other than the Request-URI for
329 completion of the request or identification of a new
330 resource.""",
331 )
332 age = header_property(
333 "Age",
334 None,
335 parse_age,
336 dump_age, # type: ignore
337 doc="""The Age response-header field conveys the sender's
338 estimate of the amount of time since the response (or its
339 revalidation) was generated at the origin server.
341 Age values are non-negative decimal integers, representing time
342 in seconds.""",
343 )
344 content_type = header_property[str](
345 "Content-Type",
346 doc="""The Content-Type entity-header field indicates the media
347 type of the entity-body sent to the recipient or, in the case of
348 the HEAD method, the media type that would have been sent had
349 the request been a GET.""",
350 )
351 content_length = header_property(
352 "Content-Length",
353 None,
354 int,
355 str,
356 doc="""The Content-Length entity-header field indicates the size
357 of the entity-body, in decimal number of OCTETs, sent to the
358 recipient or, in the case of the HEAD method, the size of the
359 entity-body that would have been sent had the request been a
360 GET.""",
361 )
362 content_location = header_property[str](
363 "Content-Location",
364 doc="""The Content-Location entity-header field MAY be used to
365 supply the resource location for the entity enclosed in the
366 message when that entity is accessible from a location separate
367 from the requested resource's URI.""",
368 )
369 content_encoding = header_property[str](
370 "Content-Encoding",
371 doc="""The Content-Encoding entity-header field is used as a
372 modifier to the media-type. When present, its value indicates
373 what additional content codings have been applied to the
374 entity-body, and thus what decoding mechanisms must be applied
375 in order to obtain the media-type referenced by the Content-Type
376 header field.""",
377 )
378 content_md5 = header_property[str](
379 "Content-MD5",
380 doc="""The Content-MD5 entity-header field, as defined in
381 RFC 1864, is an MD5 digest of the entity-body for the purpose of
382 providing an end-to-end message integrity check (MIC) of the
383 entity-body. (Note: a MIC is good for detecting accidental
384 modification of the entity-body in transit, but is not proof
385 against malicious attacks.)""",
386 )
387 date = header_property(
388 "Date",
389 None,
390 parse_date,
391 http_date,
392 doc="""The Date general-header field represents the date and
393 time at which the message was originated, having the same
394 semantics as orig-date in RFC 822.
396 .. versionchanged:: 2.0
397 The datetime object is timezone-aware.
398 """,
399 )
400 expires = header_property(
401 "Expires",
402 None,
403 parse_date,
404 http_date,
405 doc="""The Expires entity-header field gives the date/time after
406 which the response is considered stale. A stale cache entry may
407 not normally be returned by a cache.
409 .. versionchanged:: 2.0
410 The datetime object is timezone-aware.
411 """,
412 )
413 last_modified = header_property(
414 "Last-Modified",
415 None,
416 parse_date,
417 http_date,
418 doc="""The Last-Modified entity-header field indicates the date
419 and time at which the origin server believes the variant was
420 last modified.
422 .. versionchanged:: 2.0
423 The datetime object is timezone-aware.
424 """,
425 )
427 @property
428 def retry_after(self) -> t.Optional[datetime]:
429 """The Retry-After response-header field can be used with a
430 503 (Service Unavailable) response to indicate how long the
431 service is expected to be unavailable to the requesting client.
433 Time in seconds until expiration or date.
435 .. versionchanged:: 2.0
436 The datetime object is timezone-aware.
437 """
438 value = self.headers.get("retry-after")
439 if value is None:
440 return None
441 elif value.isdigit():
442 return datetime.now(timezone.utc) + timedelta(seconds=int(value))
443 return parse_date(value)
445 @retry_after.setter
446 def retry_after(self, value: t.Optional[t.Union[datetime, int, str]]) -> None:
447 if value is None:
448 if "retry-after" in self.headers:
449 del self.headers["retry-after"]
450 return
451 elif isinstance(value, datetime):
452 value = http_date(value)
453 else:
454 value = str(value)
455 self.headers["Retry-After"] = value
457 vary = _set_property(
458 "Vary",
459 doc="""The Vary field value indicates the set of request-header
460 fields that fully determines, while the response is fresh,
461 whether a cache is permitted to use the response to reply to a
462 subsequent request without revalidation.""",
463 )
464 content_language = _set_property(
465 "Content-Language",
466 doc="""The Content-Language entity-header field describes the
467 natural language(s) of the intended audience for the enclosed
468 entity. Note that this might not be equivalent to all the
469 languages used within the entity-body.""",
470 )
471 allow = _set_property(
472 "Allow",
473 doc="""The Allow entity-header field lists the set of methods
474 supported by the resource identified by the Request-URI. The
475 purpose of this field is strictly to inform the recipient of
476 valid methods associated with the resource. An Allow header
477 field MUST be present in a 405 (Method Not Allowed)
478 response.""",
479 )
481 # ETag
483 @property
484 def cache_control(self) -> ResponseCacheControl:
485 """The Cache-Control general-header field is used to specify
486 directives that MUST be obeyed by all caching mechanisms along the
487 request/response chain.
488 """
490 def on_update(cache_control: ResponseCacheControl) -> None:
491 if not cache_control and "cache-control" in self.headers:
492 del self.headers["cache-control"]
493 elif cache_control:
494 self.headers["Cache-Control"] = cache_control.to_header()
496 return parse_cache_control_header(
497 self.headers.get("cache-control"), on_update, ResponseCacheControl
498 )
500 def set_etag(self, etag: str, weak: bool = False) -> None:
501 """Set the etag, and override the old one if there was one."""
502 self.headers["ETag"] = quote_etag(etag, weak)
504 def get_etag(self) -> t.Union[t.Tuple[str, bool], t.Tuple[None, None]]:
505 """Return a tuple in the form ``(etag, is_weak)``. If there is no
506 ETag the return value is ``(None, None)``.
507 """
508 return unquote_etag(self.headers.get("ETag"))
510 accept_ranges = header_property[str](
511 "Accept-Ranges",
512 doc="""The `Accept-Ranges` header. Even though the name would
513 indicate that multiple values are supported, it must be one
514 string token only.
516 The values ``'bytes'`` and ``'none'`` are common.
518 .. versionadded:: 0.7""",
519 )
521 @property
522 def content_range(self) -> ContentRange:
523 """The ``Content-Range`` header as a
524 :class:`~werkzeug.datastructures.ContentRange` object. Available
525 even if the header is not set.
527 .. versionadded:: 0.7
528 """
530 def on_update(rng: ContentRange) -> None:
531 if not rng:
532 del self.headers["content-range"]
533 else:
534 self.headers["Content-Range"] = rng.to_header()
536 rv = parse_content_range_header(self.headers.get("content-range"), on_update)
537 # always provide a content range object to make the descriptor
538 # more user friendly. It provides an unset() method that can be
539 # used to remove the header quickly.
540 if rv is None:
541 rv = ContentRange(None, None, None, on_update=on_update)
542 return rv
544 @content_range.setter
545 def content_range(self, value: t.Optional[t.Union[ContentRange, str]]) -> None:
546 if not value:
547 del self.headers["content-range"]
548 elif isinstance(value, str):
549 self.headers["Content-Range"] = value
550 else:
551 self.headers["Content-Range"] = value.to_header()
553 # Authorization
555 @property
556 def www_authenticate(self) -> WWWAuthenticate:
557 """The ``WWW-Authenticate`` header in a parsed form."""
559 def on_update(www_auth: WWWAuthenticate) -> None:
560 if not www_auth and "www-authenticate" in self.headers:
561 del self.headers["www-authenticate"]
562 elif www_auth:
563 self.headers["WWW-Authenticate"] = www_auth.to_header()
565 header = self.headers.get("www-authenticate")
566 return parse_www_authenticate_header(header, on_update)
568 # CSP
570 @property
571 def content_security_policy(self) -> ContentSecurityPolicy:
572 """The ``Content-Security-Policy`` header as a
573 :class:`~werkzeug.datastructures.ContentSecurityPolicy` object. Available
574 even if the header is not set.
576 The Content-Security-Policy header adds an additional layer of
577 security to help detect and mitigate certain types of attacks.
578 """
580 def on_update(csp: ContentSecurityPolicy) -> None:
581 if not csp:
582 del self.headers["content-security-policy"]
583 else:
584 self.headers["Content-Security-Policy"] = csp.to_header()
586 rv = parse_csp_header(self.headers.get("content-security-policy"), on_update)
587 if rv is None:
588 rv = ContentSecurityPolicy(None, on_update=on_update)
589 return rv
591 @content_security_policy.setter
592 def content_security_policy(
593 self, value: t.Optional[t.Union[ContentSecurityPolicy, str]]
594 ) -> None:
595 if not value:
596 del self.headers["content-security-policy"]
597 elif isinstance(value, str):
598 self.headers["Content-Security-Policy"] = value
599 else:
600 self.headers["Content-Security-Policy"] = value.to_header()
602 @property
603 def content_security_policy_report_only(self) -> ContentSecurityPolicy:
604 """The ``Content-Security-policy-report-only`` header as a
605 :class:`~werkzeug.datastructures.ContentSecurityPolicy` object. Available
606 even if the header is not set.
608 The Content-Security-Policy-Report-Only header adds a csp policy
609 that is not enforced but is reported thereby helping detect
610 certain types of attacks.
611 """
613 def on_update(csp: ContentSecurityPolicy) -> None:
614 if not csp:
615 del self.headers["content-security-policy-report-only"]
616 else:
617 self.headers["Content-Security-policy-report-only"] = csp.to_header()
619 rv = parse_csp_header(
620 self.headers.get("content-security-policy-report-only"), on_update
621 )
622 if rv is None:
623 rv = ContentSecurityPolicy(None, on_update=on_update)
624 return rv
626 @content_security_policy_report_only.setter
627 def content_security_policy_report_only(
628 self, value: t.Optional[t.Union[ContentSecurityPolicy, str]]
629 ) -> None:
630 if not value:
631 del self.headers["content-security-policy-report-only"]
632 elif isinstance(value, str):
633 self.headers["Content-Security-policy-report-only"] = value
634 else:
635 self.headers["Content-Security-policy-report-only"] = value.to_header()
637 # CORS
639 @property
640 def access_control_allow_credentials(self) -> bool:
641 """Whether credentials can be shared by the browser to
642 JavaScript code. As part of the preflight request it indicates
643 whether credentials can be used on the cross origin request.
644 """
645 return "Access-Control-Allow-Credentials" in self.headers
647 @access_control_allow_credentials.setter
648 def access_control_allow_credentials(self, value: t.Optional[bool]) -> None:
649 if value is True:
650 self.headers["Access-Control-Allow-Credentials"] = "true"
651 else:
652 self.headers.pop("Access-Control-Allow-Credentials", None)
654 access_control_allow_headers = header_property(
655 "Access-Control-Allow-Headers",
656 load_func=parse_set_header,
657 dump_func=dump_header,
658 doc="Which headers can be sent with the cross origin request.",
659 )
661 access_control_allow_methods = header_property(
662 "Access-Control-Allow-Methods",
663 load_func=parse_set_header,
664 dump_func=dump_header,
665 doc="Which methods can be used for the cross origin request.",
666 )
668 access_control_allow_origin = header_property[str](
669 "Access-Control-Allow-Origin",
670 doc="The origin or '*' for any origin that may make cross origin requests.",
671 )
673 access_control_expose_headers = header_property(
674 "Access-Control-Expose-Headers",
675 load_func=parse_set_header,
676 dump_func=dump_header,
677 doc="Which headers can be shared by the browser to JavaScript code.",
678 )
680 access_control_max_age = header_property(
681 "Access-Control-Max-Age",
682 load_func=int,
683 dump_func=str,
684 doc="The maximum age in seconds the access control settings can be cached for.",
685 )
687 cross_origin_opener_policy = header_property[COOP](
688 "Cross-Origin-Opener-Policy",
689 load_func=lambda value: COOP(value),
690 dump_func=lambda value: value.value,
691 default=COOP.UNSAFE_NONE,
692 doc="""Allows control over sharing of browsing context group with cross-origin
693 documents. Values must be a member of the :class:`werkzeug.http.COOP` enum.""",
694 )
696 cross_origin_embedder_policy = header_property[COEP](
697 "Cross-Origin-Embedder-Policy",
698 load_func=lambda value: COEP(value),
699 dump_func=lambda value: value.value,
700 default=COEP.UNSAFE_NONE,
701 doc="""Prevents a document from loading any cross-origin resources that do not
702 explicitly grant the document permission. Values must be a member of the
703 :class:`werkzeug.http.COEP` enum.""",
704 )