Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/http.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import email.utils
4import re
5import typing as t
6import warnings
7from datetime import date
8from datetime import datetime
9from datetime import time
10from datetime import timedelta
11from datetime import timezone
12from enum import Enum
13from hashlib import sha1
14from time import mktime
15from time import struct_time
16from urllib.parse import quote
17from urllib.parse import unquote
18from urllib.request import parse_http_list as _parse_list_header
20from ._internal import _dt_as_utc
21from ._internal import _plain_int
23if t.TYPE_CHECKING:
24 from _typeshed.wsgi import WSGIEnvironment
26_token_chars = frozenset(
27 "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~"
28)
29_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)')
30_entity_headers = frozenset(
31 [
32 "allow",
33 "content-encoding",
34 "content-language",
35 "content-length",
36 "content-location",
37 "content-md5",
38 "content-range",
39 "content-type",
40 "expires",
41 "last-modified",
42 ]
43)
44_hop_by_hop_headers = frozenset(
45 [
46 "connection",
47 "keep-alive",
48 "proxy-authenticate",
49 "proxy-authorization",
50 "te",
51 "trailer",
52 "transfer-encoding",
53 "upgrade",
54 ]
55)
56HTTP_STATUS_CODES = {
57 100: "Continue",
58 101: "Switching Protocols",
59 102: "Processing",
60 103: "Early Hints", # see RFC 8297
61 200: "OK",
62 201: "Created",
63 202: "Accepted",
64 203: "Non Authoritative Information",
65 204: "No Content",
66 205: "Reset Content",
67 206: "Partial Content",
68 207: "Multi Status",
69 208: "Already Reported", # see RFC 5842
70 226: "IM Used", # see RFC 3229
71 300: "Multiple Choices",
72 301: "Moved Permanently",
73 302: "Found",
74 303: "See Other",
75 304: "Not Modified",
76 305: "Use Proxy",
77 306: "Switch Proxy", # unused
78 307: "Temporary Redirect",
79 308: "Permanent Redirect",
80 400: "Bad Request",
81 401: "Unauthorized",
82 402: "Payment Required", # unused
83 403: "Forbidden",
84 404: "Not Found",
85 405: "Method Not Allowed",
86 406: "Not Acceptable",
87 407: "Proxy Authentication Required",
88 408: "Request Timeout",
89 409: "Conflict",
90 410: "Gone",
91 411: "Length Required",
92 412: "Precondition Failed",
93 413: "Request Entity Too Large",
94 414: "Request URI Too Long",
95 415: "Unsupported Media Type",
96 416: "Requested Range Not Satisfiable",
97 417: "Expectation Failed",
98 418: "I'm a teapot", # see RFC 2324
99 421: "Misdirected Request", # see RFC 7540
100 422: "Unprocessable Entity",
101 423: "Locked",
102 424: "Failed Dependency",
103 425: "Too Early", # see RFC 8470
104 426: "Upgrade Required",
105 428: "Precondition Required", # see RFC 6585
106 429: "Too Many Requests",
107 431: "Request Header Fields Too Large",
108 449: "Retry With", # proprietary MS extension
109 451: "Unavailable For Legal Reasons",
110 500: "Internal Server Error",
111 501: "Not Implemented",
112 502: "Bad Gateway",
113 503: "Service Unavailable",
114 504: "Gateway Timeout",
115 505: "HTTP Version Not Supported",
116 506: "Variant Also Negotiates", # see RFC 2295
117 507: "Insufficient Storage",
118 508: "Loop Detected", # see RFC 5842
119 510: "Not Extended",
120 511: "Network Authentication Failed",
121}
124class COEP(Enum):
125 """Cross Origin Embedder Policies"""
127 UNSAFE_NONE = "unsafe-none"
128 REQUIRE_CORP = "require-corp"
131class COOP(Enum):
132 """Cross Origin Opener Policies"""
134 UNSAFE_NONE = "unsafe-none"
135 SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups"
136 SAME_ORIGIN = "same-origin"
139def quote_header_value(value: t.Any, allow_token: bool = True) -> str:
140 """Add double quotes around a header value. If the header contains only ASCII token
141 characters, it will be returned unchanged. If the header contains ``"`` or ``\\``
142 characters, they will be escaped with an additional ``\\`` character.
144 This is the reverse of :func:`unquote_header_value`.
146 :param value: The value to quote. Will be converted to a string.
147 :param allow_token: Disable to quote the value even if it only has token characters.
149 .. versionchanged:: 3.0
150 Passing bytes is not supported.
152 .. versionchanged:: 3.0
153 The ``extra_chars`` parameter is removed.
155 .. versionchanged:: 2.3
156 The value is quoted if it is the empty string.
158 .. versionadded:: 0.5
159 """
160 value_str = str(value)
162 if not value_str:
163 return '""'
165 if allow_token:
166 token_chars = _token_chars
168 if token_chars.issuperset(value_str):
169 return value_str
171 value_str = value_str.replace("\\", "\\\\").replace('"', '\\"')
172 return f'"{value_str}"'
175def unquote_header_value(value: str) -> str:
176 """Remove double quotes and decode slash-escaped ``"`` and ``\\`` characters in a
177 header value.
179 This is the reverse of :func:`quote_header_value`.
181 :param value: The header value to unquote.
183 .. versionchanged:: 3.0
184 The ``is_filename`` parameter is removed.
185 """
186 if len(value) >= 2 and value[0] == value[-1] == '"':
187 value = value[1:-1]
188 return value.replace("\\\\", "\\").replace('\\"', '"')
190 return value
193def dump_options_header(header: str | None, options: t.Mapping[str, t.Any]) -> str:
194 """Produce a header value and ``key=value`` parameters separated by semicolons
195 ``;``. For example, the ``Content-Type`` header.
197 .. code-block:: python
199 dump_options_header("text/html", {"charset": "UTF-8"})
200 'text/html; charset=UTF-8'
202 This is the reverse of :func:`parse_options_header`.
204 If a value contains non-token characters, it will be quoted.
206 If a value is ``None``, the parameter is skipped.
208 In some keys for some headers, a UTF-8 value can be encoded using a special
209 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will
210 not produce that format automatically, but if a given key ends with an asterisk
211 ``*``, the value is assumed to have that form and will not be quoted further.
213 :param header: The primary header value.
214 :param options: Parameters to encode as ``key=value`` pairs.
216 .. versionchanged:: 2.3
217 Keys with ``None`` values are skipped rather than treated as a bare key.
219 .. versionchanged:: 2.2.3
220 If a key ends with ``*``, its value will not be quoted.
221 """
222 segments = []
224 if header is not None:
225 segments.append(header)
227 for key, value in options.items():
228 if value is None:
229 continue
231 if key[-1] == "*":
232 segments.append(f"{key}={value}")
233 else:
234 segments.append(f"{key}={quote_header_value(value)}")
236 return "; ".join(segments)
239def dump_header(iterable: dict[str, t.Any] | t.Iterable[t.Any]) -> str:
240 """Produce a header value from a list of items or ``key=value`` pairs, separated by
241 commas ``,``.
243 This is the reverse of :func:`parse_list_header`, :func:`parse_dict_header`, and
244 :func:`parse_set_header`.
246 If a value contains non-token characters, it will be quoted.
248 If a value is ``None``, the key is output alone.
250 In some keys for some headers, a UTF-8 value can be encoded using a special
251 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will
252 not produce that format automatically, but if a given key ends with an asterisk
253 ``*``, the value is assumed to have that form and will not be quoted further.
255 .. code-block:: python
257 dump_header(["foo", "bar baz"])
258 'foo, "bar baz"'
260 dump_header({"foo": "bar baz"})
261 'foo="bar baz"'
263 :param iterable: The items to create a header from.
265 .. versionchanged:: 3.0
266 The ``allow_token`` parameter is removed.
268 .. versionchanged:: 2.2.3
269 If a key ends with ``*``, its value will not be quoted.
270 """
271 if isinstance(iterable, dict):
272 items = []
274 for key, value in iterable.items():
275 if value is None:
276 items.append(key)
277 elif key[-1] == "*":
278 items.append(f"{key}={value}")
279 else:
280 items.append(f"{key}={quote_header_value(value)}")
281 else:
282 items = [quote_header_value(x) for x in iterable]
284 return ", ".join(items)
287def dump_csp_header(header: ds.ContentSecurityPolicy) -> str:
288 """Dump a Content Security Policy header.
290 These are structured into policies such as "default-src 'self';
291 script-src 'self'".
293 .. versionadded:: 1.0.0
294 Support for Content Security Policy headers was added.
296 """
297 return "; ".join(f"{key} {value}" for key, value in header.items())
300def parse_list_header(value: str) -> list[str]:
301 """Parse a header value that consists of a list of comma separated items according
302 to `RFC 9110 <https://httpwg.org/specs/rfc9110.html#abnf.extension>`__.
304 This extends :func:`urllib.request.parse_http_list` to remove surrounding quotes
305 from values.
307 .. code-block:: python
309 parse_list_header('token, "quoted value"')
310 ['token', 'quoted value']
312 This is the reverse of :func:`dump_header`.
314 :param value: The header value to parse.
315 """
316 result = []
318 for item in _parse_list_header(value):
319 if len(item) >= 2 and item[0] == item[-1] == '"':
320 item = item[1:-1]
322 result.append(item)
324 return result
327def parse_dict_header(value: str) -> dict[str, str | None]:
328 """Parse a list header using :func:`parse_list_header`, then parse each item as a
329 ``key=value`` pair.
331 .. code-block:: python
333 parse_dict_header('a=b, c="d, e", f')
334 {"a": "b", "c": "d, e", "f": None}
336 This is the reverse of :func:`dump_header`.
338 If a key does not have a value, it is ``None``.
340 This handles charsets for values as described in
341 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__. Only ASCII, UTF-8,
342 and ISO-8859-1 charsets are accepted, otherwise the value remains quoted.
344 :param value: The header value to parse.
346 .. versionchanged:: 3.0
347 Passing bytes is not supported.
349 .. versionchanged:: 3.0
350 The ``cls`` argument is removed.
352 .. versionchanged:: 2.3
353 Added support for ``key*=charset''value`` encoded items.
355 .. versionchanged:: 0.9
356 The ``cls`` argument was added.
357 """
358 result: dict[str, str | None] = {}
360 for item in parse_list_header(value):
361 key, has_value, value = item.partition("=")
362 key = key.strip()
364 if not has_value:
365 result[key] = None
366 continue
368 value = value.strip()
369 encoding: str | None = None
371 if key[-1] == "*":
372 # key*=charset''value becomes key=value, where value is percent encoded
373 # adapted from parse_options_header, without the continuation handling
374 key = key[:-1]
375 match = _charset_value_re.match(value)
377 if match:
378 # If there is a charset marker in the value, split it off.
379 encoding, value = match.groups()
380 encoding = encoding.lower()
382 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
383 # This list will not be extended further. An invalid encoding will leave the
384 # value quoted.
385 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
386 # invalid bytes are replaced during unquoting
387 value = unquote(value, encoding=encoding)
389 if len(value) >= 2 and value[0] == value[-1] == '"':
390 value = value[1:-1]
392 result[key] = value
394 return result
397# https://httpwg.org/specs/rfc9110.html#parameter
398_parameter_key_re = re.compile(r"([\w!#$%&'*+\-.^`|~]+)=", flags=re.ASCII)
399_parameter_token_value_re = re.compile(r"[\w!#$%&'*+\-.^`|~]+", flags=re.ASCII)
400# https://www.rfc-editor.org/rfc/rfc2231#section-4
401_charset_value_re = re.compile(
402 r"""
403 ([\w!#$%&*+\-.^`|~]*)' # charset part, could be empty
404 [\w!#$%&*+\-.^`|~]*' # don't care about language part, usually empty
405 ([\w!#$%&'*+\-.^`|~]+) # one or more token chars with percent encoding
406 """,
407 re.ASCII | re.VERBOSE,
408)
409# https://www.rfc-editor.org/rfc/rfc2231#section-3
410_continuation_re = re.compile(r"\*(\d+)$", re.ASCII)
413def parse_options_header(value: str | None) -> tuple[str, dict[str, str]]:
414 """Parse a header that consists of a value with ``key=value`` parameters separated
415 by semicolons ``;``. For example, the ``Content-Type`` header.
417 .. code-block:: python
419 parse_options_header("text/html; charset=UTF-8")
420 ('text/html', {'charset': 'UTF-8'})
422 parse_options_header("")
423 ("", {})
425 This is the reverse of :func:`dump_options_header`.
427 This parses valid parameter parts as described in
428 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#parameter>`__. Invalid parts are
429 skipped.
431 This handles continuations and charsets as described in
432 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__, although not as
433 strictly as the RFC. Only ASCII, UTF-8, and ISO-8859-1 charsets are accepted,
434 otherwise the value remains quoted.
436 Clients may not be consistent in how they handle a quote character within a quoted
437 value. The `HTML Standard <https://html.spec.whatwg.org/#multipart-form-data>`__
438 replaces it with ``%22`` in multipart form data.
439 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#quoted.strings>`__ uses backslash
440 escapes in HTTP headers. Both are decoded to the ``"`` character.
442 Clients may not be consistent in how they handle non-ASCII characters. HTML
443 documents must declare ``<meta charset=UTF-8>``, otherwise browsers may replace with
444 HTML character references, which can be decoded using :func:`html.unescape`.
446 :param value: The header value to parse.
447 :return: ``(value, options)``, where ``options`` is a dict
449 .. versionchanged:: 2.3
450 Invalid parts, such as keys with no value, quoted keys, and incorrectly quoted
451 values, are discarded instead of treating as ``None``.
453 .. versionchanged:: 2.3
454 Only ASCII, UTF-8, and ISO-8859-1 are accepted for charset values.
456 .. versionchanged:: 2.3
457 Escaped quotes in quoted values, like ``%22`` and ``\\"``, are handled.
459 .. versionchanged:: 2.2
460 Option names are always converted to lowercase.
462 .. versionchanged:: 2.2
463 The ``multiple`` parameter was removed.
465 .. versionchanged:: 0.15
466 :rfc:`2231` parameter continuations are handled.
468 .. versionadded:: 0.5
469 """
470 if value is None:
471 return "", {}
473 value, _, rest = value.partition(";")
474 value = value.strip()
475 rest = rest.strip()
477 if not value or not rest:
478 # empty (invalid) value, or value without options
479 return value, {}
481 # Collect all valid key=value parts without processing the value.
482 parts: list[tuple[str, str]] = []
484 while True:
485 if (m := _parameter_key_re.match(rest)) is not None:
486 pk = m.group(1).lower()
487 rest = rest[m.end() :]
489 # Value may be a token.
490 if (m := _parameter_token_value_re.match(rest)) is not None:
491 parts.append((pk, m.group()))
493 # Value may be a quoted string, find the closing quote.
494 elif rest[:1] == '"':
495 pos = 1
496 length = len(rest)
498 while pos < length:
499 if rest[pos : pos + 2] in {"\\\\", '\\"'}:
500 # Consume escaped slashes and quotes.
501 pos += 2
502 elif rest[pos] == '"':
503 # Stop at an unescaped quote.
504 parts.append((pk, rest[: pos + 1]))
505 rest = rest[pos + 1 :]
506 break
507 else:
508 # Consume any other character.
509 pos += 1
511 # Find the next section delimited by `;`, if any.
512 if (end := rest.find(";")) == -1:
513 break
515 rest = rest[end + 1 :].lstrip()
517 options: dict[str, str] = {}
518 encoding: str | None = None
519 continued_encoding: str | None = None
521 # For each collected part, process optional charset and continuation,
522 # unquote quoted values.
523 for pk, pv in parts:
524 if pk[-1] == "*":
525 # key*=charset''value becomes key=value, where value is percent encoded
526 pk = pk[:-1]
527 match = _charset_value_re.match(pv)
529 if match:
530 # If there is a valid charset marker in the value, split it off.
531 encoding, pv = match.groups()
532 # This might be the empty string, handled next.
533 encoding = encoding.lower()
535 # No charset marker, or marker with empty charset value.
536 if not encoding:
537 encoding = continued_encoding
539 # A safe list of encodings. Modern clients should only send ASCII or UTF-8.
540 # This list will not be extended further. An invalid encoding will leave the
541 # value quoted.
542 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}:
543 # Continuation parts don't require their own charset marker. This is
544 # looser than the RFC, it will persist across different keys and allows
545 # changing the charset during a continuation. But this implementation is
546 # much simpler than tracking the full state.
547 continued_encoding = encoding
548 # invalid bytes are replaced during unquoting
549 pv = unquote(pv, encoding=encoding)
551 # Remove quotes. At this point the value cannot be empty or a single quote.
552 if pv[0] == pv[-1] == '"':
553 # HTTP headers use slash, multipart form data uses percent
554 pv = pv[1:-1].replace("\\\\", "\\").replace('\\"', '"').replace("%22", '"')
556 match = _continuation_re.search(pk)
558 if match:
559 # key*0=a; key*1=b becomes key=ab
560 pk = pk[: match.start()]
561 options[pk] = options.get(pk, "") + pv
562 else:
563 options[pk] = pv
565 return value, options
568_q_value_re = re.compile(r"-?\d+(\.\d+)?", re.ASCII)
569_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept")
572@t.overload
573def parse_accept_header(value: str | None) -> ds.Accept: ...
576@t.overload
577def parse_accept_header(value: str | None, cls: type[_TAnyAccept]) -> _TAnyAccept: ...
580def parse_accept_header(
581 value: str | None, cls: type[_TAnyAccept] | None = None
582) -> _TAnyAccept:
583 """Parse an ``Accept`` header according to
584 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#field.accept>`__.
586 Returns an :class:`.Accept` instance, which can sort and inspect items based on
587 their quality parameter. When parsing ``Accept-Charset``, ``Accept-Encoding``, or
588 ``Accept-Language``, pass the appropriate :class:`.Accept` subclass.
590 :param value: The header value to parse.
591 :param cls: The :class:`.Accept` class to wrap the result in.
592 :return: An instance of ``cls``.
594 .. versionchanged:: 2.3
595 Parse according to RFC 9110. Items with invalid ``q`` values are skipped.
596 """
597 if cls is None:
598 cls = t.cast(t.Type[_TAnyAccept], ds.Accept)
600 if not value:
601 return cls(None)
603 result = []
605 for item in parse_list_header(value):
606 item, options = parse_options_header(item)
608 if "q" in options:
609 # pop q, remaining options are reconstructed
610 q_str = options.pop("q").strip()
612 if _q_value_re.fullmatch(q_str) is None:
613 # ignore an invalid q
614 continue
616 q = float(q_str)
618 if q < 0 or q > 1:
619 # ignore an invalid q
620 continue
621 else:
622 q = 1
624 if options:
625 # reconstruct the media type with any options
626 item = dump_options_header(item, options)
628 result.append((item, q))
630 return cls(result)
633_TAnyCC = t.TypeVar("_TAnyCC", bound="ds.cache_control._CacheControl")
636@t.overload
637def parse_cache_control_header(
638 value: str | None,
639 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None,
640) -> ds.RequestCacheControl: ...
643@t.overload
644def parse_cache_control_header(
645 value: str | None,
646 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None,
647 cls: type[_TAnyCC] = ...,
648) -> _TAnyCC: ...
651def parse_cache_control_header(
652 value: str | None,
653 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None,
654 cls: type[_TAnyCC] | None = None,
655) -> _TAnyCC:
656 """Parse a cache control header. The RFC differs between response and
657 request cache control, this method does not. It's your responsibility
658 to not use the wrong control statements.
660 .. versionadded:: 0.5
661 The `cls` was added. If not specified an immutable
662 :class:`~werkzeug.datastructures.RequestCacheControl` is returned.
664 :param value: a cache control header to be parsed.
665 :param on_update: an optional callable that is called every time a value
666 on the :class:`~werkzeug.datastructures.CacheControl`
667 object is changed.
668 :param cls: the class for the returned object. By default
669 :class:`~werkzeug.datastructures.RequestCacheControl` is used.
670 :return: a `cls` object.
671 """
672 if cls is None:
673 cls = t.cast("type[_TAnyCC]", ds.RequestCacheControl)
675 if not value:
676 return cls((), on_update)
678 return cls(parse_dict_header(value), on_update)
681_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy")
684@t.overload
685def parse_csp_header(
686 value: str | None,
687 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None,
688) -> ds.ContentSecurityPolicy: ...
691@t.overload
692def parse_csp_header(
693 value: str | None,
694 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None,
695 cls: type[_TAnyCSP] = ...,
696) -> _TAnyCSP: ...
699def parse_csp_header(
700 value: str | None,
701 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None,
702 cls: type[_TAnyCSP] | None = None,
703) -> _TAnyCSP:
704 """Parse a Content Security Policy header.
706 .. versionadded:: 1.0.0
707 Support for Content Security Policy headers was added.
709 :param value: a csp header to be parsed.
710 :param on_update: an optional callable that is called every time a value
711 on the object is changed.
712 :param cls: the class for the returned object. By default
713 :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used.
714 :return: a `cls` object.
715 """
716 if cls is None:
717 cls = t.cast("type[_TAnyCSP]", ds.ContentSecurityPolicy)
719 if value is None:
720 return cls((), on_update)
722 items = []
724 for policy in value.split(";"):
725 policy = policy.strip()
727 # Ignore badly formatted policies (no space)
728 if " " in policy:
729 directive, value = policy.strip().split(" ", 1)
730 items.append((directive.strip(), value.strip()))
732 return cls(items, on_update)
735def parse_set_header(
736 value: str | None,
737 on_update: t.Callable[[ds.HeaderSet], None] | None = None,
738) -> ds.HeaderSet:
739 """Parse a set-like header and return a
740 :class:`~werkzeug.datastructures.HeaderSet` object:
742 >>> hs = parse_set_header('token, "quoted value"')
744 The return value is an object that treats the items case-insensitively
745 and keeps the order of the items:
747 >>> 'TOKEN' in hs
748 True
749 >>> hs.index('quoted value')
750 1
751 >>> hs
752 HeaderSet(['token', 'quoted value'])
754 To create a header from the :class:`HeaderSet` again, use the
755 :func:`dump_header` function.
757 :param value: a set header to be parsed.
758 :param on_update: an optional callable that is called every time a
759 value on the :class:`~werkzeug.datastructures.HeaderSet`
760 object is changed.
761 :return: a :class:`~werkzeug.datastructures.HeaderSet`
762 """
763 if not value:
764 return ds.HeaderSet(None, on_update)
765 return ds.HeaderSet(parse_list_header(value), on_update)
768def parse_if_range_header(value: str | None) -> ds.IfRange:
769 """Parses an if-range header which can be an etag or a date. Returns
770 a :class:`~werkzeug.datastructures.IfRange` object.
772 .. versionchanged:: 2.0
773 If the value represents a datetime, it is timezone-aware.
775 .. versionadded:: 0.7
776 """
777 if not value:
778 return ds.IfRange()
779 date = parse_date(value)
780 if date is not None:
781 return ds.IfRange(date=date)
782 # drop weakness information
783 return ds.IfRange(unquote_etag(value)[0])
786def parse_range_header(
787 value: str | None, make_inclusive: bool = True
788) -> ds.Range | None:
789 """Parses a range header into a :class:`~werkzeug.datastructures.Range`
790 object. If the header is missing or malformed `None` is returned.
791 `ranges` is a list of ``(start, stop)`` tuples where the ranges are
792 non-inclusive.
794 .. versionadded:: 0.7
795 """
796 if not value or "=" not in value:
797 return None
799 ranges = []
800 last_end = 0
801 units, rng = value.split("=", 1)
802 units = units.strip().lower()
804 for item in rng.split(","):
805 item = item.strip()
806 if "-" not in item:
807 return None
808 if item.startswith("-"):
809 if last_end < 0:
810 return None
811 try:
812 begin = _plain_int(item)
813 except ValueError:
814 return None
815 end = None
816 last_end = -1
817 elif "-" in item:
818 begin_str, end_str = item.split("-", 1)
819 begin_str = begin_str.strip()
820 end_str = end_str.strip()
822 try:
823 begin = _plain_int(begin_str)
824 except ValueError:
825 return None
827 if begin < last_end or last_end < 0:
828 return None
829 if end_str:
830 try:
831 end = _plain_int(end_str) + 1
832 except ValueError:
833 return None
835 if begin >= end:
836 return None
837 else:
838 end = None
839 last_end = end if end is not None else -1
840 ranges.append((begin, end))
842 return ds.Range(units, ranges)
845def parse_content_range_header(
846 value: str | None,
847 on_update: t.Callable[[ds.ContentRange], None] | None = None,
848) -> ds.ContentRange | None:
849 """Parses a range header into a
850 :class:`~werkzeug.datastructures.ContentRange` object or `None` if
851 parsing is not possible.
853 .. versionadded:: 0.7
855 :param value: a content range header to be parsed.
856 :param on_update: an optional callable that is called every time a value
857 on the :class:`~werkzeug.datastructures.ContentRange`
858 object is changed.
859 """
860 if value is None:
861 return None
862 try:
863 units, rangedef = (value or "").strip().split(None, 1)
864 except ValueError:
865 return None
867 if "/" not in rangedef:
868 return None
869 rng, length_str = rangedef.split("/", 1)
870 if length_str == "*":
871 length = None
872 else:
873 try:
874 length = _plain_int(length_str)
875 except ValueError:
876 return None
878 if rng == "*":
879 if not is_byte_range_valid(None, None, length):
880 return None
882 return ds.ContentRange(units, None, None, length, on_update=on_update)
883 elif "-" not in rng:
884 return None
886 start_str, stop_str = rng.split("-", 1)
887 try:
888 start = _plain_int(start_str)
889 stop = _plain_int(stop_str) + 1
890 except ValueError:
891 return None
893 if is_byte_range_valid(start, stop, length):
894 return ds.ContentRange(units, start, stop, length, on_update=on_update)
896 return None
899def quote_etag(etag: str, weak: bool = False) -> str:
900 """Quote an etag.
902 :param etag: the etag to quote.
903 :param weak: set to `True` to tag it "weak".
904 """
905 if '"' in etag:
906 raise ValueError("invalid etag")
907 etag = f'"{etag}"'
908 if weak:
909 etag = f"W/{etag}"
910 return etag
913def unquote_etag(
914 etag: str | None,
915) -> tuple[str, bool] | tuple[None, None]:
916 """Unquote a single etag:
918 >>> unquote_etag('W/"bar"')
919 ('bar', True)
920 >>> unquote_etag('"bar"')
921 ('bar', False)
923 :param etag: the etag identifier to unquote.
924 :return: a ``(etag, weak)`` tuple.
925 """
926 if not etag:
927 return None, None
928 etag = etag.strip()
929 weak = False
930 if etag.startswith(("W/", "w/")):
931 weak = True
932 etag = etag[2:]
933 if etag[:1] == etag[-1:] == '"':
934 etag = etag[1:-1]
935 return etag, weak
938def parse_etags(value: str | None) -> ds.ETags:
939 """Parse an etag header.
941 :param value: the tag header to parse
942 :return: an :class:`~werkzeug.datastructures.ETags` object.
943 """
944 if not value:
945 return ds.ETags()
946 strong = []
947 weak = []
948 end = len(value)
949 pos = 0
950 while pos < end:
951 match = _etag_re.match(value, pos)
952 if match is None:
953 break
954 is_weak, quoted, raw = match.groups()
955 if raw == "*":
956 return ds.ETags(star_tag=True)
957 elif quoted:
958 raw = quoted
959 if is_weak:
960 weak.append(raw)
961 else:
962 strong.append(raw)
963 pos = match.end()
964 return ds.ETags(strong, weak)
967def generate_etag(data: bytes) -> str:
968 """Generate an etag for some data.
970 .. versionchanged:: 2.0
971 Use SHA-1. MD5 may not be available in some environments.
972 """
973 return sha1(data).hexdigest()
976def parse_date(value: str | None) -> datetime | None:
977 """Parse an :rfc:`2822` date into a timezone-aware
978 :class:`datetime.datetime` object, or ``None`` if parsing fails.
980 This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It
981 returns ``None`` if parsing fails instead of raising an exception,
982 and always returns a timezone-aware datetime object. If the string
983 doesn't have timezone information, it is assumed to be UTC.
985 :param value: A string with a supported date format.
987 .. versionchanged:: 2.0
988 Return a timezone-aware datetime object. Use
989 ``email.utils.parsedate_to_datetime``.
990 """
991 if value is None:
992 return None
994 try:
995 dt = email.utils.parsedate_to_datetime(value)
996 except (TypeError, ValueError):
997 return None
999 if dt.tzinfo is None:
1000 return dt.replace(tzinfo=timezone.utc)
1002 return dt
1005def http_date(
1006 timestamp: datetime | date | int | float | struct_time | None = None,
1007) -> str:
1008 """Format a datetime object or timestamp into an :rfc:`2822` date
1009 string.
1011 This is a wrapper for :func:`email.utils.format_datetime`. It
1012 assumes naive datetime objects are in UTC instead of raising an
1013 exception.
1015 :param timestamp: The datetime or timestamp to format. Defaults to
1016 the current time.
1018 .. versionchanged:: 2.0
1019 Use ``email.utils.format_datetime``. Accept ``date`` objects.
1020 """
1021 if isinstance(timestamp, date):
1022 if not isinstance(timestamp, datetime):
1023 # Assume plain date is midnight UTC.
1024 timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc)
1025 else:
1026 # Ensure datetime is timezone-aware.
1027 timestamp = _dt_as_utc(timestamp)
1029 return email.utils.format_datetime(timestamp, usegmt=True)
1031 if isinstance(timestamp, struct_time):
1032 timestamp = mktime(timestamp)
1034 return email.utils.formatdate(timestamp, usegmt=True)
1037def parse_age(value: str | None = None) -> timedelta | None:
1038 """Parses a base-10 integer count of seconds into a timedelta.
1040 If parsing fails, the return value is `None`.
1042 :param value: a string consisting of an integer represented in base-10
1043 :return: a :class:`datetime.timedelta` object or `None`.
1044 """
1045 if not value:
1046 return None
1047 try:
1048 seconds = int(value)
1049 except ValueError:
1050 return None
1051 if seconds < 0:
1052 return None
1053 try:
1054 return timedelta(seconds=seconds)
1055 except OverflowError:
1056 return None
1059def dump_age(age: timedelta | int | None = None) -> str | None:
1060 """Formats the duration as a base-10 integer.
1062 :param age: should be an integer number of seconds,
1063 a :class:`datetime.timedelta` object, or,
1064 if the age is unknown, `None` (default).
1065 """
1066 if age is None:
1067 return None
1068 if isinstance(age, timedelta):
1069 age = int(age.total_seconds())
1070 else:
1071 age = int(age)
1073 if age < 0:
1074 raise ValueError("age cannot be negative")
1076 return str(age)
1079def is_resource_modified(
1080 environ: WSGIEnvironment,
1081 etag: str | None = None,
1082 data: bytes | None = None,
1083 last_modified: datetime | str | None = None,
1084 ignore_if_range: bool = True,
1085) -> bool:
1086 """Convenience method for conditional requests.
1088 :param environ: the WSGI environment of the request to be checked.
1089 :param etag: the etag for the response for comparison.
1090 :param data: or alternatively the data of the response to automatically
1091 generate an etag using :func:`generate_etag`.
1092 :param last_modified: an optional date of the last modification.
1093 :param ignore_if_range: If `False`, `If-Range` header will be taken into
1094 account.
1095 :return: `True` if the resource was modified, otherwise `False`.
1097 .. versionchanged:: 2.0
1098 SHA-1 is used to generate an etag value for the data. MD5 may
1099 not be available in some environments.
1101 .. versionchanged:: 1.0.0
1102 The check is run for methods other than ``GET`` and ``HEAD``.
1103 """
1104 return _sansio_http.is_resource_modified(
1105 http_range=environ.get("HTTP_RANGE"),
1106 http_if_range=environ.get("HTTP_IF_RANGE"),
1107 http_if_modified_since=environ.get("HTTP_IF_MODIFIED_SINCE"),
1108 http_if_none_match=environ.get("HTTP_IF_NONE_MATCH"),
1109 http_if_match=environ.get("HTTP_IF_MATCH"),
1110 etag=etag,
1111 data=data,
1112 last_modified=last_modified,
1113 ignore_if_range=ignore_if_range,
1114 )
1117def remove_entity_headers(
1118 headers: ds.Headers | list[tuple[str, str]],
1119 allowed: t.Iterable[str] = ("expires", "content-location"),
1120) -> None:
1121 """Remove all entity headers from a list or :class:`Headers` object. This
1122 operation works in-place. `Expires` and `Content-Location` headers are
1123 by default not removed. The reason for this is :rfc:`2616` section
1124 10.3.5 which specifies some entity headers that should be sent.
1126 .. versionchanged:: 0.5
1127 added `allowed` parameter.
1129 :param headers: a list or :class:`Headers` object.
1130 :param allowed: a list of headers that should still be allowed even though
1131 they are entity headers.
1132 """
1133 allowed = {x.lower() for x in allowed}
1134 headers[:] = [
1135 (key, value)
1136 for key, value in headers
1137 if not is_entity_header(key) or key.lower() in allowed
1138 ]
1141def remove_hop_by_hop_headers(headers: ds.Headers | list[tuple[str, str]]) -> None:
1142 """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or
1143 :class:`Headers` object. This operation works in-place.
1145 .. versionadded:: 0.5
1147 :param headers: a list or :class:`Headers` object.
1148 """
1149 headers[:] = [
1150 (key, value) for key, value in headers if not is_hop_by_hop_header(key)
1151 ]
1154def is_entity_header(header: str) -> bool:
1155 """Check if a header is an entity header.
1157 .. versionadded:: 0.5
1159 :param header: the header to test.
1160 :return: `True` if it's an entity header, `False` otherwise.
1161 """
1162 return header.lower() in _entity_headers
1165def is_hop_by_hop_header(header: str) -> bool:
1166 """Check if a header is an HTTP/1.1 "Hop-by-Hop" header.
1168 .. versionadded:: 0.5
1170 :param header: the header to test.
1171 :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise.
1172 """
1173 return header.lower() in _hop_by_hop_headers
1176def parse_cookie(
1177 header: WSGIEnvironment | str | None,
1178 cls: type[ds.MultiDict[str, str]] | None = None,
1179) -> ds.MultiDict[str, str]:
1180 """Parse a cookie from a string or WSGI environ.
1182 The same key can be provided multiple times, the values are stored
1183 in-order. The default :class:`MultiDict` will have the first value
1184 first, and all values can be retrieved with
1185 :meth:`MultiDict.getlist`.
1187 :param header: The cookie header as a string, or a WSGI environ dict
1188 with a ``HTTP_COOKIE`` key.
1189 :param cls: A dict-like class to store the parsed cookies in.
1190 Defaults to :class:`MultiDict`.
1192 .. versionchanged:: 3.0
1193 Passing bytes, and the ``charset`` and ``errors`` parameters, were removed.
1195 .. versionchanged:: 1.0
1196 Returns a :class:`MultiDict` instead of a ``TypeConversionDict``.
1198 .. versionchanged:: 0.5
1199 Returns a :class:`TypeConversionDict` instead of a regular dict. The ``cls``
1200 parameter was added.
1201 """
1202 if isinstance(header, dict):
1203 cookie = header.get("HTTP_COOKIE")
1204 else:
1205 cookie = header
1207 if cookie:
1208 cookie = cookie.encode("latin1").decode()
1210 return _sansio_http.parse_cookie(cookie=cookie, cls=cls)
1213_cookie_no_quote_re = re.compile(r"[\w!#$%&'()*+\-./:<=>?@\[\]^`{|}~]*", re.A)
1214_cookie_slash_re = re.compile(rb"[\x00-\x19\",;\\\x7f-\xff]", re.A)
1215_cookie_slash_map = {b'"': b'\\"', b"\\": b"\\\\"}
1216_cookie_slash_map.update(
1217 (v.to_bytes(1, "big"), b"\\%03o" % v)
1218 for v in [*range(0x20), *b",;", *range(0x7F, 256)]
1219)
1222def dump_cookie(
1223 key: str,
1224 value: str = "",
1225 max_age: timedelta | int | None = None,
1226 expires: str | datetime | int | float | None = None,
1227 path: str | None = "/",
1228 domain: str | None = None,
1229 secure: bool = False,
1230 httponly: bool = False,
1231 sync_expires: bool = True,
1232 max_size: int = 4093,
1233 samesite: str | None = None,
1234) -> str:
1235 """Create a Set-Cookie header without the ``Set-Cookie`` prefix.
1237 The return value is usually restricted to ascii as the vast majority
1238 of values are properly escaped, but that is no guarantee. It's
1239 tunneled through latin1 as required by :pep:`3333`.
1241 The return value is not ASCII safe if the key contains unicode
1242 characters. This is technically against the specification but
1243 happens in the wild. It's strongly recommended to not use
1244 non-ASCII values for the keys.
1246 :param max_age: should be a number of seconds, or `None` (default) if
1247 the cookie should last only as long as the client's
1248 browser session. Additionally `timedelta` objects
1249 are accepted, too.
1250 :param expires: should be a `datetime` object or unix timestamp.
1251 :param path: limits the cookie to a given path, per default it will
1252 span the whole domain.
1253 :param domain: Use this if you want to set a cross-domain cookie. For
1254 example, ``domain="example.com"`` will set a cookie
1255 that is readable by the domain ``www.example.com``,
1256 ``foo.example.com`` etc. Otherwise, a cookie will only
1257 be readable by the domain that set it.
1258 :param secure: The cookie will only be available via HTTPS
1259 :param httponly: disallow JavaScript to access the cookie. This is an
1260 extension to the cookie standard and probably not
1261 supported by all browsers.
1262 :param charset: the encoding for string values.
1263 :param sync_expires: automatically set expires if max_age is defined
1264 but expires not.
1265 :param max_size: Warn if the final header value exceeds this size. The
1266 default, 4093, should be safely `supported by most browsers
1267 <cookie_>`_. Set to 0 to disable this check.
1268 :param samesite: Limits the scope of the cookie such that it will
1269 only be attached to requests if those requests are same-site.
1271 .. _`cookie`: http://browsercookielimits.squawky.net/
1273 .. versionchanged:: 3.0
1274 Passing bytes, and the ``charset`` parameter, were removed.
1276 .. versionchanged:: 2.3.3
1277 The ``path`` parameter is ``/`` by default.
1279 .. versionchanged:: 2.3.1
1280 The value allows more characters without quoting.
1282 .. versionchanged:: 2.3
1283 ``localhost`` and other names without a dot are allowed for the domain. A
1284 leading dot is ignored.
1286 .. versionchanged:: 2.3
1287 The ``path`` parameter is ``None`` by default.
1289 .. versionchanged:: 1.0.0
1290 The string ``'None'`` is accepted for ``samesite``.
1291 """
1292 if path is not None:
1293 # safe = https://url.spec.whatwg.org/#url-path-segment-string
1294 # as well as percent for things that are already quoted
1295 # excluding semicolon since it's part of the header syntax
1296 path = quote(path, safe="%!$&'()*+,/:=@")
1298 if domain:
1299 domain = domain.partition(":")[0].lstrip(".").encode("idna").decode("ascii")
1301 if isinstance(max_age, timedelta):
1302 max_age = int(max_age.total_seconds())
1304 if expires is not None:
1305 if not isinstance(expires, str):
1306 expires = http_date(expires)
1307 elif max_age is not None and sync_expires:
1308 expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age)
1310 if samesite is not None:
1311 samesite = samesite.title()
1313 if samesite not in {"Strict", "Lax", "None"}:
1314 raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.")
1316 # Quote value if it contains characters not allowed by RFC 6265. Slash-escape with
1317 # three octal digits, which matches http.cookies, although the RFC suggests base64.
1318 if not _cookie_no_quote_re.fullmatch(value):
1319 # Work with bytes here, since a UTF-8 character could be multiple bytes.
1320 value = _cookie_slash_re.sub(
1321 lambda m: _cookie_slash_map[m.group()], value.encode()
1322 ).decode("ascii")
1323 value = f'"{value}"'
1325 # Send a non-ASCII key as mojibake. Everything else should already be ASCII.
1326 # TODO Remove encoding dance, it seems like clients accept UTF-8 keys
1327 buf = [f"{key.encode().decode('latin1')}={value}"]
1329 for k, v in (
1330 ("Domain", domain),
1331 ("Expires", expires),
1332 ("Max-Age", max_age),
1333 ("Secure", secure),
1334 ("HttpOnly", httponly),
1335 ("Path", path),
1336 ("SameSite", samesite),
1337 ):
1338 if v is None or v is False:
1339 continue
1341 if v is True:
1342 buf.append(k)
1343 continue
1345 buf.append(f"{k}={v}")
1347 rv = "; ".join(buf)
1349 # Warn if the final value of the cookie is larger than the limit. If the cookie is
1350 # too large, then it may be silently ignored by the browser, which can be quite hard
1351 # to debug.
1352 cookie_size = len(rv)
1354 if max_size and cookie_size > max_size:
1355 value_size = len(value)
1356 warnings.warn(
1357 f"The '{key}' cookie is too large: the value was {value_size} bytes but the"
1358 f" header required {cookie_size - value_size} extra bytes. The final size"
1359 f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may"
1360 " silently ignore cookies larger than this.",
1361 stacklevel=2,
1362 )
1364 return rv
1367def is_byte_range_valid(
1368 start: int | None, stop: int | None, length: int | None
1369) -> bool:
1370 """Checks if a given byte content range is valid for the given length.
1372 .. versionadded:: 0.7
1373 """
1374 if (start is None) != (stop is None):
1375 return False
1376 elif start is None:
1377 return length is None or length >= 0
1378 elif length is None:
1379 return 0 <= start < stop # type: ignore
1380 elif start >= stop: # type: ignore
1381 return False
1382 return 0 <= start < length
1385# circular dependencies
1386from . import datastructures as ds
1387from .sansio import http as _sansio_http