Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/http.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import email.utils
4import re
5import typing as t
6import warnings
7from datetime import date
8from datetime import datetime
9from datetime import time
10from datetime import timedelta
11from datetime import timezone
12from enum import Enum
13from hashlib import sha1
14from time import mktime
15from time import struct_time
16from urllib.parse import quote
17from urllib.parse import unquote
18from urllib.request import parse_http_list as _parse_list_header
20from ._internal import _dt_as_utc
21from ._internal import _plain_int
23if t.TYPE_CHECKING:
24 from _typeshed.wsgi import WSGIEnvironment
26_token_chars = frozenset(
27 "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~"
28)
29_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)')
30_entity_headers = frozenset(
31 [
32 "allow",
33 "content-encoding",
34 "content-language",
35 "content-length",
36 "content-location",
37 "content-md5",
38 "content-range",
39 "content-type",
40 "expires",
41 "last-modified",
42 ]
43)
44_hop_by_hop_headers = frozenset(
45 [
46 "connection",
47 "keep-alive",
48 "proxy-authenticate",
49 "proxy-authorization",
50 "te",
51 "trailer",
52 "transfer-encoding",
53 "upgrade",
54 ]
55)
56HTTP_STATUS_CODES = {
57 100: "Continue",
58 101: "Switching Protocols",
59 102: "Processing",
60 103: "Early Hints", # see RFC 8297
61 200: "OK",
62 201: "Created",
63 202: "Accepted",
64 203: "Non Authoritative Information",
65 204: "No Content",
66 205: "Reset Content",
67 206: "Partial Content",
68 207: "Multi Status",
69 208: "Already Reported", # see RFC 5842
70 226: "IM Used", # see RFC 3229
71 300: "Multiple Choices",
72 301: "Moved Permanently",
73 302: "Found",
74 303: "See Other",
75 304: "Not Modified",
76 305: "Use Proxy",
77 306: "Switch Proxy", # unused
78 307: "Temporary Redirect",
79 308: "Permanent Redirect",
80 400: "Bad Request",
81 401: "Unauthorized",
82 402: "Payment Required", # unused
83 403: "Forbidden",
84 404: "Not Found",
85 405: "Method Not Allowed",
86 406: "Not Acceptable",
87 407: "Proxy Authentication Required",
88 408: "Request Timeout",
89 409: "Conflict",
90 410: "Gone",
91 411: "Length Required",
92 412: "Precondition Failed",
93 413: "Request Entity Too Large",
94 414: "Request URI Too Long",
95 415: "Unsupported Media Type",
96 416: "Requested Range Not Satisfiable",
97 417: "Expectation Failed",
98 418: "I'm a teapot", # see RFC 2324
99 421: "Misdirected Request", # see RFC 7540
100 422: "Unprocessable Entity",
101 423: "Locked",
102 424: "Failed Dependency",
103 425: "Too Early", # see RFC 8470
104 426: "Upgrade Required",
105 428: "Precondition Required", # see RFC 6585
106 429: "Too Many Requests",
107 431: "Request Header Fields Too Large",
108 449: "Retry With", # proprietary MS extension
109 451: "Unavailable For Legal Reasons",
110 500: "Internal Server Error",
111 501: "Not Implemented",
112 502: "Bad Gateway",
113 503: "Service Unavailable",
114 504: "Gateway Timeout",
115 505: "HTTP Version Not Supported",
116 506: "Variant Also Negotiates", # see RFC 2295
117 507: "Insufficient Storage",
118 508: "Loop Detected", # see RFC 5842
119 510: "Not Extended",
120 511: "Network Authentication Failed",
121}
124class COEP(Enum):
125 """Cross Origin Embedder Policies"""
127 UNSAFE_NONE = "unsafe-none"
128 REQUIRE_CORP = "require-corp"
131class COOP(Enum):
132 """Cross Origin Opener Policies"""
134 UNSAFE_NONE = "unsafe-none"
135 SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups"
136 SAME_ORIGIN = "same-origin"
139def quote_header_value(value: t.Any, allow_token: bool = True) -> str:
140 """Add double quotes around a header value. If the header contains only ASCII token
141 characters, it will be returned unchanged. If the header contains ``"`` or ``\\``
142 characters, they will be escaped with an additional ``\\`` character.
144 This is the reverse of :func:`unquote_header_value`.
146 :param value: The value to quote. Will be converted to a string.
147 :param allow_token: Disable to quote the value even if it only has token characters.
149 .. versionchanged:: 3.0
150 Passing bytes is not supported.
152 .. versionchanged:: 3.0
153 The ``extra_chars`` parameter is removed.
155 .. versionchanged:: 2.3
156 The value is quoted if it is the empty string.
158 .. versionadded:: 0.5
159 """
160 value_str = str(value)
162 if not value_str:
163 return '""'
165 if allow_token:
166 token_chars = _token_chars
168 if token_chars.issuperset(value_str):
169 return value_str
171 value_str = value_str.replace("\\", "\\\\").replace('"', '\\"')
172 return f'"{value_str}"'
175def unquote_header_value(value: str) -> str:
176 """Remove double quotes and decode slash-escaped ``"`` and ``\\`` characters in a
177 header value.
179 This is the reverse of :func:`quote_header_value`.
181 :param value: The header value to unquote.
183 .. versionchanged:: 3.0
184 The ``is_filename`` parameter is removed.
185 """
186 if len(value) >= 2 and value[0] == value[-1] == '"':
187 value = value[1:-1]
188 return value.replace("\\\\", "\\").replace('\\"', '"')
190 return value
193def dump_options_header(header: str | None, options: t.Mapping[str, t.Any]) -> str:
194 """Produce a header value and ``key=value`` parameters separated by semicolons
195 ``;``. For example, the ``Content-Type`` header.
197 .. code-block:: python
199 dump_options_header("text/html", {"charset": "UTF-8"})
200 'text/html; charset=UTF-8'
202 This is the reverse of :func:`parse_options_header`.
204 If a value contains non-token characters, it will be quoted.
206 If a value is ``None``, the parameter is skipped.
208 In some keys for some headers, a UTF-8 value can be encoded using a special
209 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will
210 not produce that format automatically, but if a given key ends with an asterisk
211 ``*``, the value is assumed to have that form and will not be quoted further.
213 :param header: The primary header value.
214 :param options: Parameters to encode as ``key=value`` pairs.
216 .. versionchanged:: 2.3
217 Keys with ``None`` values are skipped rather than treated as a bare key.
219 .. versionchanged:: 2.2.3
220 If a key ends with ``*``, its value will not be quoted.
221 """
222 segments = []
224 if header is not None:
225 segments.append(header)
227 for key, value in options.items():
228 if value is None:
229 continue
231 if key[-1] == "*":
232 segments.append(f"{key}={value}")
233 else:
234 segments.append(f"{key}={quote_header_value(value)}")
236 return "; ".join(segments)
239def dump_header(iterable: dict[str, t.Any] | t.Iterable[t.Any]) -> str:
240 """Produce a header value from a list of items or ``key=value`` pairs, separated by
241 commas ``,``.
243 This is the reverse of :func:`parse_list_header`, :func:`parse_dict_header`, and
244 :func:`parse_set_header`.
246 If a value contains non-token characters, it will be quoted.
248 If a value is ``None``, the key is output alone.
250 In some keys for some headers, a UTF-8 value can be encoded using a special
251 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will
252 not produce that format automatically, but if a given key ends with an asterisk
253 ``*``, the value is assumed to have that form and will not be quoted further.
255 .. code-block:: python
257 dump_header(["foo", "bar baz"])
258 'foo, "bar baz"'
260 dump_header({"foo": "bar baz"})
261 'foo="bar baz"'
263 :param iterable: The items to create a header from.
265 .. versionchanged:: 3.0
266 The ``allow_token`` parameter is removed.
268 .. versionchanged:: 2.2.3
269 If a key ends with ``*``, its value will not be quoted.
270 """
271 if isinstance(iterable, dict):
272 items = []
274 for key, value in iterable.items():
275 if value is None:
276 items.append(key)
277 elif key[-1] == "*":
278 items.append(f"{key}={value}")
279 else:
280 items.append(f"{key}={quote_header_value(value)}")
281 else:
282 items = [quote_header_value(x) for x in iterable]
284 return ", ".join(items)
287def dump_csp_header(header: ds.ContentSecurityPolicy) -> str:
288 """Dump a Content Security Policy header.
290 These are structured into policies such as "default-src 'self';
291 script-src 'self'".
293 .. versionadded:: 1.0.0
294 Support for Content Security Policy headers was added.
296 """
297 return "; ".join(f"{key} {value}" for key, value in header.items())
300def parse_list_header(value: str) -> list[str]:
301 """Parse a header value that consists of a list of comma separated items according
302 to `RFC 9110 <https://httpwg.org/specs/rfc9110.html#abnf.extension>`__.
304 This extends :func:`urllib.request.parse_http_list` to remove surrounding quotes
305 from values.
307 .. code-block:: python
309 parse_list_header('token, "quoted value"')
310 ['token', 'quoted value']
312 This is the reverse of :func:`dump_header`.
314 :param value: The header value to parse.
315 """
316 result = []
318 for item in _parse_list_header(value):
319 if len(item) >= 2 and item[0] == item[-1] == '"':
320 item = item[1:-1]
322 result.append(item)
324 return result
327def parse_dict_header(value: str) -> dict[str, str | None]:
328 """Parse a list header using :func:`parse_list_header`, then parse each item as a
329 ``key=value`` pair.
331 .. code-block:: python
333 parse_dict_header('a=b, c="d, e", f')
334 {"a": "b", "c": "d, e", "f": None}
336 This is the reverse of :func:`dump_header`.
338 If a key does not have a value, it is ``None``.
340 This handles charsets for values as described in
341 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__. Only ASCII, UTF-8,
342 and ISO-8859-1 charsets are accepted, otherwise the value remains quoted.
344 :param value: The header value to parse.
346 .. versionchanged:: 3.0
347 Passing bytes is not supported.
349 .. versionchanged:: 3.0
350 The ``cls`` argument is removed.
352 .. versionchanged:: 2.3
353 Added support for ``key*=charset''value`` encoded items.
355 .. versionchanged:: 0.9
356 The ``cls`` argument was added.
357 """
358 result: dict[str, str | None] = {}
360 for item in parse_list_header(value):
361 key, has_value, value = item.partition("=")
362 key = key.strip()
364 if not has_value:
365 result[key] = None
366 continue
368 value = value.strip()
369 encoding: str | None = None
371 if key[-1] == "*":
372 # key*=charset''value becomes key=value, where value is percent encoded
373 # adapted from parse_options_header, without the continuation handling
374 key = key[:-1]
375 match = _charset_value_re.match(value)
377 if match:
378 # If there is a charset marker in the value, split it off.
379 encoding, value = match.groups()
380 encoding = encoding.lower()
382 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
383 # This list will not be extended further. An invalid encoding will leave the
384 # value quoted.
385 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
386 # invalid bytes are replaced during unquoting
387 value = unquote(value, encoding=encoding)
389 if len(value) >= 2 and value[0] == value[-1] == '"':
390 value = value[1:-1]
392 result[key] = value
394 return result
397# https://httpwg.org/specs/rfc9110.html#parameter
398_parameter_re = re.compile(
399 r"""
400 # don't match multiple empty parts, that causes backtracking
401 \s*;\s* # find the part delimiter
402 (?:
403 ([\w!#$%&'*+\-.^`|~]+) # key, one or more token chars
404 = # equals, with no space on either side
405 ( # value, token or quoted string
406 [\w!#$%&'*+\-.^`|~]+ # one or more token chars
407 |
408 "(?:\\\\|\\"|.)*?" # quoted string, consuming slash escapes
409 )
410 )? # optionally match key=value, to account for empty parts
411 """,
412 re.ASCII | re.VERBOSE,
413)
414# https://www.rfc-editor.org/rfc/rfc2231#section-4
415_charset_value_re = re.compile(
416 r"""
417 ([\w!#$%&*+\-.^`|~]*)' # charset part, could be empty
418 [\w!#$%&*+\-.^`|~]*' # don't care about language part, usually empty
419 ([\w!#$%&'*+\-.^`|~]+) # one or more token chars with percent encoding
420 """,
421 re.ASCII | re.VERBOSE,
422)
423# https://www.rfc-editor.org/rfc/rfc2231#section-3
424_continuation_re = re.compile(r"\*(\d+)$", re.ASCII)
427def parse_options_header(value: str | None) -> tuple[str, dict[str, str]]:
428 """Parse a header that consists of a value with ``key=value`` parameters separated
429 by semicolons ``;``. For example, the ``Content-Type`` header.
431 .. code-block:: python
433 parse_options_header("text/html; charset=UTF-8")
434 ('text/html', {'charset': 'UTF-8'})
436 parse_options_header("")
437 ("", {})
439 This is the reverse of :func:`dump_options_header`.
441 This parses valid parameter parts as described in
442 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#parameter>`__. Invalid parts are
443 skipped.
445 This handles continuations and charsets as described in
446 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__, although not as
447 strictly as the RFC. Only ASCII, UTF-8, and ISO-8859-1 charsets are accepted,
448 otherwise the value remains quoted.
450 Clients may not be consistent in how they handle a quote character within a quoted
451 value. The `HTML Standard <https://html.spec.whatwg.org/#multipart-form-data>`__
452 replaces it with ``%22`` in multipart form data.
453 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#quoted.strings>`__ uses backslash
454 escapes in HTTP headers. Both are decoded to the ``"`` character.
456 Clients may not be consistent in how they handle non-ASCII characters. HTML
457 documents must declare ``<meta charset=UTF-8>``, otherwise browsers may replace with
458 HTML character references, which can be decoded using :func:`html.unescape`.
460 :param value: The header value to parse.
461 :return: ``(value, options)``, where ``options`` is a dict
463 .. versionchanged:: 2.3
464 Invalid parts, such as keys with no value, quoted keys, and incorrectly quoted
465 values, are discarded instead of treating as ``None``.
467 .. versionchanged:: 2.3
468 Only ASCII, UTF-8, and ISO-8859-1 are accepted for charset values.
470 .. versionchanged:: 2.3
471 Escaped quotes in quoted values, like ``%22`` and ``\\"``, are handled.
473 .. versionchanged:: 2.2
474 Option names are always converted to lowercase.
476 .. versionchanged:: 2.2
477 The ``multiple`` parameter was removed.
479 .. versionchanged:: 0.15
480 :rfc:`2231` parameter continuations are handled.
482 .. versionadded:: 0.5
483 """
484 if value is None:
485 return "", {}
487 value, _, rest = value.partition(";")
488 value = value.strip()
489 rest = rest.strip()
491 if not value or not rest:
492 # empty (invalid) value, or value without options
493 return value, {}
495 rest = f";{rest}"
496 options: dict[str, str] = {}
497 encoding: str | None = None
498 continued_encoding: str | None = None
500 for pk, pv in _parameter_re.findall(rest):
501 if not pk:
502 # empty or invalid part
503 continue
505 pk = pk.lower()
507 if pk[-1] == "*":
508 # key*=charset''value becomes key=value, where value is percent encoded
509 pk = pk[:-1]
510 match = _charset_value_re.match(pv)
512 if match:
513 # If there is a valid charset marker in the value, split it off.
514 encoding, pv = match.groups()
515 # This might be the empty string, handled next.
516 encoding = encoding.lower()
518 # No charset marker, or marker with empty charset value.
519 if not encoding:
520 encoding = continued_encoding
522 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
523 # This list will not be extended further. An invalid encoding will leave the
524 # value quoted.
525 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
526 # Continuation parts don't require their own charset marker. This is
527 # looser than the RFC, it will persist across different keys and allows
528 # changing the charset during a continuation. But this implementation is
529 # much simpler than tracking the full state.
530 continued_encoding = encoding
531 # invalid bytes are replaced during unquoting
532 pv = unquote(pv, encoding=encoding)
534 # Remove quotes. At this point the value cannot be empty or a single quote.
535 if pv[0] == pv[-1] == '"':
536 # HTTP headers use slash, multipart form data uses percent
537 pv = pv[1:-1].replace("\\\\", "\\").replace('\\"', '"').replace("%22", '"')
539 match = _continuation_re.search(pk)
541 if match:
542 # key*0=a; key*1=b becomes key=ab
543 pk = pk[: match.start()]
544 options[pk] = options.get(pk, "") + pv
545 else:
546 options[pk] = pv
548 return value, options
551_q_value_re = re.compile(r"-?\d+(\.\d+)?", re.ASCII)
552_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept")
555@t.overload
556def parse_accept_header(value: str | None) -> ds.Accept: ...
559@t.overload
560def parse_accept_header(value: str | None, cls: type[_TAnyAccept]) -> _TAnyAccept: ...
563def parse_accept_header(
564 value: str | None, cls: type[_TAnyAccept] | None = None
565) -> _TAnyAccept:
566 """Parse an ``Accept`` header according to
567 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#field.accept>`__.
569 Returns an :class:`.Accept` instance, which can sort and inspect items based on
570 their quality parameter. When parsing ``Accept-Charset``, ``Accept-Encoding``, or
571 ``Accept-Language``, pass the appropriate :class:`.Accept` subclass.
573 :param value: The header value to parse.
574 :param cls: The :class:`.Accept` class to wrap the result in.
575 :return: An instance of ``cls``.
577 .. versionchanged:: 2.3
578 Parse according to RFC 9110. Items with invalid ``q`` values are skipped.
579 """
580 if cls is None:
581 cls = t.cast(t.Type[_TAnyAccept], ds.Accept)
583 if not value:
584 return cls(None)
586 result = []
588 for item in parse_list_header(value):
589 item, options = parse_options_header(item)
591 if "q" in options:
592 # pop q, remaining options are reconstructed
593 q_str = options.pop("q").strip()
595 if _q_value_re.fullmatch(q_str) is None:
596 # ignore an invalid q
597 continue
599 q = float(q_str)
601 if q < 0 or q > 1:
602 # ignore an invalid q
603 continue
604 else:
605 q = 1
607 if options:
608 # reconstruct the media type with any options
609 item = dump_options_header(item, options)
611 result.append((item, q))
613 return cls(result)
616_TAnyCC = t.TypeVar("_TAnyCC", bound="ds.cache_control._CacheControl")
619@t.overload
620def parse_cache_control_header(
621 value: str | None,
622 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None,
623) -> ds.RequestCacheControl: ...
626@t.overload
627def parse_cache_control_header(
628 value: str | None,
629 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None,
630 cls: type[_TAnyCC] = ...,
631) -> _TAnyCC: ...
634def parse_cache_control_header(
635 value: str | None,
636 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None,
637 cls: type[_TAnyCC] | None = None,
638) -> _TAnyCC:
639 """Parse a cache control header. The RFC differs between response and
640 request cache control, this method does not. It's your responsibility
641 to not use the wrong control statements.
643 .. versionadded:: 0.5
644 The `cls` was added. If not specified an immutable
645 :class:`~werkzeug.datastructures.RequestCacheControl` is returned.
647 :param value: a cache control header to be parsed.
648 :param on_update: an optional callable that is called every time a value
649 on the :class:`~werkzeug.datastructures.CacheControl`
650 object is changed.
651 :param cls: the class for the returned object. By default
652 :class:`~werkzeug.datastructures.RequestCacheControl` is used.
653 :return: a `cls` object.
654 """
655 if cls is None:
656 cls = t.cast("type[_TAnyCC]", ds.RequestCacheControl)
658 if not value:
659 return cls((), on_update)
661 return cls(parse_dict_header(value), on_update)
664_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy")
667@t.overload
668def parse_csp_header(
669 value: str | None,
670 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None,
671) -> ds.ContentSecurityPolicy: ...
674@t.overload
675def parse_csp_header(
676 value: str | None,
677 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None,
678 cls: type[_TAnyCSP] = ...,
679) -> _TAnyCSP: ...
682def parse_csp_header(
683 value: str | None,
684 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None,
685 cls: type[_TAnyCSP] | None = None,
686) -> _TAnyCSP:
687 """Parse a Content Security Policy header.
689 .. versionadded:: 1.0.0
690 Support for Content Security Policy headers was added.
692 :param value: a csp header to be parsed.
693 :param on_update: an optional callable that is called every time a value
694 on the object is changed.
695 :param cls: the class for the returned object. By default
696 :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used.
697 :return: a `cls` object.
698 """
699 if cls is None:
700 cls = t.cast("type[_TAnyCSP]", ds.ContentSecurityPolicy)
702 if value is None:
703 return cls((), on_update)
705 items = []
707 for policy in value.split(";"):
708 policy = policy.strip()
710 # Ignore badly formatted policies (no space)
711 if " " in policy:
712 directive, value = policy.strip().split(" ", 1)
713 items.append((directive.strip(), value.strip()))
715 return cls(items, on_update)
718def parse_set_header(
719 value: str | None,
720 on_update: t.Callable[[ds.HeaderSet], None] | None = None,
721) -> ds.HeaderSet:
722 """Parse a set-like header and return a
723 :class:`~werkzeug.datastructures.HeaderSet` object:
725 >>> hs = parse_set_header('token, "quoted value"')
727 The return value is an object that treats the items case-insensitively
728 and keeps the order of the items:
730 >>> 'TOKEN' in hs
731 True
732 >>> hs.index('quoted value')
733 1
734 >>> hs
735 HeaderSet(['token', 'quoted value'])
737 To create a header from the :class:`HeaderSet` again, use the
738 :func:`dump_header` function.
740 :param value: a set header to be parsed.
741 :param on_update: an optional callable that is called every time a
742 value on the :class:`~werkzeug.datastructures.HeaderSet`
743 object is changed.
744 :return: a :class:`~werkzeug.datastructures.HeaderSet`
745 """
746 if not value:
747 return ds.HeaderSet(None, on_update)
748 return ds.HeaderSet(parse_list_header(value), on_update)
751def parse_if_range_header(value: str | None) -> ds.IfRange:
752 """Parses an if-range header which can be an etag or a date. Returns
753 a :class:`~werkzeug.datastructures.IfRange` object.
755 .. versionchanged:: 2.0
756 If the value represents a datetime, it is timezone-aware.
758 .. versionadded:: 0.7
759 """
760 if not value:
761 return ds.IfRange()
762 date = parse_date(value)
763 if date is not None:
764 return ds.IfRange(date=date)
765 # drop weakness information
766 return ds.IfRange(unquote_etag(value)[0])
769def parse_range_header(
770 value: str | None, make_inclusive: bool = True
771) -> ds.Range | None:
772 """Parses a range header into a :class:`~werkzeug.datastructures.Range`
773 object. If the header is missing or malformed `None` is returned.
774 `ranges` is a list of ``(start, stop)`` tuples where the ranges are
775 non-inclusive.
777 .. versionadded:: 0.7
778 """
779 if not value or "=" not in value:
780 return None
782 ranges = []
783 last_end = 0
784 units, rng = value.split("=", 1)
785 units = units.strip().lower()
787 for item in rng.split(","):
788 item = item.strip()
789 if "-" not in item:
790 return None
791 if item.startswith("-"):
792 if last_end < 0:
793 return None
794 try:
795 begin = _plain_int(item)
796 except ValueError:
797 return None
798 end = None
799 last_end = -1
800 elif "-" in item:
801 begin_str, end_str = item.split("-", 1)
802 begin_str = begin_str.strip()
803 end_str = end_str.strip()
805 try:
806 begin = _plain_int(begin_str)
807 except ValueError:
808 return None
810 if begin < last_end or last_end < 0:
811 return None
812 if end_str:
813 try:
814 end = _plain_int(end_str) + 1
815 except ValueError:
816 return None
818 if begin >= end:
819 return None
820 else:
821 end = None
822 last_end = end if end is not None else -1
823 ranges.append((begin, end))
825 return ds.Range(units, ranges)
828def parse_content_range_header(
829 value: str | None,
830 on_update: t.Callable[[ds.ContentRange], None] | None = None,
831) -> ds.ContentRange | None:
832 """Parses a range header into a
833 :class:`~werkzeug.datastructures.ContentRange` object or `None` if
834 parsing is not possible.
836 .. versionadded:: 0.7
838 :param value: a content range header to be parsed.
839 :param on_update: an optional callable that is called every time a value
840 on the :class:`~werkzeug.datastructures.ContentRange`
841 object is changed.
842 """
843 if value is None:
844 return None
845 try:
846 units, rangedef = (value or "").strip().split(None, 1)
847 except ValueError:
848 return None
850 if "/" not in rangedef:
851 return None
852 rng, length_str = rangedef.split("/", 1)
853 if length_str == "*":
854 length = None
855 else:
856 try:
857 length = _plain_int(length_str)
858 except ValueError:
859 return None
861 if rng == "*":
862 if not is_byte_range_valid(None, None, length):
863 return None
865 return ds.ContentRange(units, None, None, length, on_update=on_update)
866 elif "-" not in rng:
867 return None
869 start_str, stop_str = rng.split("-", 1)
870 try:
871 start = _plain_int(start_str)
872 stop = _plain_int(stop_str) + 1
873 except ValueError:
874 return None
876 if is_byte_range_valid(start, stop, length):
877 return ds.ContentRange(units, start, stop, length, on_update=on_update)
879 return None
882def quote_etag(etag: str, weak: bool = False) -> str:
883 """Quote an etag.
885 :param etag: the etag to quote.
886 :param weak: set to `True` to tag it "weak".
887 """
888 if '"' in etag:
889 raise ValueError("invalid etag")
890 etag = f'"{etag}"'
891 if weak:
892 etag = f"W/{etag}"
893 return etag
896def unquote_etag(
897 etag: str | None,
898) -> tuple[str, bool] | tuple[None, None]:
899 """Unquote a single etag:
901 >>> unquote_etag('W/"bar"')
902 ('bar', True)
903 >>> unquote_etag('"bar"')
904 ('bar', False)
906 :param etag: the etag identifier to unquote.
907 :return: a ``(etag, weak)`` tuple.
908 """
909 if not etag:
910 return None, None
911 etag = etag.strip()
912 weak = False
913 if etag.startswith(("W/", "w/")):
914 weak = True
915 etag = etag[2:]
916 if etag[:1] == etag[-1:] == '"':
917 etag = etag[1:-1]
918 return etag, weak
921def parse_etags(value: str | None) -> ds.ETags:
922 """Parse an etag header.
924 :param value: the tag header to parse
925 :return: an :class:`~werkzeug.datastructures.ETags` object.
926 """
927 if not value:
928 return ds.ETags()
929 strong = []
930 weak = []
931 end = len(value)
932 pos = 0
933 while pos < end:
934 match = _etag_re.match(value, pos)
935 if match is None:
936 break
937 is_weak, quoted, raw = match.groups()
938 if raw == "*":
939 return ds.ETags(star_tag=True)
940 elif quoted:
941 raw = quoted
942 if is_weak:
943 weak.append(raw)
944 else:
945 strong.append(raw)
946 pos = match.end()
947 return ds.ETags(strong, weak)
950def generate_etag(data: bytes) -> str:
951 """Generate an etag for some data.
953 .. versionchanged:: 2.0
954 Use SHA-1. MD5 may not be available in some environments.
955 """
956 return sha1(data).hexdigest()
959def parse_date(value: str | None) -> datetime | None:
960 """Parse an :rfc:`2822` date into a timezone-aware
961 :class:`datetime.datetime` object, or ``None`` if parsing fails.
963 This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It
964 returns ``None`` if parsing fails instead of raising an exception,
965 and always returns a timezone-aware datetime object. If the string
966 doesn't have timezone information, it is assumed to be UTC.
968 :param value: A string with a supported date format.
970 .. versionchanged:: 2.0
971 Return a timezone-aware datetime object. Use
972 ``email.utils.parsedate_to_datetime``.
973 """
974 if value is None:
975 return None
977 try:
978 dt = email.utils.parsedate_to_datetime(value)
979 except (TypeError, ValueError):
980 return None
982 if dt.tzinfo is None:
983 return dt.replace(tzinfo=timezone.utc)
985 return dt
988def http_date(
989 timestamp: datetime | date | int | float | struct_time | None = None,
990) -> str:
991 """Format a datetime object or timestamp into an :rfc:`2822` date
992 string.
994 This is a wrapper for :func:`email.utils.format_datetime`. It
995 assumes naive datetime objects are in UTC instead of raising an
996 exception.
998 :param timestamp: The datetime or timestamp to format. Defaults to
999 the current time.
1001 .. versionchanged:: 2.0
1002 Use ``email.utils.format_datetime``. Accept ``date`` objects.
1003 """
1004 if isinstance(timestamp, date):
1005 if not isinstance(timestamp, datetime):
1006 # Assume plain date is midnight UTC.
1007 timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc)
1008 else:
1009 # Ensure datetime is timezone-aware.
1010 timestamp = _dt_as_utc(timestamp)
1012 return email.utils.format_datetime(timestamp, usegmt=True)
1014 if isinstance(timestamp, struct_time):
1015 timestamp = mktime(timestamp)
1017 return email.utils.formatdate(timestamp, usegmt=True)
1020def parse_age(value: str | None = None) -> timedelta | None:
1021 """Parses a base-10 integer count of seconds into a timedelta.
1023 If parsing fails, the return value is `None`.
1025 :param value: a string consisting of an integer represented in base-10
1026 :return: a :class:`datetime.timedelta` object or `None`.
1027 """
1028 if not value:
1029 return None
1030 try:
1031 seconds = int(value)
1032 except ValueError:
1033 return None
1034 if seconds < 0:
1035 return None
1036 try:
1037 return timedelta(seconds=seconds)
1038 except OverflowError:
1039 return None
1042def dump_age(age: timedelta | int | None = None) -> str | None:
1043 """Formats the duration as a base-10 integer.
1045 :param age: should be an integer number of seconds,
1046 a :class:`datetime.timedelta` object, or,
1047 if the age is unknown, `None` (default).
1048 """
1049 if age is None:
1050 return None
1051 if isinstance(age, timedelta):
1052 age = int(age.total_seconds())
1053 else:
1054 age = int(age)
1056 if age < 0:
1057 raise ValueError("age cannot be negative")
1059 return str(age)
1062def is_resource_modified(
1063 environ: WSGIEnvironment,
1064 etag: str | None = None,
1065 data: bytes | None = None,
1066 last_modified: datetime | str | None = None,
1067 ignore_if_range: bool = True,
1068) -> bool:
1069 """Convenience method for conditional requests.
1071 :param environ: the WSGI environment of the request to be checked.
1072 :param etag: the etag for the response for comparison.
1073 :param data: or alternatively the data of the response to automatically
1074 generate an etag using :func:`generate_etag`.
1075 :param last_modified: an optional date of the last modification.
1076 :param ignore_if_range: If `False`, `If-Range` header will be taken into
1077 account.
1078 :return: `True` if the resource was modified, otherwise `False`.
1080 .. versionchanged:: 2.0
1081 SHA-1 is used to generate an etag value for the data. MD5 may
1082 not be available in some environments.
1084 .. versionchanged:: 1.0.0
1085 The check is run for methods other than ``GET`` and ``HEAD``.
1086 """
1087 return _sansio_http.is_resource_modified(
1088 http_range=environ.get("HTTP_RANGE"),
1089 http_if_range=environ.get("HTTP_IF_RANGE"),
1090 http_if_modified_since=environ.get("HTTP_IF_MODIFIED_SINCE"),
1091 http_if_none_match=environ.get("HTTP_IF_NONE_MATCH"),
1092 http_if_match=environ.get("HTTP_IF_MATCH"),
1093 etag=etag,
1094 data=data,
1095 last_modified=last_modified,
1096 ignore_if_range=ignore_if_range,
1097 )
1100def remove_entity_headers(
1101 headers: ds.Headers | list[tuple[str, str]],
1102 allowed: t.Iterable[str] = ("expires", "content-location"),
1103) -> None:
1104 """Remove all entity headers from a list or :class:`Headers` object. This
1105 operation works in-place. `Expires` and `Content-Location` headers are
1106 by default not removed. The reason for this is :rfc:`2616` section
1107 10.3.5 which specifies some entity headers that should be sent.
1109 .. versionchanged:: 0.5
1110 added `allowed` parameter.
1112 :param headers: a list or :class:`Headers` object.
1113 :param allowed: a list of headers that should still be allowed even though
1114 they are entity headers.
1115 """
1116 allowed = {x.lower() for x in allowed}
1117 headers[:] = [
1118 (key, value)
1119 for key, value in headers
1120 if not is_entity_header(key) or key.lower() in allowed
1121 ]
1124def remove_hop_by_hop_headers(headers: ds.Headers | list[tuple[str, str]]) -> None:
1125 """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or
1126 :class:`Headers` object. This operation works in-place.
1128 .. versionadded:: 0.5
1130 :param headers: a list or :class:`Headers` object.
1131 """
1132 headers[:] = [
1133 (key, value) for key, value in headers if not is_hop_by_hop_header(key)
1134 ]
1137def is_entity_header(header: str) -> bool:
1138 """Check if a header is an entity header.
1140 .. versionadded:: 0.5
1142 :param header: the header to test.
1143 :return: `True` if it's an entity header, `False` otherwise.
1144 """
1145 return header.lower() in _entity_headers
1148def is_hop_by_hop_header(header: str) -> bool:
1149 """Check if a header is an HTTP/1.1 "Hop-by-Hop" header.
1151 .. versionadded:: 0.5
1153 :param header: the header to test.
1154 :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise.
1155 """
1156 return header.lower() in _hop_by_hop_headers
1159def parse_cookie(
1160 header: WSGIEnvironment | str | None,
1161 cls: type[ds.MultiDict[str, str]] | None = None,
1162) -> ds.MultiDict[str, str]:
1163 """Parse a cookie from a string or WSGI environ.
1165 The same key can be provided multiple times, the values are stored
1166 in-order. The default :class:`MultiDict` will have the first value
1167 first, and all values can be retrieved with
1168 :meth:`MultiDict.getlist`.
1170 :param header: The cookie header as a string, or a WSGI environ dict
1171 with a ``HTTP_COOKIE`` key.
1172 :param cls: A dict-like class to store the parsed cookies in.
1173 Defaults to :class:`MultiDict`.
1175 .. versionchanged:: 3.0
1176 Passing bytes, and the ``charset`` and ``errors`` parameters, were removed.
1178 .. versionchanged:: 1.0
1179 Returns a :class:`MultiDict` instead of a ``TypeConversionDict``.
1181 .. versionchanged:: 0.5
1182 Returns a :class:`TypeConversionDict` instead of a regular dict. The ``cls``
1183 parameter was added.
1184 """
1185 if isinstance(header, dict):
1186 cookie = header.get("HTTP_COOKIE")
1187 else:
1188 cookie = header
1190 if cookie:
1191 cookie = cookie.encode("latin1").decode()
1193 return _sansio_http.parse_cookie(cookie=cookie, cls=cls)
1196_cookie_no_quote_re = re.compile(r"[\w!#$%&'()*+\-./:<=>?@\[\]^`{|}~]*", re.A)
1197_cookie_slash_re = re.compile(rb"[\x00-\x19\",;\\\x7f-\xff]", re.A)
1198_cookie_slash_map = {b'"': b'\\"', b"\\": b"\\\\"}
1199_cookie_slash_map.update(
1200 (v.to_bytes(1, "big"), b"\\%03o" % v)
1201 for v in [*range(0x20), *b",;", *range(0x7F, 256)]
1202)
1205def dump_cookie(
1206 key: str,
1207 value: str = "",
1208 max_age: timedelta | int | None = None,
1209 expires: str | datetime | int | float | None = None,
1210 path: str | None = "/",
1211 domain: str | None = None,
1212 secure: bool = False,
1213 httponly: bool = False,
1214 sync_expires: bool = True,
1215 max_size: int = 4093,
1216 samesite: str | None = None,
1217) -> str:
1218 """Create a Set-Cookie header without the ``Set-Cookie`` prefix.
1220 The return value is usually restricted to ascii as the vast majority
1221 of values are properly escaped, but that is no guarantee. It's
1222 tunneled through latin1 as required by :pep:`3333`.
1224 The return value is not ASCII safe if the key contains unicode
1225 characters. This is technically against the specification but
1226 happens in the wild. It's strongly recommended to not use
1227 non-ASCII values for the keys.
1229 :param max_age: should be a number of seconds, or `None` (default) if
1230 the cookie should last only as long as the client's
1231 browser session. Additionally `timedelta` objects
1232 are accepted, too.
1233 :param expires: should be a `datetime` object or unix timestamp.
1234 :param path: limits the cookie to a given path, per default it will
1235 span the whole domain.
1236 :param domain: Use this if you want to set a cross-domain cookie. For
1237 example, ``domain="example.com"`` will set a cookie
1238 that is readable by the domain ``www.example.com``,
1239 ``foo.example.com`` etc. Otherwise, a cookie will only
1240 be readable by the domain that set it.
1241 :param secure: The cookie will only be available via HTTPS
1242 :param httponly: disallow JavaScript to access the cookie. This is an
1243 extension to the cookie standard and probably not
1244 supported by all browsers.
1245 :param charset: the encoding for string values.
1246 :param sync_expires: automatically set expires if max_age is defined
1247 but expires not.
1248 :param max_size: Warn if the final header value exceeds this size. The
1249 default, 4093, should be safely `supported by most browsers
1250 <cookie_>`_. Set to 0 to disable this check.
1251 :param samesite: Limits the scope of the cookie such that it will
1252 only be attached to requests if those requests are same-site.
1254 .. _`cookie`: http://browsercookielimits.squawky.net/
1256 .. versionchanged:: 3.0
1257 Passing bytes, and the ``charset`` parameter, were removed.
1259 .. versionchanged:: 2.3.3
1260 The ``path`` parameter is ``/`` by default.
1262 .. versionchanged:: 2.3.1
1263 The value allows more characters without quoting.
1265 .. versionchanged:: 2.3
1266 ``localhost`` and other names without a dot are allowed for the domain. A
1267 leading dot is ignored.
1269 .. versionchanged:: 2.3
1270 The ``path`` parameter is ``None`` by default.
1272 .. versionchanged:: 1.0.0
1273 The string ``'None'`` is accepted for ``samesite``.
1274 """
1275 if path is not None:
1276 # safe = https://url.spec.whatwg.org/#url-path-segment-string
1277 # as well as percent for things that are already quoted
1278 # excluding semicolon since it's part of the header syntax
1279 path = quote(path, safe="%!$&'()*+,/:=@")
1281 if domain:
1282 domain = domain.partition(":")[0].lstrip(".").encode("idna").decode("ascii")
1284 if isinstance(max_age, timedelta):
1285 max_age = int(max_age.total_seconds())
1287 if expires is not None:
1288 if not isinstance(expires, str):
1289 expires = http_date(expires)
1290 elif max_age is not None and sync_expires:
1291 expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age)
1293 if samesite is not None:
1294 samesite = samesite.title()
1296 if samesite not in {"Strict", "Lax", "None"}:
1297 raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.")
1299 # Quote value if it contains characters not allowed by RFC 6265. Slash-escape with
1300 # three octal digits, which matches http.cookies, although the RFC suggests base64.
1301 if not _cookie_no_quote_re.fullmatch(value):
1302 # Work with bytes here, since a UTF-8 character could be multiple bytes.
1303 value = _cookie_slash_re.sub(
1304 lambda m: _cookie_slash_map[m.group()], value.encode()
1305 ).decode("ascii")
1306 value = f'"{value}"'
1308 # Send a non-ASCII key as mojibake. Everything else should already be ASCII.
1309 # TODO Remove encoding dance, it seems like clients accept UTF-8 keys
1310 buf = [f"{key.encode().decode('latin1')}={value}"]
1312 for k, v in (
1313 ("Domain", domain),
1314 ("Expires", expires),
1315 ("Max-Age", max_age),
1316 ("Secure", secure),
1317 ("HttpOnly", httponly),
1318 ("Path", path),
1319 ("SameSite", samesite),
1320 ):
1321 if v is None or v is False:
1322 continue
1324 if v is True:
1325 buf.append(k)
1326 continue
1328 buf.append(f"{k}={v}")
1330 rv = "; ".join(buf)
1332 # Warn if the final value of the cookie is larger than the limit. If the cookie is
1333 # too large, then it may be silently ignored by the browser, which can be quite hard
1334 # to debug.
1335 cookie_size = len(rv)
1337 if max_size and cookie_size > max_size:
1338 value_size = len(value)
1339 warnings.warn(
1340 f"The '{key}' cookie is too large: the value was {value_size} bytes but the"
1341 f" header required {cookie_size - value_size} extra bytes. The final size"
1342 f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may"
1343 " silently ignore cookies larger than this.",
1344 stacklevel=2,
1345 )
1347 return rv
1350def is_byte_range_valid(
1351 start: int | None, stop: int | None, length: int | None
1352) -> bool:
1353 """Checks if a given byte content range is valid for the given length.
1355 .. versionadded:: 0.7
1356 """
1357 if (start is None) != (stop is None):
1358 return False
1359 elif start is None:
1360 return length is None or length >= 0
1361 elif length is None:
1362 return 0 <= start < stop # type: ignore
1363 elif start >= stop: # type: ignore
1364 return False
1365 return 0 <= start < length
1368# circular dependencies
1369from . import datastructures as ds
1370from .sansio import http as _sansio_http