Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/http.py: 20%
456 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1import base64
2import email.utils
3import re
4import typing
5import typing as t
6import warnings
7from datetime import date
8from datetime import datetime
9from datetime import time
10from datetime import timedelta
11from datetime import timezone
12from enum import Enum
13from hashlib import sha1
14from time import mktime
15from time import struct_time
16from urllib.parse import unquote_to_bytes as _unquote
17from urllib.request import parse_http_list as _parse_list_header
19from ._internal import _cookie_quote
20from ._internal import _dt_as_utc
21from ._internal import _make_cookie_domain
22from ._internal import _to_bytes
23from ._internal import _to_str
24from ._internal import _wsgi_decoding_dance
26if t.TYPE_CHECKING:
27 from _typeshed.wsgi import WSGIEnvironment
29# for explanation of "media-range", etc. see Sections 5.3.{1,2} of RFC 7231
30_accept_re = re.compile(
31 r"""
32 ( # media-range capturing-parenthesis
33 [^\s;,]+ # type/subtype
34 (?:[ \t]*;[ \t]* # ";"
35 (?: # parameter non-capturing-parenthesis
36 [^\s;,q][^\s;,]* # token that doesn't start with "q"
37 | # or
38 q[^\s;,=][^\s;,]* # token that is more than just "q"
39 )
40 )* # zero or more parameters
41 ) # end of media-range
42 (?:[ \t]*;[ \t]*q= # weight is a "q" parameter
43 (\d*(?:\.\d+)?) # qvalue capturing-parentheses
44 [^,]* # "extension" accept params: who cares?
45 )? # accept params are optional
46 """,
47 re.VERBOSE,
48)
49_token_chars = frozenset(
50 "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~"
51)
52_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)')
53_option_header_piece_re = re.compile(
54 r"""
55 ;\s*,?\s* # newlines were replaced with commas
56 (?P<key>
57 "[^"\\]*(?:\\.[^"\\]*)*" # quoted string
58 |
59 [^\s;,=*]+ # token
60 )
61 (?:\*(?P<count>\d+))? # *1, optional continuation index
62 \s*
63 (?: # optionally followed by =value
64 (?: # equals sign, possibly with encoding
65 \*\s*=\s* # * indicates extended notation
66 (?: # optional encoding
67 (?P<encoding>[^\s]+?)
68 '(?P<language>[^\s]*?)'
69 )?
70 |
71 =\s* # basic notation
72 )
73 (?P<value>
74 "[^"\\]*(?:\\.[^"\\]*)*" # quoted string
75 |
76 [^;,]+ # token
77 )?
78 )?
79 \s*
80 """,
81 flags=re.VERBOSE,
82)
83_option_header_start_mime_type = re.compile(r",\s*([^;,\s]+)([;,]\s*.+)?")
84_entity_headers = frozenset(
85 [
86 "allow",
87 "content-encoding",
88 "content-language",
89 "content-length",
90 "content-location",
91 "content-md5",
92 "content-range",
93 "content-type",
94 "expires",
95 "last-modified",
96 ]
97)
98_hop_by_hop_headers = frozenset(
99 [
100 "connection",
101 "keep-alive",
102 "proxy-authenticate",
103 "proxy-authorization",
104 "te",
105 "trailer",
106 "transfer-encoding",
107 "upgrade",
108 ]
109)
110HTTP_STATUS_CODES = {
111 100: "Continue",
112 101: "Switching Protocols",
113 102: "Processing",
114 103: "Early Hints", # see RFC 8297
115 200: "OK",
116 201: "Created",
117 202: "Accepted",
118 203: "Non Authoritative Information",
119 204: "No Content",
120 205: "Reset Content",
121 206: "Partial Content",
122 207: "Multi Status",
123 208: "Already Reported", # see RFC 5842
124 226: "IM Used", # see RFC 3229
125 300: "Multiple Choices",
126 301: "Moved Permanently",
127 302: "Found",
128 303: "See Other",
129 304: "Not Modified",
130 305: "Use Proxy",
131 306: "Switch Proxy", # unused
132 307: "Temporary Redirect",
133 308: "Permanent Redirect",
134 400: "Bad Request",
135 401: "Unauthorized",
136 402: "Payment Required", # unused
137 403: "Forbidden",
138 404: "Not Found",
139 405: "Method Not Allowed",
140 406: "Not Acceptable",
141 407: "Proxy Authentication Required",
142 408: "Request Timeout",
143 409: "Conflict",
144 410: "Gone",
145 411: "Length Required",
146 412: "Precondition Failed",
147 413: "Request Entity Too Large",
148 414: "Request URI Too Long",
149 415: "Unsupported Media Type",
150 416: "Requested Range Not Satisfiable",
151 417: "Expectation Failed",
152 418: "I'm a teapot", # see RFC 2324
153 421: "Misdirected Request", # see RFC 7540
154 422: "Unprocessable Entity",
155 423: "Locked",
156 424: "Failed Dependency",
157 425: "Too Early", # see RFC 8470
158 426: "Upgrade Required",
159 428: "Precondition Required", # see RFC 6585
160 429: "Too Many Requests",
161 431: "Request Header Fields Too Large",
162 449: "Retry With", # proprietary MS extension
163 451: "Unavailable For Legal Reasons",
164 500: "Internal Server Error",
165 501: "Not Implemented",
166 502: "Bad Gateway",
167 503: "Service Unavailable",
168 504: "Gateway Timeout",
169 505: "HTTP Version Not Supported",
170 506: "Variant Also Negotiates", # see RFC 2295
171 507: "Insufficient Storage",
172 508: "Loop Detected", # see RFC 5842
173 510: "Not Extended",
174 511: "Network Authentication Failed",
175}
178class COEP(Enum):
179 """Cross Origin Embedder Policies"""
181 UNSAFE_NONE = "unsafe-none"
182 REQUIRE_CORP = "require-corp"
185class COOP(Enum):
186 """Cross Origin Opener Policies"""
188 UNSAFE_NONE = "unsafe-none"
189 SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups"
190 SAME_ORIGIN = "same-origin"
193def quote_header_value(
194 value: t.Union[str, int], extra_chars: str = "", allow_token: bool = True
195) -> str:
196 """Quote a header value if necessary.
198 .. versionadded:: 0.5
200 :param value: the value to quote.
201 :param extra_chars: a list of extra characters to skip quoting.
202 :param allow_token: if this is enabled token values are returned
203 unchanged.
204 """
205 if isinstance(value, bytes):
206 value = value.decode("latin1")
207 value = str(value)
208 if allow_token:
209 token_chars = _token_chars | set(extra_chars)
210 if set(value).issubset(token_chars):
211 return value
212 value = value.replace("\\", "\\\\").replace('"', '\\"')
213 return f'"{value}"'
216def unquote_header_value(value: str, is_filename: bool = False) -> str:
217 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
218 This does not use the real unquoting but what browsers are actually
219 using for quoting.
221 .. versionadded:: 0.5
223 :param value: the header value to unquote.
224 :param is_filename: The value represents a filename or path.
225 """
226 if value and value[0] == value[-1] == '"':
227 # this is not the real unquoting, but fixing this so that the
228 # RFC is met will result in bugs with internet explorer and
229 # probably some other browsers as well. IE for example is
230 # uploading files with "C:\foo\bar.txt" as filename
231 value = value[1:-1]
233 # if this is a filename and the starting characters look like
234 # a UNC path, then just return the value without quotes. Using the
235 # replace sequence below on a UNC path has the effect of turning
236 # the leading double slash into a single slash and then
237 # _fix_ie_filename() doesn't work correctly. See #458.
238 if not is_filename or value[:2] != "\\\\":
239 return value.replace("\\\\", "\\").replace('\\"', '"')
240 return value
243def dump_options_header(
244 header: t.Optional[str], options: t.Mapping[str, t.Optional[t.Union[str, int]]]
245) -> str:
246 """The reverse function to :func:`parse_options_header`.
248 :param header: the header to dump
249 :param options: a dict of options to append.
250 """
251 segments = []
252 if header is not None:
253 segments.append(header)
254 for key, value in options.items():
255 if value is None:
256 segments.append(key)
257 else:
258 segments.append(f"{key}={quote_header_value(value)}")
259 return "; ".join(segments)
262def dump_header(
263 iterable: t.Union[t.Dict[str, t.Union[str, int]], t.Iterable[str]],
264 allow_token: bool = True,
265) -> str:
266 """Dump an HTTP header again. This is the reversal of
267 :func:`parse_list_header`, :func:`parse_set_header` and
268 :func:`parse_dict_header`. This also quotes strings that include an
269 equals sign unless you pass it as dict of key, value pairs.
271 >>> dump_header({'foo': 'bar baz'})
272 'foo="bar baz"'
273 >>> dump_header(('foo', 'bar baz'))
274 'foo, "bar baz"'
276 :param iterable: the iterable or dict of values to quote.
277 :param allow_token: if set to `False` tokens as values are disallowed.
278 See :func:`quote_header_value` for more details.
279 """
280 if isinstance(iterable, dict):
281 items = []
282 for key, value in iterable.items():
283 if value is None:
284 items.append(key)
285 else:
286 items.append(
287 f"{key}={quote_header_value(value, allow_token=allow_token)}"
288 )
289 else:
290 items = [quote_header_value(x, allow_token=allow_token) for x in iterable]
291 return ", ".join(items)
294def dump_csp_header(header: "ds.ContentSecurityPolicy") -> str:
295 """Dump a Content Security Policy header.
297 These are structured into policies such as "default-src 'self';
298 script-src 'self'".
300 .. versionadded:: 1.0.0
301 Support for Content Security Policy headers was added.
303 """
304 return "; ".join(f"{key} {value}" for key, value in header.items())
307def parse_list_header(value: str) -> t.List[str]:
308 """Parse lists as described by RFC 2068 Section 2.
310 In particular, parse comma-separated lists where the elements of
311 the list may include quoted-strings. A quoted-string could
312 contain a comma. A non-quoted string could have quotes in the
313 middle. Quotes are removed automatically after parsing.
315 It basically works like :func:`parse_set_header` just that items
316 may appear multiple times and case sensitivity is preserved.
318 The return value is a standard :class:`list`:
320 >>> parse_list_header('token, "quoted value"')
321 ['token', 'quoted value']
323 To create a header from the :class:`list` again, use the
324 :func:`dump_header` function.
326 :param value: a string with a list header.
327 :return: :class:`list`
328 """
329 result = []
330 for item in _parse_list_header(value):
331 if item[:1] == item[-1:] == '"':
332 item = unquote_header_value(item[1:-1])
333 result.append(item)
334 return result
337def parse_dict_header(value: str, cls: t.Type[dict] = dict) -> t.Dict[str, str]:
338 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
339 convert them into a python dict (or any other mapping object created from
340 the type with a dict like interface provided by the `cls` argument):
342 >>> d = parse_dict_header('foo="is a fish", bar="as well"')
343 >>> type(d) is dict
344 True
345 >>> sorted(d.items())
346 [('bar', 'as well'), ('foo', 'is a fish')]
348 If there is no value for a key it will be `None`:
350 >>> parse_dict_header('key_without_value')
351 {'key_without_value': None}
353 To create a header from the :class:`dict` again, use the
354 :func:`dump_header` function.
356 .. versionchanged:: 0.9
357 Added support for `cls` argument.
359 :param value: a string with a dict header.
360 :param cls: callable to use for storage of parsed results.
361 :return: an instance of `cls`
362 """
363 result = cls()
364 if isinstance(value, bytes):
365 value = value.decode("latin1")
366 for item in _parse_list_header(value):
367 if "=" not in item:
368 result[item] = None
369 continue
370 name, value = item.split("=", 1)
371 if value[:1] == value[-1:] == '"':
372 value = unquote_header_value(value[1:-1])
373 result[name] = value
374 return result
377def parse_options_header(value: t.Optional[str]) -> t.Tuple[str, t.Dict[str, str]]:
378 """Parse a ``Content-Type``-like header into a tuple with the
379 value and any options:
381 >>> parse_options_header('text/html; charset=utf8')
382 ('text/html', {'charset': 'utf8'})
384 This should is not for ``Cache-Control``-like headers, which use a
385 different format. For those, use :func:`parse_dict_header`.
387 :param value: The header value to parse.
389 .. versionchanged:: 2.2
390 Option names are always converted to lowercase.
392 .. versionchanged:: 2.1
393 The ``multiple`` parameter is deprecated and will be removed in
394 Werkzeug 2.2.
396 .. versionchanged:: 0.15
397 :rfc:`2231` parameter continuations are handled.
399 .. versionadded:: 0.5
400 """
401 if not value:
402 return "", {}
404 result: t.List[t.Any] = []
406 value = "," + value.replace("\n", ",")
407 while value:
408 match = _option_header_start_mime_type.match(value)
409 if not match:
410 break
411 result.append(match.group(1)) # mimetype
412 options: t.Dict[str, str] = {}
413 # Parse options
414 rest = match.group(2)
415 encoding: t.Optional[str]
416 continued_encoding: t.Optional[str] = None
417 while rest:
418 optmatch = _option_header_piece_re.match(rest)
419 if not optmatch:
420 break
421 option, count, encoding, language, option_value = optmatch.groups()
422 # Continuations don't have to supply the encoding after the
423 # first line. If we're in a continuation, track the current
424 # encoding to use for subsequent lines. Reset it when the
425 # continuation ends.
426 if not count:
427 continued_encoding = None
428 else:
429 if not encoding:
430 encoding = continued_encoding
431 continued_encoding = encoding
432 option = unquote_header_value(option).lower()
434 if option_value is not None:
435 option_value = unquote_header_value(option_value, option == "filename")
437 if encoding is not None:
438 option_value = _unquote(option_value).decode(encoding)
440 if count:
441 # Continuations append to the existing value. For
442 # simplicity, this ignores the possibility of
443 # out-of-order indices, which shouldn't happen anyway.
444 if option_value is not None:
445 options[option] = options.get(option, "") + option_value
446 else:
447 options[option] = option_value # type: ignore[assignment]
449 rest = rest[optmatch.end() :]
450 result.append(options)
451 return tuple(result) # type: ignore[return-value]
453 return tuple(result) if result else ("", {}) # type: ignore[return-value]
456_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept")
459@typing.overload
460def parse_accept_header(value: t.Optional[str]) -> "ds.Accept":
461 ...
464@typing.overload
465def parse_accept_header(
466 value: t.Optional[str], cls: t.Type[_TAnyAccept]
467) -> _TAnyAccept:
468 ...
471def parse_accept_header(
472 value: t.Optional[str], cls: t.Optional[t.Type[_TAnyAccept]] = None
473) -> _TAnyAccept:
474 """Parses an HTTP Accept-* header. This does not implement a complete
475 valid algorithm but one that supports at least value and quality
476 extraction.
478 Returns a new :class:`Accept` object (basically a list of ``(value, quality)``
479 tuples sorted by the quality with some additional accessor methods).
481 The second parameter can be a subclass of :class:`Accept` that is created
482 with the parsed values and returned.
484 :param value: the accept header string to be parsed.
485 :param cls: the wrapper class for the return value (can be
486 :class:`Accept` or a subclass thereof)
487 :return: an instance of `cls`.
488 """
489 if cls is None:
490 cls = t.cast(t.Type[_TAnyAccept], ds.Accept)
492 if not value:
493 return cls(None)
495 result = []
496 for match in _accept_re.finditer(value):
497 quality_match = match.group(2)
498 if not quality_match:
499 quality: float = 1
500 else:
501 quality = max(min(float(quality_match), 1), 0)
502 result.append((match.group(1), quality))
503 return cls(result)
506_TAnyCC = t.TypeVar("_TAnyCC", bound="ds._CacheControl")
507_t_cc_update = t.Optional[t.Callable[[_TAnyCC], None]]
510@typing.overload
511def parse_cache_control_header(
512 value: t.Optional[str], on_update: _t_cc_update, cls: None = None
513) -> "ds.RequestCacheControl":
514 ...
517@typing.overload
518def parse_cache_control_header(
519 value: t.Optional[str], on_update: _t_cc_update, cls: t.Type[_TAnyCC]
520) -> _TAnyCC:
521 ...
524def parse_cache_control_header(
525 value: t.Optional[str],
526 on_update: _t_cc_update = None,
527 cls: t.Optional[t.Type[_TAnyCC]] = None,
528) -> _TAnyCC:
529 """Parse a cache control header. The RFC differs between response and
530 request cache control, this method does not. It's your responsibility
531 to not use the wrong control statements.
533 .. versionadded:: 0.5
534 The `cls` was added. If not specified an immutable
535 :class:`~werkzeug.datastructures.RequestCacheControl` is returned.
537 :param value: a cache control header to be parsed.
538 :param on_update: an optional callable that is called every time a value
539 on the :class:`~werkzeug.datastructures.CacheControl`
540 object is changed.
541 :param cls: the class for the returned object. By default
542 :class:`~werkzeug.datastructures.RequestCacheControl` is used.
543 :return: a `cls` object.
544 """
545 if cls is None:
546 cls = t.cast(t.Type[_TAnyCC], ds.RequestCacheControl)
548 if not value:
549 return cls((), on_update)
551 return cls(parse_dict_header(value), on_update)
554_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy")
555_t_csp_update = t.Optional[t.Callable[[_TAnyCSP], None]]
558@typing.overload
559def parse_csp_header(
560 value: t.Optional[str], on_update: _t_csp_update, cls: None = None
561) -> "ds.ContentSecurityPolicy":
562 ...
565@typing.overload
566def parse_csp_header(
567 value: t.Optional[str], on_update: _t_csp_update, cls: t.Type[_TAnyCSP]
568) -> _TAnyCSP:
569 ...
572def parse_csp_header(
573 value: t.Optional[str],
574 on_update: _t_csp_update = None,
575 cls: t.Optional[t.Type[_TAnyCSP]] = None,
576) -> _TAnyCSP:
577 """Parse a Content Security Policy header.
579 .. versionadded:: 1.0.0
580 Support for Content Security Policy headers was added.
582 :param value: a csp header to be parsed.
583 :param on_update: an optional callable that is called every time a value
584 on the object is changed.
585 :param cls: the class for the returned object. By default
586 :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used.
587 :return: a `cls` object.
588 """
589 if cls is None:
590 cls = t.cast(t.Type[_TAnyCSP], ds.ContentSecurityPolicy)
592 if value is None:
593 return cls((), on_update)
595 items = []
597 for policy in value.split(";"):
598 policy = policy.strip()
600 # Ignore badly formatted policies (no space)
601 if " " in policy:
602 directive, value = policy.strip().split(" ", 1)
603 items.append((directive.strip(), value.strip()))
605 return cls(items, on_update)
608def parse_set_header(
609 value: t.Optional[str],
610 on_update: t.Optional[t.Callable[["ds.HeaderSet"], None]] = None,
611) -> "ds.HeaderSet":
612 """Parse a set-like header and return a
613 :class:`~werkzeug.datastructures.HeaderSet` object:
615 >>> hs = parse_set_header('token, "quoted value"')
617 The return value is an object that treats the items case-insensitively
618 and keeps the order of the items:
620 >>> 'TOKEN' in hs
621 True
622 >>> hs.index('quoted value')
623 1
624 >>> hs
625 HeaderSet(['token', 'quoted value'])
627 To create a header from the :class:`HeaderSet` again, use the
628 :func:`dump_header` function.
630 :param value: a set header to be parsed.
631 :param on_update: an optional callable that is called every time a
632 value on the :class:`~werkzeug.datastructures.HeaderSet`
633 object is changed.
634 :return: a :class:`~werkzeug.datastructures.HeaderSet`
635 """
636 if not value:
637 return ds.HeaderSet(None, on_update)
638 return ds.HeaderSet(parse_list_header(value), on_update)
641def parse_authorization_header(
642 value: t.Optional[str],
643) -> t.Optional["ds.Authorization"]:
644 """Parse an HTTP basic/digest authorization header transmitted by the web
645 browser. The return value is either `None` if the header was invalid or
646 not given, otherwise an :class:`~werkzeug.datastructures.Authorization`
647 object.
649 :param value: the authorization header to parse.
650 :return: a :class:`~werkzeug.datastructures.Authorization` object or `None`.
651 """
652 if not value:
653 return None
654 value = _wsgi_decoding_dance(value)
655 try:
656 auth_type, auth_info = value.split(None, 1)
657 auth_type = auth_type.lower()
658 except ValueError:
659 return None
660 if auth_type == "basic":
661 try:
662 username, password = base64.b64decode(auth_info).split(b":", 1)
663 except Exception:
664 return None
665 try:
666 return ds.Authorization(
667 "basic",
668 {
669 "username": _to_str(username, "utf-8"),
670 "password": _to_str(password, "utf-8"),
671 },
672 )
673 except UnicodeDecodeError:
674 return None
675 elif auth_type == "digest":
676 auth_map = parse_dict_header(auth_info)
677 for key in "username", "realm", "nonce", "uri", "response":
678 if key not in auth_map:
679 return None
680 if "qop" in auth_map:
681 if not auth_map.get("nc") or not auth_map.get("cnonce"):
682 return None
683 return ds.Authorization("digest", auth_map)
684 return None
687def parse_www_authenticate_header(
688 value: t.Optional[str],
689 on_update: t.Optional[t.Callable[["ds.WWWAuthenticate"], None]] = None,
690) -> "ds.WWWAuthenticate":
691 """Parse an HTTP WWW-Authenticate header into a
692 :class:`~werkzeug.datastructures.WWWAuthenticate` object.
694 :param value: a WWW-Authenticate header to parse.
695 :param on_update: an optional callable that is called every time a value
696 on the :class:`~werkzeug.datastructures.WWWAuthenticate`
697 object is changed.
698 :return: a :class:`~werkzeug.datastructures.WWWAuthenticate` object.
699 """
700 if not value:
701 return ds.WWWAuthenticate(on_update=on_update)
702 try:
703 auth_type, auth_info = value.split(None, 1)
704 auth_type = auth_type.lower()
705 except (ValueError, AttributeError):
706 return ds.WWWAuthenticate(value.strip().lower(), on_update=on_update)
707 return ds.WWWAuthenticate(auth_type, parse_dict_header(auth_info), on_update)
710def parse_if_range_header(value: t.Optional[str]) -> "ds.IfRange":
711 """Parses an if-range header which can be an etag or a date. Returns
712 a :class:`~werkzeug.datastructures.IfRange` object.
714 .. versionchanged:: 2.0
715 If the value represents a datetime, it is timezone-aware.
717 .. versionadded:: 0.7
718 """
719 if not value:
720 return ds.IfRange()
721 date = parse_date(value)
722 if date is not None:
723 return ds.IfRange(date=date)
724 # drop weakness information
725 return ds.IfRange(unquote_etag(value)[0])
728def parse_range_header(
729 value: t.Optional[str], make_inclusive: bool = True
730) -> t.Optional["ds.Range"]:
731 """Parses a range header into a :class:`~werkzeug.datastructures.Range`
732 object. If the header is missing or malformed `None` is returned.
733 `ranges` is a list of ``(start, stop)`` tuples where the ranges are
734 non-inclusive.
736 .. versionadded:: 0.7
737 """
738 if not value or "=" not in value:
739 return None
741 ranges = []
742 last_end = 0
743 units, rng = value.split("=", 1)
744 units = units.strip().lower()
746 for item in rng.split(","):
747 item = item.strip()
748 if "-" not in item:
749 return None
750 if item.startswith("-"):
751 if last_end < 0:
752 return None
753 try:
754 begin = int(item)
755 except ValueError:
756 return None
757 end = None
758 last_end = -1
759 elif "-" in item:
760 begin_str, end_str = item.split("-", 1)
761 begin_str = begin_str.strip()
762 end_str = end_str.strip()
764 try:
765 begin = int(begin_str)
766 except ValueError:
767 return None
769 if begin < last_end or last_end < 0:
770 return None
771 if end_str:
772 try:
773 end = int(end_str) + 1
774 except ValueError:
775 return None
777 if begin >= end:
778 return None
779 else:
780 end = None
781 last_end = end if end is not None else -1
782 ranges.append((begin, end))
784 return ds.Range(units, ranges)
787def parse_content_range_header(
788 value: t.Optional[str],
789 on_update: t.Optional[t.Callable[["ds.ContentRange"], None]] = None,
790) -> t.Optional["ds.ContentRange"]:
791 """Parses a range header into a
792 :class:`~werkzeug.datastructures.ContentRange` object or `None` if
793 parsing is not possible.
795 .. versionadded:: 0.7
797 :param value: a content range header to be parsed.
798 :param on_update: an optional callable that is called every time a value
799 on the :class:`~werkzeug.datastructures.ContentRange`
800 object is changed.
801 """
802 if value is None:
803 return None
804 try:
805 units, rangedef = (value or "").strip().split(None, 1)
806 except ValueError:
807 return None
809 if "/" not in rangedef:
810 return None
811 rng, length_str = rangedef.split("/", 1)
812 if length_str == "*":
813 length = None
814 else:
815 try:
816 length = int(length_str)
817 except ValueError:
818 return None
820 if rng == "*":
821 return ds.ContentRange(units, None, None, length, on_update=on_update)
822 elif "-" not in rng:
823 return None
825 start_str, stop_str = rng.split("-", 1)
826 try:
827 start = int(start_str)
828 stop = int(stop_str) + 1
829 except ValueError:
830 return None
832 if is_byte_range_valid(start, stop, length):
833 return ds.ContentRange(units, start, stop, length, on_update=on_update)
835 return None
838def quote_etag(etag: str, weak: bool = False) -> str:
839 """Quote an etag.
841 :param etag: the etag to quote.
842 :param weak: set to `True` to tag it "weak".
843 """
844 if '"' in etag:
845 raise ValueError("invalid etag")
846 etag = f'"{etag}"'
847 if weak:
848 etag = f"W/{etag}"
849 return etag
852def unquote_etag(
853 etag: t.Optional[str],
854) -> t.Union[t.Tuple[str, bool], t.Tuple[None, None]]:
855 """Unquote a single etag:
857 >>> unquote_etag('W/"bar"')
858 ('bar', True)
859 >>> unquote_etag('"bar"')
860 ('bar', False)
862 :param etag: the etag identifier to unquote.
863 :return: a ``(etag, weak)`` tuple.
864 """
865 if not etag:
866 return None, None
867 etag = etag.strip()
868 weak = False
869 if etag.startswith(("W/", "w/")):
870 weak = True
871 etag = etag[2:]
872 if etag[:1] == etag[-1:] == '"':
873 etag = etag[1:-1]
874 return etag, weak
877def parse_etags(value: t.Optional[str]) -> "ds.ETags":
878 """Parse an etag header.
880 :param value: the tag header to parse
881 :return: an :class:`~werkzeug.datastructures.ETags` object.
882 """
883 if not value:
884 return ds.ETags()
885 strong = []
886 weak = []
887 end = len(value)
888 pos = 0
889 while pos < end:
890 match = _etag_re.match(value, pos)
891 if match is None:
892 break
893 is_weak, quoted, raw = match.groups()
894 if raw == "*":
895 return ds.ETags(star_tag=True)
896 elif quoted:
897 raw = quoted
898 if is_weak:
899 weak.append(raw)
900 else:
901 strong.append(raw)
902 pos = match.end()
903 return ds.ETags(strong, weak)
906def generate_etag(data: bytes) -> str:
907 """Generate an etag for some data.
909 .. versionchanged:: 2.0
910 Use SHA-1. MD5 may not be available in some environments.
911 """
912 return sha1(data).hexdigest()
915def parse_date(value: t.Optional[str]) -> t.Optional[datetime]:
916 """Parse an :rfc:`2822` date into a timezone-aware
917 :class:`datetime.datetime` object, or ``None`` if parsing fails.
919 This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It
920 returns ``None`` if parsing fails instead of raising an exception,
921 and always returns a timezone-aware datetime object. If the string
922 doesn't have timezone information, it is assumed to be UTC.
924 :param value: A string with a supported date format.
926 .. versionchanged:: 2.0
927 Return a timezone-aware datetime object. Use
928 ``email.utils.parsedate_to_datetime``.
929 """
930 if value is None:
931 return None
933 try:
934 dt = email.utils.parsedate_to_datetime(value)
935 except (TypeError, ValueError):
936 return None
938 if dt.tzinfo is None:
939 return dt.replace(tzinfo=timezone.utc)
941 return dt
944def http_date(
945 timestamp: t.Optional[t.Union[datetime, date, int, float, struct_time]] = None
946) -> str:
947 """Format a datetime object or timestamp into an :rfc:`2822` date
948 string.
950 This is a wrapper for :func:`email.utils.format_datetime`. It
951 assumes naive datetime objects are in UTC instead of raising an
952 exception.
954 :param timestamp: The datetime or timestamp to format. Defaults to
955 the current time.
957 .. versionchanged:: 2.0
958 Use ``email.utils.format_datetime``. Accept ``date`` objects.
959 """
960 if isinstance(timestamp, date):
961 if not isinstance(timestamp, datetime):
962 # Assume plain date is midnight UTC.
963 timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc)
964 else:
965 # Ensure datetime is timezone-aware.
966 timestamp = _dt_as_utc(timestamp)
968 return email.utils.format_datetime(timestamp, usegmt=True)
970 if isinstance(timestamp, struct_time):
971 timestamp = mktime(timestamp)
973 return email.utils.formatdate(timestamp, usegmt=True)
976def parse_age(value: t.Optional[str] = None) -> t.Optional[timedelta]:
977 """Parses a base-10 integer count of seconds into a timedelta.
979 If parsing fails, the return value is `None`.
981 :param value: a string consisting of an integer represented in base-10
982 :return: a :class:`datetime.timedelta` object or `None`.
983 """
984 if not value:
985 return None
986 try:
987 seconds = int(value)
988 except ValueError:
989 return None
990 if seconds < 0:
991 return None
992 try:
993 return timedelta(seconds=seconds)
994 except OverflowError:
995 return None
998def dump_age(age: t.Optional[t.Union[timedelta, int]] = None) -> t.Optional[str]:
999 """Formats the duration as a base-10 integer.
1001 :param age: should be an integer number of seconds,
1002 a :class:`datetime.timedelta` object, or,
1003 if the age is unknown, `None` (default).
1004 """
1005 if age is None:
1006 return None
1007 if isinstance(age, timedelta):
1008 age = int(age.total_seconds())
1009 else:
1010 age = int(age)
1012 if age < 0:
1013 raise ValueError("age cannot be negative")
1015 return str(age)
1018def is_resource_modified(
1019 environ: "WSGIEnvironment",
1020 etag: t.Optional[str] = None,
1021 data: t.Optional[bytes] = None,
1022 last_modified: t.Optional[t.Union[datetime, str]] = None,
1023 ignore_if_range: bool = True,
1024) -> bool:
1025 """Convenience method for conditional requests.
1027 :param environ: the WSGI environment of the request to be checked.
1028 :param etag: the etag for the response for comparison.
1029 :param data: or alternatively the data of the response to automatically
1030 generate an etag using :func:`generate_etag`.
1031 :param last_modified: an optional date of the last modification.
1032 :param ignore_if_range: If `False`, `If-Range` header will be taken into
1033 account.
1034 :return: `True` if the resource was modified, otherwise `False`.
1036 .. versionchanged:: 2.0
1037 SHA-1 is used to generate an etag value for the data. MD5 may
1038 not be available in some environments.
1040 .. versionchanged:: 1.0.0
1041 The check is run for methods other than ``GET`` and ``HEAD``.
1042 """
1043 return _sansio_http.is_resource_modified(
1044 http_range=environ.get("HTTP_RANGE"),
1045 http_if_range=environ.get("HTTP_IF_RANGE"),
1046 http_if_modified_since=environ.get("HTTP_IF_MODIFIED_SINCE"),
1047 http_if_none_match=environ.get("HTTP_IF_NONE_MATCH"),
1048 http_if_match=environ.get("HTTP_IF_MATCH"),
1049 etag=etag,
1050 data=data,
1051 last_modified=last_modified,
1052 ignore_if_range=ignore_if_range,
1053 )
1056def remove_entity_headers(
1057 headers: t.Union["ds.Headers", t.List[t.Tuple[str, str]]],
1058 allowed: t.Iterable[str] = ("expires", "content-location"),
1059) -> None:
1060 """Remove all entity headers from a list or :class:`Headers` object. This
1061 operation works in-place. `Expires` and `Content-Location` headers are
1062 by default not removed. The reason for this is :rfc:`2616` section
1063 10.3.5 which specifies some entity headers that should be sent.
1065 .. versionchanged:: 0.5
1066 added `allowed` parameter.
1068 :param headers: a list or :class:`Headers` object.
1069 :param allowed: a list of headers that should still be allowed even though
1070 they are entity headers.
1071 """
1072 allowed = {x.lower() for x in allowed}
1073 headers[:] = [
1074 (key, value)
1075 for key, value in headers
1076 if not is_entity_header(key) or key.lower() in allowed
1077 ]
1080def remove_hop_by_hop_headers(
1081 headers: t.Union["ds.Headers", t.List[t.Tuple[str, str]]]
1082) -> None:
1083 """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or
1084 :class:`Headers` object. This operation works in-place.
1086 .. versionadded:: 0.5
1088 :param headers: a list or :class:`Headers` object.
1089 """
1090 headers[:] = [
1091 (key, value) for key, value in headers if not is_hop_by_hop_header(key)
1092 ]
1095def is_entity_header(header: str) -> bool:
1096 """Check if a header is an entity header.
1098 .. versionadded:: 0.5
1100 :param header: the header to test.
1101 :return: `True` if it's an entity header, `False` otherwise.
1102 """
1103 return header.lower() in _entity_headers
1106def is_hop_by_hop_header(header: str) -> bool:
1107 """Check if a header is an HTTP/1.1 "Hop-by-Hop" header.
1109 .. versionadded:: 0.5
1111 :param header: the header to test.
1112 :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise.
1113 """
1114 return header.lower() in _hop_by_hop_headers
1117def parse_cookie(
1118 header: t.Union["WSGIEnvironment", str, bytes, None],
1119 charset: str = "utf-8",
1120 errors: str = "replace",
1121 cls: t.Optional[t.Type["ds.MultiDict"]] = None,
1122) -> "ds.MultiDict[str, str]":
1123 """Parse a cookie from a string or WSGI environ.
1125 The same key can be provided multiple times, the values are stored
1126 in-order. The default :class:`MultiDict` will have the first value
1127 first, and all values can be retrieved with
1128 :meth:`MultiDict.getlist`.
1130 :param header: The cookie header as a string, or a WSGI environ dict
1131 with a ``HTTP_COOKIE`` key.
1132 :param charset: The charset for the cookie values.
1133 :param errors: The error behavior for the charset decoding.
1134 :param cls: A dict-like class to store the parsed cookies in.
1135 Defaults to :class:`MultiDict`.
1137 .. versionchanged:: 1.0.0
1138 Returns a :class:`MultiDict` instead of a
1139 ``TypeConversionDict``.
1141 .. versionchanged:: 0.5
1142 Returns a :class:`TypeConversionDict` instead of a regular dict.
1143 The ``cls`` parameter was added.
1144 """
1145 if isinstance(header, dict):
1146 cookie = header.get("HTTP_COOKIE", "")
1147 elif header is None:
1148 cookie = ""
1149 else:
1150 cookie = header
1152 return _sansio_http.parse_cookie(
1153 cookie=cookie, charset=charset, errors=errors, cls=cls
1154 )
1157def dump_cookie(
1158 key: str,
1159 value: t.Union[bytes, str] = "",
1160 max_age: t.Optional[t.Union[timedelta, int]] = None,
1161 expires: t.Optional[t.Union[str, datetime, int, float]] = None,
1162 path: t.Optional[str] = "/",
1163 domain: t.Optional[str] = None,
1164 secure: bool = False,
1165 httponly: bool = False,
1166 charset: str = "utf-8",
1167 sync_expires: bool = True,
1168 max_size: int = 4093,
1169 samesite: t.Optional[str] = None,
1170) -> str:
1171 """Create a Set-Cookie header without the ``Set-Cookie`` prefix.
1173 The return value is usually restricted to ascii as the vast majority
1174 of values are properly escaped, but that is no guarantee. It's
1175 tunneled through latin1 as required by :pep:`3333`.
1177 The return value is not ASCII safe if the key contains unicode
1178 characters. This is technically against the specification but
1179 happens in the wild. It's strongly recommended to not use
1180 non-ASCII values for the keys.
1182 :param max_age: should be a number of seconds, or `None` (default) if
1183 the cookie should last only as long as the client's
1184 browser session. Additionally `timedelta` objects
1185 are accepted, too.
1186 :param expires: should be a `datetime` object or unix timestamp.
1187 :param path: limits the cookie to a given path, per default it will
1188 span the whole domain.
1189 :param domain: Use this if you want to set a cross-domain cookie. For
1190 example, ``domain=".example.com"`` will set a cookie
1191 that is readable by the domain ``www.example.com``,
1192 ``foo.example.com`` etc. Otherwise, a cookie will only
1193 be readable by the domain that set it.
1194 :param secure: The cookie will only be available via HTTPS
1195 :param httponly: disallow JavaScript to access the cookie. This is an
1196 extension to the cookie standard and probably not
1197 supported by all browsers.
1198 :param charset: the encoding for string values.
1199 :param sync_expires: automatically set expires if max_age is defined
1200 but expires not.
1201 :param max_size: Warn if the final header value exceeds this size. The
1202 default, 4093, should be safely `supported by most browsers
1203 <cookie_>`_. Set to 0 to disable this check.
1204 :param samesite: Limits the scope of the cookie such that it will
1205 only be attached to requests if those requests are same-site.
1207 .. _`cookie`: http://browsercookielimits.squawky.net/
1209 .. versionchanged:: 1.0.0
1210 The string ``'None'`` is accepted for ``samesite``.
1211 """
1212 key = _to_bytes(key, charset)
1213 value = _to_bytes(value, charset)
1215 if path is not None:
1216 from .urls import iri_to_uri
1218 path = iri_to_uri(path, charset)
1220 domain = _make_cookie_domain(domain)
1222 if isinstance(max_age, timedelta):
1223 max_age = int(max_age.total_seconds())
1225 if expires is not None:
1226 if not isinstance(expires, str):
1227 expires = http_date(expires)
1228 elif max_age is not None and sync_expires:
1229 expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age)
1231 if samesite is not None:
1232 samesite = samesite.title()
1234 if samesite not in {"Strict", "Lax", "None"}:
1235 raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.")
1237 buf = [key + b"=" + _cookie_quote(value)]
1239 # XXX: In theory all of these parameters that are not marked with `None`
1240 # should be quoted. Because stdlib did not quote it before I did not
1241 # want to introduce quoting there now.
1242 for k, v, q in (
1243 (b"Domain", domain, True),
1244 (b"Expires", expires, False),
1245 (b"Max-Age", max_age, False),
1246 (b"Secure", secure, None),
1247 (b"HttpOnly", httponly, None),
1248 (b"Path", path, False),
1249 (b"SameSite", samesite, False),
1250 ):
1251 if q is None:
1252 if v:
1253 buf.append(k)
1254 continue
1256 if v is None:
1257 continue
1259 tmp = bytearray(k)
1260 if not isinstance(v, (bytes, bytearray)):
1261 v = _to_bytes(str(v), charset)
1262 if q:
1263 v = _cookie_quote(v)
1264 tmp += b"=" + v
1265 buf.append(bytes(tmp))
1267 # The return value will be an incorrectly encoded latin1 header for
1268 # consistency with the headers object.
1269 rv = b"; ".join(buf)
1270 rv = rv.decode("latin1")
1272 # Warn if the final value of the cookie is larger than the limit. If the
1273 # cookie is too large, then it may be silently ignored by the browser,
1274 # which can be quite hard to debug.
1275 cookie_size = len(rv)
1277 if max_size and cookie_size > max_size:
1278 value_size = len(value)
1279 warnings.warn(
1280 f"The {key.decode(charset)!r} cookie is too large: the value was"
1281 f" {value_size} bytes but the"
1282 f" header required {cookie_size - value_size} extra bytes. The final size"
1283 f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may"
1284 f" silently ignore cookies larger than this.",
1285 stacklevel=2,
1286 )
1288 return rv
1291def is_byte_range_valid(
1292 start: t.Optional[int], stop: t.Optional[int], length: t.Optional[int]
1293) -> bool:
1294 """Checks if a given byte content range is valid for the given length.
1296 .. versionadded:: 0.7
1297 """
1298 if (start is None) != (stop is None):
1299 return False
1300 elif start is None:
1301 return length is None or length >= 0
1302 elif length is None:
1303 return 0 <= start < stop # type: ignore
1304 elif start >= stop: # type: ignore
1305 return False
1306 return 0 <= start < length
1309# circular dependencies
1310from . import datastructures as ds
1311from .sansio import http as _sansio_http