Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/http.py: 20%
464 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import base64
2import email.utils
3import re
4import typing
5import typing as t
6import warnings
7from datetime import date
8from datetime import datetime
9from datetime import time
10from datetime import timedelta
11from datetime import timezone
12from enum import Enum
13from hashlib import sha1
14from time import mktime
15from time import struct_time
16from urllib.parse import unquote_to_bytes as _unquote
17from urllib.request import parse_http_list as _parse_list_header
19from ._internal import _cookie_quote
20from ._internal import _dt_as_utc
21from ._internal import _make_cookie_domain
22from ._internal import _to_bytes
23from ._internal import _to_str
24from ._internal import _wsgi_decoding_dance
26if t.TYPE_CHECKING:
27 from _typeshed.wsgi import WSGIEnvironment
29# for explanation of "media-range", etc. see Sections 5.3.{1,2} of RFC 7231
30_accept_re = re.compile(
31 r"""
32 ( # media-range capturing-parenthesis
33 [^\s;,]+ # type/subtype
34 (?:[ \t]*;[ \t]* # ";"
35 (?: # parameter non-capturing-parenthesis
36 [^\s;,q][^\s;,]* # token that doesn't start with "q"
37 | # or
38 q[^\s;,=][^\s;,]* # token that is more than just "q"
39 )
40 )* # zero or more parameters
41 ) # end of media-range
42 (?:[ \t]*;[ \t]*q= # weight is a "q" parameter
43 (\d*(?:\.\d+)?) # qvalue capturing-parentheses
44 [^,]* # "extension" accept params: who cares?
45 )? # accept params are optional
46 """,
47 re.VERBOSE,
48)
49_token_chars = frozenset(
50 "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~"
51)
52_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)')
53_option_header_piece_re = re.compile(
54 r"""
55 ;\s*,?\s* # newlines were replaced with commas
56 (?P<key>
57 "[^"\\]*(?:\\.[^"\\]*)*" # quoted string
58 |
59 [^\s;,=*]+ # token
60 )
61 (?:\*(?P<count>\d+))? # *1, optional continuation index
62 \s*
63 (?: # optionally followed by =value
64 (?: # equals sign, possibly with encoding
65 \*\s*=\s* # * indicates extended notation
66 (?: # optional encoding
67 (?P<encoding>[^\s]+?)
68 '(?P<language>[^\s]*?)'
69 )?
70 |
71 =\s* # basic notation
72 )
73 (?P<value>
74 "[^"\\]*(?:\\.[^"\\]*)*" # quoted string
75 |
76 [^;,]+ # token
77 )?
78 )?
79 \s*
80 """,
81 flags=re.VERBOSE,
82)
83_option_header_start_mime_type = re.compile(r",\s*([^;,\s]+)([;,]\s*.+)?")
84_entity_headers = frozenset(
85 [
86 "allow",
87 "content-encoding",
88 "content-language",
89 "content-length",
90 "content-location",
91 "content-md5",
92 "content-range",
93 "content-type",
94 "expires",
95 "last-modified",
96 ]
97)
98_hop_by_hop_headers = frozenset(
99 [
100 "connection",
101 "keep-alive",
102 "proxy-authenticate",
103 "proxy-authorization",
104 "te",
105 "trailer",
106 "transfer-encoding",
107 "upgrade",
108 ]
109)
110HTTP_STATUS_CODES = {
111 100: "Continue",
112 101: "Switching Protocols",
113 102: "Processing",
114 103: "Early Hints", # see RFC 8297
115 200: "OK",
116 201: "Created",
117 202: "Accepted",
118 203: "Non Authoritative Information",
119 204: "No Content",
120 205: "Reset Content",
121 206: "Partial Content",
122 207: "Multi Status",
123 208: "Already Reported", # see RFC 5842
124 226: "IM Used", # see RFC 3229
125 300: "Multiple Choices",
126 301: "Moved Permanently",
127 302: "Found",
128 303: "See Other",
129 304: "Not Modified",
130 305: "Use Proxy",
131 306: "Switch Proxy", # unused
132 307: "Temporary Redirect",
133 308: "Permanent Redirect",
134 400: "Bad Request",
135 401: "Unauthorized",
136 402: "Payment Required", # unused
137 403: "Forbidden",
138 404: "Not Found",
139 405: "Method Not Allowed",
140 406: "Not Acceptable",
141 407: "Proxy Authentication Required",
142 408: "Request Timeout",
143 409: "Conflict",
144 410: "Gone",
145 411: "Length Required",
146 412: "Precondition Failed",
147 413: "Request Entity Too Large",
148 414: "Request URI Too Long",
149 415: "Unsupported Media Type",
150 416: "Requested Range Not Satisfiable",
151 417: "Expectation Failed",
152 418: "I'm a teapot", # see RFC 2324
153 421: "Misdirected Request", # see RFC 7540
154 422: "Unprocessable Entity",
155 423: "Locked",
156 424: "Failed Dependency",
157 425: "Too Early", # see RFC 8470
158 426: "Upgrade Required",
159 428: "Precondition Required", # see RFC 6585
160 429: "Too Many Requests",
161 431: "Request Header Fields Too Large",
162 449: "Retry With", # proprietary MS extension
163 451: "Unavailable For Legal Reasons",
164 500: "Internal Server Error",
165 501: "Not Implemented",
166 502: "Bad Gateway",
167 503: "Service Unavailable",
168 504: "Gateway Timeout",
169 505: "HTTP Version Not Supported",
170 506: "Variant Also Negotiates", # see RFC 2295
171 507: "Insufficient Storage",
172 508: "Loop Detected", # see RFC 5842
173 510: "Not Extended",
174 511: "Network Authentication Failed",
175}
178class COEP(Enum):
179 """Cross Origin Embedder Policies"""
181 UNSAFE_NONE = "unsafe-none"
182 REQUIRE_CORP = "require-corp"
185class COOP(Enum):
186 """Cross Origin Opener Policies"""
188 UNSAFE_NONE = "unsafe-none"
189 SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups"
190 SAME_ORIGIN = "same-origin"
193def _is_extended_parameter(key: str) -> bool:
194 """Per RFC 5987/8187, "extended" values may *not* be quoted.
195 This is in keeping with browser implementations. So we test
196 using this function to see if the key indicates this parameter
197 follows the `ext-parameter` syntax (using a trailing '*').
198 """
199 return key.strip().endswith("*")
202def quote_header_value(
203 value: t.Union[str, int], extra_chars: str = "", allow_token: bool = True
204) -> str:
205 """Quote a header value if necessary.
207 .. versionadded:: 0.5
209 :param value: the value to quote.
210 :param extra_chars: a list of extra characters to skip quoting.
211 :param allow_token: if this is enabled token values are returned
212 unchanged.
213 """
214 if isinstance(value, bytes):
215 value = value.decode("latin1")
216 value = str(value)
217 if allow_token:
218 token_chars = _token_chars | set(extra_chars)
219 if set(value).issubset(token_chars):
220 return value
221 value = value.replace("\\", "\\\\").replace('"', '\\"')
222 return f'"{value}"'
225def unquote_header_value(value: str, is_filename: bool = False) -> str:
226 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
227 This does not use the real unquoting but what browsers are actually
228 using for quoting.
230 .. versionadded:: 0.5
232 :param value: the header value to unquote.
233 :param is_filename: The value represents a filename or path.
234 """
235 if value and value[0] == value[-1] == '"':
236 # this is not the real unquoting, but fixing this so that the
237 # RFC is met will result in bugs with internet explorer and
238 # probably some other browsers as well. IE for example is
239 # uploading files with "C:\foo\bar.txt" as filename
240 value = value[1:-1]
242 # if this is a filename and the starting characters look like
243 # a UNC path, then just return the value without quotes. Using the
244 # replace sequence below on a UNC path has the effect of turning
245 # the leading double slash into a single slash and then
246 # _fix_ie_filename() doesn't work correctly. See #458.
247 if not is_filename or value[:2] != "\\\\":
248 return value.replace("\\\\", "\\").replace('\\"', '"')
249 return value
252def dump_options_header(
253 header: t.Optional[str], options: t.Mapping[str, t.Optional[t.Union[str, int]]]
254) -> str:
255 """The reverse function to :func:`parse_options_header`.
257 :param header: the header to dump
258 :param options: a dict of options to append.
259 """
260 segments = []
261 if header is not None:
262 segments.append(header)
263 for key, value in options.items():
264 if value is None:
265 segments.append(key)
266 elif _is_extended_parameter(key):
267 segments.append(f"{key}={value}")
268 else:
269 segments.append(f"{key}={quote_header_value(value)}")
270 return "; ".join(segments)
273def dump_header(
274 iterable: t.Union[t.Dict[str, t.Union[str, int]], t.Iterable[str]],
275 allow_token: bool = True,
276) -> str:
277 """Dump an HTTP header again. This is the reversal of
278 :func:`parse_list_header`, :func:`parse_set_header` and
279 :func:`parse_dict_header`. This also quotes strings that include an
280 equals sign unless you pass it as dict of key, value pairs.
282 >>> dump_header({'foo': 'bar baz'})
283 'foo="bar baz"'
284 >>> dump_header(('foo', 'bar baz'))
285 'foo, "bar baz"'
287 :param iterable: the iterable or dict of values to quote.
288 :param allow_token: if set to `False` tokens as values are disallowed.
289 See :func:`quote_header_value` for more details.
290 """
291 if isinstance(iterable, dict):
292 items = []
293 for key, value in iterable.items():
294 if value is None:
295 items.append(key)
296 elif _is_extended_parameter(key):
297 items.append(f"{key}={value}")
298 else:
299 items.append(
300 f"{key}={quote_header_value(value, allow_token=allow_token)}"
301 )
302 else:
303 items = [quote_header_value(x, allow_token=allow_token) for x in iterable]
304 return ", ".join(items)
307def dump_csp_header(header: "ds.ContentSecurityPolicy") -> str:
308 """Dump a Content Security Policy header.
310 These are structured into policies such as "default-src 'self';
311 script-src 'self'".
313 .. versionadded:: 1.0.0
314 Support for Content Security Policy headers was added.
316 """
317 return "; ".join(f"{key} {value}" for key, value in header.items())
320def parse_list_header(value: str) -> t.List[str]:
321 """Parse lists as described by RFC 2068 Section 2.
323 In particular, parse comma-separated lists where the elements of
324 the list may include quoted-strings. A quoted-string could
325 contain a comma. A non-quoted string could have quotes in the
326 middle. Quotes are removed automatically after parsing.
328 It basically works like :func:`parse_set_header` just that items
329 may appear multiple times and case sensitivity is preserved.
331 The return value is a standard :class:`list`:
333 >>> parse_list_header('token, "quoted value"')
334 ['token', 'quoted value']
336 To create a header from the :class:`list` again, use the
337 :func:`dump_header` function.
339 :param value: a string with a list header.
340 :return: :class:`list`
341 """
342 result = []
343 for item in _parse_list_header(value):
344 if item[:1] == item[-1:] == '"':
345 item = unquote_header_value(item[1:-1])
346 result.append(item)
347 return result
350def parse_dict_header(value: str, cls: t.Type[dict] = dict) -> t.Dict[str, str]:
351 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
352 convert them into a python dict (or any other mapping object created from
353 the type with a dict like interface provided by the `cls` argument):
355 >>> d = parse_dict_header('foo="is a fish", bar="as well"')
356 >>> type(d) is dict
357 True
358 >>> sorted(d.items())
359 [('bar', 'as well'), ('foo', 'is a fish')]
361 If there is no value for a key it will be `None`:
363 >>> parse_dict_header('key_without_value')
364 {'key_without_value': None}
366 To create a header from the :class:`dict` again, use the
367 :func:`dump_header` function.
369 .. versionchanged:: 0.9
370 Added support for `cls` argument.
372 :param value: a string with a dict header.
373 :param cls: callable to use for storage of parsed results.
374 :return: an instance of `cls`
375 """
376 result = cls()
377 if isinstance(value, bytes):
378 value = value.decode("latin1")
379 for item in _parse_list_header(value):
380 if "=" not in item:
381 result[item] = None
382 continue
383 name, value = item.split("=", 1)
384 if value[:1] == value[-1:] == '"':
385 value = unquote_header_value(value[1:-1])
386 result[name] = value
387 return result
390def parse_options_header(value: t.Optional[str]) -> t.Tuple[str, t.Dict[str, str]]:
391 """Parse a ``Content-Type``-like header into a tuple with the
392 value and any options:
394 >>> parse_options_header('text/html; charset=utf8')
395 ('text/html', {'charset': 'utf8'})
397 This should is not for ``Cache-Control``-like headers, which use a
398 different format. For those, use :func:`parse_dict_header`.
400 :param value: The header value to parse.
402 .. versionchanged:: 2.2
403 Option names are always converted to lowercase.
405 .. versionchanged:: 2.1
406 The ``multiple`` parameter is deprecated and will be removed in
407 Werkzeug 2.2.
409 .. versionchanged:: 0.15
410 :rfc:`2231` parameter continuations are handled.
412 .. versionadded:: 0.5
413 """
414 if not value:
415 return "", {}
417 result: t.List[t.Any] = []
419 value = "," + value.replace("\n", ",")
420 while value:
421 match = _option_header_start_mime_type.match(value)
422 if not match:
423 break
424 result.append(match.group(1)) # mimetype
425 options: t.Dict[str, str] = {}
426 # Parse options
427 rest = match.group(2)
428 encoding: t.Optional[str]
429 continued_encoding: t.Optional[str] = None
430 while rest:
431 optmatch = _option_header_piece_re.match(rest)
432 if not optmatch:
433 break
434 option, count, encoding, language, option_value = optmatch.groups()
435 # Continuations don't have to supply the encoding after the
436 # first line. If we're in a continuation, track the current
437 # encoding to use for subsequent lines. Reset it when the
438 # continuation ends.
439 if not count:
440 continued_encoding = None
441 else:
442 if not encoding:
443 encoding = continued_encoding
444 continued_encoding = encoding
445 option = unquote_header_value(option).lower()
447 if option_value is not None:
448 option_value = unquote_header_value(option_value, option == "filename")
450 if encoding is not None:
451 option_value = _unquote(option_value).decode(encoding)
453 if count:
454 # Continuations append to the existing value. For
455 # simplicity, this ignores the possibility of
456 # out-of-order indices, which shouldn't happen anyway.
457 if option_value is not None:
458 options[option] = options.get(option, "") + option_value
459 else:
460 options[option] = option_value # type: ignore[assignment]
462 rest = rest[optmatch.end() :]
463 result.append(options)
464 return tuple(result) # type: ignore[return-value]
466 return tuple(result) if result else ("", {}) # type: ignore[return-value]
469_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept")
472@typing.overload
473def parse_accept_header(value: t.Optional[str]) -> "ds.Accept":
474 ...
477@typing.overload
478def parse_accept_header(
479 value: t.Optional[str], cls: t.Type[_TAnyAccept]
480) -> _TAnyAccept:
481 ...
484def parse_accept_header(
485 value: t.Optional[str], cls: t.Optional[t.Type[_TAnyAccept]] = None
486) -> _TAnyAccept:
487 """Parses an HTTP Accept-* header. This does not implement a complete
488 valid algorithm but one that supports at least value and quality
489 extraction.
491 Returns a new :class:`Accept` object (basically a list of ``(value, quality)``
492 tuples sorted by the quality with some additional accessor methods).
494 The second parameter can be a subclass of :class:`Accept` that is created
495 with the parsed values and returned.
497 :param value: the accept header string to be parsed.
498 :param cls: the wrapper class for the return value (can be
499 :class:`Accept` or a subclass thereof)
500 :return: an instance of `cls`.
501 """
502 if cls is None:
503 cls = t.cast(t.Type[_TAnyAccept], ds.Accept)
505 if not value:
506 return cls(None)
508 result = []
509 for match in _accept_re.finditer(value):
510 quality_match = match.group(2)
511 if not quality_match:
512 quality: float = 1
513 else:
514 quality = max(min(float(quality_match), 1), 0)
515 result.append((match.group(1), quality))
516 return cls(result)
519_TAnyCC = t.TypeVar("_TAnyCC", bound="ds._CacheControl")
520_t_cc_update = t.Optional[t.Callable[[_TAnyCC], None]]
523@typing.overload
524def parse_cache_control_header(
525 value: t.Optional[str], on_update: _t_cc_update, cls: None = None
526) -> "ds.RequestCacheControl":
527 ...
530@typing.overload
531def parse_cache_control_header(
532 value: t.Optional[str], on_update: _t_cc_update, cls: t.Type[_TAnyCC]
533) -> _TAnyCC:
534 ...
537def parse_cache_control_header(
538 value: t.Optional[str],
539 on_update: _t_cc_update = None,
540 cls: t.Optional[t.Type[_TAnyCC]] = None,
541) -> _TAnyCC:
542 """Parse a cache control header. The RFC differs between response and
543 request cache control, this method does not. It's your responsibility
544 to not use the wrong control statements.
546 .. versionadded:: 0.5
547 The `cls` was added. If not specified an immutable
548 :class:`~werkzeug.datastructures.RequestCacheControl` is returned.
550 :param value: a cache control header to be parsed.
551 :param on_update: an optional callable that is called every time a value
552 on the :class:`~werkzeug.datastructures.CacheControl`
553 object is changed.
554 :param cls: the class for the returned object. By default
555 :class:`~werkzeug.datastructures.RequestCacheControl` is used.
556 :return: a `cls` object.
557 """
558 if cls is None:
559 cls = t.cast(t.Type[_TAnyCC], ds.RequestCacheControl)
561 if not value:
562 return cls((), on_update)
564 return cls(parse_dict_header(value), on_update)
567_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy")
568_t_csp_update = t.Optional[t.Callable[[_TAnyCSP], None]]
571@typing.overload
572def parse_csp_header(
573 value: t.Optional[str], on_update: _t_csp_update, cls: None = None
574) -> "ds.ContentSecurityPolicy":
575 ...
578@typing.overload
579def parse_csp_header(
580 value: t.Optional[str], on_update: _t_csp_update, cls: t.Type[_TAnyCSP]
581) -> _TAnyCSP:
582 ...
585def parse_csp_header(
586 value: t.Optional[str],
587 on_update: _t_csp_update = None,
588 cls: t.Optional[t.Type[_TAnyCSP]] = None,
589) -> _TAnyCSP:
590 """Parse a Content Security Policy header.
592 .. versionadded:: 1.0.0
593 Support for Content Security Policy headers was added.
595 :param value: a csp header to be parsed.
596 :param on_update: an optional callable that is called every time a value
597 on the object is changed.
598 :param cls: the class for the returned object. By default
599 :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used.
600 :return: a `cls` object.
601 """
602 if cls is None:
603 cls = t.cast(t.Type[_TAnyCSP], ds.ContentSecurityPolicy)
605 if value is None:
606 return cls((), on_update)
608 items = []
610 for policy in value.split(";"):
611 policy = policy.strip()
613 # Ignore badly formatted policies (no space)
614 if " " in policy:
615 directive, value = policy.strip().split(" ", 1)
616 items.append((directive.strip(), value.strip()))
618 return cls(items, on_update)
621def parse_set_header(
622 value: t.Optional[str],
623 on_update: t.Optional[t.Callable[["ds.HeaderSet"], None]] = None,
624) -> "ds.HeaderSet":
625 """Parse a set-like header and return a
626 :class:`~werkzeug.datastructures.HeaderSet` object:
628 >>> hs = parse_set_header('token, "quoted value"')
630 The return value is an object that treats the items case-insensitively
631 and keeps the order of the items:
633 >>> 'TOKEN' in hs
634 True
635 >>> hs.index('quoted value')
636 1
637 >>> hs
638 HeaderSet(['token', 'quoted value'])
640 To create a header from the :class:`HeaderSet` again, use the
641 :func:`dump_header` function.
643 :param value: a set header to be parsed.
644 :param on_update: an optional callable that is called every time a
645 value on the :class:`~werkzeug.datastructures.HeaderSet`
646 object is changed.
647 :return: a :class:`~werkzeug.datastructures.HeaderSet`
648 """
649 if not value:
650 return ds.HeaderSet(None, on_update)
651 return ds.HeaderSet(parse_list_header(value), on_update)
654def parse_authorization_header(
655 value: t.Optional[str],
656) -> t.Optional["ds.Authorization"]:
657 """Parse an HTTP basic/digest authorization header transmitted by the web
658 browser. The return value is either `None` if the header was invalid or
659 not given, otherwise an :class:`~werkzeug.datastructures.Authorization`
660 object.
662 :param value: the authorization header to parse.
663 :return: a :class:`~werkzeug.datastructures.Authorization` object or `None`.
664 """
665 if not value:
666 return None
667 value = _wsgi_decoding_dance(value)
668 try:
669 auth_type, auth_info = value.split(None, 1)
670 auth_type = auth_type.lower()
671 except ValueError:
672 return None
673 if auth_type == "basic":
674 try:
675 username, password = base64.b64decode(auth_info).split(b":", 1)
676 except Exception:
677 return None
678 try:
679 return ds.Authorization(
680 "basic",
681 {
682 "username": _to_str(username, "utf-8"),
683 "password": _to_str(password, "utf-8"),
684 },
685 )
686 except UnicodeDecodeError:
687 return None
688 elif auth_type == "digest":
689 auth_map = parse_dict_header(auth_info)
690 for key in "username", "realm", "nonce", "uri", "response":
691 if key not in auth_map:
692 return None
693 if "qop" in auth_map:
694 if not auth_map.get("nc") or not auth_map.get("cnonce"):
695 return None
696 return ds.Authorization("digest", auth_map)
697 return None
700def parse_www_authenticate_header(
701 value: t.Optional[str],
702 on_update: t.Optional[t.Callable[["ds.WWWAuthenticate"], None]] = None,
703) -> "ds.WWWAuthenticate":
704 """Parse an HTTP WWW-Authenticate header into a
705 :class:`~werkzeug.datastructures.WWWAuthenticate` object.
707 :param value: a WWW-Authenticate header to parse.
708 :param on_update: an optional callable that is called every time a value
709 on the :class:`~werkzeug.datastructures.WWWAuthenticate`
710 object is changed.
711 :return: a :class:`~werkzeug.datastructures.WWWAuthenticate` object.
712 """
713 if not value:
714 return ds.WWWAuthenticate(on_update=on_update)
715 try:
716 auth_type, auth_info = value.split(None, 1)
717 auth_type = auth_type.lower()
718 except (ValueError, AttributeError):
719 return ds.WWWAuthenticate(value.strip().lower(), on_update=on_update)
720 return ds.WWWAuthenticate(auth_type, parse_dict_header(auth_info), on_update)
723def parse_if_range_header(value: t.Optional[str]) -> "ds.IfRange":
724 """Parses an if-range header which can be an etag or a date. Returns
725 a :class:`~werkzeug.datastructures.IfRange` object.
727 .. versionchanged:: 2.0
728 If the value represents a datetime, it is timezone-aware.
730 .. versionadded:: 0.7
731 """
732 if not value:
733 return ds.IfRange()
734 date = parse_date(value)
735 if date is not None:
736 return ds.IfRange(date=date)
737 # drop weakness information
738 return ds.IfRange(unquote_etag(value)[0])
741def parse_range_header(
742 value: t.Optional[str], make_inclusive: bool = True
743) -> t.Optional["ds.Range"]:
744 """Parses a range header into a :class:`~werkzeug.datastructures.Range`
745 object. If the header is missing or malformed `None` is returned.
746 `ranges` is a list of ``(start, stop)`` tuples where the ranges are
747 non-inclusive.
749 .. versionadded:: 0.7
750 """
751 if not value or "=" not in value:
752 return None
754 ranges = []
755 last_end = 0
756 units, rng = value.split("=", 1)
757 units = units.strip().lower()
759 for item in rng.split(","):
760 item = item.strip()
761 if "-" not in item:
762 return None
763 if item.startswith("-"):
764 if last_end < 0:
765 return None
766 try:
767 begin = int(item)
768 except ValueError:
769 return None
770 end = None
771 last_end = -1
772 elif "-" in item:
773 begin_str, end_str = item.split("-", 1)
774 begin_str = begin_str.strip()
775 end_str = end_str.strip()
777 try:
778 begin = int(begin_str)
779 except ValueError:
780 return None
782 if begin < last_end or last_end < 0:
783 return None
784 if end_str:
785 try:
786 end = int(end_str) + 1
787 except ValueError:
788 return None
790 if begin >= end:
791 return None
792 else:
793 end = None
794 last_end = end if end is not None else -1
795 ranges.append((begin, end))
797 return ds.Range(units, ranges)
800def parse_content_range_header(
801 value: t.Optional[str],
802 on_update: t.Optional[t.Callable[["ds.ContentRange"], None]] = None,
803) -> t.Optional["ds.ContentRange"]:
804 """Parses a range header into a
805 :class:`~werkzeug.datastructures.ContentRange` object or `None` if
806 parsing is not possible.
808 .. versionadded:: 0.7
810 :param value: a content range header to be parsed.
811 :param on_update: an optional callable that is called every time a value
812 on the :class:`~werkzeug.datastructures.ContentRange`
813 object is changed.
814 """
815 if value is None:
816 return None
817 try:
818 units, rangedef = (value or "").strip().split(None, 1)
819 except ValueError:
820 return None
822 if "/" not in rangedef:
823 return None
824 rng, length_str = rangedef.split("/", 1)
825 if length_str == "*":
826 length = None
827 else:
828 try:
829 length = int(length_str)
830 except ValueError:
831 return None
833 if rng == "*":
834 if not is_byte_range_valid(None, None, length):
835 return None
837 return ds.ContentRange(units, None, None, length, on_update=on_update)
838 elif "-" not in rng:
839 return None
841 start_str, stop_str = rng.split("-", 1)
842 try:
843 start = int(start_str)
844 stop = int(stop_str) + 1
845 except ValueError:
846 return None
848 if is_byte_range_valid(start, stop, length):
849 return ds.ContentRange(units, start, stop, length, on_update=on_update)
851 return None
854def quote_etag(etag: str, weak: bool = False) -> str:
855 """Quote an etag.
857 :param etag: the etag to quote.
858 :param weak: set to `True` to tag it "weak".
859 """
860 if '"' in etag:
861 raise ValueError("invalid etag")
862 etag = f'"{etag}"'
863 if weak:
864 etag = f"W/{etag}"
865 return etag
868def unquote_etag(
869 etag: t.Optional[str],
870) -> t.Union[t.Tuple[str, bool], t.Tuple[None, None]]:
871 """Unquote a single etag:
873 >>> unquote_etag('W/"bar"')
874 ('bar', True)
875 >>> unquote_etag('"bar"')
876 ('bar', False)
878 :param etag: the etag identifier to unquote.
879 :return: a ``(etag, weak)`` tuple.
880 """
881 if not etag:
882 return None, None
883 etag = etag.strip()
884 weak = False
885 if etag.startswith(("W/", "w/")):
886 weak = True
887 etag = etag[2:]
888 if etag[:1] == etag[-1:] == '"':
889 etag = etag[1:-1]
890 return etag, weak
893def parse_etags(value: t.Optional[str]) -> "ds.ETags":
894 """Parse an etag header.
896 :param value: the tag header to parse
897 :return: an :class:`~werkzeug.datastructures.ETags` object.
898 """
899 if not value:
900 return ds.ETags()
901 strong = []
902 weak = []
903 end = len(value)
904 pos = 0
905 while pos < end:
906 match = _etag_re.match(value, pos)
907 if match is None:
908 break
909 is_weak, quoted, raw = match.groups()
910 if raw == "*":
911 return ds.ETags(star_tag=True)
912 elif quoted:
913 raw = quoted
914 if is_weak:
915 weak.append(raw)
916 else:
917 strong.append(raw)
918 pos = match.end()
919 return ds.ETags(strong, weak)
922def generate_etag(data: bytes) -> str:
923 """Generate an etag for some data.
925 .. versionchanged:: 2.0
926 Use SHA-1. MD5 may not be available in some environments.
927 """
928 return sha1(data).hexdigest()
931def parse_date(value: t.Optional[str]) -> t.Optional[datetime]:
932 """Parse an :rfc:`2822` date into a timezone-aware
933 :class:`datetime.datetime` object, or ``None`` if parsing fails.
935 This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It
936 returns ``None`` if parsing fails instead of raising an exception,
937 and always returns a timezone-aware datetime object. If the string
938 doesn't have timezone information, it is assumed to be UTC.
940 :param value: A string with a supported date format.
942 .. versionchanged:: 2.0
943 Return a timezone-aware datetime object. Use
944 ``email.utils.parsedate_to_datetime``.
945 """
946 if value is None:
947 return None
949 try:
950 dt = email.utils.parsedate_to_datetime(value)
951 except (TypeError, ValueError):
952 return None
954 if dt.tzinfo is None:
955 return dt.replace(tzinfo=timezone.utc)
957 return dt
960def http_date(
961 timestamp: t.Optional[t.Union[datetime, date, int, float, struct_time]] = None
962) -> str:
963 """Format a datetime object or timestamp into an :rfc:`2822` date
964 string.
966 This is a wrapper for :func:`email.utils.format_datetime`. It
967 assumes naive datetime objects are in UTC instead of raising an
968 exception.
970 :param timestamp: The datetime or timestamp to format. Defaults to
971 the current time.
973 .. versionchanged:: 2.0
974 Use ``email.utils.format_datetime``. Accept ``date`` objects.
975 """
976 if isinstance(timestamp, date):
977 if not isinstance(timestamp, datetime):
978 # Assume plain date is midnight UTC.
979 timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc)
980 else:
981 # Ensure datetime is timezone-aware.
982 timestamp = _dt_as_utc(timestamp)
984 return email.utils.format_datetime(timestamp, usegmt=True)
986 if isinstance(timestamp, struct_time):
987 timestamp = mktime(timestamp)
989 return email.utils.formatdate(timestamp, usegmt=True)
992def parse_age(value: t.Optional[str] = None) -> t.Optional[timedelta]:
993 """Parses a base-10 integer count of seconds into a timedelta.
995 If parsing fails, the return value is `None`.
997 :param value: a string consisting of an integer represented in base-10
998 :return: a :class:`datetime.timedelta` object or `None`.
999 """
1000 if not value:
1001 return None
1002 try:
1003 seconds = int(value)
1004 except ValueError:
1005 return None
1006 if seconds < 0:
1007 return None
1008 try:
1009 return timedelta(seconds=seconds)
1010 except OverflowError:
1011 return None
1014def dump_age(age: t.Optional[t.Union[timedelta, int]] = None) -> t.Optional[str]:
1015 """Formats the duration as a base-10 integer.
1017 :param age: should be an integer number of seconds,
1018 a :class:`datetime.timedelta` object, or,
1019 if the age is unknown, `None` (default).
1020 """
1021 if age is None:
1022 return None
1023 if isinstance(age, timedelta):
1024 age = int(age.total_seconds())
1025 else:
1026 age = int(age)
1028 if age < 0:
1029 raise ValueError("age cannot be negative")
1031 return str(age)
1034def is_resource_modified(
1035 environ: "WSGIEnvironment",
1036 etag: t.Optional[str] = None,
1037 data: t.Optional[bytes] = None,
1038 last_modified: t.Optional[t.Union[datetime, str]] = None,
1039 ignore_if_range: bool = True,
1040) -> bool:
1041 """Convenience method for conditional requests.
1043 :param environ: the WSGI environment of the request to be checked.
1044 :param etag: the etag for the response for comparison.
1045 :param data: or alternatively the data of the response to automatically
1046 generate an etag using :func:`generate_etag`.
1047 :param last_modified: an optional date of the last modification.
1048 :param ignore_if_range: If `False`, `If-Range` header will be taken into
1049 account.
1050 :return: `True` if the resource was modified, otherwise `False`.
1052 .. versionchanged:: 2.0
1053 SHA-1 is used to generate an etag value for the data. MD5 may
1054 not be available in some environments.
1056 .. versionchanged:: 1.0.0
1057 The check is run for methods other than ``GET`` and ``HEAD``.
1058 """
1059 return _sansio_http.is_resource_modified(
1060 http_range=environ.get("HTTP_RANGE"),
1061 http_if_range=environ.get("HTTP_IF_RANGE"),
1062 http_if_modified_since=environ.get("HTTP_IF_MODIFIED_SINCE"),
1063 http_if_none_match=environ.get("HTTP_IF_NONE_MATCH"),
1064 http_if_match=environ.get("HTTP_IF_MATCH"),
1065 etag=etag,
1066 data=data,
1067 last_modified=last_modified,
1068 ignore_if_range=ignore_if_range,
1069 )
1072def remove_entity_headers(
1073 headers: t.Union["ds.Headers", t.List[t.Tuple[str, str]]],
1074 allowed: t.Iterable[str] = ("expires", "content-location"),
1075) -> None:
1076 """Remove all entity headers from a list or :class:`Headers` object. This
1077 operation works in-place. `Expires` and `Content-Location` headers are
1078 by default not removed. The reason for this is :rfc:`2616` section
1079 10.3.5 which specifies some entity headers that should be sent.
1081 .. versionchanged:: 0.5
1082 added `allowed` parameter.
1084 :param headers: a list or :class:`Headers` object.
1085 :param allowed: a list of headers that should still be allowed even though
1086 they are entity headers.
1087 """
1088 allowed = {x.lower() for x in allowed}
1089 headers[:] = [
1090 (key, value)
1091 for key, value in headers
1092 if not is_entity_header(key) or key.lower() in allowed
1093 ]
1096def remove_hop_by_hop_headers(
1097 headers: t.Union["ds.Headers", t.List[t.Tuple[str, str]]]
1098) -> None:
1099 """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or
1100 :class:`Headers` object. This operation works in-place.
1102 .. versionadded:: 0.5
1104 :param headers: a list or :class:`Headers` object.
1105 """
1106 headers[:] = [
1107 (key, value) for key, value in headers if not is_hop_by_hop_header(key)
1108 ]
1111def is_entity_header(header: str) -> bool:
1112 """Check if a header is an entity header.
1114 .. versionadded:: 0.5
1116 :param header: the header to test.
1117 :return: `True` if it's an entity header, `False` otherwise.
1118 """
1119 return header.lower() in _entity_headers
1122def is_hop_by_hop_header(header: str) -> bool:
1123 """Check if a header is an HTTP/1.1 "Hop-by-Hop" header.
1125 .. versionadded:: 0.5
1127 :param header: the header to test.
1128 :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise.
1129 """
1130 return header.lower() in _hop_by_hop_headers
1133def parse_cookie(
1134 header: t.Union["WSGIEnvironment", str, bytes, None],
1135 charset: str = "utf-8",
1136 errors: str = "replace",
1137 cls: t.Optional[t.Type["ds.MultiDict"]] = None,
1138) -> "ds.MultiDict[str, str]":
1139 """Parse a cookie from a string or WSGI environ.
1141 The same key can be provided multiple times, the values are stored
1142 in-order. The default :class:`MultiDict` will have the first value
1143 first, and all values can be retrieved with
1144 :meth:`MultiDict.getlist`.
1146 :param header: The cookie header as a string, or a WSGI environ dict
1147 with a ``HTTP_COOKIE`` key.
1148 :param charset: The charset for the cookie values.
1149 :param errors: The error behavior for the charset decoding.
1150 :param cls: A dict-like class to store the parsed cookies in.
1151 Defaults to :class:`MultiDict`.
1153 .. versionchanged:: 1.0.0
1154 Returns a :class:`MultiDict` instead of a
1155 ``TypeConversionDict``.
1157 .. versionchanged:: 0.5
1158 Returns a :class:`TypeConversionDict` instead of a regular dict.
1159 The ``cls`` parameter was added.
1160 """
1161 if isinstance(header, dict):
1162 cookie = header.get("HTTP_COOKIE", "")
1163 elif header is None:
1164 cookie = ""
1165 else:
1166 cookie = header
1168 return _sansio_http.parse_cookie(
1169 cookie=cookie, charset=charset, errors=errors, cls=cls
1170 )
1173def dump_cookie(
1174 key: str,
1175 value: t.Union[bytes, str] = "",
1176 max_age: t.Optional[t.Union[timedelta, int]] = None,
1177 expires: t.Optional[t.Union[str, datetime, int, float]] = None,
1178 path: t.Optional[str] = "/",
1179 domain: t.Optional[str] = None,
1180 secure: bool = False,
1181 httponly: bool = False,
1182 charset: str = "utf-8",
1183 sync_expires: bool = True,
1184 max_size: int = 4093,
1185 samesite: t.Optional[str] = None,
1186) -> str:
1187 """Create a Set-Cookie header without the ``Set-Cookie`` prefix.
1189 The return value is usually restricted to ascii as the vast majority
1190 of values are properly escaped, but that is no guarantee. It's
1191 tunneled through latin1 as required by :pep:`3333`.
1193 The return value is not ASCII safe if the key contains unicode
1194 characters. This is technically against the specification but
1195 happens in the wild. It's strongly recommended to not use
1196 non-ASCII values for the keys.
1198 :param max_age: should be a number of seconds, or `None` (default) if
1199 the cookie should last only as long as the client's
1200 browser session. Additionally `timedelta` objects
1201 are accepted, too.
1202 :param expires: should be a `datetime` object or unix timestamp.
1203 :param path: limits the cookie to a given path, per default it will
1204 span the whole domain.
1205 :param domain: Use this if you want to set a cross-domain cookie. For
1206 example, ``domain=".example.com"`` will set a cookie
1207 that is readable by the domain ``www.example.com``,
1208 ``foo.example.com`` etc. Otherwise, a cookie will only
1209 be readable by the domain that set it.
1210 :param secure: The cookie will only be available via HTTPS
1211 :param httponly: disallow JavaScript to access the cookie. This is an
1212 extension to the cookie standard and probably not
1213 supported by all browsers.
1214 :param charset: the encoding for string values.
1215 :param sync_expires: automatically set expires if max_age is defined
1216 but expires not.
1217 :param max_size: Warn if the final header value exceeds this size. The
1218 default, 4093, should be safely `supported by most browsers
1219 <cookie_>`_. Set to 0 to disable this check.
1220 :param samesite: Limits the scope of the cookie such that it will
1221 only be attached to requests if those requests are same-site.
1223 .. _`cookie`: http://browsercookielimits.squawky.net/
1225 .. versionchanged:: 1.0.0
1226 The string ``'None'`` is accepted for ``samesite``.
1227 """
1228 key = _to_bytes(key, charset)
1229 value = _to_bytes(value, charset)
1231 if path is not None:
1232 from .urls import iri_to_uri
1234 path = iri_to_uri(path, charset)
1236 domain = _make_cookie_domain(domain)
1238 if isinstance(max_age, timedelta):
1239 max_age = int(max_age.total_seconds())
1241 if expires is not None:
1242 if not isinstance(expires, str):
1243 expires = http_date(expires)
1244 elif max_age is not None and sync_expires:
1245 expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age)
1247 if samesite is not None:
1248 samesite = samesite.title()
1250 if samesite not in {"Strict", "Lax", "None"}:
1251 raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.")
1253 buf = [key + b"=" + _cookie_quote(value)]
1255 # XXX: In theory all of these parameters that are not marked with `None`
1256 # should be quoted. Because stdlib did not quote it before I did not
1257 # want to introduce quoting there now.
1258 for k, v, q in (
1259 (b"Domain", domain, True),
1260 (b"Expires", expires, False),
1261 (b"Max-Age", max_age, False),
1262 (b"Secure", secure, None),
1263 (b"HttpOnly", httponly, None),
1264 (b"Path", path, False),
1265 (b"SameSite", samesite, False),
1266 ):
1267 if q is None:
1268 if v:
1269 buf.append(k)
1270 continue
1272 if v is None:
1273 continue
1275 tmp = bytearray(k)
1276 if not isinstance(v, (bytes, bytearray)):
1277 v = _to_bytes(str(v), charset)
1278 if q:
1279 v = _cookie_quote(v)
1280 tmp += b"=" + v
1281 buf.append(bytes(tmp))
1283 # The return value will be an incorrectly encoded latin1 header for
1284 # consistency with the headers object.
1285 rv = b"; ".join(buf)
1286 rv = rv.decode("latin1")
1288 # Warn if the final value of the cookie is larger than the limit. If the
1289 # cookie is too large, then it may be silently ignored by the browser,
1290 # which can be quite hard to debug.
1291 cookie_size = len(rv)
1293 if max_size and cookie_size > max_size:
1294 value_size = len(value)
1295 warnings.warn(
1296 f"The {key.decode(charset)!r} cookie is too large: the value was"
1297 f" {value_size} bytes but the"
1298 f" header required {cookie_size - value_size} extra bytes. The final size"
1299 f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may"
1300 f" silently ignore cookies larger than this.",
1301 stacklevel=2,
1302 )
1304 return rv
1307def is_byte_range_valid(
1308 start: t.Optional[int], stop: t.Optional[int], length: t.Optional[int]
1309) -> bool:
1310 """Checks if a given byte content range is valid for the given length.
1312 .. versionadded:: 0.7
1313 """
1314 if (start is None) != (stop is None):
1315 return False
1316 elif start is None:
1317 return length is None or length >= 0
1318 elif length is None:
1319 return 0 <= start < stop # type: ignore
1320 elif start >= stop: # type: ignore
1321 return False
1322 return 0 <= start < length
1325# circular dependencies
1326from . import datastructures as ds
1327from .sansio import http as _sansio_http