Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/http.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import email.utils
4import re
5import typing as t
6import warnings
7from datetime import date
8from datetime import datetime
9from datetime import time
10from datetime import timedelta
11from datetime import timezone
12from enum import Enum
13from hashlib import sha1
14from time import mktime
15from time import struct_time
16from urllib.parse import quote
17from urllib.parse import unquote
19from ._internal import _dt_as_utc
20from ._internal import _plain_int
22if t.TYPE_CHECKING:
23 from _typeshed.wsgi import WSGIEnvironment
25_token_chars = frozenset(
26 "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~"
27)
28_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)')
29_entity_headers = frozenset(
30 [
31 "allow",
32 "content-encoding",
33 "content-language",
34 "content-length",
35 "content-location",
36 "content-md5",
37 "content-range",
38 "content-type",
39 "expires",
40 "last-modified",
41 ]
42)
43_hop_by_hop_headers = frozenset(
44 [
45 "connection",
46 "keep-alive",
47 "proxy-authenticate",
48 "proxy-authorization",
49 "te",
50 "trailer",
51 "transfer-encoding",
52 "upgrade",
53 ]
54)
55HTTP_STATUS_CODES = {
56 100: "Continue",
57 101: "Switching Protocols",
58 102: "Processing",
59 103: "Early Hints", # see RFC 8297
60 200: "OK",
61 201: "Created",
62 202: "Accepted",
63 203: "Non Authoritative Information",
64 204: "No Content",
65 205: "Reset Content",
66 206: "Partial Content",
67 207: "Multi Status",
68 208: "Already Reported", # see RFC 5842
69 226: "IM Used", # see RFC 3229
70 300: "Multiple Choices",
71 301: "Moved Permanently",
72 302: "Found",
73 303: "See Other",
74 304: "Not Modified",
75 305: "Use Proxy",
76 306: "Switch Proxy", # unused
77 307: "Temporary Redirect",
78 308: "Permanent Redirect",
79 400: "Bad Request",
80 401: "Unauthorized",
81 402: "Payment Required", # unused
82 403: "Forbidden",
83 404: "Not Found",
84 405: "Method Not Allowed",
85 406: "Not Acceptable",
86 407: "Proxy Authentication Required",
87 408: "Request Timeout",
88 409: "Conflict",
89 410: "Gone",
90 411: "Length Required",
91 412: "Precondition Failed",
92 413: "Request Entity Too Large",
93 414: "Request URI Too Long",
94 415: "Unsupported Media Type",
95 416: "Requested Range Not Satisfiable",
96 417: "Expectation Failed",
97 418: "I'm a teapot", # see RFC 2324
98 421: "Misdirected Request", # see RFC 7540
99 422: "Unprocessable Entity",
100 423: "Locked",
101 424: "Failed Dependency",
102 425: "Too Early", # see RFC 8470
103 426: "Upgrade Required",
104 428: "Precondition Required", # see RFC 6585
105 429: "Too Many Requests",
106 431: "Request Header Fields Too Large",
107 449: "Retry With", # proprietary MS extension
108 451: "Unavailable For Legal Reasons",
109 500: "Internal Server Error",
110 501: "Not Implemented",
111 502: "Bad Gateway",
112 503: "Service Unavailable",
113 504: "Gateway Timeout",
114 505: "HTTP Version Not Supported",
115 506: "Variant Also Negotiates", # see RFC 2295
116 507: "Insufficient Storage",
117 508: "Loop Detected", # see RFC 5842
118 510: "Not Extended",
119 511: "Network Authentication Failed",
120}
123class COEP(Enum):
124 """Cross Origin Embedder Policies"""
126 UNSAFE_NONE = "unsafe-none"
127 REQUIRE_CORP = "require-corp"
130class COOP(Enum):
131 """Cross Origin Opener Policies"""
133 UNSAFE_NONE = "unsafe-none"
134 SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups"
135 SAME_ORIGIN = "same-origin"
138def quote_header_value(value: t.Any, allow_token: bool = True) -> str:
139 """Add double quotes around a header value. If the header contains only ASCII token
140 characters, it will be returned unchanged. If the header contains ``"`` or ``\\``
141 characters, they will be escaped with an additional ``\\`` character.
143 This is the reverse of :func:`unquote_header_value`.
145 :param value: The value to quote. Will be converted to a string.
146 :param allow_token: Disable to quote the value even if it only has token characters.
148 .. versionchanged:: 3.0
149 Passing bytes is not supported.
151 .. versionchanged:: 3.0
152 The ``extra_chars`` parameter is removed.
154 .. versionchanged:: 2.3
155 The value is quoted if it is the empty string.
157 .. versionadded:: 0.5
158 """
159 value_str = str(value)
161 if not value_str:
162 return '""'
164 if allow_token:
165 token_chars = _token_chars
167 if token_chars.issuperset(value_str):
168 return value_str
170 value_str = value_str.replace("\\", "\\\\").replace('"', '\\"')
171 return f'"{value_str}"'
174_unslash_re = re.compile(r"\\(.)", re.A)
177def unquote_header_value(value: str) -> str:
178 """Remove double quotes and backslash escapes from a header value.
180 This is the reverse of :func:`quote_header_value`.
182 :param value: The header value to unquote.
184 .. versionchanged:: 3.2
185 Removes escape preceding any character.
187 .. versionchanged:: 3.0
188 The ``is_filename`` parameter is removed.
189 """
190 if len(value) >= 2 and value[0] == value[-1] == '"':
191 return _unslash_re.sub(r"\g<1>", value[1:-1])
193 return value
196def dump_options_header(header: str | None, options: t.Mapping[str, t.Any]) -> str:
197 """Produce a header value and ``key=value`` parameters separated by semicolons
198 ``;``. For example, the ``Content-Type`` header.
200 .. code-block:: python
202 dump_options_header("text/html", {"charset": "UTF-8"})
203 'text/html; charset=UTF-8'
205 This is the reverse of :func:`parse_options_header`.
207 If a value contains non-token characters, it will be quoted.
209 If a value is ``None``, the parameter is skipped.
211 In some keys for some headers, a UTF-8 value can be encoded using a special
212 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will
213 not produce that format automatically, but if a given key ends with an asterisk
214 ``*``, the value is assumed to have that form and will not be quoted further.
216 :param header: The primary header value.
217 :param options: Parameters to encode as ``key=value`` pairs.
219 .. versionchanged:: 2.3
220 Keys with ``None`` values are skipped rather than treated as a bare key.
222 .. versionchanged:: 2.2.3
223 If a key ends with ``*``, its value will not be quoted.
224 """
225 segments = []
227 if header is not None:
228 segments.append(header)
230 for key, value in options.items():
231 if value is None:
232 continue
234 if key[-1] == "*":
235 segments.append(f"{key}={value}")
236 else:
237 segments.append(f"{key}={quote_header_value(value)}")
239 return "; ".join(segments)
242def dump_header(iterable: dict[str, t.Any] | t.Iterable[t.Any]) -> str:
243 """Produce a header value from a list of items or ``key=value`` pairs, separated by
244 commas ``,``.
246 This is the reverse of :func:`parse_list_header`, :func:`parse_dict_header`, and
247 :func:`parse_set_header`.
249 If a value contains non-token characters, it will be quoted.
251 If a value is ``None``, the key is output alone.
253 In some keys for some headers, a UTF-8 value can be encoded using a special
254 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will
255 not produce that format automatically, but if a given key ends with an asterisk
256 ``*``, the value is assumed to have that form and will not be quoted further.
258 .. code-block:: python
260 dump_header(["foo", "bar baz"])
261 'foo, "bar baz"'
263 dump_header({"foo": "bar baz"})
264 'foo="bar baz"'
266 :param iterable: The items to create a header from.
268 .. versionchanged:: 3.0
269 The ``allow_token`` parameter is removed.
271 .. versionchanged:: 2.2.3
272 If a key ends with ``*``, its value will not be quoted.
273 """
274 if isinstance(iterable, dict):
275 items = []
277 for key, value in iterable.items():
278 if value is None:
279 items.append(key)
280 elif key[-1] == "*":
281 items.append(f"{key}={value}")
282 else:
283 items.append(f"{key}={quote_header_value(value)}")
284 else:
285 items = [quote_header_value(x) for x in iterable]
287 return ", ".join(items)
290def dump_csp_header(header: ds.ContentSecurityPolicy) -> str:
291 """Dump a Content Security Policy header.
293 These are structured into policies such as "default-src 'self';
294 script-src 'self'".
296 .. versionadded:: 1.0.0
297 Support for Content Security Policy headers was added.
299 """
300 return "; ".join(f"{key} {value}" for key, value in header.items())
303def parse_list_header(value: str) -> list[str]:
304 """Parse a header value that consists of a list of comma separated items according
305 to `RFC 9110 <https://httpwg.org/specs/rfc9110.html#abnf.extension>`__.
307 Surrounding quotes are removed from items, but internal quotes are left for
308 future parsing. Empty values are discarded.
310 .. code-block:: python
312 parse_list_header('token, "quoted value"')
313 ['token', 'quoted value']
315 This is the reverse of :func:`dump_header`.
317 :param value: The header value to parse.
319 .. versionchanged:: 3.2
320 Quotes and escapes are kept if only part of an item is quoted. Empty
321 values are omitted. An empty list is returned if the value contains an
322 unclosed quoted string.
323 """
324 items = []
325 item = ""
326 escape = False
327 quote = False
329 for char in value:
330 if escape:
331 escape = False
332 item += char
333 continue
335 if quote:
336 if char == "\\":
337 escape = True
338 elif char == '"':
339 quote = False
341 item += char
342 continue
344 if char == ",":
345 items.append(item)
346 item = ""
347 continue
349 if char == '"':
350 quote = True
352 item += char
354 if quote:
355 # invalid, unclosed quoted string
356 return []
358 items.append(item)
359 return [
360 unquote_header_value(item) for item in (item.strip() for item in items) if item
361 ]
364def parse_dict_header(value: str) -> dict[str, str | None]:
365 """Parse a list header using :func:`parse_list_header`, then parse each item as a
366 ``key=value`` pair.
368 .. code-block:: python
370 parse_dict_header('a=b, c="d, e", f')
371 {"a": "b", "c": "d, e", "f": None}
373 This is the reverse of :func:`dump_header`.
375 If a key does not have a value, it is ``None``.
377 This handles charsets for values as described in
378 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__. Only ASCII, UTF-8,
379 and ISO-8859-1 charsets are accepted, otherwise the value remains quoted.
381 :param value: The header value to parse.
383 .. versionchanged:: 3.2
384 An empty dict is returned if the value contains an unclosed quoted
385 string.
387 .. versionchanged:: 3.0
388 Passing bytes is not supported.
390 .. versionchanged:: 3.0
391 The ``cls`` argument is removed.
393 .. versionchanged:: 2.3
394 Added support for ``key*=charset''value`` encoded items.
396 .. versionchanged:: 0.9
397 The ``cls`` argument was added.
398 """
399 result: dict[str, str | None] = {}
401 for item in parse_list_header(value):
402 key, has_value, value = item.partition("=")
403 key = key.strip()
405 if not key:
406 # =value is not valid
407 continue
409 if not has_value:
410 result[key] = None
411 continue
413 value = value.strip()
414 encoding: str | None = None
416 if key[-1] == "*":
417 # key*=charset''value becomes key=value, where value is percent encoded
418 # adapted from parse_options_header, without the continuation handling
419 key = key[:-1]
420 match = _charset_value_re.match(value)
422 if match:
423 # If there is a charset marker in the value, split it off.
424 encoding, value = match.groups()
425 encoding = encoding.lower()
427 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
428 # This list will not be extended further. An invalid encoding will leave the
429 # value quoted.
430 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
431 # invalid bytes are replaced during unquoting
432 value = unquote(value, encoding=encoding)
434 result[key] = unquote_header_value(value)
436 return result
439# https://httpwg.org/specs/rfc9110.html#parameter
440_parameter_key_re = re.compile(r"([\w!#$%&'*+\-.^`|~]+)=", flags=re.ASCII)
441_parameter_token_value_re = re.compile(r"[\w!#$%&'*+\-.^`|~]+", flags=re.ASCII)
442# https://www.rfc-editor.org/rfc/rfc2231#section-4
443_charset_value_re = re.compile(
444 r"""
445 ([\w!#$%&*+\-.^`|~]*)' # charset part, could be empty
446 [\w!#$%&*+\-.^`|~]*' # don't care about language part, usually empty
447 ([\w!#$%&'*+\-.^`|~]+) # one or more token chars with percent encoding
448 """,
449 re.ASCII | re.VERBOSE,
450)
451# https://www.rfc-editor.org/rfc/rfc2231#section-3
452_continuation_re = re.compile(r"\*(\d+)$", re.ASCII)
455def parse_options_header(value: str | None) -> tuple[str, dict[str, str]]:
456 """Parse a header that consists of a value with ``key=value`` parameters separated
457 by semicolons ``;``. For example, the ``Content-Type`` header.
459 .. code-block:: python
461 parse_options_header("text/html; charset=UTF-8")
462 ('text/html', {'charset': 'UTF-8'})
464 parse_options_header("")
465 ("", {})
467 This is the reverse of :func:`dump_options_header`.
469 This parses valid parameter parts as described in
470 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#parameter>`__. Invalid parts are
471 skipped.
473 This handles continuations and charsets as described in
474 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__, although not as
475 strictly as the RFC. Only ASCII, UTF-8, and ISO-8859-1 charsets are accepted,
476 otherwise the value remains quoted.
478 Clients may not be consistent in how they handle a quote character within a quoted
479 value. The `HTML Standard <https://html.spec.whatwg.org/#multipart-form-data>`__
480 replaces it with ``%22`` in multipart form data.
481 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#quoted.strings>`__ uses backslash
482 escapes in HTTP headers. Both are decoded to the ``"`` character.
484 Clients may not be consistent in how they handle non-ASCII characters. HTML
485 documents must declare ``<meta charset=UTF-8>``, otherwise browsers may replace with
486 HTML character references, which can be decoded using :func:`html.unescape`.
488 :param value: The header value to parse.
489 :return: ``(value, options)``, where ``options`` is a dict
491 .. versionchanged:: 2.3
492 Invalid parts, such as keys with no value, quoted keys, and incorrectly quoted
493 values, are discarded instead of treating as ``None``.
495 .. versionchanged:: 2.3
496 Only ASCII, UTF-8, and ISO-8859-1 are accepted for charset values.
498 .. versionchanged:: 2.3
499 Escaped quotes in quoted values, like ``%22`` and ``\\"``, are handled.
501 .. versionchanged:: 2.2
502 Option names are always converted to lowercase.
504 .. versionchanged:: 2.2
505 The ``multiple`` parameter was removed.
507 .. versionchanged:: 0.15
508 :rfc:`2231` parameter continuations are handled.
510 .. versionadded:: 0.5
511 """
512 if value is None:
513 return "", {}
515 value, _, rest = value.partition(";")
516 value = value.strip()
517 rest = rest.strip()
519 if not value or not rest:
520 # empty (invalid) value, or value without options
521 return value, {}
523 # Collect all valid key=value parts without processing the value.
524 parts: list[tuple[str, str]] = []
526 while True:
527 if (m := _parameter_key_re.match(rest)) is not None:
528 pk = m.group(1).lower()
529 rest = rest[m.end() :]
531 # Value may be a token.
532 if (m := _parameter_token_value_re.match(rest)) is not None:
533 parts.append((pk, m.group()))
535 # Value may be a quoted string, find the closing quote.
536 elif rest[:1] == '"':
537 pos = 1
538 length = len(rest)
540 while pos < length:
541 if rest[pos : pos + 2] in {"\\\\", '\\"'}:
542 # Consume escaped slashes and quotes.
543 pos += 2
544 elif rest[pos] == '"':
545 # Stop at an unescaped quote.
546 parts.append((pk, rest[: pos + 1]))
547 rest = rest[pos + 1 :]
548 break
549 else:
550 # Consume any other character.
551 pos += 1
553 # Find the next section delimited by `;`, if any.
554 if (end := rest.find(";")) == -1:
555 break
557 rest = rest[end + 1 :].lstrip()
559 options: dict[str, str] = {}
560 encoding: str | None = None
561 continued_encoding: str | None = None
563 # For each collected part, process optional charset and continuation,
564 # unquote quoted values.
565 for pk, pv in parts:
566 if pk[-1] == "*":
567 # key*=charset''value becomes key=value, where value is percent encoded
568 pk = pk[:-1]
569 match = _charset_value_re.match(pv)
571 if match:
572 # If there is a valid charset marker in the value, split it off.
573 encoding, pv = match.groups()
574 # This might be the empty string, handled next.
575 encoding = encoding.lower()
577 # No charset marker, or marker with empty charset value.
578 if not encoding:
579 encoding = continued_encoding
581 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
582 # This list will not be extended further. An invalid encoding will leave the
583 # value quoted.
584 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
585 # Continuation parts don't require their own charset marker. This is
586 # looser than the RFC, it will persist across different keys and allows
587 # changing the charset during a continuation. But this implementation is
588 # much simpler than tracking the full state.
589 continued_encoding = encoding
590 # invalid bytes are replaced during unquoting
591 pv = unquote(pv, encoding=encoding)
593 # Remove quotes. At this point the value cannot be empty or a single quote.
594 if pv[0] == pv[-1] == '"':
595 # HTTP headers use slash, multipart form data uses percent
596 pv = pv[1:-1].replace("\\\\", "\\").replace('\\"', '"').replace("%22", '"')
598 match = _continuation_re.search(pk)
600 if match:
601 # key*0=a; key*1=b becomes key=ab
602 pk = pk[: match.start()]
603 options[pk] = options.get(pk, "") + pv
604 else:
605 options[pk] = pv
607 return value, options
610_q_value_re = re.compile(r"-?\d+(\.\d+)?", re.ASCII)
611_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept")
614@t.overload
615def parse_accept_header(value: str | None) -> ds.Accept: ...
618@t.overload
619def parse_accept_header(value: str | None, cls: type[_TAnyAccept]) -> _TAnyAccept: ...
622def parse_accept_header(
623 value: str | None, cls: type[_TAnyAccept] | None = None
624) -> _TAnyAccept:
625 """Parse an ``Accept`` header according to
626 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#field.accept>`__.
628 Returns an :class:`.Accept` instance, which can sort and inspect items based on
629 their quality parameter. When parsing ``Accept-Charset``, ``Accept-Encoding``, or
630 ``Accept-Language``, pass the appropriate :class:`.Accept` subclass.
632 :param value: The header value to parse.
633 :param cls: The :class:`.Accept` class to wrap the result in.
634 :return: An instance of ``cls``.
636 .. versionchanged:: 2.3
637 Parse according to RFC 9110. Items with invalid ``q`` values are skipped.
638 """
639 if cls is None:
640 cls = t.cast(type[_TAnyAccept], ds.Accept)
642 if not value:
643 return cls(None)
645 result = []
647 for item in parse_list_header(value):
648 item, options = parse_options_header(item)
650 if "q" in options:
651 # pop q, remaining options are reconstructed
652 q_str = options.pop("q").strip()
654 if _q_value_re.fullmatch(q_str) is None:
655 # ignore an invalid q
656 continue
658 q = float(q_str)
660 if q < 0 or q > 1:
661 # ignore an invalid q
662 continue
663 else:
664 q = 1
666 if options:
667 # reconstruct the media type with any options
668 item = dump_options_header(item, options)
670 result.append((item, q))
672 return cls(result)
675_TAnyCC = t.TypeVar("_TAnyCC", bound="ds.cache_control._CacheControl")
678@t.overload
679def parse_cache_control_header(
680 value: str | None,
681 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None,
682) -> ds.RequestCacheControl: ...
685@t.overload
686def parse_cache_control_header(
687 value: str | None,
688 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None,
689 cls: type[_TAnyCC] = ...,
690) -> _TAnyCC: ...
693def parse_cache_control_header(
694 value: str | None,
695 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None,
696 cls: type[_TAnyCC] | None = None,
697) -> _TAnyCC:
698 """Parse a cache control header. The RFC differs between response and
699 request cache control, this method does not. It's your responsibility
700 to not use the wrong control statements.
702 .. versionadded:: 0.5
703 The `cls` was added. If not specified an immutable
704 :class:`~werkzeug.datastructures.RequestCacheControl` is returned.
706 :param value: a cache control header to be parsed.
707 :param on_update: an optional callable that is called every time a value
708 on the :class:`~werkzeug.datastructures.CacheControl`
709 object is changed.
710 :param cls: the class for the returned object. By default
711 :class:`~werkzeug.datastructures.RequestCacheControl` is used.
712 :return: a `cls` object.
713 """
714 if cls is None:
715 cls = t.cast("type[_TAnyCC]", ds.RequestCacheControl)
717 if not value:
718 return cls((), on_update)
720 return cls(parse_dict_header(value), on_update)
723_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy")
726@t.overload
727def parse_csp_header(
728 value: str | None,
729 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None,
730) -> ds.ContentSecurityPolicy: ...
733@t.overload
734def parse_csp_header(
735 value: str | None,
736 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None,
737 cls: type[_TAnyCSP] = ...,
738) -> _TAnyCSP: ...
741def parse_csp_header(
742 value: str | None,
743 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None,
744 cls: type[_TAnyCSP] | None = None,
745) -> _TAnyCSP:
746 """Parse a Content Security Policy header.
748 .. versionadded:: 1.0.0
749 Support for Content Security Policy headers was added.
751 :param value: a csp header to be parsed.
752 :param on_update: an optional callable that is called every time a value
753 on the object is changed.
754 :param cls: the class for the returned object. By default
755 :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used.
756 :return: a `cls` object.
757 """
758 if cls is None:
759 cls = t.cast("type[_TAnyCSP]", ds.ContentSecurityPolicy)
761 if value is None:
762 return cls((), on_update)
764 items = []
766 for policy in value.split(";"):
767 policy = policy.strip()
769 # Ignore badly formatted policies (no space)
770 if " " in policy:
771 directive, value = policy.strip().split(" ", 1)
772 items.append((directive.strip(), value.strip()))
774 return cls(items, on_update)
777def parse_set_header(
778 value: str | None,
779 on_update: t.Callable[[ds.HeaderSet], None] | None = None,
780) -> ds.HeaderSet:
781 """Parse a set-like header and return a
782 :class:`~werkzeug.datastructures.HeaderSet` object:
784 >>> hs = parse_set_header('token, "quoted value"')
786 The return value is an object that treats the items case-insensitively
787 and keeps the order of the items:
789 >>> 'TOKEN' in hs
790 True
791 >>> hs.index('quoted value')
792 1
793 >>> hs
794 HeaderSet(['token', 'quoted value'])
796 To create a header from the :class:`HeaderSet` again, use the
797 :func:`dump_header` function.
799 :param value: a set header to be parsed.
800 :param on_update: an optional callable that is called every time a
801 value on the :class:`~werkzeug.datastructures.HeaderSet`
802 object is changed.
803 :return: a :class:`~werkzeug.datastructures.HeaderSet`
804 """
805 if not value:
806 return ds.HeaderSet(None, on_update)
807 return ds.HeaderSet(parse_list_header(value), on_update)
810def parse_if_range_header(value: str | None) -> ds.IfRange:
811 """Parses an if-range header which can be an etag or a date. Returns
812 a :class:`~werkzeug.datastructures.IfRange` object.
814 .. versionchanged:: 2.0
815 If the value represents a datetime, it is timezone-aware.
817 .. versionadded:: 0.7
818 """
819 if not value:
820 return ds.IfRange()
821 date = parse_date(value)
822 if date is not None:
823 return ds.IfRange(date=date)
824 # drop weakness information
825 return ds.IfRange(unquote_etag(value)[0])
828def parse_range_header(
829 value: str | None, make_inclusive: bool = True
830) -> ds.Range | None:
831 """Parses a range header into a :class:`~werkzeug.datastructures.Range`
832 object. If the header is missing or malformed `None` is returned.
833 `ranges` is a list of ``(start, stop)`` tuples where the ranges are
834 non-inclusive.
836 .. versionadded:: 0.7
837 """
838 if not value or "=" not in value:
839 return None
841 ranges = []
842 last_end = 0
843 units, rng = value.split("=", 1)
844 units = units.strip().lower()
846 for item in rng.split(","):
847 item = item.strip()
848 if "-" not in item:
849 return None
850 if item.startswith("-"):
851 if last_end < 0:
852 return None
853 try:
854 begin = _plain_int(item)
855 except ValueError:
856 return None
857 end = None
858 last_end = -1
859 elif "-" in item:
860 begin_str, end_str = item.split("-", 1)
861 begin_str = begin_str.strip()
862 end_str = end_str.strip()
864 try:
865 begin = _plain_int(begin_str)
866 except ValueError:
867 return None
869 if begin < last_end or last_end < 0:
870 return None
871 if end_str:
872 try:
873 end = _plain_int(end_str) + 1
874 except ValueError:
875 return None
877 if begin >= end:
878 return None
879 else:
880 end = None
881 last_end = end if end is not None else -1
882 ranges.append((begin, end))
884 return ds.Range(units, ranges)
887def parse_content_range_header(
888 value: str | None,
889 on_update: t.Callable[[ds.ContentRange], None] | None = None,
890) -> ds.ContentRange | None:
891 """Parses a range header into a
892 :class:`~werkzeug.datastructures.ContentRange` object or `None` if
893 parsing is not possible.
895 .. versionadded:: 0.7
897 :param value: a content range header to be parsed.
898 :param on_update: an optional callable that is called every time a value
899 on the :class:`~werkzeug.datastructures.ContentRange`
900 object is changed.
901 """
902 if value is None:
903 return None
904 try:
905 units, rangedef = (value or "").strip().split(None, 1)
906 except ValueError:
907 return None
909 if "/" not in rangedef:
910 return None
911 rng, length_str = rangedef.split("/", 1)
912 if length_str == "*":
913 length = None
914 else:
915 try:
916 length = _plain_int(length_str)
917 except ValueError:
918 return None
920 if rng == "*":
921 if not is_byte_range_valid(None, None, length):
922 return None
924 return ds.ContentRange(units, None, None, length, on_update=on_update)
925 elif "-" not in rng:
926 return None
928 start_str, stop_str = rng.split("-", 1)
929 try:
930 start = _plain_int(start_str)
931 stop = _plain_int(stop_str) + 1
932 except ValueError:
933 return None
935 if is_byte_range_valid(start, stop, length):
936 return ds.ContentRange(units, start, stop, length, on_update=on_update)
938 return None
941def quote_etag(etag: str, weak: bool = False) -> str:
942 """Quote an etag.
944 :param etag: the etag to quote.
945 :param weak: set to `True` to tag it "weak".
946 """
947 if '"' in etag:
948 raise ValueError("invalid etag")
949 etag = f'"{etag}"'
950 if weak:
951 etag = f"W/{etag}"
952 return etag
955@t.overload
956def unquote_etag(etag: str) -> tuple[str, bool]: ...
957@t.overload
958def unquote_etag(etag: None) -> tuple[None, None]: ...
959def unquote_etag(
960 etag: str | None,
961) -> tuple[str, bool] | tuple[None, None]:
962 """Unquote a single etag:
964 >>> unquote_etag('W/"bar"')
965 ('bar', True)
966 >>> unquote_etag('"bar"')
967 ('bar', False)
969 :param etag: the etag identifier to unquote.
970 :return: a ``(etag, weak)`` tuple.
971 """
972 if not etag:
973 return None, None
974 etag = etag.strip()
975 weak = False
976 if etag.startswith(("W/", "w/")):
977 weak = True
978 etag = etag[2:]
979 if etag[:1] == etag[-1:] == '"':
980 etag = etag[1:-1]
981 return etag, weak
984def parse_etags(value: str | None) -> ds.ETags:
985 """Parse an etag header.
987 :param value: the tag header to parse
988 :return: an :class:`~werkzeug.datastructures.ETags` object.
989 """
990 if not value:
991 return ds.ETags()
992 strong = []
993 weak = []
994 end = len(value)
995 pos = 0
996 while pos < end:
997 match = _etag_re.match(value, pos)
998 if match is None:
999 break
1000 is_weak, quoted, raw = match.groups()
1001 if raw == "*":
1002 return ds.ETags(star_tag=True)
1003 elif quoted:
1004 raw = quoted
1005 if is_weak:
1006 weak.append(raw)
1007 else:
1008 strong.append(raw)
1009 pos = match.end()
1010 return ds.ETags(strong, weak)
1013def generate_etag(data: bytes) -> str:
1014 """Generate an etag for some data.
1016 .. versionchanged:: 2.0
1017 Use SHA-1. MD5 may not be available in some environments.
1018 """
1019 return sha1(data).hexdigest()
1022def parse_date(value: str | None) -> datetime | None:
1023 """Parse an :rfc:`2822` date into a timezone-aware
1024 :class:`datetime.datetime` object, or ``None`` if parsing fails.
1026 This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It
1027 returns ``None`` if parsing fails instead of raising an exception,
1028 and always returns a timezone-aware datetime object. If the string
1029 doesn't have timezone information, it is assumed to be UTC.
1031 :param value: A string with a supported date format.
1033 .. versionchanged:: 2.0
1034 Return a timezone-aware datetime object. Use
1035 ``email.utils.parsedate_to_datetime``.
1036 """
1037 if value is None:
1038 return None
1040 try:
1041 dt = email.utils.parsedate_to_datetime(value)
1042 except (TypeError, ValueError):
1043 return None
1045 if dt.tzinfo is None:
1046 return dt.replace(tzinfo=timezone.utc)
1048 return dt
1051def http_date(
1052 timestamp: datetime | date | int | float | struct_time | None = None,
1053) -> str:
1054 """Format a datetime object or timestamp into an :rfc:`2822` date
1055 string.
1057 This is a wrapper for :func:`email.utils.format_datetime`. It
1058 assumes naive datetime objects are in UTC instead of raising an
1059 exception.
1061 :param timestamp: The datetime or timestamp to format. Defaults to
1062 the current time.
1064 .. versionchanged:: 2.0
1065 Use ``email.utils.format_datetime``. Accept ``date`` objects.
1066 """
1067 if isinstance(timestamp, date):
1068 if not isinstance(timestamp, datetime):
1069 # Assume plain date is midnight UTC.
1070 timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc)
1071 else:
1072 # Ensure datetime is timezone-aware.
1073 timestamp = _dt_as_utc(timestamp)
1075 return email.utils.format_datetime(timestamp, usegmt=True)
1077 if isinstance(timestamp, struct_time):
1078 timestamp = mktime(timestamp)
1080 return email.utils.formatdate(timestamp, usegmt=True)
1083def parse_age(value: str | None = None) -> timedelta | None:
1084 """Parses a base-10 integer count of seconds into a timedelta.
1086 If parsing fails, the return value is `None`.
1088 :param value: a string consisting of an integer represented in base-10
1089 :return: a :class:`datetime.timedelta` object or `None`.
1090 """
1091 if not value:
1092 return None
1093 try:
1094 seconds = int(value)
1095 except ValueError:
1096 return None
1097 if seconds < 0:
1098 return None
1099 try:
1100 return timedelta(seconds=seconds)
1101 except OverflowError:
1102 return None
1105def dump_age(age: timedelta | int | None = None) -> str | None:
1106 """Formats the duration as a base-10 integer.
1108 :param age: should be an integer number of seconds,
1109 a :class:`datetime.timedelta` object, or,
1110 if the age is unknown, `None` (default).
1111 """
1112 if age is None:
1113 return None
1114 if isinstance(age, timedelta):
1115 age = int(age.total_seconds())
1116 else:
1117 age = int(age)
1119 if age < 0:
1120 raise ValueError("age cannot be negative")
1122 return str(age)
1125def is_resource_modified(
1126 environ: WSGIEnvironment,
1127 etag: str | None = None,
1128 data: bytes | None = None,
1129 last_modified: datetime | str | None = None,
1130 ignore_if_range: bool = True,
1131) -> bool:
1132 """Convenience method for conditional requests.
1134 :param environ: the WSGI environment of the request to be checked.
1135 :param etag: the etag for the response for comparison.
1136 :param data: or alternatively the data of the response to automatically
1137 generate an etag using :func:`generate_etag`.
1138 :param last_modified: an optional date of the last modification.
1139 :param ignore_if_range: If `False`, `If-Range` header will be taken into
1140 account.
1141 :return: `True` if the resource was modified, otherwise `False`.
1143 .. versionchanged:: 2.0
1144 SHA-1 is used to generate an etag value for the data. MD5 may
1145 not be available in some environments.
1147 .. versionchanged:: 1.0.0
1148 The check is run for methods other than ``GET`` and ``HEAD``.
1149 """
1150 return _sansio_http.is_resource_modified(
1151 http_range=environ.get("HTTP_RANGE"),
1152 http_if_range=environ.get("HTTP_IF_RANGE"),
1153 http_if_modified_since=environ.get("HTTP_IF_MODIFIED_SINCE"),
1154 http_if_none_match=environ.get("HTTP_IF_NONE_MATCH"),
1155 http_if_match=environ.get("HTTP_IF_MATCH"),
1156 etag=etag,
1157 data=data,
1158 last_modified=last_modified,
1159 ignore_if_range=ignore_if_range,
1160 )
1163def remove_entity_headers(
1164 headers: ds.Headers | list[tuple[str, str]],
1165 allowed: t.Iterable[str] = ("expires", "content-location"),
1166) -> None:
1167 """Remove all entity headers from a list or :class:`Headers` object. This
1168 operation works in-place. `Expires` and `Content-Location` headers are
1169 by default not removed. The reason for this is :rfc:`2616` section
1170 10.3.5 which specifies some entity headers that should be sent.
1172 .. versionchanged:: 0.5
1173 added `allowed` parameter.
1175 :param headers: a list or :class:`Headers` object.
1176 :param allowed: a list of headers that should still be allowed even though
1177 they are entity headers.
1178 """
1179 allowed = {x.lower() for x in allowed}
1180 headers[:] = [
1181 (key, value)
1182 for key, value in headers
1183 if not is_entity_header(key) or key.lower() in allowed
1184 ]
1187def remove_hop_by_hop_headers(headers: ds.Headers | list[tuple[str, str]]) -> None:
1188 """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or
1189 :class:`Headers` object. This operation works in-place.
1191 .. versionadded:: 0.5
1193 :param headers: a list or :class:`Headers` object.
1194 """
1195 headers[:] = [
1196 (key, value) for key, value in headers if not is_hop_by_hop_header(key)
1197 ]
1200def is_entity_header(header: str) -> bool:
1201 """Check if a header is an entity header.
1203 .. versionadded:: 0.5
1205 :param header: the header to test.
1206 :return: `True` if it's an entity header, `False` otherwise.
1207 """
1208 return header.lower() in _entity_headers
1211def is_hop_by_hop_header(header: str) -> bool:
1212 """Check if a header is an HTTP/1.1 "Hop-by-Hop" header.
1214 .. versionadded:: 0.5
1216 :param header: the header to test.
1217 :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise.
1218 """
1219 return header.lower() in _hop_by_hop_headers
1222def parse_cookie(
1223 header: WSGIEnvironment | str | None,
1224 cls: type[ds.MultiDict[str, str]] | None = None,
1225) -> ds.MultiDict[str, str]:
1226 """Parse a cookie from a string or WSGI environ.
1228 The same key can be provided multiple times, the values are stored
1229 in-order. The default :class:`MultiDict` will have the first value
1230 first, and all values can be retrieved with
1231 :meth:`MultiDict.getlist`.
1233 :param header: The cookie header as a string, or a WSGI environ dict
1234 with a ``HTTP_COOKIE`` key.
1235 :param cls: A dict-like class to store the parsed cookies in.
1236 Defaults to :class:`MultiDict`.
1238 .. versionchanged:: 3.0
1239 Passing bytes, and the ``charset`` and ``errors`` parameters, were removed.
1241 .. versionchanged:: 1.0
1242 Returns a :class:`MultiDict` instead of a ``TypeConversionDict``.
1244 .. versionchanged:: 0.5
1245 Returns a :class:`TypeConversionDict` instead of a regular dict. The ``cls``
1246 parameter was added.
1247 """
1248 if isinstance(header, dict):
1249 cookie = header.get("HTTP_COOKIE")
1250 else:
1251 cookie = header
1253 if cookie:
1254 cookie = cookie.encode("latin1").decode()
1256 return _sansio_http.parse_cookie(cookie=cookie, cls=cls)
1259_cookie_no_quote_re = re.compile(r"[\w!#$%&'()*+\-./:<=>?@\[\]^`{|}~]*", re.A)
1260_cookie_slash_re = re.compile(rb"[\x00-\x19\",;\\\x7f-\xff]", re.A)
1261_cookie_slash_map = {b'"': b'\\"', b"\\": b"\\\\"}
1262_cookie_slash_map.update(
1263 (v.to_bytes(1, "big"), b"\\%03o" % v)
1264 for v in [*range(0x20), *b",;", *range(0x7F, 256)]
1265)
1268def dump_cookie(
1269 key: str,
1270 value: str = "",
1271 max_age: timedelta | int | None = None,
1272 expires: str | datetime | int | float | None = None,
1273 path: str | None = "/",
1274 domain: str | None = None,
1275 secure: bool = False,
1276 httponly: bool = False,
1277 sync_expires: bool = True,
1278 max_size: int = 4093,
1279 samesite: str | None = None,
1280 partitioned: bool = False,
1281) -> str:
1282 """Create a Set-Cookie header without the ``Set-Cookie`` prefix.
1284 The return value is usually restricted to ascii as the vast majority
1285 of values are properly escaped, but that is no guarantee. It's
1286 tunneled through latin1 as required by :pep:`3333`.
1288 The return value is not ASCII safe if the key contains unicode
1289 characters. This is technically against the specification but
1290 happens in the wild. It's strongly recommended to not use
1291 non-ASCII values for the keys.
1293 :param max_age: should be a number of seconds, or `None` (default) if
1294 the cookie should last only as long as the client's
1295 browser session. Additionally `timedelta` objects
1296 are accepted, too.
1297 :param expires: should be a `datetime` object or unix timestamp.
1298 :param path: limits the cookie to a given path, per default it will
1299 span the whole domain.
1300 :param domain: Use this if you want to set a cross-domain cookie. For
1301 example, ``domain="example.com"`` will set a cookie
1302 that is readable by the domain ``www.example.com``,
1303 ``foo.example.com`` etc. Otherwise, a cookie will only
1304 be readable by the domain that set it.
1305 :param secure: The cookie will only be available via HTTPS
1306 :param httponly: disallow JavaScript to access the cookie. This is an
1307 extension to the cookie standard and probably not
1308 supported by all browsers.
1309 :param charset: the encoding for string values.
1310 :param sync_expires: automatically set expires if max_age is defined
1311 but expires not.
1312 :param max_size: Warn if the final header value exceeds this size. The
1313 default, 4093, should be safely `supported by most browsers
1314 <cookie_>`_. Set to 0 to disable this check.
1315 :param samesite: Limits the scope of the cookie such that it will
1316 only be attached to requests if those requests are same-site.
1317 :param partitioned: Opts the cookie into partitioned storage. This
1318 will also set secure to True
1320 .. _`cookie`: http://browsercookielimits.squawky.net/
1322 .. versionchanged:: 3.1
1323 The ``partitioned`` parameter was added.
1325 .. versionchanged:: 3.0
1326 Passing bytes, and the ``charset`` parameter, were removed.
1328 .. versionchanged:: 2.3.3
1329 The ``path`` parameter is ``/`` by default.
1331 .. versionchanged:: 2.3.1
1332 The value allows more characters without quoting.
1334 .. versionchanged:: 2.3
1335 ``localhost`` and other names without a dot are allowed for the domain. A
1336 leading dot is ignored.
1338 .. versionchanged:: 2.3
1339 The ``path`` parameter is ``None`` by default.
1341 .. versionchanged:: 1.0.0
1342 The string ``'None'`` is accepted for ``samesite``.
1343 """
1344 if path is not None:
1345 # safe = https://url.spec.whatwg.org/#url-path-segment-string
1346 # as well as percent for things that are already quoted
1347 # excluding semicolon since it's part of the header syntax
1348 path = quote(path, safe="%!$&'()*+,/:=@")
1350 if domain:
1351 domain = domain.partition(":")[0].lstrip(".").encode("idna").decode("ascii")
1353 if isinstance(max_age, timedelta):
1354 max_age = int(max_age.total_seconds())
1356 if expires is not None:
1357 if not isinstance(expires, str):
1358 expires = http_date(expires)
1359 elif max_age is not None and sync_expires:
1360 expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age)
1362 if samesite is not None:
1363 samesite = samesite.title()
1365 if samesite not in {"Strict", "Lax", "None"}:
1366 raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.")
1368 if partitioned:
1369 secure = True
1371 # Quote value if it contains characters not allowed by RFC 6265. Slash-escape with
1372 # three octal digits, which matches http.cookies, although the RFC suggests base64.
1373 if not _cookie_no_quote_re.fullmatch(value):
1374 # Work with bytes here, since a UTF-8 character could be multiple bytes.
1375 value = _cookie_slash_re.sub(
1376 lambda m: _cookie_slash_map[m.group()], value.encode()
1377 ).decode("ascii")
1378 value = f'"{value}"'
1380 # Send a non-ASCII key as mojibake. Everything else should already be ASCII.
1381 # TODO Remove encoding dance, it seems like clients accept UTF-8 keys
1382 buf = [f"{key.encode().decode('latin1')}={value}"]
1384 for k, v in (
1385 ("Domain", domain),
1386 ("Expires", expires),
1387 ("Max-Age", max_age),
1388 ("Secure", secure),
1389 ("HttpOnly", httponly),
1390 ("Path", path),
1391 ("SameSite", samesite),
1392 ("Partitioned", partitioned),
1393 ):
1394 if v is None or v is False:
1395 continue
1397 if v is True:
1398 buf.append(k)
1399 continue
1401 buf.append(f"{k}={v}")
1403 rv = "; ".join(buf)
1405 # Warn if the final value of the cookie is larger than the limit. If the cookie is
1406 # too large, then it may be silently ignored by the browser, which can be quite hard
1407 # to debug.
1408 cookie_size = len(rv)
1410 if max_size and cookie_size > max_size:
1411 value_size = len(value)
1412 warnings.warn(
1413 f"The '{key}' cookie is too large: the value was {value_size} bytes but the"
1414 f" header required {cookie_size - value_size} extra bytes. The final size"
1415 f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may"
1416 " silently ignore cookies larger than this.",
1417 stacklevel=2,
1418 )
1420 return rv
1423def is_byte_range_valid(
1424 start: int | None, stop: int | None, length: int | None
1425) -> bool:
1426 """Checks if a given byte content range is valid for the given length.
1428 .. versionadded:: 0.7
1429 """
1430 if (start is None) != (stop is None):
1431 return False
1432 elif start is None:
1433 return length is None or length >= 0
1434 elif length is None:
1435 return 0 <= start < stop # type: ignore
1436 elif start >= stop: # type: ignore
1437 return False
1438 return 0 <= start < length
1441# circular dependencies
1442from . import datastructures as ds # noqa: E402
1443from .sansio import http as _sansio_http # noqa: E402