1from __future__ import annotations
2
3import typing as t
4from datetime import datetime
5from datetime import timedelta
6from datetime import timezone
7from http import HTTPStatus
8
9from ..datastructures import CallbackDict
10from ..datastructures import ContentRange
11from ..datastructures import ContentSecurityPolicy
12from ..datastructures import Headers
13from ..datastructures import HeaderSet
14from ..datastructures import ResponseCacheControl
15from ..datastructures import WWWAuthenticate
16from ..http import COEP
17from ..http import COOP
18from ..http import dump_age
19from ..http import dump_cookie
20from ..http import dump_header
21from ..http import dump_options_header
22from ..http import http_date
23from ..http import HTTP_STATUS_CODES
24from ..http import parse_age
25from ..http import parse_cache_control_header
26from ..http import parse_content_range_header
27from ..http import parse_csp_header
28from ..http import parse_date
29from ..http import parse_options_header
30from ..http import parse_set_header
31from ..http import quote_etag
32from ..http import unquote_etag
33from ..utils import get_content_type
34from ..utils import header_property
35
36if t.TYPE_CHECKING:
37 from ..datastructures.cache_control import _CacheControl
38
39
40def _set_property(name: str, doc: str | None = None) -> property:
41 def fget(self: Response) -> HeaderSet:
42 def on_update(header_set: HeaderSet) -> None:
43 if not header_set and name in self.headers:
44 del self.headers[name]
45 elif header_set:
46 self.headers[name] = header_set.to_header()
47
48 return parse_set_header(self.headers.get(name), on_update)
49
50 def fset(
51 self: Response,
52 value: None | (str | dict[str, str | int] | t.Iterable[str]),
53 ) -> None:
54 if not value:
55 del self.headers[name]
56 elif isinstance(value, str):
57 self.headers[name] = value
58 else:
59 self.headers[name] = dump_header(value)
60
61 return property(fget, fset, doc=doc)
62
63
64class Response:
65 """Represents the non-IO parts of an HTTP response, specifically the
66 status and headers but not the body.
67
68 This class is not meant for general use. It should only be used when
69 implementing WSGI, ASGI, or another HTTP application spec. Werkzeug
70 provides a WSGI implementation at :cls:`werkzeug.wrappers.Response`.
71
72 :param status: The status code for the response. Either an int, in
73 which case the default status message is added, or a string in
74 the form ``{code} {message}``, like ``404 Not Found``. Defaults
75 to 200.
76 :param headers: A :class:`~werkzeug.datastructures.Headers` object,
77 or a list of ``(key, value)`` tuples that will be converted to a
78 ``Headers`` object.
79 :param mimetype: The mime type (content type without charset or
80 other parameters) of the response. If the value starts with
81 ``text/`` (or matches some other special cases), the charset
82 will be added to create the ``content_type``.
83 :param content_type: The full content type of the response.
84 Overrides building the value from ``mimetype``.
85
86 .. versionchanged:: 3.0
87 The ``charset`` attribute was removed.
88
89 .. versionadded:: 2.0
90 """
91
92 #: the default status if none is provided.
93 default_status = 200
94
95 #: the default mimetype if none is provided.
96 default_mimetype: str | None = "text/plain"
97
98 #: Warn if a cookie header exceeds this size. The default, 4093, should be
99 #: safely `supported by most browsers <cookie_>`_. A cookie larger than
100 #: this size will still be sent, but it may be ignored or handled
101 #: incorrectly by some browsers. Set to 0 to disable this check.
102 #:
103 #: .. versionadded:: 0.13
104 #:
105 #: .. _`cookie`: http://browsercookielimits.squawky.net/
106 max_cookie_size = 4093
107
108 # A :class:`Headers` object representing the response headers.
109 headers: Headers
110
111 def __init__(
112 self,
113 status: int | str | HTTPStatus | None = None,
114 headers: t.Mapping[str, str | t.Iterable[str]]
115 | t.Iterable[tuple[str, str]]
116 | None = None,
117 mimetype: str | None = None,
118 content_type: str | None = None,
119 ) -> None:
120 if isinstance(headers, Headers):
121 self.headers = headers
122 elif not headers:
123 self.headers = Headers()
124 else:
125 self.headers = Headers(headers)
126
127 if content_type is None:
128 if mimetype is None and "content-type" not in self.headers:
129 mimetype = self.default_mimetype
130 if mimetype is not None:
131 mimetype = get_content_type(mimetype, "utf-8")
132 content_type = mimetype
133 if content_type is not None:
134 self.headers["Content-Type"] = content_type
135 if status is None:
136 status = self.default_status
137 self.status = status # type: ignore
138
139 def __repr__(self) -> str:
140 return f"<{type(self).__name__} [{self.status}]>"
141
142 @property
143 def status_code(self) -> int:
144 """The HTTP status code as a number."""
145 return self._status_code
146
147 @status_code.setter
148 def status_code(self, code: int) -> None:
149 self.status = code # type: ignore
150
151 @property
152 def status(self) -> str:
153 """The HTTP status code as a string."""
154 return self._status
155
156 @status.setter
157 def status(self, value: str | int | HTTPStatus) -> None:
158 self._status, self._status_code = self._clean_status(value)
159
160 def _clean_status(self, value: str | int | HTTPStatus) -> tuple[str, int]:
161 if isinstance(value, (int, HTTPStatus)):
162 status_code = int(value)
163 else:
164 value = value.strip()
165
166 if not value:
167 raise ValueError("Empty status argument")
168
169 code_str, sep, _ = value.partition(" ")
170
171 try:
172 status_code = int(code_str)
173 except ValueError:
174 # only message
175 return f"0 {value}", 0
176
177 if sep:
178 # code and message
179 return value, status_code
180
181 # only code, look up message
182 try:
183 status = f"{status_code} {HTTP_STATUS_CODES[status_code].upper()}"
184 except KeyError:
185 status = f"{status_code} UNKNOWN"
186
187 return status, status_code
188
189 def set_cookie(
190 self,
191 key: str,
192 value: str = "",
193 max_age: timedelta | int | None = None,
194 expires: str | datetime | int | float | None = None,
195 path: str | None = "/",
196 domain: str | None = None,
197 secure: bool = False,
198 httponly: bool = False,
199 samesite: str | None = None,
200 partitioned: bool = False,
201 ) -> None:
202 """Sets a cookie.
203
204 A warning is raised if the size of the cookie header exceeds
205 :attr:`max_cookie_size`, but the header will still be set.
206
207 :param key: the key (name) of the cookie to be set.
208 :param value: the value of the cookie.
209 :param max_age: should be a number of seconds, or `None` (default) if
210 the cookie should last only as long as the client's
211 browser session.
212 :param expires: should be a `datetime` object or UNIX timestamp.
213 :param path: limits the cookie to a given path, per default it will
214 span the whole domain.
215 :param domain: if you want to set a cross-domain cookie. For example,
216 ``domain="example.com"`` will set a cookie that is
217 readable by the domain ``www.example.com``,
218 ``foo.example.com`` etc. Otherwise, a cookie will only
219 be readable by the domain that set it.
220 :param secure: If ``True``, the cookie will only be available
221 via HTTPS.
222 :param httponly: Disallow JavaScript access to the cookie.
223 :param samesite: Limit the scope of the cookie to only be
224 attached to requests that are "same-site".
225 :param partitioned: If ``True``, the cookie will be partitioned.
226
227 .. versionchanged:: 3.1
228 The ``partitioned`` parameter was added.
229 """
230 self.headers.add(
231 "Set-Cookie",
232 dump_cookie(
233 key,
234 value=value,
235 max_age=max_age,
236 expires=expires,
237 path=path,
238 domain=domain,
239 secure=secure,
240 httponly=httponly,
241 max_size=self.max_cookie_size,
242 samesite=samesite,
243 partitioned=partitioned,
244 ),
245 )
246
247 def delete_cookie(
248 self,
249 key: str,
250 path: str | None = "/",
251 domain: str | None = None,
252 secure: bool = False,
253 httponly: bool = False,
254 samesite: str | None = None,
255 partitioned: bool = False,
256 ) -> None:
257 """Delete a cookie. Fails silently if key doesn't exist.
258
259 :param key: the key (name) of the cookie to be deleted.
260 :param path: if the cookie that should be deleted was limited to a
261 path, the path has to be defined here.
262 :param domain: if the cookie that should be deleted was limited to a
263 domain, that domain has to be defined here.
264 :param secure: If ``True``, the cookie will only be available
265 via HTTPS.
266 :param httponly: Disallow JavaScript access to the cookie.
267 :param samesite: Limit the scope of the cookie to only be
268 attached to requests that are "same-site".
269 :param partitioned: If ``True``, the cookie will be partitioned.
270 """
271 self.set_cookie(
272 key,
273 expires=0,
274 max_age=0,
275 path=path,
276 domain=domain,
277 secure=secure,
278 httponly=httponly,
279 samesite=samesite,
280 partitioned=partitioned,
281 )
282
283 @property
284 def is_json(self) -> bool:
285 """Check if the mimetype indicates JSON data, either
286 :mimetype:`application/json` or :mimetype:`application/*+json`.
287 """
288 mt = self.mimetype
289 return mt is not None and (
290 mt == "application/json"
291 or mt.startswith("application/")
292 and mt.endswith("+json")
293 )
294
295 # Common Descriptors
296
297 @property
298 def mimetype(self) -> str | None:
299 """The mimetype (content type without charset etc.)"""
300 ct = self.headers.get("content-type")
301
302 if ct:
303 return ct.split(";")[0].strip()
304 else:
305 return None
306
307 @mimetype.setter
308 def mimetype(self, value: str) -> None:
309 self.headers["Content-Type"] = get_content_type(value, "utf-8")
310
311 @property
312 def mimetype_params(self) -> dict[str, str]:
313 """The mimetype parameters as dict. For example if the
314 content type is ``text/html; charset=utf-8`` the params would be
315 ``{'charset': 'utf-8'}``.
316
317 .. versionadded:: 0.5
318 """
319
320 def on_update(d: CallbackDict[str, str]) -> None:
321 self.headers["Content-Type"] = dump_options_header(self.mimetype, d)
322
323 d = parse_options_header(self.headers.get("content-type", ""))[1]
324 return CallbackDict(d, on_update)
325
326 location = header_property[str](
327 "Location",
328 doc="""The Location response-header field is used to redirect
329 the recipient to a location other than the Request-URI for
330 completion of the request or identification of a new
331 resource.""",
332 )
333 age = header_property(
334 "Age",
335 None,
336 parse_age,
337 dump_age, # type: ignore
338 doc="""The Age response-header field conveys the sender's
339 estimate of the amount of time since the response (or its
340 revalidation) was generated at the origin server.
341
342 Age values are non-negative decimal integers, representing time
343 in seconds.""",
344 )
345 content_type = header_property[str](
346 "Content-Type",
347 doc="""The Content-Type entity-header field indicates the media
348 type of the entity-body sent to the recipient or, in the case of
349 the HEAD method, the media type that would have been sent had
350 the request been a GET.""",
351 )
352 content_length = header_property(
353 "Content-Length",
354 None,
355 int,
356 str,
357 doc="""The Content-Length entity-header field indicates the size
358 of the entity-body, in decimal number of OCTETs, sent to the
359 recipient or, in the case of the HEAD method, the size of the
360 entity-body that would have been sent had the request been a
361 GET.""",
362 )
363 content_location = header_property[str](
364 "Content-Location",
365 doc="""The Content-Location entity-header field MAY be used to
366 supply the resource location for the entity enclosed in the
367 message when that entity is accessible from a location separate
368 from the requested resource's URI.""",
369 )
370 content_encoding = header_property[str](
371 "Content-Encoding",
372 doc="""The Content-Encoding entity-header field is used as a
373 modifier to the media-type. When present, its value indicates
374 what additional content codings have been applied to the
375 entity-body, and thus what decoding mechanisms must be applied
376 in order to obtain the media-type referenced by the Content-Type
377 header field.""",
378 )
379 content_md5 = header_property[str](
380 "Content-MD5",
381 doc="""The Content-MD5 entity-header field, as defined in
382 RFC 1864, is an MD5 digest of the entity-body for the purpose of
383 providing an end-to-end message integrity check (MIC) of the
384 entity-body. (Note: a MIC is good for detecting accidental
385 modification of the entity-body in transit, but is not proof
386 against malicious attacks.)""",
387 )
388 date = header_property(
389 "Date",
390 None,
391 parse_date,
392 http_date,
393 doc="""The Date general-header field represents the date and
394 time at which the message was originated, having the same
395 semantics as orig-date in RFC 822.
396
397 .. versionchanged:: 2.0
398 The datetime object is timezone-aware.
399 """,
400 )
401 expires = header_property(
402 "Expires",
403 None,
404 parse_date,
405 http_date,
406 doc="""The Expires entity-header field gives the date/time after
407 which the response is considered stale. A stale cache entry may
408 not normally be returned by a cache.
409
410 .. versionchanged:: 2.0
411 The datetime object is timezone-aware.
412 """,
413 )
414 last_modified = header_property(
415 "Last-Modified",
416 None,
417 parse_date,
418 http_date,
419 doc="""The Last-Modified entity-header field indicates the date
420 and time at which the origin server believes the variant was
421 last modified.
422
423 .. versionchanged:: 2.0
424 The datetime object is timezone-aware.
425 """,
426 )
427
428 @property
429 def retry_after(self) -> datetime | None:
430 """The Retry-After response-header field can be used with a
431 503 (Service Unavailable) response to indicate how long the
432 service is expected to be unavailable to the requesting client.
433
434 Time in seconds until expiration or date.
435
436 .. versionchanged:: 2.0
437 The datetime object is timezone-aware.
438 """
439 value = self.headers.get("retry-after")
440 if value is None:
441 return None
442
443 try:
444 seconds = int(value)
445 except ValueError:
446 return parse_date(value)
447
448 return datetime.now(timezone.utc) + timedelta(seconds=seconds)
449
450 @retry_after.setter
451 def retry_after(self, value: datetime | int | str | None) -> None:
452 if value is None:
453 if "retry-after" in self.headers:
454 del self.headers["retry-after"]
455 return
456 elif isinstance(value, datetime):
457 value = http_date(value)
458 else:
459 value = str(value)
460 self.headers["Retry-After"] = value
461
462 vary = _set_property(
463 "Vary",
464 doc="""The Vary field value indicates the set of request-header
465 fields that fully determines, while the response is fresh,
466 whether a cache is permitted to use the response to reply to a
467 subsequent request without revalidation.""",
468 )
469 content_language = _set_property(
470 "Content-Language",
471 doc="""The Content-Language entity-header field describes the
472 natural language(s) of the intended audience for the enclosed
473 entity. Note that this might not be equivalent to all the
474 languages used within the entity-body.""",
475 )
476 allow = _set_property(
477 "Allow",
478 doc="""The Allow entity-header field lists the set of methods
479 supported by the resource identified by the Request-URI. The
480 purpose of this field is strictly to inform the recipient of
481 valid methods associated with the resource. An Allow header
482 field MUST be present in a 405 (Method Not Allowed)
483 response.""",
484 )
485
486 # ETag
487
488 @property
489 def cache_control(self) -> ResponseCacheControl:
490 """The Cache-Control general-header field is used to specify
491 directives that MUST be obeyed by all caching mechanisms along the
492 request/response chain.
493 """
494
495 def on_update(cache_control: _CacheControl) -> None:
496 if not cache_control and "cache-control" in self.headers:
497 del self.headers["cache-control"]
498 elif cache_control:
499 self.headers["Cache-Control"] = cache_control.to_header()
500
501 return parse_cache_control_header(
502 self.headers.get("cache-control"), on_update, ResponseCacheControl
503 )
504
505 def set_etag(self, etag: str, weak: bool = False) -> None:
506 """Set the etag, and override the old one if there was one."""
507 self.headers["ETag"] = quote_etag(etag, weak)
508
509 def get_etag(self) -> tuple[str, bool] | tuple[None, None]:
510 """Return a tuple in the form ``(etag, is_weak)``. If there is no
511 ETag the return value is ``(None, None)``.
512 """
513 return unquote_etag(self.headers.get("ETag"))
514
515 accept_ranges = header_property[str](
516 "Accept-Ranges",
517 doc="""The `Accept-Ranges` header. Even though the name would
518 indicate that multiple values are supported, it must be one
519 string token only.
520
521 The values ``'bytes'`` and ``'none'`` are common.
522
523 .. versionadded:: 0.7""",
524 )
525
526 @property
527 def content_range(self) -> ContentRange:
528 """The ``Content-Range`` header as a
529 :class:`~werkzeug.datastructures.ContentRange` object. Available
530 even if the header is not set.
531
532 .. versionadded:: 0.7
533 """
534
535 def on_update(rng: ContentRange) -> None:
536 if not rng:
537 del self.headers["content-range"]
538 else:
539 self.headers["Content-Range"] = rng.to_header()
540
541 rv = parse_content_range_header(self.headers.get("content-range"), on_update)
542 # always provide a content range object to make the descriptor
543 # more user friendly. It provides an unset() method that can be
544 # used to remove the header quickly.
545 if rv is None:
546 rv = ContentRange(None, None, None, on_update=on_update)
547 return rv
548
549 @content_range.setter
550 def content_range(self, value: ContentRange | str | None) -> None:
551 if not value:
552 del self.headers["content-range"]
553 elif isinstance(value, str):
554 self.headers["Content-Range"] = value
555 else:
556 self.headers["Content-Range"] = value.to_header()
557
558 # Authorization
559
560 @property
561 def www_authenticate(self) -> WWWAuthenticate:
562 """The ``WWW-Authenticate`` header parsed into a :class:`.WWWAuthenticate`
563 object. Modifying the object will modify the header value.
564
565 This header is not set by default. To set this header, assign an instance of
566 :class:`.WWWAuthenticate` to this attribute.
567
568 .. code-block:: python
569
570 response.www_authenticate = WWWAuthenticate(
571 "basic", {"realm": "Authentication Required"}
572 )
573
574 Multiple values for this header can be sent to give the client multiple options.
575 Assign a list to set multiple headers. However, modifying the items in the list
576 will not automatically update the header values, and accessing this attribute
577 will only ever return the first value.
578
579 To unset this header, assign ``None`` or use ``del``.
580
581 .. versionchanged:: 2.3
582 This attribute can be assigned to to set the header. A list can be assigned
583 to set multiple header values. Use ``del`` to unset the header.
584
585 .. versionchanged:: 2.3
586 :class:`WWWAuthenticate` is no longer a ``dict``. The ``token`` attribute
587 was added for auth challenges that use a token instead of parameters.
588 """
589 value = WWWAuthenticate.from_header(self.headers.get("WWW-Authenticate"))
590
591 if value is None:
592 value = WWWAuthenticate("basic")
593
594 def on_update(value: WWWAuthenticate) -> None:
595 self.www_authenticate = value
596
597 value._on_update = on_update
598 return value
599
600 @www_authenticate.setter
601 def www_authenticate(
602 self, value: WWWAuthenticate | list[WWWAuthenticate] | None
603 ) -> None:
604 if not value: # None or empty list
605 del self.www_authenticate
606 elif isinstance(value, list):
607 # Clear any existing header by setting the first item.
608 self.headers.set("WWW-Authenticate", value[0].to_header())
609
610 for item in value[1:]:
611 # Add additional header lines for additional items.
612 self.headers.add("WWW-Authenticate", item.to_header())
613 else:
614 self.headers.set("WWW-Authenticate", value.to_header())
615
616 def on_update(value: WWWAuthenticate) -> None:
617 self.www_authenticate = value
618
619 # When setting a single value, allow updating it directly.
620 value._on_update = on_update
621
622 @www_authenticate.deleter
623 def www_authenticate(self) -> None:
624 if "WWW-Authenticate" in self.headers:
625 del self.headers["WWW-Authenticate"]
626
627 # CSP
628
629 @property
630 def content_security_policy(self) -> ContentSecurityPolicy:
631 """The ``Content-Security-Policy`` header as a
632 :class:`~werkzeug.datastructures.ContentSecurityPolicy` object. Available
633 even if the header is not set.
634
635 The Content-Security-Policy header adds an additional layer of
636 security to help detect and mitigate certain types of attacks.
637 """
638
639 def on_update(csp: ContentSecurityPolicy) -> None:
640 if not csp:
641 del self.headers["content-security-policy"]
642 else:
643 self.headers["Content-Security-Policy"] = csp.to_header()
644
645 rv = parse_csp_header(self.headers.get("content-security-policy"), on_update)
646 if rv is None:
647 rv = ContentSecurityPolicy(None, on_update=on_update)
648 return rv
649
650 @content_security_policy.setter
651 def content_security_policy(
652 self, value: ContentSecurityPolicy | str | None
653 ) -> None:
654 if not value:
655 del self.headers["content-security-policy"]
656 elif isinstance(value, str):
657 self.headers["Content-Security-Policy"] = value
658 else:
659 self.headers["Content-Security-Policy"] = value.to_header()
660
661 @property
662 def content_security_policy_report_only(self) -> ContentSecurityPolicy:
663 """The ``Content-Security-policy-report-only`` header as a
664 :class:`~werkzeug.datastructures.ContentSecurityPolicy` object. Available
665 even if the header is not set.
666
667 The Content-Security-Policy-Report-Only header adds a csp policy
668 that is not enforced but is reported thereby helping detect
669 certain types of attacks.
670 """
671
672 def on_update(csp: ContentSecurityPolicy) -> None:
673 if not csp:
674 del self.headers["content-security-policy-report-only"]
675 else:
676 self.headers["Content-Security-policy-report-only"] = csp.to_header()
677
678 rv = parse_csp_header(
679 self.headers.get("content-security-policy-report-only"), on_update
680 )
681 if rv is None:
682 rv = ContentSecurityPolicy(None, on_update=on_update)
683 return rv
684
685 @content_security_policy_report_only.setter
686 def content_security_policy_report_only(
687 self, value: ContentSecurityPolicy | str | None
688 ) -> None:
689 if not value:
690 del self.headers["content-security-policy-report-only"]
691 elif isinstance(value, str):
692 self.headers["Content-Security-policy-report-only"] = value
693 else:
694 self.headers["Content-Security-policy-report-only"] = value.to_header()
695
696 # CORS
697
698 @property
699 def access_control_allow_credentials(self) -> bool:
700 """Whether credentials can be shared by the browser to
701 JavaScript code. As part of the preflight request it indicates
702 whether credentials can be used on the cross origin request.
703 """
704 return "Access-Control-Allow-Credentials" in self.headers
705
706 @access_control_allow_credentials.setter
707 def access_control_allow_credentials(self, value: bool | None) -> None:
708 if value is True:
709 self.headers["Access-Control-Allow-Credentials"] = "true"
710 else:
711 self.headers.pop("Access-Control-Allow-Credentials", None)
712
713 access_control_allow_headers = header_property(
714 "Access-Control-Allow-Headers",
715 load_func=parse_set_header,
716 dump_func=dump_header,
717 doc="Which headers can be sent with the cross origin request.",
718 )
719
720 access_control_allow_methods = header_property(
721 "Access-Control-Allow-Methods",
722 load_func=parse_set_header,
723 dump_func=dump_header,
724 doc="Which methods can be used for the cross origin request.",
725 )
726
727 access_control_allow_origin = header_property[str](
728 "Access-Control-Allow-Origin",
729 doc="The origin or '*' for any origin that may make cross origin requests.",
730 )
731
732 access_control_expose_headers = header_property(
733 "Access-Control-Expose-Headers",
734 load_func=parse_set_header,
735 dump_func=dump_header,
736 doc="Which headers can be shared by the browser to JavaScript code.",
737 )
738
739 access_control_max_age = header_property(
740 "Access-Control-Max-Age",
741 load_func=int,
742 dump_func=str,
743 doc="The maximum age in seconds the access control settings can be cached for.",
744 )
745
746 cross_origin_opener_policy = header_property[COOP](
747 "Cross-Origin-Opener-Policy",
748 load_func=lambda value: COOP(value),
749 dump_func=lambda value: value.value,
750 default=COOP.UNSAFE_NONE,
751 doc="""Allows control over sharing of browsing context group with cross-origin
752 documents. Values must be a member of the :class:`werkzeug.http.COOP` enum.""",
753 )
754
755 cross_origin_embedder_policy = header_property[COEP](
756 "Cross-Origin-Embedder-Policy",
757 load_func=lambda value: COEP(value),
758 dump_func=lambda value: value.value,
759 default=COEP.UNSAFE_NONE,
760 doc="""Prevents a document from loading any cross-origin resources that do not
761 explicitly grant the document permission. Values must be a member of the
762 :class:`werkzeug.http.COEP` enum.""",
763 )