Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/http.py: 22%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import email.utils
4import re
5import typing as t
6import warnings
7from datetime import date
8from datetime import datetime
9from datetime import time
10from datetime import timedelta
11from datetime import timezone
12from enum import Enum
13from hashlib import sha1
14from time import mktime
15from time import struct_time
16from urllib.parse import quote
17from urllib.parse import unquote
18from urllib.request import parse_http_list as _parse_list_header
20from ._internal import _dt_as_utc
21from ._internal import _plain_int
23if t.TYPE_CHECKING:
24 from _typeshed.wsgi import WSGIEnvironment
26_token_chars = frozenset(
27 "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~"
28)
29_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)')
30_entity_headers = frozenset(
31 [
32 "allow",
33 "content-encoding",
34 "content-language",
35 "content-length",
36 "content-location",
37 "content-md5",
38 "content-range",
39 "content-type",
40 "expires",
41 "last-modified",
42 ]
43)
44_hop_by_hop_headers = frozenset(
45 [
46 "connection",
47 "keep-alive",
48 "proxy-authenticate",
49 "proxy-authorization",
50 "te",
51 "trailer",
52 "transfer-encoding",
53 "upgrade",
54 ]
55)
56HTTP_STATUS_CODES = {
57 100: "Continue",
58 101: "Switching Protocols",
59 102: "Processing",
60 103: "Early Hints", # see RFC 8297
61 200: "OK",
62 201: "Created",
63 202: "Accepted",
64 203: "Non Authoritative Information",
65 204: "No Content",
66 205: "Reset Content",
67 206: "Partial Content",
68 207: "Multi Status",
69 208: "Already Reported", # see RFC 5842
70 226: "IM Used", # see RFC 3229
71 300: "Multiple Choices",
72 301: "Moved Permanently",
73 302: "Found",
74 303: "See Other",
75 304: "Not Modified",
76 305: "Use Proxy",
77 306: "Switch Proxy", # unused
78 307: "Temporary Redirect",
79 308: "Permanent Redirect",
80 400: "Bad Request",
81 401: "Unauthorized",
82 402: "Payment Required", # unused
83 403: "Forbidden",
84 404: "Not Found",
85 405: "Method Not Allowed",
86 406: "Not Acceptable",
87 407: "Proxy Authentication Required",
88 408: "Request Timeout",
89 409: "Conflict",
90 410: "Gone",
91 411: "Length Required",
92 412: "Precondition Failed",
93 413: "Request Entity Too Large",
94 414: "Request URI Too Long",
95 415: "Unsupported Media Type",
96 416: "Requested Range Not Satisfiable",
97 417: "Expectation Failed",
98 418: "I'm a teapot", # see RFC 2324
99 421: "Misdirected Request", # see RFC 7540
100 422: "Unprocessable Entity",
101 423: "Locked",
102 424: "Failed Dependency",
103 425: "Too Early", # see RFC 8470
104 426: "Upgrade Required",
105 428: "Precondition Required", # see RFC 6585
106 429: "Too Many Requests",
107 431: "Request Header Fields Too Large",
108 449: "Retry With", # proprietary MS extension
109 451: "Unavailable For Legal Reasons",
110 500: "Internal Server Error",
111 501: "Not Implemented",
112 502: "Bad Gateway",
113 503: "Service Unavailable",
114 504: "Gateway Timeout",
115 505: "HTTP Version Not Supported",
116 506: "Variant Also Negotiates", # see RFC 2295
117 507: "Insufficient Storage",
118 508: "Loop Detected", # see RFC 5842
119 510: "Not Extended",
120 511: "Network Authentication Failed",
121}
124class COEP(Enum):
125 """Cross Origin Embedder Policies"""
127 UNSAFE_NONE = "unsafe-none"
128 REQUIRE_CORP = "require-corp"
131class COOP(Enum):
132 """Cross Origin Opener Policies"""
134 UNSAFE_NONE = "unsafe-none"
135 SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups"
136 SAME_ORIGIN = "same-origin"
139def quote_header_value(value: t.Any, allow_token: bool = True) -> str:
140 """Add double quotes around a header value. If the header contains only ASCII token
141 characters, it will be returned unchanged. If the header contains ``"`` or ``\\``
142 characters, they will be escaped with an additional ``\\`` character.
144 This is the reverse of :func:`unquote_header_value`.
146 :param value: The value to quote. Will be converted to a string.
147 :param allow_token: Disable to quote the value even if it only has token characters.
149 .. versionchanged:: 3.0
150 Passing bytes is not supported.
152 .. versionchanged:: 3.0
153 The ``extra_chars`` parameter is removed.
155 .. versionchanged:: 2.3
156 The value is quoted if it is the empty string.
158 .. versionadded:: 0.5
159 """
160 value_str = str(value)
162 if not value_str:
163 return '""'
165 if allow_token:
166 token_chars = _token_chars
168 if token_chars.issuperset(value_str):
169 return value_str
171 value_str = value_str.replace("\\", "\\\\").replace('"', '\\"')
172 return f'"{value_str}"'
175def unquote_header_value(value: str) -> str:
176 """Remove double quotes and decode slash-escaped ``"`` and ``\\`` characters in a
177 header value.
179 This is the reverse of :func:`quote_header_value`.
181 :param value: The header value to unquote.
183 .. versionchanged:: 3.0
184 The ``is_filename`` parameter is removed.
185 """
186 if len(value) >= 2 and value[0] == value[-1] == '"':
187 value = value[1:-1]
188 return value.replace("\\\\", "\\").replace('\\"', '"')
190 return value
193def dump_options_header(header: str | None, options: t.Mapping[str, t.Any]) -> str:
194 """Produce a header value and ``key=value`` parameters separated by semicolons
195 ``;``. For example, the ``Content-Type`` header.
197 .. code-block:: python
199 dump_options_header("text/html", {"charset": "UTF-8"})
200 'text/html; charset=UTF-8'
202 This is the reverse of :func:`parse_options_header`.
204 If a value contains non-token characters, it will be quoted.
206 If a value is ``None``, the parameter is skipped.
208 In some keys for some headers, a UTF-8 value can be encoded using a special
209 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will
210 not produce that format automatically, but if a given key ends with an asterisk
211 ``*``, the value is assumed to have that form and will not be quoted further.
213 :param header: The primary header value.
214 :param options: Parameters to encode as ``key=value`` pairs.
216 .. versionchanged:: 2.3
217 Keys with ``None`` values are skipped rather than treated as a bare key.
219 .. versionchanged:: 2.2.3
220 If a key ends with ``*``, its value will not be quoted.
221 """
222 segments = []
224 if header is not None:
225 segments.append(header)
227 for key, value in options.items():
228 if value is None:
229 continue
231 if key[-1] == "*":
232 segments.append(f"{key}={value}")
233 else:
234 segments.append(f"{key}={quote_header_value(value)}")
236 return "; ".join(segments)
239def dump_header(iterable: dict[str, t.Any] | t.Iterable[t.Any]) -> str:
240 """Produce a header value from a list of items or ``key=value`` pairs, separated by
241 commas ``,``.
243 This is the reverse of :func:`parse_list_header`, :func:`parse_dict_header`, and
244 :func:`parse_set_header`.
246 If a value contains non-token characters, it will be quoted.
248 If a value is ``None``, the key is output alone.
250 In some keys for some headers, a UTF-8 value can be encoded using a special
251 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will
252 not produce that format automatically, but if a given key ends with an asterisk
253 ``*``, the value is assumed to have that form and will not be quoted further.
255 .. code-block:: python
257 dump_header(["foo", "bar baz"])
258 'foo, "bar baz"'
260 dump_header({"foo": "bar baz"})
261 'foo="bar baz"'
263 :param iterable: The items to create a header from.
265 .. versionchanged:: 3.0
266 The ``allow_token`` parameter is removed.
268 .. versionchanged:: 2.2.3
269 If a key ends with ``*``, its value will not be quoted.
270 """
271 if isinstance(iterable, dict):
272 items = []
274 for key, value in iterable.items():
275 if value is None:
276 items.append(key)
277 elif key[-1] == "*":
278 items.append(f"{key}={value}")
279 else:
280 items.append(f"{key}={quote_header_value(value)}")
281 else:
282 items = [quote_header_value(x) for x in iterable]
284 return ", ".join(items)
287def dump_csp_header(header: ds.ContentSecurityPolicy) -> str:
288 """Dump a Content Security Policy header.
290 These are structured into policies such as "default-src 'self';
291 script-src 'self'".
293 .. versionadded:: 1.0.0
294 Support for Content Security Policy headers was added.
296 """
297 return "; ".join(f"{key} {value}" for key, value in header.items())
300def parse_list_header(value: str) -> list[str]:
301 """Parse a header value that consists of a list of comma separated items according
302 to `RFC 9110 <https://httpwg.org/specs/rfc9110.html#abnf.extension>`__.
304 This extends :func:`urllib.request.parse_http_list` to remove surrounding quotes
305 from values.
307 .. code-block:: python
309 parse_list_header('token, "quoted value"')
310 ['token', 'quoted value']
312 This is the reverse of :func:`dump_header`.
314 :param value: The header value to parse.
315 """
316 result = []
318 for item in _parse_list_header(value):
319 if len(item) >= 2 and item[0] == item[-1] == '"':
320 item = item[1:-1]
322 result.append(item)
324 return result
327def parse_dict_header(value: str) -> dict[str, str | None]:
328 """Parse a list header using :func:`parse_list_header`, then parse each item as a
329 ``key=value`` pair.
331 .. code-block:: python
333 parse_dict_header('a=b, c="d, e", f')
334 {"a": "b", "c": "d, e", "f": None}
336 This is the reverse of :func:`dump_header`.
338 If a key does not have a value, it is ``None``.
340 This handles charsets for values as described in
341 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__. Only ASCII, UTF-8,
342 and ISO-8859-1 charsets are accepted, otherwise the value remains quoted.
344 :param value: The header value to parse.
346 .. versionchanged:: 3.0
347 Passing bytes is not supported.
349 .. versionchanged:: 3.0
350 The ``cls`` argument is removed.
352 .. versionchanged:: 2.3
353 Added support for ``key*=charset''value`` encoded items.
355 .. versionchanged:: 0.9
356 The ``cls`` argument was added.
357 """
358 result: dict[str, str | None] = {}
360 for item in parse_list_header(value):
361 key, has_value, value = item.partition("=")
362 key = key.strip()
364 if not key:
365 # =value is not valid
366 continue
368 if not has_value:
369 result[key] = None
370 continue
372 value = value.strip()
373 encoding: str | None = None
375 if key[-1] == "*":
376 # key*=charset''value becomes key=value, where value is percent encoded
377 # adapted from parse_options_header, without the continuation handling
378 key = key[:-1]
379 match = _charset_value_re.match(value)
381 if match:
382 # If there is a charset marker in the value, split it off.
383 encoding, value = match.groups()
384 encoding = encoding.lower()
386 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
387 # This list will not be extended further. An invalid encoding will leave the
388 # value quoted.
389 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
390 # invalid bytes are replaced during unquoting
391 value = unquote(value, encoding=encoding)
393 if len(value) >= 2 and value[0] == value[-1] == '"':
394 value = value[1:-1]
396 result[key] = value
398 return result
401# https://httpwg.org/specs/rfc9110.html#parameter
402_parameter_key_re = re.compile(r"([\w!#$%&'*+\-.^`|~]+)=", flags=re.ASCII)
403_parameter_token_value_re = re.compile(r"[\w!#$%&'*+\-.^`|~]+", flags=re.ASCII)
404# https://www.rfc-editor.org/rfc/rfc2231#section-4
405_charset_value_re = re.compile(
406 r"""
407 ([\w!#$%&*+\-.^`|~]*)' # charset part, could be empty
408 [\w!#$%&*+\-.^`|~]*' # don't care about language part, usually empty
409 ([\w!#$%&'*+\-.^`|~]+) # one or more token chars with percent encoding
410 """,
411 re.ASCII | re.VERBOSE,
412)
413# https://www.rfc-editor.org/rfc/rfc2231#section-3
414_continuation_re = re.compile(r"\*(\d+)$", re.ASCII)
417def parse_options_header(value: str | None) -> tuple[str, dict[str, str]]:
418 """Parse a header that consists of a value with ``key=value`` parameters separated
419 by semicolons ``;``. For example, the ``Content-Type`` header.
421 .. code-block:: python
423 parse_options_header("text/html; charset=UTF-8")
424 ('text/html', {'charset': 'UTF-8'})
426 parse_options_header("")
427 ("", {})
429 This is the reverse of :func:`dump_options_header`.
431 This parses valid parameter parts as described in
432 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#parameter>`__. Invalid parts are
433 skipped.
435 This handles continuations and charsets as described in
436 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__, although not as
437 strictly as the RFC. Only ASCII, UTF-8, and ISO-8859-1 charsets are accepted,
438 otherwise the value remains quoted.
440 Clients may not be consistent in how they handle a quote character within a quoted
441 value. The `HTML Standard <https://html.spec.whatwg.org/#multipart-form-data>`__
442 replaces it with ``%22`` in multipart form data.
443 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#quoted.strings>`__ uses backslash
444 escapes in HTTP headers. Both are decoded to the ``"`` character.
446 Clients may not be consistent in how they handle non-ASCII characters. HTML
447 documents must declare ``<meta charset=UTF-8>``, otherwise browsers may replace with
448 HTML character references, which can be decoded using :func:`html.unescape`.
450 :param value: The header value to parse.
451 :return: ``(value, options)``, where ``options`` is a dict
453 .. versionchanged:: 2.3
454 Invalid parts, such as keys with no value, quoted keys, and incorrectly quoted
455 values, are discarded instead of treating as ``None``.
457 .. versionchanged:: 2.3
458 Only ASCII, UTF-8, and ISO-8859-1 are accepted for charset values.
460 .. versionchanged:: 2.3
461 Escaped quotes in quoted values, like ``%22`` and ``\\"``, are handled.
463 .. versionchanged:: 2.2
464 Option names are always converted to lowercase.
466 .. versionchanged:: 2.2
467 The ``multiple`` parameter was removed.
469 .. versionchanged:: 0.15
470 :rfc:`2231` parameter continuations are handled.
472 .. versionadded:: 0.5
473 """
474 if value is None:
475 return "", {}
477 value, _, rest = value.partition(";")
478 value = value.strip()
479 rest = rest.strip()
481 if not value or not rest:
482 # empty (invalid) value, or value without options
483 return value, {}
485 # Collect all valid key=value parts without processing the value.
486 parts: list[tuple[str, str]] = []
488 while True:
489 if (m := _parameter_key_re.match(rest)) is not None:
490 pk = m.group(1).lower()
491 rest = rest[m.end() :]
493 # Value may be a token.
494 if (m := _parameter_token_value_re.match(rest)) is not None:
495 parts.append((pk, m.group()))
497 # Value may be a quoted string, find the closing quote.
498 elif rest[:1] == '"':
499 pos = 1
500 length = len(rest)
502 while pos < length:
503 if rest[pos : pos + 2] in {"\\\\", '\\"'}:
504 # Consume escaped slashes and quotes.
505 pos += 2
506 elif rest[pos] == '"':
507 # Stop at an unescaped quote.
508 parts.append((pk, rest[: pos + 1]))
509 rest = rest[pos + 1 :]
510 break
511 else:
512 # Consume any other character.
513 pos += 1
515 # Find the next section delimited by `;`, if any.
516 if (end := rest.find(";")) == -1:
517 break
519 rest = rest[end + 1 :].lstrip()
521 options: dict[str, str] = {}
522 encoding: str | None = None
523 continued_encoding: str | None = None
525 # For each collected part, process optional charset and continuation,
526 # unquote quoted values.
527 for pk, pv in parts:
528 if pk[-1] == "*":
529 # key*=charset''value becomes key=value, where value is percent encoded
530 pk = pk[:-1]
531 match = _charset_value_re.match(pv)
533 if match:
534 # If there is a valid charset marker in the value, split it off.
535 encoding, pv = match.groups()
536 # This might be the empty string, handled next.
537 encoding = encoding.lower()
539 # No charset marker, or marker with empty charset value.
540 if not encoding:
541 encoding = continued_encoding
543 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
544 # This list will not be extended further. An invalid encoding will leave the
545 # value quoted.
546 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
547 # Continuation parts don't require their own charset marker. This is
548 # looser than the RFC, it will persist across different keys and allows
549 # changing the charset during a continuation. But this implementation is
550 # much simpler than tracking the full state.
551 continued_encoding = encoding
552 # invalid bytes are replaced during unquoting
553 pv = unquote(pv, encoding=encoding)
555 # Remove quotes. At this point the value cannot be empty or a single quote.
556 if pv[0] == pv[-1] == '"':
557 # HTTP headers use slash, multipart form data uses percent
558 pv = pv[1:-1].replace("\\\\", "\\").replace('\\"', '"').replace("%22", '"')
560 match = _continuation_re.search(pk)
562 if match:
563 # key*0=a; key*1=b becomes key=ab
564 pk = pk[: match.start()]
565 options[pk] = options.get(pk, "") + pv
566 else:
567 options[pk] = pv
569 return value, options
572_q_value_re = re.compile(r"-?\d+(\.\d+)?", re.ASCII)
573_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept")
576@t.overload
577def parse_accept_header(value: str | None) -> ds.Accept: ...
580@t.overload
581def parse_accept_header(value: str | None, cls: type[_TAnyAccept]) -> _TAnyAccept: ...
584def parse_accept_header(
585 value: str | None, cls: type[_TAnyAccept] | None = None
586) -> _TAnyAccept:
587 """Parse an ``Accept`` header according to
588 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#field.accept>`__.
590 Returns an :class:`.Accept` instance, which can sort and inspect items based on
591 their quality parameter. When parsing ``Accept-Charset``, ``Accept-Encoding``, or
592 ``Accept-Language``, pass the appropriate :class:`.Accept` subclass.
594 :param value: The header value to parse.
595 :param cls: The :class:`.Accept` class to wrap the result in.
596 :return: An instance of ``cls``.
598 .. versionchanged:: 2.3
599 Parse according to RFC 9110. Items with invalid ``q`` values are skipped.
600 """
601 if cls is None:
602 cls = t.cast(type[_TAnyAccept], ds.Accept)
604 if not value:
605 return cls(None)
607 result = []
609 for item in parse_list_header(value):
610 item, options = parse_options_header(item)
612 if "q" in options:
613 # pop q, remaining options are reconstructed
614 q_str = options.pop("q").strip()
616 if _q_value_re.fullmatch(q_str) is None:
617 # ignore an invalid q
618 continue
620 q = float(q_str)
622 if q < 0 or q > 1:
623 # ignore an invalid q
624 continue
625 else:
626 q = 1
628 if options:
629 # reconstruct the media type with any options
630 item = dump_options_header(item, options)
632 result.append((item, q))
634 return cls(result)
637_TAnyCC = t.TypeVar("_TAnyCC", bound="ds.cache_control._CacheControl")
640@t.overload
641def parse_cache_control_header(
642 value: str | None,
643 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None,
644) -> ds.RequestCacheControl: ...
647@t.overload
648def parse_cache_control_header(
649 value: str | None,
650 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None,
651 cls: type[_TAnyCC] = ...,
652) -> _TAnyCC: ...
655def parse_cache_control_header(
656 value: str | None,
657 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None,
658 cls: type[_TAnyCC] | None = None,
659) -> _TAnyCC:
660 """Parse a cache control header. The RFC differs between response and
661 request cache control, this method does not. It's your responsibility
662 to not use the wrong control statements.
664 .. versionadded:: 0.5
665 The `cls` was added. If not specified an immutable
666 :class:`~werkzeug.datastructures.RequestCacheControl` is returned.
668 :param value: a cache control header to be parsed.
669 :param on_update: an optional callable that is called every time a value
670 on the :class:`~werkzeug.datastructures.CacheControl`
671 object is changed.
672 :param cls: the class for the returned object. By default
673 :class:`~werkzeug.datastructures.RequestCacheControl` is used.
674 :return: a `cls` object.
675 """
676 if cls is None:
677 cls = t.cast("type[_TAnyCC]", ds.RequestCacheControl)
679 if not value:
680 return cls((), on_update)
682 return cls(parse_dict_header(value), on_update)
685_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy")
688@t.overload
689def parse_csp_header(
690 value: str | None,
691 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None,
692) -> ds.ContentSecurityPolicy: ...
695@t.overload
696def parse_csp_header(
697 value: str | None,
698 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None,
699 cls: type[_TAnyCSP] = ...,
700) -> _TAnyCSP: ...
703def parse_csp_header(
704 value: str | None,
705 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None,
706 cls: type[_TAnyCSP] | None = None,
707) -> _TAnyCSP:
708 """Parse a Content Security Policy header.
710 .. versionadded:: 1.0.0
711 Support for Content Security Policy headers was added.
713 :param value: a csp header to be parsed.
714 :param on_update: an optional callable that is called every time a value
715 on the object is changed.
716 :param cls: the class for the returned object. By default
717 :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used.
718 :return: a `cls` object.
719 """
720 if cls is None:
721 cls = t.cast("type[_TAnyCSP]", ds.ContentSecurityPolicy)
723 if value is None:
724 return cls((), on_update)
726 items = []
728 for policy in value.split(";"):
729 policy = policy.strip()
731 # Ignore badly formatted policies (no space)
732 if " " in policy:
733 directive, value = policy.strip().split(" ", 1)
734 items.append((directive.strip(), value.strip()))
736 return cls(items, on_update)
739def parse_set_header(
740 value: str | None,
741 on_update: t.Callable[[ds.HeaderSet], None] | None = None,
742) -> ds.HeaderSet:
743 """Parse a set-like header and return a
744 :class:`~werkzeug.datastructures.HeaderSet` object:
746 >>> hs = parse_set_header('token, "quoted value"')
748 The return value is an object that treats the items case-insensitively
749 and keeps the order of the items:
751 >>> 'TOKEN' in hs
752 True
753 >>> hs.index('quoted value')
754 1
755 >>> hs
756 HeaderSet(['token', 'quoted value'])
758 To create a header from the :class:`HeaderSet` again, use the
759 :func:`dump_header` function.
761 :param value: a set header to be parsed.
762 :param on_update: an optional callable that is called every time a
763 value on the :class:`~werkzeug.datastructures.HeaderSet`
764 object is changed.
765 :return: a :class:`~werkzeug.datastructures.HeaderSet`
766 """
767 if not value:
768 return ds.HeaderSet(None, on_update)
769 return ds.HeaderSet(parse_list_header(value), on_update)
772def parse_if_range_header(value: str | None) -> ds.IfRange:
773 """Parses an if-range header which can be an etag or a date. Returns
774 a :class:`~werkzeug.datastructures.IfRange` object.
776 .. versionchanged:: 2.0
777 If the value represents a datetime, it is timezone-aware.
779 .. versionadded:: 0.7
780 """
781 if not value:
782 return ds.IfRange()
783 date = parse_date(value)
784 if date is not None:
785 return ds.IfRange(date=date)
786 # drop weakness information
787 return ds.IfRange(unquote_etag(value)[0])
790def parse_range_header(
791 value: str | None, make_inclusive: bool = True
792) -> ds.Range | None:
793 """Parses a range header into a :class:`~werkzeug.datastructures.Range`
794 object. If the header is missing or malformed `None` is returned.
795 `ranges` is a list of ``(start, stop)`` tuples where the ranges are
796 non-inclusive.
798 .. versionadded:: 0.7
799 """
800 if not value or "=" not in value:
801 return None
803 ranges = []
804 last_end = 0
805 units, rng = value.split("=", 1)
806 units = units.strip().lower()
808 for item in rng.split(","):
809 item = item.strip()
810 if "-" not in item:
811 return None
812 if item.startswith("-"):
813 if last_end < 0:
814 return None
815 try:
816 begin = _plain_int(item)
817 except ValueError:
818 return None
819 end = None
820 last_end = -1
821 elif "-" in item:
822 begin_str, end_str = item.split("-", 1)
823 begin_str = begin_str.strip()
824 end_str = end_str.strip()
826 try:
827 begin = _plain_int(begin_str)
828 except ValueError:
829 return None
831 if begin < last_end or last_end < 0:
832 return None
833 if end_str:
834 try:
835 end = _plain_int(end_str) + 1
836 except ValueError:
837 return None
839 if begin >= end:
840 return None
841 else:
842 end = None
843 last_end = end if end is not None else -1
844 ranges.append((begin, end))
846 return ds.Range(units, ranges)
849def parse_content_range_header(
850 value: str | None,
851 on_update: t.Callable[[ds.ContentRange], None] | None = None,
852) -> ds.ContentRange | None:
853 """Parses a range header into a
854 :class:`~werkzeug.datastructures.ContentRange` object or `None` if
855 parsing is not possible.
857 .. versionadded:: 0.7
859 :param value: a content range header to be parsed.
860 :param on_update: an optional callable that is called every time a value
861 on the :class:`~werkzeug.datastructures.ContentRange`
862 object is changed.
863 """
864 if value is None:
865 return None
866 try:
867 units, rangedef = (value or "").strip().split(None, 1)
868 except ValueError:
869 return None
871 if "/" not in rangedef:
872 return None
873 rng, length_str = rangedef.split("/", 1)
874 if length_str == "*":
875 length = None
876 else:
877 try:
878 length = _plain_int(length_str)
879 except ValueError:
880 return None
882 if rng == "*":
883 if not is_byte_range_valid(None, None, length):
884 return None
886 return ds.ContentRange(units, None, None, length, on_update=on_update)
887 elif "-" not in rng:
888 return None
890 start_str, stop_str = rng.split("-", 1)
891 try:
892 start = _plain_int(start_str)
893 stop = _plain_int(stop_str) + 1
894 except ValueError:
895 return None
897 if is_byte_range_valid(start, stop, length):
898 return ds.ContentRange(units, start, stop, length, on_update=on_update)
900 return None
903def quote_etag(etag: str, weak: bool = False) -> str:
904 """Quote an etag.
906 :param etag: the etag to quote.
907 :param weak: set to `True` to tag it "weak".
908 """
909 if '"' in etag:
910 raise ValueError("invalid etag")
911 etag = f'"{etag}"'
912 if weak:
913 etag = f"W/{etag}"
914 return etag
917@t.overload
918def unquote_etag(etag: str) -> tuple[str, bool]: ...
919@t.overload
920def unquote_etag(etag: None) -> tuple[None, None]: ...
921def unquote_etag(
922 etag: str | None,
923) -> tuple[str, bool] | tuple[None, None]:
924 """Unquote a single etag:
926 >>> unquote_etag('W/"bar"')
927 ('bar', True)
928 >>> unquote_etag('"bar"')
929 ('bar', False)
931 :param etag: the etag identifier to unquote.
932 :return: a ``(etag, weak)`` tuple.
933 """
934 if not etag:
935 return None, None
936 etag = etag.strip()
937 weak = False
938 if etag.startswith(("W/", "w/")):
939 weak = True
940 etag = etag[2:]
941 if etag[:1] == etag[-1:] == '"':
942 etag = etag[1:-1]
943 return etag, weak
946def parse_etags(value: str | None) -> ds.ETags:
947 """Parse an etag header.
949 :param value: the tag header to parse
950 :return: an :class:`~werkzeug.datastructures.ETags` object.
951 """
952 if not value:
953 return ds.ETags()
954 strong = []
955 weak = []
956 end = len(value)
957 pos = 0
958 while pos < end:
959 match = _etag_re.match(value, pos)
960 if match is None:
961 break
962 is_weak, quoted, raw = match.groups()
963 if raw == "*":
964 return ds.ETags(star_tag=True)
965 elif quoted:
966 raw = quoted
967 if is_weak:
968 weak.append(raw)
969 else:
970 strong.append(raw)
971 pos = match.end()
972 return ds.ETags(strong, weak)
975def generate_etag(data: bytes) -> str:
976 """Generate an etag for some data.
978 .. versionchanged:: 2.0
979 Use SHA-1. MD5 may not be available in some environments.
980 """
981 return sha1(data).hexdigest()
984def parse_date(value: str | None) -> datetime | None:
985 """Parse an :rfc:`2822` date into a timezone-aware
986 :class:`datetime.datetime` object, or ``None`` if parsing fails.
988 This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It
989 returns ``None`` if parsing fails instead of raising an exception,
990 and always returns a timezone-aware datetime object. If the string
991 doesn't have timezone information, it is assumed to be UTC.
993 :param value: A string with a supported date format.
995 .. versionchanged:: 2.0
996 Return a timezone-aware datetime object. Use
997 ``email.utils.parsedate_to_datetime``.
998 """
999 if value is None:
1000 return None
1002 try:
1003 dt = email.utils.parsedate_to_datetime(value)
1004 except (TypeError, ValueError):
1005 return None
1007 if dt.tzinfo is None:
1008 return dt.replace(tzinfo=timezone.utc)
1010 return dt
1013def http_date(
1014 timestamp: datetime | date | int | float | struct_time | None = None,
1015) -> str:
1016 """Format a datetime object or timestamp into an :rfc:`2822` date
1017 string.
1019 This is a wrapper for :func:`email.utils.format_datetime`. It
1020 assumes naive datetime objects are in UTC instead of raising an
1021 exception.
1023 :param timestamp: The datetime or timestamp to format. Defaults to
1024 the current time.
1026 .. versionchanged:: 2.0
1027 Use ``email.utils.format_datetime``. Accept ``date`` objects.
1028 """
1029 if isinstance(timestamp, date):
1030 if not isinstance(timestamp, datetime):
1031 # Assume plain date is midnight UTC.
1032 timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc)
1033 else:
1034 # Ensure datetime is timezone-aware.
1035 timestamp = _dt_as_utc(timestamp)
1037 return email.utils.format_datetime(timestamp, usegmt=True)
1039 if isinstance(timestamp, struct_time):
1040 timestamp = mktime(timestamp)
1042 return email.utils.formatdate(timestamp, usegmt=True)
1045def parse_age(value: str | None = None) -> timedelta | None:
1046 """Parses a base-10 integer count of seconds into a timedelta.
1048 If parsing fails, the return value is `None`.
1050 :param value: a string consisting of an integer represented in base-10
1051 :return: a :class:`datetime.timedelta` object or `None`.
1052 """
1053 if not value:
1054 return None
1055 try:
1056 seconds = int(value)
1057 except ValueError:
1058 return None
1059 if seconds < 0:
1060 return None
1061 try:
1062 return timedelta(seconds=seconds)
1063 except OverflowError:
1064 return None
1067def dump_age(age: timedelta | int | None = None) -> str | None:
1068 """Formats the duration as a base-10 integer.
1070 :param age: should be an integer number of seconds,
1071 a :class:`datetime.timedelta` object, or,
1072 if the age is unknown, `None` (default).
1073 """
1074 if age is None:
1075 return None
1076 if isinstance(age, timedelta):
1077 age = int(age.total_seconds())
1078 else:
1079 age = int(age)
1081 if age < 0:
1082 raise ValueError("age cannot be negative")
1084 return str(age)
1087def is_resource_modified(
1088 environ: WSGIEnvironment,
1089 etag: str | None = None,
1090 data: bytes | None = None,
1091 last_modified: datetime | str | None = None,
1092 ignore_if_range: bool = True,
1093) -> bool:
1094 """Convenience method for conditional requests.
1096 :param environ: the WSGI environment of the request to be checked.
1097 :param etag: the etag for the response for comparison.
1098 :param data: or alternatively the data of the response to automatically
1099 generate an etag using :func:`generate_etag`.
1100 :param last_modified: an optional date of the last modification.
1101 :param ignore_if_range: If `False`, `If-Range` header will be taken into
1102 account.
1103 :return: `True` if the resource was modified, otherwise `False`.
1105 .. versionchanged:: 2.0
1106 SHA-1 is used to generate an etag value for the data. MD5 may
1107 not be available in some environments.
1109 .. versionchanged:: 1.0.0
1110 The check is run for methods other than ``GET`` and ``HEAD``.
1111 """
1112 return _sansio_http.is_resource_modified(
1113 http_range=environ.get("HTTP_RANGE"),
1114 http_if_range=environ.get("HTTP_IF_RANGE"),
1115 http_if_modified_since=environ.get("HTTP_IF_MODIFIED_SINCE"),
1116 http_if_none_match=environ.get("HTTP_IF_NONE_MATCH"),
1117 http_if_match=environ.get("HTTP_IF_MATCH"),
1118 etag=etag,
1119 data=data,
1120 last_modified=last_modified,
1121 ignore_if_range=ignore_if_range,
1122 )
1125def remove_entity_headers(
1126 headers: ds.Headers | list[tuple[str, str]],
1127 allowed: t.Iterable[str] = ("expires", "content-location"),
1128) -> None:
1129 """Remove all entity headers from a list or :class:`Headers` object. This
1130 operation works in-place. `Expires` and `Content-Location` headers are
1131 by default not removed. The reason for this is :rfc:`2616` section
1132 10.3.5 which specifies some entity headers that should be sent.
1134 .. versionchanged:: 0.5
1135 added `allowed` parameter.
1137 :param headers: a list or :class:`Headers` object.
1138 :param allowed: a list of headers that should still be allowed even though
1139 they are entity headers.
1140 """
1141 allowed = {x.lower() for x in allowed}
1142 headers[:] = [
1143 (key, value)
1144 for key, value in headers
1145 if not is_entity_header(key) or key.lower() in allowed
1146 ]
1149def remove_hop_by_hop_headers(headers: ds.Headers | list[tuple[str, str]]) -> None:
1150 """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or
1151 :class:`Headers` object. This operation works in-place.
1153 .. versionadded:: 0.5
1155 :param headers: a list or :class:`Headers` object.
1156 """
1157 headers[:] = [
1158 (key, value) for key, value in headers if not is_hop_by_hop_header(key)
1159 ]
1162def is_entity_header(header: str) -> bool:
1163 """Check if a header is an entity header.
1165 .. versionadded:: 0.5
1167 :param header: the header to test.
1168 :return: `True` if it's an entity header, `False` otherwise.
1169 """
1170 return header.lower() in _entity_headers
1173def is_hop_by_hop_header(header: str) -> bool:
1174 """Check if a header is an HTTP/1.1 "Hop-by-Hop" header.
1176 .. versionadded:: 0.5
1178 :param header: the header to test.
1179 :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise.
1180 """
1181 return header.lower() in _hop_by_hop_headers
1184def parse_cookie(
1185 header: WSGIEnvironment | str | None,
1186 cls: type[ds.MultiDict[str, str]] | None = None,
1187) -> ds.MultiDict[str, str]:
1188 """Parse a cookie from a string or WSGI environ.
1190 The same key can be provided multiple times, the values are stored
1191 in-order. The default :class:`MultiDict` will have the first value
1192 first, and all values can be retrieved with
1193 :meth:`MultiDict.getlist`.
1195 :param header: The cookie header as a string, or a WSGI environ dict
1196 with a ``HTTP_COOKIE`` key.
1197 :param cls: A dict-like class to store the parsed cookies in.
1198 Defaults to :class:`MultiDict`.
1200 .. versionchanged:: 3.0
1201 Passing bytes, and the ``charset`` and ``errors`` parameters, were removed.
1203 .. versionchanged:: 1.0
1204 Returns a :class:`MultiDict` instead of a ``TypeConversionDict``.
1206 .. versionchanged:: 0.5
1207 Returns a :class:`TypeConversionDict` instead of a regular dict. The ``cls``
1208 parameter was added.
1209 """
1210 if isinstance(header, dict):
1211 cookie = header.get("HTTP_COOKIE")
1212 else:
1213 cookie = header
1215 if cookie:
1216 cookie = cookie.encode("latin1").decode()
1218 return _sansio_http.parse_cookie(cookie=cookie, cls=cls)
1221_cookie_no_quote_re = re.compile(r"[\w!#$%&'()*+\-./:<=>?@\[\]^`{|}~]*", re.A)
1222_cookie_slash_re = re.compile(rb"[\x00-\x19\",;\\\x7f-\xff]", re.A)
1223_cookie_slash_map = {b'"': b'\\"', b"\\": b"\\\\"}
1224_cookie_slash_map.update(
1225 (v.to_bytes(1, "big"), b"\\%03o" % v)
1226 for v in [*range(0x20), *b",;", *range(0x7F, 256)]
1227)
1230def dump_cookie(
1231 key: str,
1232 value: str = "",
1233 max_age: timedelta | int | None = None,
1234 expires: str | datetime | int | float | None = None,
1235 path: str | None = "/",
1236 domain: str | None = None,
1237 secure: bool = False,
1238 httponly: bool = False,
1239 sync_expires: bool = True,
1240 max_size: int = 4093,
1241 samesite: str | None = None,
1242 partitioned: bool = False,
1243) -> str:
1244 """Create a Set-Cookie header without the ``Set-Cookie`` prefix.
1246 The return value is usually restricted to ascii as the vast majority
1247 of values are properly escaped, but that is no guarantee. It's
1248 tunneled through latin1 as required by :pep:`3333`.
1250 The return value is not ASCII safe if the key contains unicode
1251 characters. This is technically against the specification but
1252 happens in the wild. It's strongly recommended to not use
1253 non-ASCII values for the keys.
1255 :param max_age: should be a number of seconds, or `None` (default) if
1256 the cookie should last only as long as the client's
1257 browser session. Additionally `timedelta` objects
1258 are accepted, too.
1259 :param expires: should be a `datetime` object or unix timestamp.
1260 :param path: limits the cookie to a given path, per default it will
1261 span the whole domain.
1262 :param domain: Use this if you want to set a cross-domain cookie. For
1263 example, ``domain="example.com"`` will set a cookie
1264 that is readable by the domain ``www.example.com``,
1265 ``foo.example.com`` etc. Otherwise, a cookie will only
1266 be readable by the domain that set it.
1267 :param secure: The cookie will only be available via HTTPS
1268 :param httponly: disallow JavaScript to access the cookie. This is an
1269 extension to the cookie standard and probably not
1270 supported by all browsers.
1271 :param charset: the encoding for string values.
1272 :param sync_expires: automatically set expires if max_age is defined
1273 but expires not.
1274 :param max_size: Warn if the final header value exceeds this size. The
1275 default, 4093, should be safely `supported by most browsers
1276 <cookie_>`_. Set to 0 to disable this check.
1277 :param samesite: Limits the scope of the cookie such that it will
1278 only be attached to requests if those requests are same-site.
1279 :param partitioned: Opts the cookie into partitioned storage. This
1280 will also set secure to True
1282 .. _`cookie`: http://browsercookielimits.squawky.net/
1284 .. versionchanged:: 3.1
1285 The ``partitioned`` parameter was added.
1287 .. versionchanged:: 3.0
1288 Passing bytes, and the ``charset`` parameter, were removed.
1290 .. versionchanged:: 2.3.3
1291 The ``path`` parameter is ``/`` by default.
1293 .. versionchanged:: 2.3.1
1294 The value allows more characters without quoting.
1296 .. versionchanged:: 2.3
1297 ``localhost`` and other names without a dot are allowed for the domain. A
1298 leading dot is ignored.
1300 .. versionchanged:: 2.3
1301 The ``path`` parameter is ``None`` by default.
1303 .. versionchanged:: 1.0.0
1304 The string ``'None'`` is accepted for ``samesite``.
1305 """
1306 if path is not None:
1307 # safe = https://url.spec.whatwg.org/#url-path-segment-string
1308 # as well as percent for things that are already quoted
1309 # excluding semicolon since it's part of the header syntax
1310 path = quote(path, safe="%!$&'()*+,/:=@")
1312 if domain:
1313 domain = domain.partition(":")[0].lstrip(".").encode("idna").decode("ascii")
1315 if isinstance(max_age, timedelta):
1316 max_age = int(max_age.total_seconds())
1318 if expires is not None:
1319 if not isinstance(expires, str):
1320 expires = http_date(expires)
1321 elif max_age is not None and sync_expires:
1322 expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age)
1324 if samesite is not None:
1325 samesite = samesite.title()
1327 if samesite not in {"Strict", "Lax", "None"}:
1328 raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.")
1330 if partitioned:
1331 secure = True
1333 # Quote value if it contains characters not allowed by RFC 6265. Slash-escape with
1334 # three octal digits, which matches http.cookies, although the RFC suggests base64.
1335 if not _cookie_no_quote_re.fullmatch(value):
1336 # Work with bytes here, since a UTF-8 character could be multiple bytes.
1337 value = _cookie_slash_re.sub(
1338 lambda m: _cookie_slash_map[m.group()], value.encode()
1339 ).decode("ascii")
1340 value = f'"{value}"'
1342 # Send a non-ASCII key as mojibake. Everything else should already be ASCII.
1343 # TODO Remove encoding dance, it seems like clients accept UTF-8 keys
1344 buf = [f"{key.encode().decode('latin1')}={value}"]
1346 for k, v in (
1347 ("Domain", domain),
1348 ("Expires", expires),
1349 ("Max-Age", max_age),
1350 ("Secure", secure),
1351 ("HttpOnly", httponly),
1352 ("Path", path),
1353 ("SameSite", samesite),
1354 ("Partitioned", partitioned),
1355 ):
1356 if v is None or v is False:
1357 continue
1359 if v is True:
1360 buf.append(k)
1361 continue
1363 buf.append(f"{k}={v}")
1365 rv = "; ".join(buf)
1367 # Warn if the final value of the cookie is larger than the limit. If the cookie is
1368 # too large, then it may be silently ignored by the browser, which can be quite hard
1369 # to debug.
1370 cookie_size = len(rv)
1372 if max_size and cookie_size > max_size:
1373 value_size = len(value)
1374 warnings.warn(
1375 f"The '{key}' cookie is too large: the value was {value_size} bytes but the"
1376 f" header required {cookie_size - value_size} extra bytes. The final size"
1377 f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may"
1378 " silently ignore cookies larger than this.",
1379 stacklevel=2,
1380 )
1382 return rv
1385def is_byte_range_valid(
1386 start: int | None, stop: int | None, length: int | None
1387) -> bool:
1388 """Checks if a given byte content range is valid for the given length.
1390 .. versionadded:: 0.7
1391 """
1392 if (start is None) != (stop is None):
1393 return False
1394 elif start is None:
1395 return length is None or length >= 0
1396 elif length is None:
1397 return 0 <= start < stop # type: ignore
1398 elif start >= stop: # type: ignore
1399 return False
1400 return 0 <= start < length
1403# circular dependencies
1404from . import datastructures as ds
1405from .sansio import http as _sansio_http