Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/http.py: 21%
433 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 07:17 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-09 07:17 +0000
1from __future__ import annotations
3import email.utils
4import re
5import typing as t
6import warnings
7from datetime import date
8from datetime import datetime
9from datetime import time
10from datetime import timedelta
11from datetime import timezone
12from enum import Enum
13from hashlib import sha1
14from time import mktime
15from time import struct_time
16from urllib.parse import quote
17from urllib.parse import unquote
18from urllib.request import parse_http_list as _parse_list_header
20from ._internal import _dt_as_utc
21from ._internal import _plain_int
23if t.TYPE_CHECKING:
24 from _typeshed.wsgi import WSGIEnvironment
26_token_chars = frozenset(
27 "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~"
28)
29_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)')
30_entity_headers = frozenset(
31 [
32 "allow",
33 "content-encoding",
34 "content-language",
35 "content-length",
36 "content-location",
37 "content-md5",
38 "content-range",
39 "content-type",
40 "expires",
41 "last-modified",
42 ]
43)
44_hop_by_hop_headers = frozenset(
45 [
46 "connection",
47 "keep-alive",
48 "proxy-authenticate",
49 "proxy-authorization",
50 "te",
51 "trailer",
52 "transfer-encoding",
53 "upgrade",
54 ]
55)
56HTTP_STATUS_CODES = {
57 100: "Continue",
58 101: "Switching Protocols",
59 102: "Processing",
60 103: "Early Hints", # see RFC 8297
61 200: "OK",
62 201: "Created",
63 202: "Accepted",
64 203: "Non Authoritative Information",
65 204: "No Content",
66 205: "Reset Content",
67 206: "Partial Content",
68 207: "Multi Status",
69 208: "Already Reported", # see RFC 5842
70 226: "IM Used", # see RFC 3229
71 300: "Multiple Choices",
72 301: "Moved Permanently",
73 302: "Found",
74 303: "See Other",
75 304: "Not Modified",
76 305: "Use Proxy",
77 306: "Switch Proxy", # unused
78 307: "Temporary Redirect",
79 308: "Permanent Redirect",
80 400: "Bad Request",
81 401: "Unauthorized",
82 402: "Payment Required", # unused
83 403: "Forbidden",
84 404: "Not Found",
85 405: "Method Not Allowed",
86 406: "Not Acceptable",
87 407: "Proxy Authentication Required",
88 408: "Request Timeout",
89 409: "Conflict",
90 410: "Gone",
91 411: "Length Required",
92 412: "Precondition Failed",
93 413: "Request Entity Too Large",
94 414: "Request URI Too Long",
95 415: "Unsupported Media Type",
96 416: "Requested Range Not Satisfiable",
97 417: "Expectation Failed",
98 418: "I'm a teapot", # see RFC 2324
99 421: "Misdirected Request", # see RFC 7540
100 422: "Unprocessable Entity",
101 423: "Locked",
102 424: "Failed Dependency",
103 425: "Too Early", # see RFC 8470
104 426: "Upgrade Required",
105 428: "Precondition Required", # see RFC 6585
106 429: "Too Many Requests",
107 431: "Request Header Fields Too Large",
108 449: "Retry With", # proprietary MS extension
109 451: "Unavailable For Legal Reasons",
110 500: "Internal Server Error",
111 501: "Not Implemented",
112 502: "Bad Gateway",
113 503: "Service Unavailable",
114 504: "Gateway Timeout",
115 505: "HTTP Version Not Supported",
116 506: "Variant Also Negotiates", # see RFC 2295
117 507: "Insufficient Storage",
118 508: "Loop Detected", # see RFC 5842
119 510: "Not Extended",
120 511: "Network Authentication Failed",
121}
124class COEP(Enum):
125 """Cross Origin Embedder Policies"""
127 UNSAFE_NONE = "unsafe-none"
128 REQUIRE_CORP = "require-corp"
131class COOP(Enum):
132 """Cross Origin Opener Policies"""
134 UNSAFE_NONE = "unsafe-none"
135 SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups"
136 SAME_ORIGIN = "same-origin"
139def quote_header_value(value: t.Any, allow_token: bool = True) -> str:
140 """Add double quotes around a header value. If the header contains only ASCII token
141 characters, it will be returned unchanged. If the header contains ``"`` or ``\\``
142 characters, they will be escaped with an additional ``\\`` character.
144 This is the reverse of :func:`unquote_header_value`.
146 :param value: The value to quote. Will be converted to a string.
147 :param allow_token: Disable to quote the value even if it only has token characters.
149 .. versionchanged:: 3.0
150 Passing bytes is not supported.
152 .. versionchanged:: 3.0
153 The ``extra_chars`` parameter is removed.
155 .. versionchanged:: 2.3
156 The value is quoted if it is the empty string.
158 .. versionadded:: 0.5
159 """
160 value = str(value)
162 if not value:
163 return '""'
165 if allow_token:
166 token_chars = _token_chars
168 if token_chars.issuperset(value):
169 return value
171 value = value.replace("\\", "\\\\").replace('"', '\\"')
172 return f'"{value}"'
175def unquote_header_value(value: str) -> str:
176 """Remove double quotes and decode slash-escaped ``"`` and ``\\`` characters in a
177 header value.
179 This is the reverse of :func:`quote_header_value`.
181 :param value: The header value to unquote.
183 .. versionchanged:: 3.0
184 The ``is_filename`` parameter is removed.
185 """
186 if len(value) >= 2 and value[0] == value[-1] == '"':
187 value = value[1:-1]
188 return value.replace("\\\\", "\\").replace('\\"', '"')
190 return value
193def dump_options_header(header: str | None, options: t.Mapping[str, t.Any]) -> str:
194 """Produce a header value and ``key=value`` parameters separated by semicolons
195 ``;``. For example, the ``Content-Type`` header.
197 .. code-block:: python
199 dump_options_header("text/html", {"charset": "UTF-8"})
200 'text/html; charset=UTF-8'
202 This is the reverse of :func:`parse_options_header`.
204 If a value contains non-token characters, it will be quoted.
206 If a value is ``None``, the parameter is skipped.
208 In some keys for some headers, a UTF-8 value can be encoded using a special
209 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will
210 not produce that format automatically, but if a given key ends with an asterisk
211 ``*``, the value is assumed to have that form and will not be quoted further.
213 :param header: The primary header value.
214 :param options: Parameters to encode as ``key=value`` pairs.
216 .. versionchanged:: 2.3
217 Keys with ``None`` values are skipped rather than treated as a bare key.
219 .. versionchanged:: 2.2.3
220 If a key ends with ``*``, its value will not be quoted.
221 """
222 segments = []
224 if header is not None:
225 segments.append(header)
227 for key, value in options.items():
228 if value is None:
229 continue
231 if key[-1] == "*":
232 segments.append(f"{key}={value}")
233 else:
234 segments.append(f"{key}={quote_header_value(value)}")
236 return "; ".join(segments)
239def dump_header(iterable: dict[str, t.Any] | t.Iterable[t.Any]) -> str:
240 """Produce a header value from a list of items or ``key=value`` pairs, separated by
241 commas ``,``.
243 This is the reverse of :func:`parse_list_header`, :func:`parse_dict_header`, and
244 :func:`parse_set_header`.
246 If a value contains non-token characters, it will be quoted.
248 If a value is ``None``, the key is output alone.
250 In some keys for some headers, a UTF-8 value can be encoded using a special
251 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will
252 not produce that format automatically, but if a given key ends with an asterisk
253 ``*``, the value is assumed to have that form and will not be quoted further.
255 .. code-block:: python
257 dump_header(["foo", "bar baz"])
258 'foo, "bar baz"'
260 dump_header({"foo": "bar baz"})
261 'foo="bar baz"'
263 :param iterable: The items to create a header from.
265 .. versionchanged:: 3.0
266 The ``allow_token`` parameter is removed.
268 .. versionchanged:: 2.2.3
269 If a key ends with ``*``, its value will not be quoted.
270 """
271 if isinstance(iterable, dict):
272 items = []
274 for key, value in iterable.items():
275 if value is None:
276 items.append(key)
277 elif key[-1] == "*":
278 items.append(f"{key}={value}")
279 else:
280 items.append(f"{key}={quote_header_value(value)}")
281 else:
282 items = [quote_header_value(x) for x in iterable]
284 return ", ".join(items)
287def dump_csp_header(header: ds.ContentSecurityPolicy) -> str:
288 """Dump a Content Security Policy header.
290 These are structured into policies such as "default-src 'self';
291 script-src 'self'".
293 .. versionadded:: 1.0.0
294 Support for Content Security Policy headers was added.
296 """
297 return "; ".join(f"{key} {value}" for key, value in header.items())
300def parse_list_header(value: str) -> list[str]:
301 """Parse a header value that consists of a list of comma separated items according
302 to `RFC 9110 <https://httpwg.org/specs/rfc9110.html#abnf.extension>`__.
304 This extends :func:`urllib.request.parse_http_list` to remove surrounding quotes
305 from values.
307 .. code-block:: python
309 parse_list_header('token, "quoted value"')
310 ['token', 'quoted value']
312 This is the reverse of :func:`dump_header`.
314 :param value: The header value to parse.
315 """
316 result = []
318 for item in _parse_list_header(value):
319 if len(item) >= 2 and item[0] == item[-1] == '"':
320 item = item[1:-1]
322 result.append(item)
324 return result
327def parse_dict_header(value: str) -> dict[str, str | None]:
328 """Parse a list header using :func:`parse_list_header`, then parse each item as a
329 ``key=value`` pair.
331 .. code-block:: python
333 parse_dict_header('a=b, c="d, e", f')
334 {"a": "b", "c": "d, e", "f": None}
336 This is the reverse of :func:`dump_header`.
338 If a key does not have a value, it is ``None``.
340 This handles charsets for values as described in
341 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__. Only ASCII, UTF-8,
342 and ISO-8859-1 charsets are accepted, otherwise the value remains quoted.
344 :param value: The header value to parse.
346 .. versionchanged:: 3.0
347 Passing bytes is not supported.
349 .. versionchanged:: 3.0
350 The ``cls`` argument is removed.
352 .. versionchanged:: 2.3
353 Added support for ``key*=charset''value`` encoded items.
355 .. versionchanged:: 0.9
356 The ``cls`` argument was added.
357 """
358 result: dict[str, str | None] = {}
360 for item in parse_list_header(value):
361 key, has_value, value = item.partition("=")
362 key = key.strip()
364 if not has_value:
365 result[key] = None
366 continue
368 value = value.strip()
369 encoding: str | None = None
371 if key[-1] == "*":
372 # key*=charset''value becomes key=value, where value is percent encoded
373 # adapted from parse_options_header, without the continuation handling
374 key = key[:-1]
375 match = _charset_value_re.match(value)
377 if match:
378 # If there is a charset marker in the value, split it off.
379 encoding, value = match.groups()
380 encoding = encoding.lower()
382 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
383 # This list will not be extended further. An invalid encoding will leave the
384 # value quoted.
385 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
386 # invalid bytes are replaced during unquoting
387 value = unquote(value, encoding=encoding)
389 if len(value) >= 2 and value[0] == value[-1] == '"':
390 value = value[1:-1]
392 result[key] = value
394 return result
397# https://httpwg.org/specs/rfc9110.html#parameter
398_parameter_re = re.compile(
399 r"""
400 # don't match multiple empty parts, that causes backtracking
401 \s*;\s* # find the part delimiter
402 (?:
403 ([\w!#$%&'*+\-.^`|~]+) # key, one or more token chars
404 = # equals, with no space on either side
405 ( # value, token or quoted string
406 [\w!#$%&'*+\-.^`|~]+ # one or more token chars
407 |
408 "(?:\\\\|\\"|.)*?" # quoted string, consuming slash escapes
409 )
410 )? # optionally match key=value, to account for empty parts
411 """,
412 re.ASCII | re.VERBOSE,
413)
414# https://www.rfc-editor.org/rfc/rfc2231#section-4
415_charset_value_re = re.compile(
416 r"""
417 ([\w!#$%&*+\-.^`|~]*)' # charset part, could be empty
418 [\w!#$%&*+\-.^`|~]*' # don't care about language part, usually empty
419 ([\w!#$%&'*+\-.^`|~]+) # one or more token chars with percent encoding
420 """,
421 re.ASCII | re.VERBOSE,
422)
423# https://www.rfc-editor.org/rfc/rfc2231#section-3
424_continuation_re = re.compile(r"\*(\d+)$", re.ASCII)
427def parse_options_header(value: str | None) -> tuple[str, dict[str, str]]:
428 """Parse a header that consists of a value with ``key=value`` parameters separated
429 by semicolons ``;``. For example, the ``Content-Type`` header.
431 .. code-block:: python
433 parse_options_header("text/html; charset=UTF-8")
434 ('text/html', {'charset': 'UTF-8'})
436 parse_options_header("")
437 ("", {})
439 This is the reverse of :func:`dump_options_header`.
441 This parses valid parameter parts as described in
442 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#parameter>`__. Invalid parts are
443 skipped.
445 This handles continuations and charsets as described in
446 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__, although not as
447 strictly as the RFC. Only ASCII, UTF-8, and ISO-8859-1 charsets are accepted,
448 otherwise the value remains quoted.
450 Clients may not be consistent in how they handle a quote character within a quoted
451 value. The `HTML Standard <https://html.spec.whatwg.org/#multipart-form-data>`__
452 replaces it with ``%22`` in multipart form data.
453 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#quoted.strings>`__ uses backslash
454 escapes in HTTP headers. Both are decoded to the ``"`` character.
456 Clients may not be consistent in how they handle non-ASCII characters. HTML
457 documents must declare ``<meta charset=UTF-8>``, otherwise browsers may replace with
458 HTML character references, which can be decoded using :func:`html.unescape`.
460 :param value: The header value to parse.
461 :return: ``(value, options)``, where ``options`` is a dict
463 .. versionchanged:: 2.3
464 Invalid parts, such as keys with no value, quoted keys, and incorrectly quoted
465 values, are discarded instead of treating as ``None``.
467 .. versionchanged:: 2.3
468 Only ASCII, UTF-8, and ISO-8859-1 are accepted for charset values.
470 .. versionchanged:: 2.3
471 Escaped quotes in quoted values, like ``%22`` and ``\\"``, are handled.
473 .. versionchanged:: 2.2
474 Option names are always converted to lowercase.
476 .. versionchanged:: 2.2
477 The ``multiple`` parameter was removed.
479 .. versionchanged:: 0.15
480 :rfc:`2231` parameter continuations are handled.
482 .. versionadded:: 0.5
483 """
484 if value is None:
485 return "", {}
487 value, _, rest = value.partition(";")
488 value = value.strip()
489 rest = rest.strip()
491 if not value or not rest:
492 # empty (invalid) value, or value without options
493 return value, {}
495 rest = f";{rest}"
496 options: dict[str, str] = {}
497 encoding: str | None = None
498 continued_encoding: str | None = None
500 for pk, pv in _parameter_re.findall(rest):
501 if not pk:
502 # empty or invalid part
503 continue
505 pk = pk.lower()
507 if pk[-1] == "*":
508 # key*=charset''value becomes key=value, where value is percent encoded
509 pk = pk[:-1]
510 match = _charset_value_re.match(pv)
512 if match:
513 # If there is a valid charset marker in the value, split it off.
514 encoding, pv = match.groups()
515 # This might be the empty string, handled next.
516 encoding = encoding.lower()
518 # No charset marker, or marker with empty charset value.
519 if not encoding:
520 encoding = continued_encoding
522 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
523 # This list will not be extended further. An invalid encoding will leave the
524 # value quoted.
525 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
526 # Continuation parts don't require their own charset marker. This is
527 # looser than the RFC, it will persist across different keys and allows
528 # changing the charset during a continuation. But this implementation is
529 # much simpler than tracking the full state.
530 continued_encoding = encoding
531 # invalid bytes are replaced during unquoting
532 pv = unquote(pv, encoding=encoding)
534 # Remove quotes. At this point the value cannot be empty or a single quote.
535 if pv[0] == pv[-1] == '"':
536 # HTTP headers use slash, multipart form data uses percent
537 pv = pv[1:-1].replace("\\\\", "\\").replace('\\"', '"').replace("%22", '"')
539 match = _continuation_re.search(pk)
541 if match:
542 # key*0=a; key*1=b becomes key=ab
543 pk = pk[: match.start()]
544 options[pk] = options.get(pk, "") + pv
545 else:
546 options[pk] = pv
548 return value, options
551_q_value_re = re.compile(r"-?\d+(\.\d+)?", re.ASCII)
552_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept")
555@t.overload
556def parse_accept_header(value: str | None) -> ds.Accept:
557 ...
560@t.overload
561def parse_accept_header(value: str | None, cls: type[_TAnyAccept]) -> _TAnyAccept:
562 ...
565def parse_accept_header(
566 value: str | None, cls: type[_TAnyAccept] | None = None
567) -> _TAnyAccept:
568 """Parse an ``Accept`` header according to
569 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#field.accept>`__.
571 Returns an :class:`.Accept` instance, which can sort and inspect items based on
572 their quality parameter. When parsing ``Accept-Charset``, ``Accept-Encoding``, or
573 ``Accept-Language``, pass the appropriate :class:`.Accept` subclass.
575 :param value: The header value to parse.
576 :param cls: The :class:`.Accept` class to wrap the result in.
577 :return: An instance of ``cls``.
579 .. versionchanged:: 2.3
580 Parse according to RFC 9110. Items with invalid ``q`` values are skipped.
581 """
582 if cls is None:
583 cls = t.cast(t.Type[_TAnyAccept], ds.Accept)
585 if not value:
586 return cls(None)
588 result = []
590 for item in parse_list_header(value):
591 item, options = parse_options_header(item)
593 if "q" in options:
594 # pop q, remaining options are reconstructed
595 q_str = options.pop("q").strip()
597 if _q_value_re.fullmatch(q_str) is None:
598 # ignore an invalid q
599 continue
601 q = float(q_str)
603 if q < 0 or q > 1:
604 # ignore an invalid q
605 continue
606 else:
607 q = 1
609 if options:
610 # reconstruct the media type with any options
611 item = dump_options_header(item, options)
613 result.append((item, q))
615 return cls(result)
618_TAnyCC = t.TypeVar("_TAnyCC", bound="ds.cache_control._CacheControl")
619_t_cc_update = t.Optional[t.Callable[[_TAnyCC], None]]
622@t.overload
623def parse_cache_control_header(
624 value: str | None, on_update: _t_cc_update, cls: None = None
625) -> ds.RequestCacheControl:
626 ...
629@t.overload
630def parse_cache_control_header(
631 value: str | None, on_update: _t_cc_update, cls: type[_TAnyCC]
632) -> _TAnyCC:
633 ...
636def parse_cache_control_header(
637 value: str | None,
638 on_update: _t_cc_update = None,
639 cls: type[_TAnyCC] | None = None,
640) -> _TAnyCC:
641 """Parse a cache control header. The RFC differs between response and
642 request cache control, this method does not. It's your responsibility
643 to not use the wrong control statements.
645 .. versionadded:: 0.5
646 The `cls` was added. If not specified an immutable
647 :class:`~werkzeug.datastructures.RequestCacheControl` is returned.
649 :param value: a cache control header to be parsed.
650 :param on_update: an optional callable that is called every time a value
651 on the :class:`~werkzeug.datastructures.CacheControl`
652 object is changed.
653 :param cls: the class for the returned object. By default
654 :class:`~werkzeug.datastructures.RequestCacheControl` is used.
655 :return: a `cls` object.
656 """
657 if cls is None:
658 cls = t.cast(t.Type[_TAnyCC], ds.RequestCacheControl)
660 if not value:
661 return cls((), on_update)
663 return cls(parse_dict_header(value), on_update)
666_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy")
667_t_csp_update = t.Optional[t.Callable[[_TAnyCSP], None]]
670@t.overload
671def parse_csp_header(
672 value: str | None, on_update: _t_csp_update, cls: None = None
673) -> ds.ContentSecurityPolicy:
674 ...
677@t.overload
678def parse_csp_header(
679 value: str | None, on_update: _t_csp_update, cls: type[_TAnyCSP]
680) -> _TAnyCSP:
681 ...
684def parse_csp_header(
685 value: str | None,
686 on_update: _t_csp_update = None,
687 cls: type[_TAnyCSP] | None = None,
688) -> _TAnyCSP:
689 """Parse a Content Security Policy header.
691 .. versionadded:: 1.0.0
692 Support for Content Security Policy headers was added.
694 :param value: a csp header to be parsed.
695 :param on_update: an optional callable that is called every time a value
696 on the object is changed.
697 :param cls: the class for the returned object. By default
698 :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used.
699 :return: a `cls` object.
700 """
701 if cls is None:
702 cls = t.cast(t.Type[_TAnyCSP], ds.ContentSecurityPolicy)
704 if value is None:
705 return cls((), on_update)
707 items = []
709 for policy in value.split(";"):
710 policy = policy.strip()
712 # Ignore badly formatted policies (no space)
713 if " " in policy:
714 directive, value = policy.strip().split(" ", 1)
715 items.append((directive.strip(), value.strip()))
717 return cls(items, on_update)
720def parse_set_header(
721 value: str | None,
722 on_update: t.Callable[[ds.HeaderSet], None] | None = None,
723) -> ds.HeaderSet:
724 """Parse a set-like header and return a
725 :class:`~werkzeug.datastructures.HeaderSet` object:
727 >>> hs = parse_set_header('token, "quoted value"')
729 The return value is an object that treats the items case-insensitively
730 and keeps the order of the items:
732 >>> 'TOKEN' in hs
733 True
734 >>> hs.index('quoted value')
735 1
736 >>> hs
737 HeaderSet(['token', 'quoted value'])
739 To create a header from the :class:`HeaderSet` again, use the
740 :func:`dump_header` function.
742 :param value: a set header to be parsed.
743 :param on_update: an optional callable that is called every time a
744 value on the :class:`~werkzeug.datastructures.HeaderSet`
745 object is changed.
746 :return: a :class:`~werkzeug.datastructures.HeaderSet`
747 """
748 if not value:
749 return ds.HeaderSet(None, on_update)
750 return ds.HeaderSet(parse_list_header(value), on_update)
753def parse_if_range_header(value: str | None) -> ds.IfRange:
754 """Parses an if-range header which can be an etag or a date. Returns
755 a :class:`~werkzeug.datastructures.IfRange` object.
757 .. versionchanged:: 2.0
758 If the value represents a datetime, it is timezone-aware.
760 .. versionadded:: 0.7
761 """
762 if not value:
763 return ds.IfRange()
764 date = parse_date(value)
765 if date is not None:
766 return ds.IfRange(date=date)
767 # drop weakness information
768 return ds.IfRange(unquote_etag(value)[0])
771def parse_range_header(
772 value: str | None, make_inclusive: bool = True
773) -> ds.Range | None:
774 """Parses a range header into a :class:`~werkzeug.datastructures.Range`
775 object. If the header is missing or malformed `None` is returned.
776 `ranges` is a list of ``(start, stop)`` tuples where the ranges are
777 non-inclusive.
779 .. versionadded:: 0.7
780 """
781 if not value or "=" not in value:
782 return None
784 ranges = []
785 last_end = 0
786 units, rng = value.split("=", 1)
787 units = units.strip().lower()
789 for item in rng.split(","):
790 item = item.strip()
791 if "-" not in item:
792 return None
793 if item.startswith("-"):
794 if last_end < 0:
795 return None
796 try:
797 begin = _plain_int(item)
798 except ValueError:
799 return None
800 end = None
801 last_end = -1
802 elif "-" in item:
803 begin_str, end_str = item.split("-", 1)
804 begin_str = begin_str.strip()
805 end_str = end_str.strip()
807 try:
808 begin = _plain_int(begin_str)
809 except ValueError:
810 return None
812 if begin < last_end or last_end < 0:
813 return None
814 if end_str:
815 try:
816 end = _plain_int(end_str) + 1
817 except ValueError:
818 return None
820 if begin >= end:
821 return None
822 else:
823 end = None
824 last_end = end if end is not None else -1
825 ranges.append((begin, end))
827 return ds.Range(units, ranges)
830def parse_content_range_header(
831 value: str | None,
832 on_update: t.Callable[[ds.ContentRange], None] | None = None,
833) -> ds.ContentRange | None:
834 """Parses a range header into a
835 :class:`~werkzeug.datastructures.ContentRange` object or `None` if
836 parsing is not possible.
838 .. versionadded:: 0.7
840 :param value: a content range header to be parsed.
841 :param on_update: an optional callable that is called every time a value
842 on the :class:`~werkzeug.datastructures.ContentRange`
843 object is changed.
844 """
845 if value is None:
846 return None
847 try:
848 units, rangedef = (value or "").strip().split(None, 1)
849 except ValueError:
850 return None
852 if "/" not in rangedef:
853 return None
854 rng, length_str = rangedef.split("/", 1)
855 if length_str == "*":
856 length = None
857 else:
858 try:
859 length = _plain_int(length_str)
860 except ValueError:
861 return None
863 if rng == "*":
864 if not is_byte_range_valid(None, None, length):
865 return None
867 return ds.ContentRange(units, None, None, length, on_update=on_update)
868 elif "-" not in rng:
869 return None
871 start_str, stop_str = rng.split("-", 1)
872 try:
873 start = _plain_int(start_str)
874 stop = _plain_int(stop_str) + 1
875 except ValueError:
876 return None
878 if is_byte_range_valid(start, stop, length):
879 return ds.ContentRange(units, start, stop, length, on_update=on_update)
881 return None
884def quote_etag(etag: str, weak: bool = False) -> str:
885 """Quote an etag.
887 :param etag: the etag to quote.
888 :param weak: set to `True` to tag it "weak".
889 """
890 if '"' in etag:
891 raise ValueError("invalid etag")
892 etag = f'"{etag}"'
893 if weak:
894 etag = f"W/{etag}"
895 return etag
898def unquote_etag(
899 etag: str | None,
900) -> tuple[str, bool] | tuple[None, None]:
901 """Unquote a single etag:
903 >>> unquote_etag('W/"bar"')
904 ('bar', True)
905 >>> unquote_etag('"bar"')
906 ('bar', False)
908 :param etag: the etag identifier to unquote.
909 :return: a ``(etag, weak)`` tuple.
910 """
911 if not etag:
912 return None, None
913 etag = etag.strip()
914 weak = False
915 if etag.startswith(("W/", "w/")):
916 weak = True
917 etag = etag[2:]
918 if etag[:1] == etag[-1:] == '"':
919 etag = etag[1:-1]
920 return etag, weak
923def parse_etags(value: str | None) -> ds.ETags:
924 """Parse an etag header.
926 :param value: the tag header to parse
927 :return: an :class:`~werkzeug.datastructures.ETags` object.
928 """
929 if not value:
930 return ds.ETags()
931 strong = []
932 weak = []
933 end = len(value)
934 pos = 0
935 while pos < end:
936 match = _etag_re.match(value, pos)
937 if match is None:
938 break
939 is_weak, quoted, raw = match.groups()
940 if raw == "*":
941 return ds.ETags(star_tag=True)
942 elif quoted:
943 raw = quoted
944 if is_weak:
945 weak.append(raw)
946 else:
947 strong.append(raw)
948 pos = match.end()
949 return ds.ETags(strong, weak)
952def generate_etag(data: bytes) -> str:
953 """Generate an etag for some data.
955 .. versionchanged:: 2.0
956 Use SHA-1. MD5 may not be available in some environments.
957 """
958 return sha1(data).hexdigest()
961def parse_date(value: str | None) -> datetime | None:
962 """Parse an :rfc:`2822` date into a timezone-aware
963 :class:`datetime.datetime` object, or ``None`` if parsing fails.
965 This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It
966 returns ``None`` if parsing fails instead of raising an exception,
967 and always returns a timezone-aware datetime object. If the string
968 doesn't have timezone information, it is assumed to be UTC.
970 :param value: A string with a supported date format.
972 .. versionchanged:: 2.0
973 Return a timezone-aware datetime object. Use
974 ``email.utils.parsedate_to_datetime``.
975 """
976 if value is None:
977 return None
979 try:
980 dt = email.utils.parsedate_to_datetime(value)
981 except (TypeError, ValueError):
982 return None
984 if dt.tzinfo is None:
985 return dt.replace(tzinfo=timezone.utc)
987 return dt
990def http_date(
991 timestamp: datetime | date | int | float | struct_time | None = None,
992) -> str:
993 """Format a datetime object or timestamp into an :rfc:`2822` date
994 string.
996 This is a wrapper for :func:`email.utils.format_datetime`. It
997 assumes naive datetime objects are in UTC instead of raising an
998 exception.
1000 :param timestamp: The datetime or timestamp to format. Defaults to
1001 the current time.
1003 .. versionchanged:: 2.0
1004 Use ``email.utils.format_datetime``. Accept ``date`` objects.
1005 """
1006 if isinstance(timestamp, date):
1007 if not isinstance(timestamp, datetime):
1008 # Assume plain date is midnight UTC.
1009 timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc)
1010 else:
1011 # Ensure datetime is timezone-aware.
1012 timestamp = _dt_as_utc(timestamp)
1014 return email.utils.format_datetime(timestamp, usegmt=True)
1016 if isinstance(timestamp, struct_time):
1017 timestamp = mktime(timestamp)
1019 return email.utils.formatdate(timestamp, usegmt=True)
1022def parse_age(value: str | None = None) -> timedelta | None:
1023 """Parses a base-10 integer count of seconds into a timedelta.
1025 If parsing fails, the return value is `None`.
1027 :param value: a string consisting of an integer represented in base-10
1028 :return: a :class:`datetime.timedelta` object or `None`.
1029 """
1030 if not value:
1031 return None
1032 try:
1033 seconds = int(value)
1034 except ValueError:
1035 return None
1036 if seconds < 0:
1037 return None
1038 try:
1039 return timedelta(seconds=seconds)
1040 except OverflowError:
1041 return None
1044def dump_age(age: timedelta | int | None = None) -> str | None:
1045 """Formats the duration as a base-10 integer.
1047 :param age: should be an integer number of seconds,
1048 a :class:`datetime.timedelta` object, or,
1049 if the age is unknown, `None` (default).
1050 """
1051 if age is None:
1052 return None
1053 if isinstance(age, timedelta):
1054 age = int(age.total_seconds())
1055 else:
1056 age = int(age)
1058 if age < 0:
1059 raise ValueError("age cannot be negative")
1061 return str(age)
1064def is_resource_modified(
1065 environ: WSGIEnvironment,
1066 etag: str | None = None,
1067 data: bytes | None = None,
1068 last_modified: datetime | str | None = None,
1069 ignore_if_range: bool = True,
1070) -> bool:
1071 """Convenience method for conditional requests.
1073 :param environ: the WSGI environment of the request to be checked.
1074 :param etag: the etag for the response for comparison.
1075 :param data: or alternatively the data of the response to automatically
1076 generate an etag using :func:`generate_etag`.
1077 :param last_modified: an optional date of the last modification.
1078 :param ignore_if_range: If `False`, `If-Range` header will be taken into
1079 account.
1080 :return: `True` if the resource was modified, otherwise `False`.
1082 .. versionchanged:: 2.0
1083 SHA-1 is used to generate an etag value for the data. MD5 may
1084 not be available in some environments.
1086 .. versionchanged:: 1.0.0
1087 The check is run for methods other than ``GET`` and ``HEAD``.
1088 """
1089 return _sansio_http.is_resource_modified(
1090 http_range=environ.get("HTTP_RANGE"),
1091 http_if_range=environ.get("HTTP_IF_RANGE"),
1092 http_if_modified_since=environ.get("HTTP_IF_MODIFIED_SINCE"),
1093 http_if_none_match=environ.get("HTTP_IF_NONE_MATCH"),
1094 http_if_match=environ.get("HTTP_IF_MATCH"),
1095 etag=etag,
1096 data=data,
1097 last_modified=last_modified,
1098 ignore_if_range=ignore_if_range,
1099 )
1102def remove_entity_headers(
1103 headers: ds.Headers | list[tuple[str, str]],
1104 allowed: t.Iterable[str] = ("expires", "content-location"),
1105) -> None:
1106 """Remove all entity headers from a list or :class:`Headers` object. This
1107 operation works in-place. `Expires` and `Content-Location` headers are
1108 by default not removed. The reason for this is :rfc:`2616` section
1109 10.3.5 which specifies some entity headers that should be sent.
1111 .. versionchanged:: 0.5
1112 added `allowed` parameter.
1114 :param headers: a list or :class:`Headers` object.
1115 :param allowed: a list of headers that should still be allowed even though
1116 they are entity headers.
1117 """
1118 allowed = {x.lower() for x in allowed}
1119 headers[:] = [
1120 (key, value)
1121 for key, value in headers
1122 if not is_entity_header(key) or key.lower() in allowed
1123 ]
1126def remove_hop_by_hop_headers(headers: ds.Headers | list[tuple[str, str]]) -> None:
1127 """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or
1128 :class:`Headers` object. This operation works in-place.
1130 .. versionadded:: 0.5
1132 :param headers: a list or :class:`Headers` object.
1133 """
1134 headers[:] = [
1135 (key, value) for key, value in headers if not is_hop_by_hop_header(key)
1136 ]
1139def is_entity_header(header: str) -> bool:
1140 """Check if a header is an entity header.
1142 .. versionadded:: 0.5
1144 :param header: the header to test.
1145 :return: `True` if it's an entity header, `False` otherwise.
1146 """
1147 return header.lower() in _entity_headers
1150def is_hop_by_hop_header(header: str) -> bool:
1151 """Check if a header is an HTTP/1.1 "Hop-by-Hop" header.
1153 .. versionadded:: 0.5
1155 :param header: the header to test.
1156 :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise.
1157 """
1158 return header.lower() in _hop_by_hop_headers
1161def parse_cookie(
1162 header: WSGIEnvironment | str | None,
1163 cls: type[ds.MultiDict] | None = None,
1164) -> ds.MultiDict[str, str]:
1165 """Parse a cookie from a string or WSGI environ.
1167 The same key can be provided multiple times, the values are stored
1168 in-order. The default :class:`MultiDict` will have the first value
1169 first, and all values can be retrieved with
1170 :meth:`MultiDict.getlist`.
1172 :param header: The cookie header as a string, or a WSGI environ dict
1173 with a ``HTTP_COOKIE`` key.
1174 :param cls: A dict-like class to store the parsed cookies in.
1175 Defaults to :class:`MultiDict`.
1177 .. versionchanged:: 3.0
1178 Passing bytes, and the ``charset`` and ``errors`` parameters, were removed.
1180 .. versionchanged:: 1.0
1181 Returns a :class:`MultiDict` instead of a ``TypeConversionDict``.
1183 .. versionchanged:: 0.5
1184 Returns a :class:`TypeConversionDict` instead of a regular dict. The ``cls``
1185 parameter was added.
1186 """
1187 if isinstance(header, dict):
1188 cookie = header.get("HTTP_COOKIE")
1189 else:
1190 cookie = header
1192 if cookie:
1193 cookie = cookie.encode("latin1").decode()
1195 return _sansio_http.parse_cookie(cookie=cookie, cls=cls)
1198_cookie_no_quote_re = re.compile(r"[\w!#$%&'()*+\-./:<=>?@\[\]^`{|}~]*", re.A)
1199_cookie_slash_re = re.compile(rb"[\x00-\x19\",;\\\x7f-\xff]", re.A)
1200_cookie_slash_map = {b'"': b'\\"', b"\\": b"\\\\"}
1201_cookie_slash_map.update(
1202 (v.to_bytes(1, "big"), b"\\%03o" % v)
1203 for v in [*range(0x20), *b",;", *range(0x7F, 256)]
1204)
1207def dump_cookie(
1208 key: str,
1209 value: str = "",
1210 max_age: timedelta | int | None = None,
1211 expires: str | datetime | int | float | None = None,
1212 path: str | None = "/",
1213 domain: str | None = None,
1214 secure: bool = False,
1215 httponly: bool = False,
1216 sync_expires: bool = True,
1217 max_size: int = 4093,
1218 samesite: str | None = None,
1219) -> str:
1220 """Create a Set-Cookie header without the ``Set-Cookie`` prefix.
1222 The return value is usually restricted to ascii as the vast majority
1223 of values are properly escaped, but that is no guarantee. It's
1224 tunneled through latin1 as required by :pep:`3333`.
1226 The return value is not ASCII safe if the key contains unicode
1227 characters. This is technically against the specification but
1228 happens in the wild. It's strongly recommended to not use
1229 non-ASCII values for the keys.
1231 :param max_age: should be a number of seconds, or `None` (default) if
1232 the cookie should last only as long as the client's
1233 browser session. Additionally `timedelta` objects
1234 are accepted, too.
1235 :param expires: should be a `datetime` object or unix timestamp.
1236 :param path: limits the cookie to a given path, per default it will
1237 span the whole domain.
1238 :param domain: Use this if you want to set a cross-domain cookie. For
1239 example, ``domain="example.com"`` will set a cookie
1240 that is readable by the domain ``www.example.com``,
1241 ``foo.example.com`` etc. Otherwise, a cookie will only
1242 be readable by the domain that set it.
1243 :param secure: The cookie will only be available via HTTPS
1244 :param httponly: disallow JavaScript to access the cookie. This is an
1245 extension to the cookie standard and probably not
1246 supported by all browsers.
1247 :param charset: the encoding for string values.
1248 :param sync_expires: automatically set expires if max_age is defined
1249 but expires not.
1250 :param max_size: Warn if the final header value exceeds this size. The
1251 default, 4093, should be safely `supported by most browsers
1252 <cookie_>`_. Set to 0 to disable this check.
1253 :param samesite: Limits the scope of the cookie such that it will
1254 only be attached to requests if those requests are same-site.
1256 .. _`cookie`: http://browsercookielimits.squawky.net/
1258 .. versionchanged:: 3.0
1259 Passing bytes, and the ``charset`` parameter, were removed.
1261 .. versionchanged:: 2.3.3
1262 The ``path`` parameter is ``/`` by default.
1264 .. versionchanged:: 2.3.1
1265 The value allows more characters without quoting.
1267 .. versionchanged:: 2.3
1268 ``localhost`` and other names without a dot are allowed for the domain. A
1269 leading dot is ignored.
1271 .. versionchanged:: 2.3
1272 The ``path`` parameter is ``None`` by default.
1274 .. versionchanged:: 1.0.0
1275 The string ``'None'`` is accepted for ``samesite``.
1276 """
1277 if path is not None:
1278 # safe = https://url.spec.whatwg.org/#url-path-segment-string
1279 # as well as percent for things that are already quoted
1280 # excluding semicolon since it's part of the header syntax
1281 path = quote(path, safe="%!$&'()*+,/:=@")
1283 if domain:
1284 domain = domain.partition(":")[0].lstrip(".").encode("idna").decode("ascii")
1286 if isinstance(max_age, timedelta):
1287 max_age = int(max_age.total_seconds())
1289 if expires is not None:
1290 if not isinstance(expires, str):
1291 expires = http_date(expires)
1292 elif max_age is not None and sync_expires:
1293 expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age)
1295 if samesite is not None:
1296 samesite = samesite.title()
1298 if samesite not in {"Strict", "Lax", "None"}:
1299 raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.")
1301 # Quote value if it contains characters not allowed by RFC 6265. Slash-escape with
1302 # three octal digits, which matches http.cookies, although the RFC suggests base64.
1303 if not _cookie_no_quote_re.fullmatch(value):
1304 # Work with bytes here, since a UTF-8 character could be multiple bytes.
1305 value = _cookie_slash_re.sub(
1306 lambda m: _cookie_slash_map[m.group()], value.encode()
1307 ).decode("ascii")
1308 value = f'"{value}"'
1310 # Send a non-ASCII key as mojibake. Everything else should already be ASCII.
1311 # TODO Remove encoding dance, it seems like clients accept UTF-8 keys
1312 buf = [f"{key.encode().decode('latin1')}={value}"]
1314 for k, v in (
1315 ("Domain", domain),
1316 ("Expires", expires),
1317 ("Max-Age", max_age),
1318 ("Secure", secure),
1319 ("HttpOnly", httponly),
1320 ("Path", path),
1321 ("SameSite", samesite),
1322 ):
1323 if v is None or v is False:
1324 continue
1326 if v is True:
1327 buf.append(k)
1328 continue
1330 buf.append(f"{k}={v}")
1332 rv = "; ".join(buf)
1334 # Warn if the final value of the cookie is larger than the limit. If the cookie is
1335 # too large, then it may be silently ignored by the browser, which can be quite hard
1336 # to debug.
1337 cookie_size = len(rv)
1339 if max_size and cookie_size > max_size:
1340 value_size = len(value)
1341 warnings.warn(
1342 f"The '{key}' cookie is too large: the value was {value_size} bytes but the"
1343 f" header required {cookie_size - value_size} extra bytes. The final size"
1344 f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may"
1345 " silently ignore cookies larger than this.",
1346 stacklevel=2,
1347 )
1349 return rv
1352def is_byte_range_valid(
1353 start: int | None, stop: int | None, length: int | None
1354) -> bool:
1355 """Checks if a given byte content range is valid for the given length.
1357 .. versionadded:: 0.7
1358 """
1359 if (start is None) != (stop is None):
1360 return False
1361 elif start is None:
1362 return length is None or length >= 0
1363 elif length is None:
1364 return 0 <= start < stop # type: ignore
1365 elif start >= stop: # type: ignore
1366 return False
1367 return 0 <= start < length
1370# circular dependencies
1371from . import datastructures as ds
1372from .sansio import http as _sansio_http