1from __future__ import annotations
2
3import typing as t
4from datetime import datetime
5from datetime import timedelta
6from datetime import timezone
7from http import HTTPStatus
8
9from ..datastructures import CallbackDict
10from ..datastructures import ContentRange
11from ..datastructures import ContentSecurityPolicy
12from ..datastructures import Headers
13from ..datastructures import HeaderSet
14from ..datastructures import ResponseCacheControl
15from ..datastructures import WWWAuthenticate
16from ..http import COEP
17from ..http import COOP
18from ..http import dump_age
19from ..http import dump_cookie
20from ..http import dump_header
21from ..http import dump_options_header
22from ..http import http_date
23from ..http import HTTP_STATUS_CODES
24from ..http import parse_age
25from ..http import parse_cache_control_header
26from ..http import parse_content_range_header
27from ..http import parse_csp_header
28from ..http import parse_date
29from ..http import parse_options_header
30from ..http import parse_set_header
31from ..http import quote_etag
32from ..http import unquote_etag
33from ..utils import get_content_type
34from ..utils import header_property
35
36if t.TYPE_CHECKING:
37 from ..datastructures.cache_control import _CacheControl
38
39
40def _set_property(name: str, doc: str | None = None) -> property:
41 def fget(self: Response) -> HeaderSet:
42 def on_update(header_set: HeaderSet) -> None:
43 if not header_set and name in self.headers:
44 del self.headers[name]
45 elif header_set:
46 self.headers[name] = header_set.to_header()
47
48 return parse_set_header(self.headers.get(name), on_update)
49
50 def fset(
51 self: Response,
52 value: None | (str | dict[str, str | int] | t.Iterable[str]),
53 ) -> None:
54 if not value:
55 del self.headers[name]
56 elif isinstance(value, str):
57 self.headers[name] = value
58 else:
59 self.headers[name] = dump_header(value)
60
61 return property(fget, fset, doc=doc)
62
63
64class Response:
65 """Represents the non-IO parts of an HTTP response, specifically the
66 status and headers but not the body.
67
68 This class is not meant for general use. It should only be used when
69 implementing WSGI, ASGI, or another HTTP application spec. Werkzeug
70 provides a WSGI implementation at :cls:`werkzeug.wrappers.Response`.
71
72 :param status: The status code for the response. Either an int, in
73 which case the default status message is added, or a string in
74 the form ``{code} {message}``, like ``404 Not Found``. Defaults
75 to 200.
76 :param headers: A :class:`~werkzeug.datastructures.Headers` object,
77 or a list of ``(key, value)`` tuples that will be converted to a
78 ``Headers`` object.
79 :param mimetype: The mime type (content type without charset or
80 other parameters) of the response. If the value starts with
81 ``text/`` (or matches some other special cases), the charset
82 will be added to create the ``content_type``.
83 :param content_type: The full content type of the response.
84 Overrides building the value from ``mimetype``.
85
86 .. versionchanged:: 3.0
87 The ``charset`` attribute was removed.
88
89 .. versionadded:: 2.0
90 """
91
92 #: the default status if none is provided.
93 default_status = 200
94
95 #: the default mimetype if none is provided.
96 default_mimetype: str | None = "text/plain"
97
98 #: Warn if a cookie header exceeds this size. The default, 4093, should be
99 #: safely `supported by most browsers <cookie_>`_. A cookie larger than
100 #: this size will still be sent, but it may be ignored or handled
101 #: incorrectly by some browsers. Set to 0 to disable this check.
102 #:
103 #: .. versionadded:: 0.13
104 #:
105 #: .. _`cookie`: http://browsercookielimits.squawky.net/
106 max_cookie_size = 4093
107
108 # A :class:`Headers` object representing the response headers.
109 headers: Headers
110
111 def __init__(
112 self,
113 status: int | str | HTTPStatus | None = None,
114 headers: t.Mapping[str, str | t.Iterable[str]]
115 | t.Iterable[tuple[str, str]]
116 | None = None,
117 mimetype: str | None = None,
118 content_type: str | None = None,
119 ) -> None:
120 if isinstance(headers, Headers):
121 self.headers = headers
122 elif not headers:
123 self.headers = Headers()
124 else:
125 self.headers = Headers(headers)
126
127 if content_type is None:
128 if mimetype is None and "content-type" not in self.headers:
129 mimetype = self.default_mimetype
130 if mimetype is not None:
131 mimetype = get_content_type(mimetype, "utf-8")
132 content_type = mimetype
133 if content_type is not None:
134 self.headers["Content-Type"] = content_type
135 if status is None:
136 status = self.default_status
137 self.status = status # type: ignore
138
139 def __repr__(self) -> str:
140 return f"<{type(self).__name__} [{self.status}]>"
141
142 @property
143 def status_code(self) -> int:
144 """The HTTP status code as a number."""
145 return self._status_code
146
147 @status_code.setter
148 def status_code(self, code: int) -> None:
149 self.status = code # type: ignore
150
151 @property
152 def status(self) -> str:
153 """The HTTP status code as a string."""
154 return self._status
155
156 @status.setter
157 def status(self, value: str | int | HTTPStatus) -> None:
158 self._status, self._status_code = self._clean_status(value)
159
160 def _clean_status(self, value: str | int | HTTPStatus) -> tuple[str, int]:
161 if isinstance(value, (int, HTTPStatus)):
162 status_code = int(value)
163 else:
164 value = value.strip()
165
166 if not value:
167 raise ValueError("Empty status argument")
168
169 code_str, sep, _ = value.partition(" ")
170
171 try:
172 status_code = int(code_str)
173 except ValueError:
174 # only message
175 return f"0 {value}", 0
176
177 if sep:
178 # code and message
179 return value, status_code
180
181 # only code, look up message
182 try:
183 status = f"{status_code} {HTTP_STATUS_CODES[status_code].upper()}"
184 except KeyError:
185 status = f"{status_code} UNKNOWN"
186
187 return status, status_code
188
189 def set_cookie(
190 self,
191 key: str,
192 value: str = "",
193 max_age: timedelta | int | None = None,
194 expires: str | datetime | int | float | None = None,
195 path: str | None = "/",
196 domain: str | None = None,
197 secure: bool = False,
198 httponly: bool = False,
199 samesite: str | None = None,
200 ) -> None:
201 """Sets a cookie.
202
203 A warning is raised if the size of the cookie header exceeds
204 :attr:`max_cookie_size`, but the header will still be set.
205
206 :param key: the key (name) of the cookie to be set.
207 :param value: the value of the cookie.
208 :param max_age: should be a number of seconds, or `None` (default) if
209 the cookie should last only as long as the client's
210 browser session.
211 :param expires: should be a `datetime` object or UNIX timestamp.
212 :param path: limits the cookie to a given path, per default it will
213 span the whole domain.
214 :param domain: if you want to set a cross-domain cookie. For example,
215 ``domain="example.com"`` will set a cookie that is
216 readable by the domain ``www.example.com``,
217 ``foo.example.com`` etc. Otherwise, a cookie will only
218 be readable by the domain that set it.
219 :param secure: If ``True``, the cookie will only be available
220 via HTTPS.
221 :param httponly: Disallow JavaScript access to the cookie.
222 :param samesite: Limit the scope of the cookie to only be
223 attached to requests that are "same-site".
224 """
225 self.headers.add(
226 "Set-Cookie",
227 dump_cookie(
228 key,
229 value=value,
230 max_age=max_age,
231 expires=expires,
232 path=path,
233 domain=domain,
234 secure=secure,
235 httponly=httponly,
236 max_size=self.max_cookie_size,
237 samesite=samesite,
238 ),
239 )
240
241 def delete_cookie(
242 self,
243 key: str,
244 path: str | None = "/",
245 domain: str | None = None,
246 secure: bool = False,
247 httponly: bool = False,
248 samesite: str | None = None,
249 ) -> None:
250 """Delete a cookie. Fails silently if key doesn't exist.
251
252 :param key: the key (name) of the cookie to be deleted.
253 :param path: if the cookie that should be deleted was limited to a
254 path, the path has to be defined here.
255 :param domain: if the cookie that should be deleted was limited to a
256 domain, that domain has to be defined here.
257 :param secure: If ``True``, the cookie will only be available
258 via HTTPS.
259 :param httponly: Disallow JavaScript access to the cookie.
260 :param samesite: Limit the scope of the cookie to only be
261 attached to requests that are "same-site".
262 """
263 self.set_cookie(
264 key,
265 expires=0,
266 max_age=0,
267 path=path,
268 domain=domain,
269 secure=secure,
270 httponly=httponly,
271 samesite=samesite,
272 )
273
274 @property
275 def is_json(self) -> bool:
276 """Check if the mimetype indicates JSON data, either
277 :mimetype:`application/json` or :mimetype:`application/*+json`.
278 """
279 mt = self.mimetype
280 return mt is not None and (
281 mt == "application/json"
282 or mt.startswith("application/")
283 and mt.endswith("+json")
284 )
285
286 # Common Descriptors
287
288 @property
289 def mimetype(self) -> str | None:
290 """The mimetype (content type without charset etc.)"""
291 ct = self.headers.get("content-type")
292
293 if ct:
294 return ct.split(";")[0].strip()
295 else:
296 return None
297
298 @mimetype.setter
299 def mimetype(self, value: str) -> None:
300 self.headers["Content-Type"] = get_content_type(value, "utf-8")
301
302 @property
303 def mimetype_params(self) -> dict[str, str]:
304 """The mimetype parameters as dict. For example if the
305 content type is ``text/html; charset=utf-8`` the params would be
306 ``{'charset': 'utf-8'}``.
307
308 .. versionadded:: 0.5
309 """
310
311 def on_update(d: CallbackDict[str, str]) -> None:
312 self.headers["Content-Type"] = dump_options_header(self.mimetype, d)
313
314 d = parse_options_header(self.headers.get("content-type", ""))[1]
315 return CallbackDict(d, on_update)
316
317 location = header_property[str](
318 "Location",
319 doc="""The Location response-header field is used to redirect
320 the recipient to a location other than the Request-URI for
321 completion of the request or identification of a new
322 resource.""",
323 )
324 age = header_property(
325 "Age",
326 None,
327 parse_age,
328 dump_age, # type: ignore
329 doc="""The Age response-header field conveys the sender's
330 estimate of the amount of time since the response (or its
331 revalidation) was generated at the origin server.
332
333 Age values are non-negative decimal integers, representing time
334 in seconds.""",
335 )
336 content_type = header_property[str](
337 "Content-Type",
338 doc="""The Content-Type entity-header field indicates the media
339 type of the entity-body sent to the recipient or, in the case of
340 the HEAD method, the media type that would have been sent had
341 the request been a GET.""",
342 )
343 content_length = header_property(
344 "Content-Length",
345 None,
346 int,
347 str,
348 doc="""The Content-Length entity-header field indicates the size
349 of the entity-body, in decimal number of OCTETs, sent to the
350 recipient or, in the case of the HEAD method, the size of the
351 entity-body that would have been sent had the request been a
352 GET.""",
353 )
354 content_location = header_property[str](
355 "Content-Location",
356 doc="""The Content-Location entity-header field MAY be used to
357 supply the resource location for the entity enclosed in the
358 message when that entity is accessible from a location separate
359 from the requested resource's URI.""",
360 )
361 content_encoding = header_property[str](
362 "Content-Encoding",
363 doc="""The Content-Encoding entity-header field is used as a
364 modifier to the media-type. When present, its value indicates
365 what additional content codings have been applied to the
366 entity-body, and thus what decoding mechanisms must be applied
367 in order to obtain the media-type referenced by the Content-Type
368 header field.""",
369 )
370 content_md5 = header_property[str](
371 "Content-MD5",
372 doc="""The Content-MD5 entity-header field, as defined in
373 RFC 1864, is an MD5 digest of the entity-body for the purpose of
374 providing an end-to-end message integrity check (MIC) of the
375 entity-body. (Note: a MIC is good for detecting accidental
376 modification of the entity-body in transit, but is not proof
377 against malicious attacks.)""",
378 )
379 date = header_property(
380 "Date",
381 None,
382 parse_date,
383 http_date,
384 doc="""The Date general-header field represents the date and
385 time at which the message was originated, having the same
386 semantics as orig-date in RFC 822.
387
388 .. versionchanged:: 2.0
389 The datetime object is timezone-aware.
390 """,
391 )
392 expires = header_property(
393 "Expires",
394 None,
395 parse_date,
396 http_date,
397 doc="""The Expires entity-header field gives the date/time after
398 which the response is considered stale. A stale cache entry may
399 not normally be returned by a cache.
400
401 .. versionchanged:: 2.0
402 The datetime object is timezone-aware.
403 """,
404 )
405 last_modified = header_property(
406 "Last-Modified",
407 None,
408 parse_date,
409 http_date,
410 doc="""The Last-Modified entity-header field indicates the date
411 and time at which the origin server believes the variant was
412 last modified.
413
414 .. versionchanged:: 2.0
415 The datetime object is timezone-aware.
416 """,
417 )
418
419 @property
420 def retry_after(self) -> datetime | None:
421 """The Retry-After response-header field can be used with a
422 503 (Service Unavailable) response to indicate how long the
423 service is expected to be unavailable to the requesting client.
424
425 Time in seconds until expiration or date.
426
427 .. versionchanged:: 2.0
428 The datetime object is timezone-aware.
429 """
430 value = self.headers.get("retry-after")
431 if value is None:
432 return None
433
434 try:
435 seconds = int(value)
436 except ValueError:
437 return parse_date(value)
438
439 return datetime.now(timezone.utc) + timedelta(seconds=seconds)
440
441 @retry_after.setter
442 def retry_after(self, value: datetime | int | str | None) -> None:
443 if value is None:
444 if "retry-after" in self.headers:
445 del self.headers["retry-after"]
446 return
447 elif isinstance(value, datetime):
448 value = http_date(value)
449 else:
450 value = str(value)
451 self.headers["Retry-After"] = value
452
453 vary = _set_property(
454 "Vary",
455 doc="""The Vary field value indicates the set of request-header
456 fields that fully determines, while the response is fresh,
457 whether a cache is permitted to use the response to reply to a
458 subsequent request without revalidation.""",
459 )
460 content_language = _set_property(
461 "Content-Language",
462 doc="""The Content-Language entity-header field describes the
463 natural language(s) of the intended audience for the enclosed
464 entity. Note that this might not be equivalent to all the
465 languages used within the entity-body.""",
466 )
467 allow = _set_property(
468 "Allow",
469 doc="""The Allow entity-header field lists the set of methods
470 supported by the resource identified by the Request-URI. The
471 purpose of this field is strictly to inform the recipient of
472 valid methods associated with the resource. An Allow header
473 field MUST be present in a 405 (Method Not Allowed)
474 response.""",
475 )
476
477 # ETag
478
479 @property
480 def cache_control(self) -> ResponseCacheControl:
481 """The Cache-Control general-header field is used to specify
482 directives that MUST be obeyed by all caching mechanisms along the
483 request/response chain.
484 """
485
486 def on_update(cache_control: _CacheControl) -> None:
487 if not cache_control and "cache-control" in self.headers:
488 del self.headers["cache-control"]
489 elif cache_control:
490 self.headers["Cache-Control"] = cache_control.to_header()
491
492 return parse_cache_control_header(
493 self.headers.get("cache-control"), on_update, ResponseCacheControl
494 )
495
496 def set_etag(self, etag: str, weak: bool = False) -> None:
497 """Set the etag, and override the old one if there was one."""
498 self.headers["ETag"] = quote_etag(etag, weak)
499
500 def get_etag(self) -> tuple[str, bool] | tuple[None, None]:
501 """Return a tuple in the form ``(etag, is_weak)``. If there is no
502 ETag the return value is ``(None, None)``.
503 """
504 return unquote_etag(self.headers.get("ETag"))
505
506 accept_ranges = header_property[str](
507 "Accept-Ranges",
508 doc="""The `Accept-Ranges` header. Even though the name would
509 indicate that multiple values are supported, it must be one
510 string token only.
511
512 The values ``'bytes'`` and ``'none'`` are common.
513
514 .. versionadded:: 0.7""",
515 )
516
517 @property
518 def content_range(self) -> ContentRange:
519 """The ``Content-Range`` header as a
520 :class:`~werkzeug.datastructures.ContentRange` object. Available
521 even if the header is not set.
522
523 .. versionadded:: 0.7
524 """
525
526 def on_update(rng: ContentRange) -> None:
527 if not rng:
528 del self.headers["content-range"]
529 else:
530 self.headers["Content-Range"] = rng.to_header()
531
532 rv = parse_content_range_header(self.headers.get("content-range"), on_update)
533 # always provide a content range object to make the descriptor
534 # more user friendly. It provides an unset() method that can be
535 # used to remove the header quickly.
536 if rv is None:
537 rv = ContentRange(None, None, None, on_update=on_update)
538 return rv
539
540 @content_range.setter
541 def content_range(self, value: ContentRange | str | None) -> None:
542 if not value:
543 del self.headers["content-range"]
544 elif isinstance(value, str):
545 self.headers["Content-Range"] = value
546 else:
547 self.headers["Content-Range"] = value.to_header()
548
549 # Authorization
550
551 @property
552 def www_authenticate(self) -> WWWAuthenticate:
553 """The ``WWW-Authenticate`` header parsed into a :class:`.WWWAuthenticate`
554 object. Modifying the object will modify the header value.
555
556 This header is not set by default. To set this header, assign an instance of
557 :class:`.WWWAuthenticate` to this attribute.
558
559 .. code-block:: python
560
561 response.www_authenticate = WWWAuthenticate(
562 "basic", {"realm": "Authentication Required"}
563 )
564
565 Multiple values for this header can be sent to give the client multiple options.
566 Assign a list to set multiple headers. However, modifying the items in the list
567 will not automatically update the header values, and accessing this attribute
568 will only ever return the first value.
569
570 To unset this header, assign ``None`` or use ``del``.
571
572 .. versionchanged:: 2.3
573 This attribute can be assigned to to set the header. A list can be assigned
574 to set multiple header values. Use ``del`` to unset the header.
575
576 .. versionchanged:: 2.3
577 :class:`WWWAuthenticate` is no longer a ``dict``. The ``token`` attribute
578 was added for auth challenges that use a token instead of parameters.
579 """
580 value = WWWAuthenticate.from_header(self.headers.get("WWW-Authenticate"))
581
582 if value is None:
583 value = WWWAuthenticate("basic")
584
585 def on_update(value: WWWAuthenticate) -> None:
586 self.www_authenticate = value
587
588 value._on_update = on_update
589 return value
590
591 @www_authenticate.setter
592 def www_authenticate(
593 self, value: WWWAuthenticate | list[WWWAuthenticate] | None
594 ) -> None:
595 if not value: # None or empty list
596 del self.www_authenticate
597 elif isinstance(value, list):
598 # Clear any existing header by setting the first item.
599 self.headers.set("WWW-Authenticate", value[0].to_header())
600
601 for item in value[1:]:
602 # Add additional header lines for additional items.
603 self.headers.add("WWW-Authenticate", item.to_header())
604 else:
605 self.headers.set("WWW-Authenticate", value.to_header())
606
607 def on_update(value: WWWAuthenticate) -> None:
608 self.www_authenticate = value
609
610 # When setting a single value, allow updating it directly.
611 value._on_update = on_update
612
613 @www_authenticate.deleter
614 def www_authenticate(self) -> None:
615 if "WWW-Authenticate" in self.headers:
616 del self.headers["WWW-Authenticate"]
617
618 # CSP
619
620 @property
621 def content_security_policy(self) -> ContentSecurityPolicy:
622 """The ``Content-Security-Policy`` header as a
623 :class:`~werkzeug.datastructures.ContentSecurityPolicy` object. Available
624 even if the header is not set.
625
626 The Content-Security-Policy header adds an additional layer of
627 security to help detect and mitigate certain types of attacks.
628 """
629
630 def on_update(csp: ContentSecurityPolicy) -> None:
631 if not csp:
632 del self.headers["content-security-policy"]
633 else:
634 self.headers["Content-Security-Policy"] = csp.to_header()
635
636 rv = parse_csp_header(self.headers.get("content-security-policy"), on_update)
637 if rv is None:
638 rv = ContentSecurityPolicy(None, on_update=on_update)
639 return rv
640
641 @content_security_policy.setter
642 def content_security_policy(
643 self, value: ContentSecurityPolicy | str | None
644 ) -> None:
645 if not value:
646 del self.headers["content-security-policy"]
647 elif isinstance(value, str):
648 self.headers["Content-Security-Policy"] = value
649 else:
650 self.headers["Content-Security-Policy"] = value.to_header()
651
652 @property
653 def content_security_policy_report_only(self) -> ContentSecurityPolicy:
654 """The ``Content-Security-policy-report-only`` header as a
655 :class:`~werkzeug.datastructures.ContentSecurityPolicy` object. Available
656 even if the header is not set.
657
658 The Content-Security-Policy-Report-Only header adds a csp policy
659 that is not enforced but is reported thereby helping detect
660 certain types of attacks.
661 """
662
663 def on_update(csp: ContentSecurityPolicy) -> None:
664 if not csp:
665 del self.headers["content-security-policy-report-only"]
666 else:
667 self.headers["Content-Security-policy-report-only"] = csp.to_header()
668
669 rv = parse_csp_header(
670 self.headers.get("content-security-policy-report-only"), on_update
671 )
672 if rv is None:
673 rv = ContentSecurityPolicy(None, on_update=on_update)
674 return rv
675
676 @content_security_policy_report_only.setter
677 def content_security_policy_report_only(
678 self, value: ContentSecurityPolicy | str | None
679 ) -> None:
680 if not value:
681 del self.headers["content-security-policy-report-only"]
682 elif isinstance(value, str):
683 self.headers["Content-Security-policy-report-only"] = value
684 else:
685 self.headers["Content-Security-policy-report-only"] = value.to_header()
686
687 # CORS
688
689 @property
690 def access_control_allow_credentials(self) -> bool:
691 """Whether credentials can be shared by the browser to
692 JavaScript code. As part of the preflight request it indicates
693 whether credentials can be used on the cross origin request.
694 """
695 return "Access-Control-Allow-Credentials" in self.headers
696
697 @access_control_allow_credentials.setter
698 def access_control_allow_credentials(self, value: bool | None) -> None:
699 if value is True:
700 self.headers["Access-Control-Allow-Credentials"] = "true"
701 else:
702 self.headers.pop("Access-Control-Allow-Credentials", None)
703
704 access_control_allow_headers = header_property(
705 "Access-Control-Allow-Headers",
706 load_func=parse_set_header,
707 dump_func=dump_header,
708 doc="Which headers can be sent with the cross origin request.",
709 )
710
711 access_control_allow_methods = header_property(
712 "Access-Control-Allow-Methods",
713 load_func=parse_set_header,
714 dump_func=dump_header,
715 doc="Which methods can be used for the cross origin request.",
716 )
717
718 access_control_allow_origin = header_property[str](
719 "Access-Control-Allow-Origin",
720 doc="The origin or '*' for any origin that may make cross origin requests.",
721 )
722
723 access_control_expose_headers = header_property(
724 "Access-Control-Expose-Headers",
725 load_func=parse_set_header,
726 dump_func=dump_header,
727 doc="Which headers can be shared by the browser to JavaScript code.",
728 )
729
730 access_control_max_age = header_property(
731 "Access-Control-Max-Age",
732 load_func=int,
733 dump_func=str,
734 doc="The maximum age in seconds the access control settings can be cached for.",
735 )
736
737 cross_origin_opener_policy = header_property[COOP](
738 "Cross-Origin-Opener-Policy",
739 load_func=lambda value: COOP(value),
740 dump_func=lambda value: value.value,
741 default=COOP.UNSAFE_NONE,
742 doc="""Allows control over sharing of browsing context group with cross-origin
743 documents. Values must be a member of the :class:`werkzeug.http.COOP` enum.""",
744 )
745
746 cross_origin_embedder_policy = header_property[COEP](
747 "Cross-Origin-Embedder-Policy",
748 load_func=lambda value: COEP(value),
749 dump_func=lambda value: value.value,
750 default=COEP.UNSAFE_NONE,
751 doc="""Prevents a document from loading any cross-origin resources that do not
752 explicitly grant the document permission. Values must be a member of the
753 :class:`werkzeug.http.COEP` enum.""",
754 )