Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/http.py: 19%
497 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:03 +0000
1import base64
2import email.utils
3import re
4import typing
5import typing as t
6import warnings
7from datetime import date
8from datetime import datetime
9from datetime import time
10from datetime import timedelta
11from datetime import timezone
12from enum import Enum
13from hashlib import sha1
14from time import mktime
15from time import struct_time
16from urllib.parse import unquote_to_bytes as _unquote
17from urllib.request import parse_http_list as _parse_list_header
19from ._internal import _cookie_parse_impl
20from ._internal import _cookie_quote
21from ._internal import _make_cookie_domain
22from ._internal import _to_bytes
23from ._internal import _to_str
24from ._internal import _wsgi_decoding_dance
25from werkzeug._internal import _dt_as_utc
27if t.TYPE_CHECKING:
28 import typing_extensions as te
29 from _typeshed.wsgi import WSGIEnvironment
31# for explanation of "media-range", etc. see Sections 5.3.{1,2} of RFC 7231
32_accept_re = re.compile(
33 r"""
34 ( # media-range capturing-parenthesis
35 [^\s;,]+ # type/subtype
36 (?:[ \t]*;[ \t]* # ";"
37 (?: # parameter non-capturing-parenthesis
38 [^\s;,q][^\s;,]* # token that doesn't start with "q"
39 | # or
40 q[^\s;,=][^\s;,]* # token that is more than just "q"
41 )
42 )* # zero or more parameters
43 ) # end of media-range
44 (?:[ \t]*;[ \t]*q= # weight is a "q" parameter
45 (\d*(?:\.\d+)?) # qvalue capturing-parentheses
46 [^,]* # "extension" accept params: who cares?
47 )? # accept params are optional
48 """,
49 re.VERBOSE,
50)
51_token_chars = frozenset(
52 "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~"
53)
54_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)')
55_option_header_piece_re = re.compile(
56 r"""
57 ;\s*,?\s* # newlines were replaced with commas
58 (?P<key>
59 "[^"\\]*(?:\\.[^"\\]*)*" # quoted string
60 |
61 [^\s;,=*]+ # token
62 )
63 (?:\*(?P<count>\d+))? # *1, optional continuation index
64 \s*
65 (?: # optionally followed by =value
66 (?: # equals sign, possibly with encoding
67 \*\s*=\s* # * indicates extended notation
68 (?: # optional encoding
69 (?P<encoding>[^\s]+?)
70 '(?P<language>[^\s]*?)'
71 )?
72 |
73 =\s* # basic notation
74 )
75 (?P<value>
76 "[^"\\]*(?:\\.[^"\\]*)*" # quoted string
77 |
78 [^;,]+ # token
79 )?
80 )?
81 \s*
82 """,
83 flags=re.VERBOSE,
84)
85_option_header_start_mime_type = re.compile(r",\s*([^;,\s]+)([;,]\s*.+)?")
86_entity_headers = frozenset(
87 [
88 "allow",
89 "content-encoding",
90 "content-language",
91 "content-length",
92 "content-location",
93 "content-md5",
94 "content-range",
95 "content-type",
96 "expires",
97 "last-modified",
98 ]
99)
100_hop_by_hop_headers = frozenset(
101 [
102 "connection",
103 "keep-alive",
104 "proxy-authenticate",
105 "proxy-authorization",
106 "te",
107 "trailer",
108 "transfer-encoding",
109 "upgrade",
110 ]
111)
112HTTP_STATUS_CODES = {
113 100: "Continue",
114 101: "Switching Protocols",
115 102: "Processing",
116 103: "Early Hints", # see RFC 8297
117 200: "OK",
118 201: "Created",
119 202: "Accepted",
120 203: "Non Authoritative Information",
121 204: "No Content",
122 205: "Reset Content",
123 206: "Partial Content",
124 207: "Multi Status",
125 208: "Already Reported", # see RFC 5842
126 226: "IM Used", # see RFC 3229
127 300: "Multiple Choices",
128 301: "Moved Permanently",
129 302: "Found",
130 303: "See Other",
131 304: "Not Modified",
132 305: "Use Proxy",
133 306: "Switch Proxy", # unused
134 307: "Temporary Redirect",
135 308: "Permanent Redirect",
136 400: "Bad Request",
137 401: "Unauthorized",
138 402: "Payment Required", # unused
139 403: "Forbidden",
140 404: "Not Found",
141 405: "Method Not Allowed",
142 406: "Not Acceptable",
143 407: "Proxy Authentication Required",
144 408: "Request Timeout",
145 409: "Conflict",
146 410: "Gone",
147 411: "Length Required",
148 412: "Precondition Failed",
149 413: "Request Entity Too Large",
150 414: "Request URI Too Long",
151 415: "Unsupported Media Type",
152 416: "Requested Range Not Satisfiable",
153 417: "Expectation Failed",
154 418: "I'm a teapot", # see RFC 2324
155 421: "Misdirected Request", # see RFC 7540
156 422: "Unprocessable Entity",
157 423: "Locked",
158 424: "Failed Dependency",
159 425: "Too Early", # see RFC 8470
160 426: "Upgrade Required",
161 428: "Precondition Required", # see RFC 6585
162 429: "Too Many Requests",
163 431: "Request Header Fields Too Large",
164 449: "Retry With", # proprietary MS extension
165 451: "Unavailable For Legal Reasons",
166 500: "Internal Server Error",
167 501: "Not Implemented",
168 502: "Bad Gateway",
169 503: "Service Unavailable",
170 504: "Gateway Timeout",
171 505: "HTTP Version Not Supported",
172 506: "Variant Also Negotiates", # see RFC 2295
173 507: "Insufficient Storage",
174 508: "Loop Detected", # see RFC 5842
175 510: "Not Extended",
176 511: "Network Authentication Failed",
177}
180class COEP(Enum):
181 """Cross Origin Embedder Policies"""
183 UNSAFE_NONE = "unsafe-none"
184 REQUIRE_CORP = "require-corp"
187class COOP(Enum):
188 """Cross Origin Opener Policies"""
190 UNSAFE_NONE = "unsafe-none"
191 SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups"
192 SAME_ORIGIN = "same-origin"
195def quote_header_value(
196 value: t.Union[str, int], extra_chars: str = "", allow_token: bool = True
197) -> str:
198 """Quote a header value if necessary.
200 .. versionadded:: 0.5
202 :param value: the value to quote.
203 :param extra_chars: a list of extra characters to skip quoting.
204 :param allow_token: if this is enabled token values are returned
205 unchanged.
206 """
207 if isinstance(value, bytes):
208 value = value.decode("latin1")
209 value = str(value)
210 if allow_token:
211 token_chars = _token_chars | set(extra_chars)
212 if set(value).issubset(token_chars):
213 return value
214 value = value.replace("\\", "\\\\").replace('"', '\\"')
215 return f'"{value}"'
218def unquote_header_value(value: str, is_filename: bool = False) -> str:
219 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
220 This does not use the real unquoting but what browsers are actually
221 using for quoting.
223 .. versionadded:: 0.5
225 :param value: the header value to unquote.
226 :param is_filename: The value represents a filename or path.
227 """
228 if value and value[0] == value[-1] == '"':
229 # this is not the real unquoting, but fixing this so that the
230 # RFC is met will result in bugs with internet explorer and
231 # probably some other browsers as well. IE for example is
232 # uploading files with "C:\foo\bar.txt" as filename
233 value = value[1:-1]
235 # if this is a filename and the starting characters look like
236 # a UNC path, then just return the value without quotes. Using the
237 # replace sequence below on a UNC path has the effect of turning
238 # the leading double slash into a single slash and then
239 # _fix_ie_filename() doesn't work correctly. See #458.
240 if not is_filename or value[:2] != "\\\\":
241 return value.replace("\\\\", "\\").replace('\\"', '"')
242 return value
245def dump_options_header(
246 header: t.Optional[str], options: t.Mapping[str, t.Optional[t.Union[str, int]]]
247) -> str:
248 """The reverse function to :func:`parse_options_header`.
250 :param header: the header to dump
251 :param options: a dict of options to append.
252 """
253 segments = []
254 if header is not None:
255 segments.append(header)
256 for key, value in options.items():
257 if value is None:
258 segments.append(key)
259 else:
260 segments.append(f"{key}={quote_header_value(value)}")
261 return "; ".join(segments)
264def dump_header(
265 iterable: t.Union[t.Dict[str, t.Union[str, int]], t.Iterable[str]],
266 allow_token: bool = True,
267) -> str:
268 """Dump an HTTP header again. This is the reversal of
269 :func:`parse_list_header`, :func:`parse_set_header` and
270 :func:`parse_dict_header`. This also quotes strings that include an
271 equals sign unless you pass it as dict of key, value pairs.
273 >>> dump_header({'foo': 'bar baz'})
274 'foo="bar baz"'
275 >>> dump_header(('foo', 'bar baz'))
276 'foo, "bar baz"'
278 :param iterable: the iterable or dict of values to quote.
279 :param allow_token: if set to `False` tokens as values are disallowed.
280 See :func:`quote_header_value` for more details.
281 """
282 if isinstance(iterable, dict):
283 items = []
284 for key, value in iterable.items():
285 if value is None:
286 items.append(key)
287 else:
288 items.append(
289 f"{key}={quote_header_value(value, allow_token=allow_token)}"
290 )
291 else:
292 items = [quote_header_value(x, allow_token=allow_token) for x in iterable]
293 return ", ".join(items)
296def dump_csp_header(header: "ds.ContentSecurityPolicy") -> str:
297 """Dump a Content Security Policy header.
299 These are structured into policies such as "default-src 'self';
300 script-src 'self'".
302 .. versionadded:: 1.0.0
303 Support for Content Security Policy headers was added.
305 """
306 return "; ".join(f"{key} {value}" for key, value in header.items())
309def parse_list_header(value: str) -> t.List[str]:
310 """Parse lists as described by RFC 2068 Section 2.
312 In particular, parse comma-separated lists where the elements of
313 the list may include quoted-strings. A quoted-string could
314 contain a comma. A non-quoted string could have quotes in the
315 middle. Quotes are removed automatically after parsing.
317 It basically works like :func:`parse_set_header` just that items
318 may appear multiple times and case sensitivity is preserved.
320 The return value is a standard :class:`list`:
322 >>> parse_list_header('token, "quoted value"')
323 ['token', 'quoted value']
325 To create a header from the :class:`list` again, use the
326 :func:`dump_header` function.
328 :param value: a string with a list header.
329 :return: :class:`list`
330 """
331 result = []
332 for item in _parse_list_header(value):
333 if item[:1] == item[-1:] == '"':
334 item = unquote_header_value(item[1:-1])
335 result.append(item)
336 return result
339def parse_dict_header(value: str, cls: t.Type[dict] = dict) -> t.Dict[str, str]:
340 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
341 convert them into a python dict (or any other mapping object created from
342 the type with a dict like interface provided by the `cls` argument):
344 >>> d = parse_dict_header('foo="is a fish", bar="as well"')
345 >>> type(d) is dict
346 True
347 >>> sorted(d.items())
348 [('bar', 'as well'), ('foo', 'is a fish')]
350 If there is no value for a key it will be `None`:
352 >>> parse_dict_header('key_without_value')
353 {'key_without_value': None}
355 To create a header from the :class:`dict` again, use the
356 :func:`dump_header` function.
358 .. versionchanged:: 0.9
359 Added support for `cls` argument.
361 :param value: a string with a dict header.
362 :param cls: callable to use for storage of parsed results.
363 :return: an instance of `cls`
364 """
365 result = cls()
366 if isinstance(value, bytes):
367 value = value.decode("latin1")
368 for item in _parse_list_header(value):
369 if "=" not in item:
370 result[item] = None
371 continue
372 name, value = item.split("=", 1)
373 if value[:1] == value[-1:] == '"':
374 value = unquote_header_value(value[1:-1])
375 result[name] = value
376 return result
379def parse_options_header(
380 value: t.Optional[str], multiple: "te.Literal[None]" = None
381) -> t.Tuple[str, t.Dict[str, str]]:
382 """Parse a ``Content-Type``-like header into a tuple with the
383 value and any options:
385 >>> parse_options_header('text/html; charset=utf8')
386 ('text/html', {'charset': 'utf8'})
388 This should is not for ``Cache-Control``-like headers, which use a
389 different format. For those, use :func:`parse_dict_header`.
391 :param value: The header value to parse.
393 .. versionchanged:: 2.1
394 The ``multiple`` parameter is deprecated and will be removed in
395 Werkzeug 2.2.
397 .. versionchanged:: 0.15
398 :rfc:`2231` parameter continuations are handled.
400 .. versionadded:: 0.5
401 """
402 if multiple is not None:
403 import warnings
405 warnings.warn(
406 "The 'multiple' parameter of 'parse_options_header' is"
407 " deprecated and will be removed in Werkzeug 2.2.",
408 DeprecationWarning,
409 stacklevel=2,
410 )
412 if not value:
413 return "", {}
415 result: t.List[t.Any] = []
417 value = "," + value.replace("\n", ",")
418 while value:
419 match = _option_header_start_mime_type.match(value)
420 if not match:
421 break
422 result.append(match.group(1)) # mimetype
423 options: t.Dict[str, str] = {}
424 # Parse options
425 rest = match.group(2)
426 encoding: t.Optional[str]
427 continued_encoding: t.Optional[str] = None
428 while rest:
429 optmatch = _option_header_piece_re.match(rest)
430 if not optmatch:
431 break
432 option, count, encoding, language, option_value = optmatch.groups()
433 # Continuations don't have to supply the encoding after the
434 # first line. If we're in a continuation, track the current
435 # encoding to use for subsequent lines. Reset it when the
436 # continuation ends.
437 if not count:
438 continued_encoding = None
439 else:
440 if not encoding:
441 encoding = continued_encoding
442 continued_encoding = encoding
443 option = unquote_header_value(option)
445 if option_value is not None:
446 option_value = unquote_header_value(option_value, option == "filename")
448 if encoding is not None:
449 option_value = _unquote(option_value).decode(encoding)
451 if count:
452 # Continuations append to the existing value. For
453 # simplicity, this ignores the possibility of
454 # out-of-order indices, which shouldn't happen anyway.
455 if option_value is not None:
456 options[option] = options.get(option, "") + option_value
457 else:
458 options[option] = option_value # type: ignore[assignment]
460 rest = rest[optmatch.end() :]
461 result.append(options)
462 if not multiple:
463 return tuple(result) # type: ignore[return-value]
464 value = rest
466 return tuple(result) if result else ("", {}) # type: ignore[return-value]
469_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept")
472@typing.overload
473def parse_accept_header(value: t.Optional[str]) -> "ds.Accept":
474 ...
477@typing.overload
478def parse_accept_header(
479 value: t.Optional[str], cls: t.Type[_TAnyAccept]
480) -> _TAnyAccept:
481 ...
484def parse_accept_header(
485 value: t.Optional[str], cls: t.Optional[t.Type[_TAnyAccept]] = None
486) -> _TAnyAccept:
487 """Parses an HTTP Accept-* header. This does not implement a complete
488 valid algorithm but one that supports at least value and quality
489 extraction.
491 Returns a new :class:`Accept` object (basically a list of ``(value, quality)``
492 tuples sorted by the quality with some additional accessor methods).
494 The second parameter can be a subclass of :class:`Accept` that is created
495 with the parsed values and returned.
497 :param value: the accept header string to be parsed.
498 :param cls: the wrapper class for the return value (can be
499 :class:`Accept` or a subclass thereof)
500 :return: an instance of `cls`.
501 """
502 if cls is None:
503 cls = t.cast(t.Type[_TAnyAccept], ds.Accept)
505 if not value:
506 return cls(None)
508 result = []
509 for match in _accept_re.finditer(value):
510 quality_match = match.group(2)
511 if not quality_match:
512 quality: float = 1
513 else:
514 quality = max(min(float(quality_match), 1), 0)
515 result.append((match.group(1), quality))
516 return cls(result)
519_TAnyCC = t.TypeVar("_TAnyCC", bound="ds._CacheControl")
520_t_cc_update = t.Optional[t.Callable[[_TAnyCC], None]]
523@typing.overload
524def parse_cache_control_header(
525 value: t.Optional[str], on_update: _t_cc_update, cls: None = None
526) -> "ds.RequestCacheControl":
527 ...
530@typing.overload
531def parse_cache_control_header(
532 value: t.Optional[str], on_update: _t_cc_update, cls: t.Type[_TAnyCC]
533) -> _TAnyCC:
534 ...
537def parse_cache_control_header(
538 value: t.Optional[str],
539 on_update: _t_cc_update = None,
540 cls: t.Optional[t.Type[_TAnyCC]] = None,
541) -> _TAnyCC:
542 """Parse a cache control header. The RFC differs between response and
543 request cache control, this method does not. It's your responsibility
544 to not use the wrong control statements.
546 .. versionadded:: 0.5
547 The `cls` was added. If not specified an immutable
548 :class:`~werkzeug.datastructures.RequestCacheControl` is returned.
550 :param value: a cache control header to be parsed.
551 :param on_update: an optional callable that is called every time a value
552 on the :class:`~werkzeug.datastructures.CacheControl`
553 object is changed.
554 :param cls: the class for the returned object. By default
555 :class:`~werkzeug.datastructures.RequestCacheControl` is used.
556 :return: a `cls` object.
557 """
558 if cls is None:
559 cls = t.cast(t.Type[_TAnyCC], ds.RequestCacheControl)
561 if not value:
562 return cls((), on_update)
564 return cls(parse_dict_header(value), on_update)
567_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy")
568_t_csp_update = t.Optional[t.Callable[[_TAnyCSP], None]]
571@typing.overload
572def parse_csp_header(
573 value: t.Optional[str], on_update: _t_csp_update, cls: None = None
574) -> "ds.ContentSecurityPolicy":
575 ...
578@typing.overload
579def parse_csp_header(
580 value: t.Optional[str], on_update: _t_csp_update, cls: t.Type[_TAnyCSP]
581) -> _TAnyCSP:
582 ...
585def parse_csp_header(
586 value: t.Optional[str],
587 on_update: _t_csp_update = None,
588 cls: t.Optional[t.Type[_TAnyCSP]] = None,
589) -> _TAnyCSP:
590 """Parse a Content Security Policy header.
592 .. versionadded:: 1.0.0
593 Support for Content Security Policy headers was added.
595 :param value: a csp header to be parsed.
596 :param on_update: an optional callable that is called every time a value
597 on the object is changed.
598 :param cls: the class for the returned object. By default
599 :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used.
600 :return: a `cls` object.
601 """
602 if cls is None:
603 cls = t.cast(t.Type[_TAnyCSP], ds.ContentSecurityPolicy)
605 if value is None:
606 return cls((), on_update)
608 items = []
610 for policy in value.split(";"):
611 policy = policy.strip()
613 # Ignore badly formatted policies (no space)
614 if " " in policy:
615 directive, value = policy.strip().split(" ", 1)
616 items.append((directive.strip(), value.strip()))
618 return cls(items, on_update)
621def parse_set_header(
622 value: t.Optional[str],
623 on_update: t.Optional[t.Callable[["ds.HeaderSet"], None]] = None,
624) -> "ds.HeaderSet":
625 """Parse a set-like header and return a
626 :class:`~werkzeug.datastructures.HeaderSet` object:
628 >>> hs = parse_set_header('token, "quoted value"')
630 The return value is an object that treats the items case-insensitively
631 and keeps the order of the items:
633 >>> 'TOKEN' in hs
634 True
635 >>> hs.index('quoted value')
636 1
637 >>> hs
638 HeaderSet(['token', 'quoted value'])
640 To create a header from the :class:`HeaderSet` again, use the
641 :func:`dump_header` function.
643 :param value: a set header to be parsed.
644 :param on_update: an optional callable that is called every time a
645 value on the :class:`~werkzeug.datastructures.HeaderSet`
646 object is changed.
647 :return: a :class:`~werkzeug.datastructures.HeaderSet`
648 """
649 if not value:
650 return ds.HeaderSet(None, on_update)
651 return ds.HeaderSet(parse_list_header(value), on_update)
654def parse_authorization_header(
655 value: t.Optional[str],
656) -> t.Optional["ds.Authorization"]:
657 """Parse an HTTP basic/digest authorization header transmitted by the web
658 browser. The return value is either `None` if the header was invalid or
659 not given, otherwise an :class:`~werkzeug.datastructures.Authorization`
660 object.
662 :param value: the authorization header to parse.
663 :return: a :class:`~werkzeug.datastructures.Authorization` object or `None`.
664 """
665 if not value:
666 return None
667 value = _wsgi_decoding_dance(value)
668 try:
669 auth_type, auth_info = value.split(None, 1)
670 auth_type = auth_type.lower()
671 except ValueError:
672 return None
673 if auth_type == "basic":
674 try:
675 username, password = base64.b64decode(auth_info).split(b":", 1)
676 except Exception:
677 return None
678 try:
679 return ds.Authorization(
680 "basic",
681 {
682 "username": _to_str(username, "utf-8"),
683 "password": _to_str(password, "utf-8"),
684 },
685 )
686 except UnicodeDecodeError:
687 return None
688 elif auth_type == "digest":
689 auth_map = parse_dict_header(auth_info)
690 for key in "username", "realm", "nonce", "uri", "response":
691 if key not in auth_map:
692 return None
693 if "qop" in auth_map:
694 if not auth_map.get("nc") or not auth_map.get("cnonce"):
695 return None
696 return ds.Authorization("digest", auth_map)
697 return None
700def parse_www_authenticate_header(
701 value: t.Optional[str],
702 on_update: t.Optional[t.Callable[["ds.WWWAuthenticate"], None]] = None,
703) -> "ds.WWWAuthenticate":
704 """Parse an HTTP WWW-Authenticate header into a
705 :class:`~werkzeug.datastructures.WWWAuthenticate` object.
707 :param value: a WWW-Authenticate header to parse.
708 :param on_update: an optional callable that is called every time a value
709 on the :class:`~werkzeug.datastructures.WWWAuthenticate`
710 object is changed.
711 :return: a :class:`~werkzeug.datastructures.WWWAuthenticate` object.
712 """
713 if not value:
714 return ds.WWWAuthenticate(on_update=on_update)
715 try:
716 auth_type, auth_info = value.split(None, 1)
717 auth_type = auth_type.lower()
718 except (ValueError, AttributeError):
719 return ds.WWWAuthenticate(value.strip().lower(), on_update=on_update)
720 return ds.WWWAuthenticate(auth_type, parse_dict_header(auth_info), on_update)
723def parse_if_range_header(value: t.Optional[str]) -> "ds.IfRange":
724 """Parses an if-range header which can be an etag or a date. Returns
725 a :class:`~werkzeug.datastructures.IfRange` object.
727 .. versionchanged:: 2.0
728 If the value represents a datetime, it is timezone-aware.
730 .. versionadded:: 0.7
731 """
732 if not value:
733 return ds.IfRange()
734 date = parse_date(value)
735 if date is not None:
736 return ds.IfRange(date=date)
737 # drop weakness information
738 return ds.IfRange(unquote_etag(value)[0])
741def parse_range_header(
742 value: t.Optional[str], make_inclusive: bool = True
743) -> t.Optional["ds.Range"]:
744 """Parses a range header into a :class:`~werkzeug.datastructures.Range`
745 object. If the header is missing or malformed `None` is returned.
746 `ranges` is a list of ``(start, stop)`` tuples where the ranges are
747 non-inclusive.
749 .. versionadded:: 0.7
750 """
751 if not value or "=" not in value:
752 return None
754 ranges = []
755 last_end = 0
756 units, rng = value.split("=", 1)
757 units = units.strip().lower()
759 for item in rng.split(","):
760 item = item.strip()
761 if "-" not in item:
762 return None
763 if item.startswith("-"):
764 if last_end < 0:
765 return None
766 try:
767 begin = int(item)
768 except ValueError:
769 return None
770 end = None
771 last_end = -1
772 elif "-" in item:
773 begin_str, end_str = item.split("-", 1)
774 begin_str = begin_str.strip()
775 end_str = end_str.strip()
776 if not begin_str.isdigit():
777 return None
778 begin = int(begin_str)
779 if begin < last_end or last_end < 0:
780 return None
781 if end_str:
782 if not end_str.isdigit():
783 return None
784 end = int(end_str) + 1
785 if begin >= end:
786 return None
787 else:
788 end = None
789 last_end = end if end is not None else -1
790 ranges.append((begin, end))
792 return ds.Range(units, ranges)
795def parse_content_range_header(
796 value: t.Optional[str],
797 on_update: t.Optional[t.Callable[["ds.ContentRange"], None]] = None,
798) -> t.Optional["ds.ContentRange"]:
799 """Parses a range header into a
800 :class:`~werkzeug.datastructures.ContentRange` object or `None` if
801 parsing is not possible.
803 .. versionadded:: 0.7
805 :param value: a content range header to be parsed.
806 :param on_update: an optional callable that is called every time a value
807 on the :class:`~werkzeug.datastructures.ContentRange`
808 object is changed.
809 """
810 if value is None:
811 return None
812 try:
813 units, rangedef = (value or "").strip().split(None, 1)
814 except ValueError:
815 return None
817 if "/" not in rangedef:
818 return None
819 rng, length_str = rangedef.split("/", 1)
820 if length_str == "*":
821 length = None
822 elif length_str.isdigit():
823 length = int(length_str)
824 else:
825 return None
827 if rng == "*":
828 return ds.ContentRange(units, None, None, length, on_update=on_update)
829 elif "-" not in rng:
830 return None
832 start_str, stop_str = rng.split("-", 1)
833 try:
834 start = int(start_str)
835 stop = int(stop_str) + 1
836 except ValueError:
837 return None
839 if is_byte_range_valid(start, stop, length):
840 return ds.ContentRange(units, start, stop, length, on_update=on_update)
842 return None
845def quote_etag(etag: str, weak: bool = False) -> str:
846 """Quote an etag.
848 :param etag: the etag to quote.
849 :param weak: set to `True` to tag it "weak".
850 """
851 if '"' in etag:
852 raise ValueError("invalid etag")
853 etag = f'"{etag}"'
854 if weak:
855 etag = f"W/{etag}"
856 return etag
859def unquote_etag(
860 etag: t.Optional[str],
861) -> t.Union[t.Tuple[str, bool], t.Tuple[None, None]]:
862 """Unquote a single etag:
864 >>> unquote_etag('W/"bar"')
865 ('bar', True)
866 >>> unquote_etag('"bar"')
867 ('bar', False)
869 :param etag: the etag identifier to unquote.
870 :return: a ``(etag, weak)`` tuple.
871 """
872 if not etag:
873 return None, None
874 etag = etag.strip()
875 weak = False
876 if etag.startswith(("W/", "w/")):
877 weak = True
878 etag = etag[2:]
879 if etag[:1] == etag[-1:] == '"':
880 etag = etag[1:-1]
881 return etag, weak
884def parse_etags(value: t.Optional[str]) -> "ds.ETags":
885 """Parse an etag header.
887 :param value: the tag header to parse
888 :return: an :class:`~werkzeug.datastructures.ETags` object.
889 """
890 if not value:
891 return ds.ETags()
892 strong = []
893 weak = []
894 end = len(value)
895 pos = 0
896 while pos < end:
897 match = _etag_re.match(value, pos)
898 if match is None:
899 break
900 is_weak, quoted, raw = match.groups()
901 if raw == "*":
902 return ds.ETags(star_tag=True)
903 elif quoted:
904 raw = quoted
905 if is_weak:
906 weak.append(raw)
907 else:
908 strong.append(raw)
909 pos = match.end()
910 return ds.ETags(strong, weak)
913def generate_etag(data: bytes) -> str:
914 """Generate an etag for some data.
916 .. versionchanged:: 2.0
917 Use SHA-1. MD5 may not be available in some environments.
918 """
919 return sha1(data).hexdigest()
922def parse_date(value: t.Optional[str]) -> t.Optional[datetime]:
923 """Parse an :rfc:`2822` date into a timezone-aware
924 :class:`datetime.datetime` object, or ``None`` if parsing fails.
926 This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It
927 returns ``None`` if parsing fails instead of raising an exception,
928 and always returns a timezone-aware datetime object. If the string
929 doesn't have timezone information, it is assumed to be UTC.
931 :param value: A string with a supported date format.
933 .. versionchanged:: 2.0
934 Return a timezone-aware datetime object. Use
935 ``email.utils.parsedate_to_datetime``.
936 """
937 if value is None:
938 return None
940 try:
941 dt = email.utils.parsedate_to_datetime(value)
942 except (TypeError, ValueError):
943 return None
945 if dt.tzinfo is None:
946 return dt.replace(tzinfo=timezone.utc)
948 return dt
951def http_date(
952 timestamp: t.Optional[t.Union[datetime, date, int, float, struct_time]] = None
953) -> str:
954 """Format a datetime object or timestamp into an :rfc:`2822` date
955 string.
957 This is a wrapper for :func:`email.utils.format_datetime`. It
958 assumes naive datetime objects are in UTC instead of raising an
959 exception.
961 :param timestamp: The datetime or timestamp to format. Defaults to
962 the current time.
964 .. versionchanged:: 2.0
965 Use ``email.utils.format_datetime``. Accept ``date`` objects.
966 """
967 if isinstance(timestamp, date):
968 if not isinstance(timestamp, datetime):
969 # Assume plain date is midnight UTC.
970 timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc)
971 else:
972 # Ensure datetime is timezone-aware.
973 timestamp = _dt_as_utc(timestamp)
975 return email.utils.format_datetime(timestamp, usegmt=True)
977 if isinstance(timestamp, struct_time):
978 timestamp = mktime(timestamp)
980 return email.utils.formatdate(timestamp, usegmt=True)
983def parse_age(value: t.Optional[str] = None) -> t.Optional[timedelta]:
984 """Parses a base-10 integer count of seconds into a timedelta.
986 If parsing fails, the return value is `None`.
988 :param value: a string consisting of an integer represented in base-10
989 :return: a :class:`datetime.timedelta` object or `None`.
990 """
991 if not value:
992 return None
993 try:
994 seconds = int(value)
995 except ValueError:
996 return None
997 if seconds < 0:
998 return None
999 try:
1000 return timedelta(seconds=seconds)
1001 except OverflowError:
1002 return None
1005def dump_age(age: t.Optional[t.Union[timedelta, int]] = None) -> t.Optional[str]:
1006 """Formats the duration as a base-10 integer.
1008 :param age: should be an integer number of seconds,
1009 a :class:`datetime.timedelta` object, or,
1010 if the age is unknown, `None` (default).
1011 """
1012 if age is None:
1013 return None
1014 if isinstance(age, timedelta):
1015 age = int(age.total_seconds())
1016 else:
1017 age = int(age)
1019 if age < 0:
1020 raise ValueError("age cannot be negative")
1022 return str(age)
1025def is_resource_modified(
1026 environ: "WSGIEnvironment",
1027 etag: t.Optional[str] = None,
1028 data: t.Optional[bytes] = None,
1029 last_modified: t.Optional[t.Union[datetime, str]] = None,
1030 ignore_if_range: bool = True,
1031) -> bool:
1032 """Convenience method for conditional requests.
1034 :param environ: the WSGI environment of the request to be checked.
1035 :param etag: the etag for the response for comparison.
1036 :param data: or alternatively the data of the response to automatically
1037 generate an etag using :func:`generate_etag`.
1038 :param last_modified: an optional date of the last modification.
1039 :param ignore_if_range: If `False`, `If-Range` header will be taken into
1040 account.
1041 :return: `True` if the resource was modified, otherwise `False`.
1043 .. versionchanged:: 2.0
1044 SHA-1 is used to generate an etag value for the data. MD5 may
1045 not be available in some environments.
1047 .. versionchanged:: 1.0.0
1048 The check is run for methods other than ``GET`` and ``HEAD``.
1049 """
1050 if etag is None and data is not None:
1051 etag = generate_etag(data)
1052 elif data is not None:
1053 raise TypeError("both data and etag given")
1055 unmodified = False
1056 if isinstance(last_modified, str):
1057 last_modified = parse_date(last_modified)
1059 # HTTP doesn't use microsecond, remove it to avoid false positive
1060 # comparisons. Mark naive datetimes as UTC.
1061 if last_modified is not None:
1062 last_modified = _dt_as_utc(last_modified.replace(microsecond=0))
1064 if_range = None
1065 if not ignore_if_range and "HTTP_RANGE" in environ:
1066 # https://tools.ietf.org/html/rfc7233#section-3.2
1067 # A server MUST ignore an If-Range header field received in a request
1068 # that does not contain a Range header field.
1069 if_range = parse_if_range_header(environ.get("HTTP_IF_RANGE"))
1071 if if_range is not None and if_range.date is not None:
1072 modified_since: t.Optional[datetime] = if_range.date
1073 else:
1074 modified_since = parse_date(environ.get("HTTP_IF_MODIFIED_SINCE"))
1076 if modified_since and last_modified and last_modified <= modified_since:
1077 unmodified = True
1079 if etag:
1080 etag, _ = unquote_etag(etag)
1081 etag = t.cast(str, etag)
1083 if if_range is not None and if_range.etag is not None:
1084 unmodified = parse_etags(if_range.etag).contains(etag)
1085 else:
1086 if_none_match = parse_etags(environ.get("HTTP_IF_NONE_MATCH"))
1087 if if_none_match:
1088 # https://tools.ietf.org/html/rfc7232#section-3.2
1089 # "A recipient MUST use the weak comparison function when comparing
1090 # entity-tags for If-None-Match"
1091 unmodified = if_none_match.contains_weak(etag)
1093 # https://tools.ietf.org/html/rfc7232#section-3.1
1094 # "Origin server MUST use the strong comparison function when
1095 # comparing entity-tags for If-Match"
1096 if_match = parse_etags(environ.get("HTTP_IF_MATCH"))
1097 if if_match:
1098 unmodified = not if_match.is_strong(etag)
1100 return not unmodified
1103def remove_entity_headers(
1104 headers: t.Union["ds.Headers", t.List[t.Tuple[str, str]]],
1105 allowed: t.Iterable[str] = ("expires", "content-location"),
1106) -> None:
1107 """Remove all entity headers from a list or :class:`Headers` object. This
1108 operation works in-place. `Expires` and `Content-Location` headers are
1109 by default not removed. The reason for this is :rfc:`2616` section
1110 10.3.5 which specifies some entity headers that should be sent.
1112 .. versionchanged:: 0.5
1113 added `allowed` parameter.
1115 :param headers: a list or :class:`Headers` object.
1116 :param allowed: a list of headers that should still be allowed even though
1117 they are entity headers.
1118 """
1119 allowed = {x.lower() for x in allowed}
1120 headers[:] = [
1121 (key, value)
1122 for key, value in headers
1123 if not is_entity_header(key) or key.lower() in allowed
1124 ]
1127def remove_hop_by_hop_headers(
1128 headers: t.Union["ds.Headers", t.List[t.Tuple[str, str]]]
1129) -> None:
1130 """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or
1131 :class:`Headers` object. This operation works in-place.
1133 .. versionadded:: 0.5
1135 :param headers: a list or :class:`Headers` object.
1136 """
1137 headers[:] = [
1138 (key, value) for key, value in headers if not is_hop_by_hop_header(key)
1139 ]
1142def is_entity_header(header: str) -> bool:
1143 """Check if a header is an entity header.
1145 .. versionadded:: 0.5
1147 :param header: the header to test.
1148 :return: `True` if it's an entity header, `False` otherwise.
1149 """
1150 return header.lower() in _entity_headers
1153def is_hop_by_hop_header(header: str) -> bool:
1154 """Check if a header is an HTTP/1.1 "Hop-by-Hop" header.
1156 .. versionadded:: 0.5
1158 :param header: the header to test.
1159 :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise.
1160 """
1161 return header.lower() in _hop_by_hop_headers
1164def parse_cookie(
1165 header: t.Union["WSGIEnvironment", str, bytes, None],
1166 charset: str = "utf-8",
1167 errors: str = "replace",
1168 cls: t.Optional[t.Type["ds.MultiDict"]] = None,
1169) -> "ds.MultiDict[str, str]":
1170 """Parse a cookie from a string or WSGI environ.
1172 The same key can be provided multiple times, the values are stored
1173 in-order. The default :class:`MultiDict` will have the first value
1174 first, and all values can be retrieved with
1175 :meth:`MultiDict.getlist`.
1177 :param header: The cookie header as a string, or a WSGI environ dict
1178 with a ``HTTP_COOKIE`` key.
1179 :param charset: The charset for the cookie values.
1180 :param errors: The error behavior for the charset decoding.
1181 :param cls: A dict-like class to store the parsed cookies in.
1182 Defaults to :class:`MultiDict`.
1184 .. versionchanged:: 1.0.0
1185 Returns a :class:`MultiDict` instead of a
1186 ``TypeConversionDict``.
1188 .. versionchanged:: 0.5
1189 Returns a :class:`TypeConversionDict` instead of a regular dict.
1190 The ``cls`` parameter was added.
1191 """
1192 if isinstance(header, dict):
1193 header = header.get("HTTP_COOKIE", "")
1194 elif header is None:
1195 header = ""
1197 # PEP 3333 sends headers through the environ as latin1 decoded
1198 # strings. Encode strings back to bytes for parsing.
1199 if isinstance(header, str):
1200 header = header.encode("latin1", "replace")
1202 if cls is None:
1203 cls = ds.MultiDict
1205 def _parse_pairs() -> t.Iterator[t.Tuple[str, str]]:
1206 for key, val in _cookie_parse_impl(header): # type: ignore
1207 key_str = _to_str(key, charset, errors, allow_none_charset=True)
1209 if not key_str:
1210 continue
1212 val_str = _to_str(val, charset, errors, allow_none_charset=True)
1213 yield key_str, val_str
1215 return cls(_parse_pairs())
1218def dump_cookie(
1219 key: str,
1220 value: t.Union[bytes, str] = "",
1221 max_age: t.Optional[t.Union[timedelta, int]] = None,
1222 expires: t.Optional[t.Union[str, datetime, int, float]] = None,
1223 path: t.Optional[str] = "/",
1224 domain: t.Optional[str] = None,
1225 secure: bool = False,
1226 httponly: bool = False,
1227 charset: str = "utf-8",
1228 sync_expires: bool = True,
1229 max_size: int = 4093,
1230 samesite: t.Optional[str] = None,
1231) -> str:
1232 """Create a Set-Cookie header without the ``Set-Cookie`` prefix.
1234 The return value is usually restricted to ascii as the vast majority
1235 of values are properly escaped, but that is no guarantee. It's
1236 tunneled through latin1 as required by :pep:`3333`.
1238 The return value is not ASCII safe if the key contains unicode
1239 characters. This is technically against the specification but
1240 happens in the wild. It's strongly recommended to not use
1241 non-ASCII values for the keys.
1243 :param max_age: should be a number of seconds, or `None` (default) if
1244 the cookie should last only as long as the client's
1245 browser session. Additionally `timedelta` objects
1246 are accepted, too.
1247 :param expires: should be a `datetime` object or unix timestamp.
1248 :param path: limits the cookie to a given path, per default it will
1249 span the whole domain.
1250 :param domain: Use this if you want to set a cross-domain cookie. For
1251 example, ``domain=".example.com"`` will set a cookie
1252 that is readable by the domain ``www.example.com``,
1253 ``foo.example.com`` etc. Otherwise, a cookie will only
1254 be readable by the domain that set it.
1255 :param secure: The cookie will only be available via HTTPS
1256 :param httponly: disallow JavaScript to access the cookie. This is an
1257 extension to the cookie standard and probably not
1258 supported by all browsers.
1259 :param charset: the encoding for string values.
1260 :param sync_expires: automatically set expires if max_age is defined
1261 but expires not.
1262 :param max_size: Warn if the final header value exceeds this size. The
1263 default, 4093, should be safely `supported by most browsers
1264 <cookie_>`_. Set to 0 to disable this check.
1265 :param samesite: Limits the scope of the cookie such that it will
1266 only be attached to requests if those requests are same-site.
1268 .. _`cookie`: http://browsercookielimits.squawky.net/
1270 .. versionchanged:: 1.0.0
1271 The string ``'None'`` is accepted for ``samesite``.
1272 """
1273 key = _to_bytes(key, charset)
1274 value = _to_bytes(value, charset)
1276 if path is not None:
1277 from .urls import iri_to_uri
1279 path = iri_to_uri(path, charset)
1281 domain = _make_cookie_domain(domain)
1283 if isinstance(max_age, timedelta):
1284 max_age = int(max_age.total_seconds())
1286 if expires is not None:
1287 if not isinstance(expires, str):
1288 expires = http_date(expires)
1289 elif max_age is not None and sync_expires:
1290 expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age)
1292 if samesite is not None:
1293 samesite = samesite.title()
1295 if samesite not in {"Strict", "Lax", "None"}:
1296 raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.")
1298 buf = [key + b"=" + _cookie_quote(value)]
1300 # XXX: In theory all of these parameters that are not marked with `None`
1301 # should be quoted. Because stdlib did not quote it before I did not
1302 # want to introduce quoting there now.
1303 for k, v, q in (
1304 (b"Domain", domain, True),
1305 (b"Expires", expires, False),
1306 (b"Max-Age", max_age, False),
1307 (b"Secure", secure, None),
1308 (b"HttpOnly", httponly, None),
1309 (b"Path", path, False),
1310 (b"SameSite", samesite, False),
1311 ):
1312 if q is None:
1313 if v:
1314 buf.append(k)
1315 continue
1317 if v is None:
1318 continue
1320 tmp = bytearray(k)
1321 if not isinstance(v, (bytes, bytearray)):
1322 v = _to_bytes(str(v), charset)
1323 if q:
1324 v = _cookie_quote(v)
1325 tmp += b"=" + v
1326 buf.append(bytes(tmp))
1328 # The return value will be an incorrectly encoded latin1 header for
1329 # consistency with the headers object.
1330 rv = b"; ".join(buf)
1331 rv = rv.decode("latin1")
1333 # Warn if the final value of the cookie is larger than the limit. If the
1334 # cookie is too large, then it may be silently ignored by the browser,
1335 # which can be quite hard to debug.
1336 cookie_size = len(rv)
1338 if max_size and cookie_size > max_size:
1339 value_size = len(value)
1340 warnings.warn(
1341 f"The {key.decode(charset)!r} cookie is too large: the value was"
1342 f" {value_size} bytes but the"
1343 f" header required {cookie_size - value_size} extra bytes. The final size"
1344 f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may"
1345 f" silently ignore cookies larger than this.",
1346 stacklevel=2,
1347 )
1349 return rv
1352def is_byte_range_valid(
1353 start: t.Optional[int], stop: t.Optional[int], length: t.Optional[int]
1354) -> bool:
1355 """Checks if a given byte content range is valid for the given length.
1357 .. versionadded:: 0.7
1358 """
1359 if (start is None) != (stop is None):
1360 return False
1361 elif start is None:
1362 return length is None or length >= 0
1363 elif length is None:
1364 return 0 <= start < stop # type: ignore
1365 elif start >= stop: # type: ignore
1366 return False
1367 return 0 <= start < length
1370# circular dependencies
1371from . import datastructures as ds