Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/http.py: 20%

456 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1import base64 

2import email.utils 

3import re 

4import typing 

5import typing as t 

6import warnings 

7from datetime import date 

8from datetime import datetime 

9from datetime import time 

10from datetime import timedelta 

11from datetime import timezone 

12from enum import Enum 

13from hashlib import sha1 

14from time import mktime 

15from time import struct_time 

16from urllib.parse import unquote_to_bytes as _unquote 

17from urllib.request import parse_http_list as _parse_list_header 

18 

19from ._internal import _cookie_quote 

20from ._internal import _dt_as_utc 

21from ._internal import _make_cookie_domain 

22from ._internal import _to_bytes 

23from ._internal import _to_str 

24from ._internal import _wsgi_decoding_dance 

25 

26if t.TYPE_CHECKING: 

27 from _typeshed.wsgi import WSGIEnvironment 

28 

29# for explanation of "media-range", etc. see Sections 5.3.{1,2} of RFC 7231 

30_accept_re = re.compile( 

31 r""" 

32 ( # media-range capturing-parenthesis 

33 [^\s;,]+ # type/subtype 

34 (?:[ \t]*;[ \t]* # ";" 

35 (?: # parameter non-capturing-parenthesis 

36 [^\s;,q][^\s;,]* # token that doesn't start with "q" 

37 | # or 

38 q[^\s;,=][^\s;,]* # token that is more than just "q" 

39 ) 

40 )* # zero or more parameters 

41 ) # end of media-range 

42 (?:[ \t]*;[ \t]*q= # weight is a "q" parameter 

43 (\d*(?:\.\d+)?) # qvalue capturing-parentheses 

44 [^,]* # "extension" accept params: who cares? 

45 )? # accept params are optional 

46 """, 

47 re.VERBOSE, 

48) 

49_token_chars = frozenset( 

50 "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~" 

51) 

52_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)') 

53_option_header_piece_re = re.compile( 

54 r""" 

55 ;\s*,?\s* # newlines were replaced with commas 

56 (?P<key> 

57 "[^"\\]*(?:\\.[^"\\]*)*" # quoted string 

58 | 

59 [^\s;,=*]+ # token 

60 ) 

61 (?:\*(?P<count>\d+))? # *1, optional continuation index 

62 \s* 

63 (?: # optionally followed by =value 

64 (?: # equals sign, possibly with encoding 

65 \*\s*=\s* # * indicates extended notation 

66 (?: # optional encoding 

67 (?P<encoding>[^\s]+?) 

68 '(?P<language>[^\s]*?)' 

69 )? 

70 | 

71 =\s* # basic notation 

72 ) 

73 (?P<value> 

74 "[^"\\]*(?:\\.[^"\\]*)*" # quoted string 

75 | 

76 [^;,]+ # token 

77 )? 

78 )? 

79 \s* 

80 """, 

81 flags=re.VERBOSE, 

82) 

83_option_header_start_mime_type = re.compile(r",\s*([^;,\s]+)([;,]\s*.+)?") 

84_entity_headers = frozenset( 

85 [ 

86 "allow", 

87 "content-encoding", 

88 "content-language", 

89 "content-length", 

90 "content-location", 

91 "content-md5", 

92 "content-range", 

93 "content-type", 

94 "expires", 

95 "last-modified", 

96 ] 

97) 

98_hop_by_hop_headers = frozenset( 

99 [ 

100 "connection", 

101 "keep-alive", 

102 "proxy-authenticate", 

103 "proxy-authorization", 

104 "te", 

105 "trailer", 

106 "transfer-encoding", 

107 "upgrade", 

108 ] 

109) 

110HTTP_STATUS_CODES = { 

111 100: "Continue", 

112 101: "Switching Protocols", 

113 102: "Processing", 

114 103: "Early Hints", # see RFC 8297 

115 200: "OK", 

116 201: "Created", 

117 202: "Accepted", 

118 203: "Non Authoritative Information", 

119 204: "No Content", 

120 205: "Reset Content", 

121 206: "Partial Content", 

122 207: "Multi Status", 

123 208: "Already Reported", # see RFC 5842 

124 226: "IM Used", # see RFC 3229 

125 300: "Multiple Choices", 

126 301: "Moved Permanently", 

127 302: "Found", 

128 303: "See Other", 

129 304: "Not Modified", 

130 305: "Use Proxy", 

131 306: "Switch Proxy", # unused 

132 307: "Temporary Redirect", 

133 308: "Permanent Redirect", 

134 400: "Bad Request", 

135 401: "Unauthorized", 

136 402: "Payment Required", # unused 

137 403: "Forbidden", 

138 404: "Not Found", 

139 405: "Method Not Allowed", 

140 406: "Not Acceptable", 

141 407: "Proxy Authentication Required", 

142 408: "Request Timeout", 

143 409: "Conflict", 

144 410: "Gone", 

145 411: "Length Required", 

146 412: "Precondition Failed", 

147 413: "Request Entity Too Large", 

148 414: "Request URI Too Long", 

149 415: "Unsupported Media Type", 

150 416: "Requested Range Not Satisfiable", 

151 417: "Expectation Failed", 

152 418: "I'm a teapot", # see RFC 2324 

153 421: "Misdirected Request", # see RFC 7540 

154 422: "Unprocessable Entity", 

155 423: "Locked", 

156 424: "Failed Dependency", 

157 425: "Too Early", # see RFC 8470 

158 426: "Upgrade Required", 

159 428: "Precondition Required", # see RFC 6585 

160 429: "Too Many Requests", 

161 431: "Request Header Fields Too Large", 

162 449: "Retry With", # proprietary MS extension 

163 451: "Unavailable For Legal Reasons", 

164 500: "Internal Server Error", 

165 501: "Not Implemented", 

166 502: "Bad Gateway", 

167 503: "Service Unavailable", 

168 504: "Gateway Timeout", 

169 505: "HTTP Version Not Supported", 

170 506: "Variant Also Negotiates", # see RFC 2295 

171 507: "Insufficient Storage", 

172 508: "Loop Detected", # see RFC 5842 

173 510: "Not Extended", 

174 511: "Network Authentication Failed", 

175} 

176 

177 

178class COEP(Enum): 

179 """Cross Origin Embedder Policies""" 

180 

181 UNSAFE_NONE = "unsafe-none" 

182 REQUIRE_CORP = "require-corp" 

183 

184 

185class COOP(Enum): 

186 """Cross Origin Opener Policies""" 

187 

188 UNSAFE_NONE = "unsafe-none" 

189 SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups" 

190 SAME_ORIGIN = "same-origin" 

191 

192 

193def quote_header_value( 

194 value: t.Union[str, int], extra_chars: str = "", allow_token: bool = True 

195) -> str: 

196 """Quote a header value if necessary. 

197 

198 .. versionadded:: 0.5 

199 

200 :param value: the value to quote. 

201 :param extra_chars: a list of extra characters to skip quoting. 

202 :param allow_token: if this is enabled token values are returned 

203 unchanged. 

204 """ 

205 if isinstance(value, bytes): 

206 value = value.decode("latin1") 

207 value = str(value) 

208 if allow_token: 

209 token_chars = _token_chars | set(extra_chars) 

210 if set(value).issubset(token_chars): 

211 return value 

212 value = value.replace("\\", "\\\\").replace('"', '\\"') 

213 return f'"{value}"' 

214 

215 

216def unquote_header_value(value: str, is_filename: bool = False) -> str: 

217 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`). 

218 This does not use the real unquoting but what browsers are actually 

219 using for quoting. 

220 

221 .. versionadded:: 0.5 

222 

223 :param value: the header value to unquote. 

224 :param is_filename: The value represents a filename or path. 

225 """ 

226 if value and value[0] == value[-1] == '"': 

227 # this is not the real unquoting, but fixing this so that the 

228 # RFC is met will result in bugs with internet explorer and 

229 # probably some other browsers as well. IE for example is 

230 # uploading files with "C:\foo\bar.txt" as filename 

231 value = value[1:-1] 

232 

233 # if this is a filename and the starting characters look like 

234 # a UNC path, then just return the value without quotes. Using the 

235 # replace sequence below on a UNC path has the effect of turning 

236 # the leading double slash into a single slash and then 

237 # _fix_ie_filename() doesn't work correctly. See #458. 

238 if not is_filename or value[:2] != "\\\\": 

239 return value.replace("\\\\", "\\").replace('\\"', '"') 

240 return value 

241 

242 

243def dump_options_header( 

244 header: t.Optional[str], options: t.Mapping[str, t.Optional[t.Union[str, int]]] 

245) -> str: 

246 """The reverse function to :func:`parse_options_header`. 

247 

248 :param header: the header to dump 

249 :param options: a dict of options to append. 

250 """ 

251 segments = [] 

252 if header is not None: 

253 segments.append(header) 

254 for key, value in options.items(): 

255 if value is None: 

256 segments.append(key) 

257 else: 

258 segments.append(f"{key}={quote_header_value(value)}") 

259 return "; ".join(segments) 

260 

261 

262def dump_header( 

263 iterable: t.Union[t.Dict[str, t.Union[str, int]], t.Iterable[str]], 

264 allow_token: bool = True, 

265) -> str: 

266 """Dump an HTTP header again. This is the reversal of 

267 :func:`parse_list_header`, :func:`parse_set_header` and 

268 :func:`parse_dict_header`. This also quotes strings that include an 

269 equals sign unless you pass it as dict of key, value pairs. 

270 

271 >>> dump_header({'foo': 'bar baz'}) 

272 'foo="bar baz"' 

273 >>> dump_header(('foo', 'bar baz')) 

274 'foo, "bar baz"' 

275 

276 :param iterable: the iterable or dict of values to quote. 

277 :param allow_token: if set to `False` tokens as values are disallowed. 

278 See :func:`quote_header_value` for more details. 

279 """ 

280 if isinstance(iterable, dict): 

281 items = [] 

282 for key, value in iterable.items(): 

283 if value is None: 

284 items.append(key) 

285 else: 

286 items.append( 

287 f"{key}={quote_header_value(value, allow_token=allow_token)}" 

288 ) 

289 else: 

290 items = [quote_header_value(x, allow_token=allow_token) for x in iterable] 

291 return ", ".join(items) 

292 

293 

294def dump_csp_header(header: "ds.ContentSecurityPolicy") -> str: 

295 """Dump a Content Security Policy header. 

296 

297 These are structured into policies such as "default-src 'self'; 

298 script-src 'self'". 

299 

300 .. versionadded:: 1.0.0 

301 Support for Content Security Policy headers was added. 

302 

303 """ 

304 return "; ".join(f"{key} {value}" for key, value in header.items()) 

305 

306 

307def parse_list_header(value: str) -> t.List[str]: 

308 """Parse lists as described by RFC 2068 Section 2. 

309 

310 In particular, parse comma-separated lists where the elements of 

311 the list may include quoted-strings. A quoted-string could 

312 contain a comma. A non-quoted string could have quotes in the 

313 middle. Quotes are removed automatically after parsing. 

314 

315 It basically works like :func:`parse_set_header` just that items 

316 may appear multiple times and case sensitivity is preserved. 

317 

318 The return value is a standard :class:`list`: 

319 

320 >>> parse_list_header('token, "quoted value"') 

321 ['token', 'quoted value'] 

322 

323 To create a header from the :class:`list` again, use the 

324 :func:`dump_header` function. 

325 

326 :param value: a string with a list header. 

327 :return: :class:`list` 

328 """ 

329 result = [] 

330 for item in _parse_list_header(value): 

331 if item[:1] == item[-1:] == '"': 

332 item = unquote_header_value(item[1:-1]) 

333 result.append(item) 

334 return result 

335 

336 

337def parse_dict_header(value: str, cls: t.Type[dict] = dict) -> t.Dict[str, str]: 

338 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and 

339 convert them into a python dict (or any other mapping object created from 

340 the type with a dict like interface provided by the `cls` argument): 

341 

342 >>> d = parse_dict_header('foo="is a fish", bar="as well"') 

343 >>> type(d) is dict 

344 True 

345 >>> sorted(d.items()) 

346 [('bar', 'as well'), ('foo', 'is a fish')] 

347 

348 If there is no value for a key it will be `None`: 

349 

350 >>> parse_dict_header('key_without_value') 

351 {'key_without_value': None} 

352 

353 To create a header from the :class:`dict` again, use the 

354 :func:`dump_header` function. 

355 

356 .. versionchanged:: 0.9 

357 Added support for `cls` argument. 

358 

359 :param value: a string with a dict header. 

360 :param cls: callable to use for storage of parsed results. 

361 :return: an instance of `cls` 

362 """ 

363 result = cls() 

364 if isinstance(value, bytes): 

365 value = value.decode("latin1") 

366 for item in _parse_list_header(value): 

367 if "=" not in item: 

368 result[item] = None 

369 continue 

370 name, value = item.split("=", 1) 

371 if value[:1] == value[-1:] == '"': 

372 value = unquote_header_value(value[1:-1]) 

373 result[name] = value 

374 return result 

375 

376 

377def parse_options_header(value: t.Optional[str]) -> t.Tuple[str, t.Dict[str, str]]: 

378 """Parse a ``Content-Type``-like header into a tuple with the 

379 value and any options: 

380 

381 >>> parse_options_header('text/html; charset=utf8') 

382 ('text/html', {'charset': 'utf8'}) 

383 

384 This should is not for ``Cache-Control``-like headers, which use a 

385 different format. For those, use :func:`parse_dict_header`. 

386 

387 :param value: The header value to parse. 

388 

389 .. versionchanged:: 2.2 

390 Option names are always converted to lowercase. 

391 

392 .. versionchanged:: 2.1 

393 The ``multiple`` parameter is deprecated and will be removed in 

394 Werkzeug 2.2. 

395 

396 .. versionchanged:: 0.15 

397 :rfc:`2231` parameter continuations are handled. 

398 

399 .. versionadded:: 0.5 

400 """ 

401 if not value: 

402 return "", {} 

403 

404 result: t.List[t.Any] = [] 

405 

406 value = "," + value.replace("\n", ",") 

407 while value: 

408 match = _option_header_start_mime_type.match(value) 

409 if not match: 

410 break 

411 result.append(match.group(1)) # mimetype 

412 options: t.Dict[str, str] = {} 

413 # Parse options 

414 rest = match.group(2) 

415 encoding: t.Optional[str] 

416 continued_encoding: t.Optional[str] = None 

417 while rest: 

418 optmatch = _option_header_piece_re.match(rest) 

419 if not optmatch: 

420 break 

421 option, count, encoding, language, option_value = optmatch.groups() 

422 # Continuations don't have to supply the encoding after the 

423 # first line. If we're in a continuation, track the current 

424 # encoding to use for subsequent lines. Reset it when the 

425 # continuation ends. 

426 if not count: 

427 continued_encoding = None 

428 else: 

429 if not encoding: 

430 encoding = continued_encoding 

431 continued_encoding = encoding 

432 option = unquote_header_value(option).lower() 

433 

434 if option_value is not None: 

435 option_value = unquote_header_value(option_value, option == "filename") 

436 

437 if encoding is not None: 

438 option_value = _unquote(option_value).decode(encoding) 

439 

440 if count: 

441 # Continuations append to the existing value. For 

442 # simplicity, this ignores the possibility of 

443 # out-of-order indices, which shouldn't happen anyway. 

444 if option_value is not None: 

445 options[option] = options.get(option, "") + option_value 

446 else: 

447 options[option] = option_value # type: ignore[assignment] 

448 

449 rest = rest[optmatch.end() :] 

450 result.append(options) 

451 return tuple(result) # type: ignore[return-value] 

452 

453 return tuple(result) if result else ("", {}) # type: ignore[return-value] 

454 

455 

456_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept") 

457 

458 

459@typing.overload 

460def parse_accept_header(value: t.Optional[str]) -> "ds.Accept": 

461 ... 

462 

463 

464@typing.overload 

465def parse_accept_header( 

466 value: t.Optional[str], cls: t.Type[_TAnyAccept] 

467) -> _TAnyAccept: 

468 ... 

469 

470 

471def parse_accept_header( 

472 value: t.Optional[str], cls: t.Optional[t.Type[_TAnyAccept]] = None 

473) -> _TAnyAccept: 

474 """Parses an HTTP Accept-* header. This does not implement a complete 

475 valid algorithm but one that supports at least value and quality 

476 extraction. 

477 

478 Returns a new :class:`Accept` object (basically a list of ``(value, quality)`` 

479 tuples sorted by the quality with some additional accessor methods). 

480 

481 The second parameter can be a subclass of :class:`Accept` that is created 

482 with the parsed values and returned. 

483 

484 :param value: the accept header string to be parsed. 

485 :param cls: the wrapper class for the return value (can be 

486 :class:`Accept` or a subclass thereof) 

487 :return: an instance of `cls`. 

488 """ 

489 if cls is None: 

490 cls = t.cast(t.Type[_TAnyAccept], ds.Accept) 

491 

492 if not value: 

493 return cls(None) 

494 

495 result = [] 

496 for match in _accept_re.finditer(value): 

497 quality_match = match.group(2) 

498 if not quality_match: 

499 quality: float = 1 

500 else: 

501 quality = max(min(float(quality_match), 1), 0) 

502 result.append((match.group(1), quality)) 

503 return cls(result) 

504 

505 

506_TAnyCC = t.TypeVar("_TAnyCC", bound="ds._CacheControl") 

507_t_cc_update = t.Optional[t.Callable[[_TAnyCC], None]] 

508 

509 

510@typing.overload 

511def parse_cache_control_header( 

512 value: t.Optional[str], on_update: _t_cc_update, cls: None = None 

513) -> "ds.RequestCacheControl": 

514 ... 

515 

516 

517@typing.overload 

518def parse_cache_control_header( 

519 value: t.Optional[str], on_update: _t_cc_update, cls: t.Type[_TAnyCC] 

520) -> _TAnyCC: 

521 ... 

522 

523 

524def parse_cache_control_header( 

525 value: t.Optional[str], 

526 on_update: _t_cc_update = None, 

527 cls: t.Optional[t.Type[_TAnyCC]] = None, 

528) -> _TAnyCC: 

529 """Parse a cache control header. The RFC differs between response and 

530 request cache control, this method does not. It's your responsibility 

531 to not use the wrong control statements. 

532 

533 .. versionadded:: 0.5 

534 The `cls` was added. If not specified an immutable 

535 :class:`~werkzeug.datastructures.RequestCacheControl` is returned. 

536 

537 :param value: a cache control header to be parsed. 

538 :param on_update: an optional callable that is called every time a value 

539 on the :class:`~werkzeug.datastructures.CacheControl` 

540 object is changed. 

541 :param cls: the class for the returned object. By default 

542 :class:`~werkzeug.datastructures.RequestCacheControl` is used. 

543 :return: a `cls` object. 

544 """ 

545 if cls is None: 

546 cls = t.cast(t.Type[_TAnyCC], ds.RequestCacheControl) 

547 

548 if not value: 

549 return cls((), on_update) 

550 

551 return cls(parse_dict_header(value), on_update) 

552 

553 

554_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy") 

555_t_csp_update = t.Optional[t.Callable[[_TAnyCSP], None]] 

556 

557 

558@typing.overload 

559def parse_csp_header( 

560 value: t.Optional[str], on_update: _t_csp_update, cls: None = None 

561) -> "ds.ContentSecurityPolicy": 

562 ... 

563 

564 

565@typing.overload 

566def parse_csp_header( 

567 value: t.Optional[str], on_update: _t_csp_update, cls: t.Type[_TAnyCSP] 

568) -> _TAnyCSP: 

569 ... 

570 

571 

572def parse_csp_header( 

573 value: t.Optional[str], 

574 on_update: _t_csp_update = None, 

575 cls: t.Optional[t.Type[_TAnyCSP]] = None, 

576) -> _TAnyCSP: 

577 """Parse a Content Security Policy header. 

578 

579 .. versionadded:: 1.0.0 

580 Support for Content Security Policy headers was added. 

581 

582 :param value: a csp header to be parsed. 

583 :param on_update: an optional callable that is called every time a value 

584 on the object is changed. 

585 :param cls: the class for the returned object. By default 

586 :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used. 

587 :return: a `cls` object. 

588 """ 

589 if cls is None: 

590 cls = t.cast(t.Type[_TAnyCSP], ds.ContentSecurityPolicy) 

591 

592 if value is None: 

593 return cls((), on_update) 

594 

595 items = [] 

596 

597 for policy in value.split(";"): 

598 policy = policy.strip() 

599 

600 # Ignore badly formatted policies (no space) 

601 if " " in policy: 

602 directive, value = policy.strip().split(" ", 1) 

603 items.append((directive.strip(), value.strip())) 

604 

605 return cls(items, on_update) 

606 

607 

608def parse_set_header( 

609 value: t.Optional[str], 

610 on_update: t.Optional[t.Callable[["ds.HeaderSet"], None]] = None, 

611) -> "ds.HeaderSet": 

612 """Parse a set-like header and return a 

613 :class:`~werkzeug.datastructures.HeaderSet` object: 

614 

615 >>> hs = parse_set_header('token, "quoted value"') 

616 

617 The return value is an object that treats the items case-insensitively 

618 and keeps the order of the items: 

619 

620 >>> 'TOKEN' in hs 

621 True 

622 >>> hs.index('quoted value') 

623 1 

624 >>> hs 

625 HeaderSet(['token', 'quoted value']) 

626 

627 To create a header from the :class:`HeaderSet` again, use the 

628 :func:`dump_header` function. 

629 

630 :param value: a set header to be parsed. 

631 :param on_update: an optional callable that is called every time a 

632 value on the :class:`~werkzeug.datastructures.HeaderSet` 

633 object is changed. 

634 :return: a :class:`~werkzeug.datastructures.HeaderSet` 

635 """ 

636 if not value: 

637 return ds.HeaderSet(None, on_update) 

638 return ds.HeaderSet(parse_list_header(value), on_update) 

639 

640 

641def parse_authorization_header( 

642 value: t.Optional[str], 

643) -> t.Optional["ds.Authorization"]: 

644 """Parse an HTTP basic/digest authorization header transmitted by the web 

645 browser. The return value is either `None` if the header was invalid or 

646 not given, otherwise an :class:`~werkzeug.datastructures.Authorization` 

647 object. 

648 

649 :param value: the authorization header to parse. 

650 :return: a :class:`~werkzeug.datastructures.Authorization` object or `None`. 

651 """ 

652 if not value: 

653 return None 

654 value = _wsgi_decoding_dance(value) 

655 try: 

656 auth_type, auth_info = value.split(None, 1) 

657 auth_type = auth_type.lower() 

658 except ValueError: 

659 return None 

660 if auth_type == "basic": 

661 try: 

662 username, password = base64.b64decode(auth_info).split(b":", 1) 

663 except Exception: 

664 return None 

665 try: 

666 return ds.Authorization( 

667 "basic", 

668 { 

669 "username": _to_str(username, "utf-8"), 

670 "password": _to_str(password, "utf-8"), 

671 }, 

672 ) 

673 except UnicodeDecodeError: 

674 return None 

675 elif auth_type == "digest": 

676 auth_map = parse_dict_header(auth_info) 

677 for key in "username", "realm", "nonce", "uri", "response": 

678 if key not in auth_map: 

679 return None 

680 if "qop" in auth_map: 

681 if not auth_map.get("nc") or not auth_map.get("cnonce"): 

682 return None 

683 return ds.Authorization("digest", auth_map) 

684 return None 

685 

686 

687def parse_www_authenticate_header( 

688 value: t.Optional[str], 

689 on_update: t.Optional[t.Callable[["ds.WWWAuthenticate"], None]] = None, 

690) -> "ds.WWWAuthenticate": 

691 """Parse an HTTP WWW-Authenticate header into a 

692 :class:`~werkzeug.datastructures.WWWAuthenticate` object. 

693 

694 :param value: a WWW-Authenticate header to parse. 

695 :param on_update: an optional callable that is called every time a value 

696 on the :class:`~werkzeug.datastructures.WWWAuthenticate` 

697 object is changed. 

698 :return: a :class:`~werkzeug.datastructures.WWWAuthenticate` object. 

699 """ 

700 if not value: 

701 return ds.WWWAuthenticate(on_update=on_update) 

702 try: 

703 auth_type, auth_info = value.split(None, 1) 

704 auth_type = auth_type.lower() 

705 except (ValueError, AttributeError): 

706 return ds.WWWAuthenticate(value.strip().lower(), on_update=on_update) 

707 return ds.WWWAuthenticate(auth_type, parse_dict_header(auth_info), on_update) 

708 

709 

710def parse_if_range_header(value: t.Optional[str]) -> "ds.IfRange": 

711 """Parses an if-range header which can be an etag or a date. Returns 

712 a :class:`~werkzeug.datastructures.IfRange` object. 

713 

714 .. versionchanged:: 2.0 

715 If the value represents a datetime, it is timezone-aware. 

716 

717 .. versionadded:: 0.7 

718 """ 

719 if not value: 

720 return ds.IfRange() 

721 date = parse_date(value) 

722 if date is not None: 

723 return ds.IfRange(date=date) 

724 # drop weakness information 

725 return ds.IfRange(unquote_etag(value)[0]) 

726 

727 

728def parse_range_header( 

729 value: t.Optional[str], make_inclusive: bool = True 

730) -> t.Optional["ds.Range"]: 

731 """Parses a range header into a :class:`~werkzeug.datastructures.Range` 

732 object. If the header is missing or malformed `None` is returned. 

733 `ranges` is a list of ``(start, stop)`` tuples where the ranges are 

734 non-inclusive. 

735 

736 .. versionadded:: 0.7 

737 """ 

738 if not value or "=" not in value: 

739 return None 

740 

741 ranges = [] 

742 last_end = 0 

743 units, rng = value.split("=", 1) 

744 units = units.strip().lower() 

745 

746 for item in rng.split(","): 

747 item = item.strip() 

748 if "-" not in item: 

749 return None 

750 if item.startswith("-"): 

751 if last_end < 0: 

752 return None 

753 try: 

754 begin = int(item) 

755 except ValueError: 

756 return None 

757 end = None 

758 last_end = -1 

759 elif "-" in item: 

760 begin_str, end_str = item.split("-", 1) 

761 begin_str = begin_str.strip() 

762 end_str = end_str.strip() 

763 

764 try: 

765 begin = int(begin_str) 

766 except ValueError: 

767 return None 

768 

769 if begin < last_end or last_end < 0: 

770 return None 

771 if end_str: 

772 try: 

773 end = int(end_str) + 1 

774 except ValueError: 

775 return None 

776 

777 if begin >= end: 

778 return None 

779 else: 

780 end = None 

781 last_end = end if end is not None else -1 

782 ranges.append((begin, end)) 

783 

784 return ds.Range(units, ranges) 

785 

786 

787def parse_content_range_header( 

788 value: t.Optional[str], 

789 on_update: t.Optional[t.Callable[["ds.ContentRange"], None]] = None, 

790) -> t.Optional["ds.ContentRange"]: 

791 """Parses a range header into a 

792 :class:`~werkzeug.datastructures.ContentRange` object or `None` if 

793 parsing is not possible. 

794 

795 .. versionadded:: 0.7 

796 

797 :param value: a content range header to be parsed. 

798 :param on_update: an optional callable that is called every time a value 

799 on the :class:`~werkzeug.datastructures.ContentRange` 

800 object is changed. 

801 """ 

802 if value is None: 

803 return None 

804 try: 

805 units, rangedef = (value or "").strip().split(None, 1) 

806 except ValueError: 

807 return None 

808 

809 if "/" not in rangedef: 

810 return None 

811 rng, length_str = rangedef.split("/", 1) 

812 if length_str == "*": 

813 length = None 

814 else: 

815 try: 

816 length = int(length_str) 

817 except ValueError: 

818 return None 

819 

820 if rng == "*": 

821 return ds.ContentRange(units, None, None, length, on_update=on_update) 

822 elif "-" not in rng: 

823 return None 

824 

825 start_str, stop_str = rng.split("-", 1) 

826 try: 

827 start = int(start_str) 

828 stop = int(stop_str) + 1 

829 except ValueError: 

830 return None 

831 

832 if is_byte_range_valid(start, stop, length): 

833 return ds.ContentRange(units, start, stop, length, on_update=on_update) 

834 

835 return None 

836 

837 

838def quote_etag(etag: str, weak: bool = False) -> str: 

839 """Quote an etag. 

840 

841 :param etag: the etag to quote. 

842 :param weak: set to `True` to tag it "weak". 

843 """ 

844 if '"' in etag: 

845 raise ValueError("invalid etag") 

846 etag = f'"{etag}"' 

847 if weak: 

848 etag = f"W/{etag}" 

849 return etag 

850 

851 

852def unquote_etag( 

853 etag: t.Optional[str], 

854) -> t.Union[t.Tuple[str, bool], t.Tuple[None, None]]: 

855 """Unquote a single etag: 

856 

857 >>> unquote_etag('W/"bar"') 

858 ('bar', True) 

859 >>> unquote_etag('"bar"') 

860 ('bar', False) 

861 

862 :param etag: the etag identifier to unquote. 

863 :return: a ``(etag, weak)`` tuple. 

864 """ 

865 if not etag: 

866 return None, None 

867 etag = etag.strip() 

868 weak = False 

869 if etag.startswith(("W/", "w/")): 

870 weak = True 

871 etag = etag[2:] 

872 if etag[:1] == etag[-1:] == '"': 

873 etag = etag[1:-1] 

874 return etag, weak 

875 

876 

877def parse_etags(value: t.Optional[str]) -> "ds.ETags": 

878 """Parse an etag header. 

879 

880 :param value: the tag header to parse 

881 :return: an :class:`~werkzeug.datastructures.ETags` object. 

882 """ 

883 if not value: 

884 return ds.ETags() 

885 strong = [] 

886 weak = [] 

887 end = len(value) 

888 pos = 0 

889 while pos < end: 

890 match = _etag_re.match(value, pos) 

891 if match is None: 

892 break 

893 is_weak, quoted, raw = match.groups() 

894 if raw == "*": 

895 return ds.ETags(star_tag=True) 

896 elif quoted: 

897 raw = quoted 

898 if is_weak: 

899 weak.append(raw) 

900 else: 

901 strong.append(raw) 

902 pos = match.end() 

903 return ds.ETags(strong, weak) 

904 

905 

906def generate_etag(data: bytes) -> str: 

907 """Generate an etag for some data. 

908 

909 .. versionchanged:: 2.0 

910 Use SHA-1. MD5 may not be available in some environments. 

911 """ 

912 return sha1(data).hexdigest() 

913 

914 

915def parse_date(value: t.Optional[str]) -> t.Optional[datetime]: 

916 """Parse an :rfc:`2822` date into a timezone-aware 

917 :class:`datetime.datetime` object, or ``None`` if parsing fails. 

918 

919 This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It 

920 returns ``None`` if parsing fails instead of raising an exception, 

921 and always returns a timezone-aware datetime object. If the string 

922 doesn't have timezone information, it is assumed to be UTC. 

923 

924 :param value: A string with a supported date format. 

925 

926 .. versionchanged:: 2.0 

927 Return a timezone-aware datetime object. Use 

928 ``email.utils.parsedate_to_datetime``. 

929 """ 

930 if value is None: 

931 return None 

932 

933 try: 

934 dt = email.utils.parsedate_to_datetime(value) 

935 except (TypeError, ValueError): 

936 return None 

937 

938 if dt.tzinfo is None: 

939 return dt.replace(tzinfo=timezone.utc) 

940 

941 return dt 

942 

943 

944def http_date( 

945 timestamp: t.Optional[t.Union[datetime, date, int, float, struct_time]] = None 

946) -> str: 

947 """Format a datetime object or timestamp into an :rfc:`2822` date 

948 string. 

949 

950 This is a wrapper for :func:`email.utils.format_datetime`. It 

951 assumes naive datetime objects are in UTC instead of raising an 

952 exception. 

953 

954 :param timestamp: The datetime or timestamp to format. Defaults to 

955 the current time. 

956 

957 .. versionchanged:: 2.0 

958 Use ``email.utils.format_datetime``. Accept ``date`` objects. 

959 """ 

960 if isinstance(timestamp, date): 

961 if not isinstance(timestamp, datetime): 

962 # Assume plain date is midnight UTC. 

963 timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc) 

964 else: 

965 # Ensure datetime is timezone-aware. 

966 timestamp = _dt_as_utc(timestamp) 

967 

968 return email.utils.format_datetime(timestamp, usegmt=True) 

969 

970 if isinstance(timestamp, struct_time): 

971 timestamp = mktime(timestamp) 

972 

973 return email.utils.formatdate(timestamp, usegmt=True) 

974 

975 

976def parse_age(value: t.Optional[str] = None) -> t.Optional[timedelta]: 

977 """Parses a base-10 integer count of seconds into a timedelta. 

978 

979 If parsing fails, the return value is `None`. 

980 

981 :param value: a string consisting of an integer represented in base-10 

982 :return: a :class:`datetime.timedelta` object or `None`. 

983 """ 

984 if not value: 

985 return None 

986 try: 

987 seconds = int(value) 

988 except ValueError: 

989 return None 

990 if seconds < 0: 

991 return None 

992 try: 

993 return timedelta(seconds=seconds) 

994 except OverflowError: 

995 return None 

996 

997 

998def dump_age(age: t.Optional[t.Union[timedelta, int]] = None) -> t.Optional[str]: 

999 """Formats the duration as a base-10 integer. 

1000 

1001 :param age: should be an integer number of seconds, 

1002 a :class:`datetime.timedelta` object, or, 

1003 if the age is unknown, `None` (default). 

1004 """ 

1005 if age is None: 

1006 return None 

1007 if isinstance(age, timedelta): 

1008 age = int(age.total_seconds()) 

1009 else: 

1010 age = int(age) 

1011 

1012 if age < 0: 

1013 raise ValueError("age cannot be negative") 

1014 

1015 return str(age) 

1016 

1017 

1018def is_resource_modified( 

1019 environ: "WSGIEnvironment", 

1020 etag: t.Optional[str] = None, 

1021 data: t.Optional[bytes] = None, 

1022 last_modified: t.Optional[t.Union[datetime, str]] = None, 

1023 ignore_if_range: bool = True, 

1024) -> bool: 

1025 """Convenience method for conditional requests. 

1026 

1027 :param environ: the WSGI environment of the request to be checked. 

1028 :param etag: the etag for the response for comparison. 

1029 :param data: or alternatively the data of the response to automatically 

1030 generate an etag using :func:`generate_etag`. 

1031 :param last_modified: an optional date of the last modification. 

1032 :param ignore_if_range: If `False`, `If-Range` header will be taken into 

1033 account. 

1034 :return: `True` if the resource was modified, otherwise `False`. 

1035 

1036 .. versionchanged:: 2.0 

1037 SHA-1 is used to generate an etag value for the data. MD5 may 

1038 not be available in some environments. 

1039 

1040 .. versionchanged:: 1.0.0 

1041 The check is run for methods other than ``GET`` and ``HEAD``. 

1042 """ 

1043 return _sansio_http.is_resource_modified( 

1044 http_range=environ.get("HTTP_RANGE"), 

1045 http_if_range=environ.get("HTTP_IF_RANGE"), 

1046 http_if_modified_since=environ.get("HTTP_IF_MODIFIED_SINCE"), 

1047 http_if_none_match=environ.get("HTTP_IF_NONE_MATCH"), 

1048 http_if_match=environ.get("HTTP_IF_MATCH"), 

1049 etag=etag, 

1050 data=data, 

1051 last_modified=last_modified, 

1052 ignore_if_range=ignore_if_range, 

1053 ) 

1054 

1055 

1056def remove_entity_headers( 

1057 headers: t.Union["ds.Headers", t.List[t.Tuple[str, str]]], 

1058 allowed: t.Iterable[str] = ("expires", "content-location"), 

1059) -> None: 

1060 """Remove all entity headers from a list or :class:`Headers` object. This 

1061 operation works in-place. `Expires` and `Content-Location` headers are 

1062 by default not removed. The reason for this is :rfc:`2616` section 

1063 10.3.5 which specifies some entity headers that should be sent. 

1064 

1065 .. versionchanged:: 0.5 

1066 added `allowed` parameter. 

1067 

1068 :param headers: a list or :class:`Headers` object. 

1069 :param allowed: a list of headers that should still be allowed even though 

1070 they are entity headers. 

1071 """ 

1072 allowed = {x.lower() for x in allowed} 

1073 headers[:] = [ 

1074 (key, value) 

1075 for key, value in headers 

1076 if not is_entity_header(key) or key.lower() in allowed 

1077 ] 

1078 

1079 

1080def remove_hop_by_hop_headers( 

1081 headers: t.Union["ds.Headers", t.List[t.Tuple[str, str]]] 

1082) -> None: 

1083 """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or 

1084 :class:`Headers` object. This operation works in-place. 

1085 

1086 .. versionadded:: 0.5 

1087 

1088 :param headers: a list or :class:`Headers` object. 

1089 """ 

1090 headers[:] = [ 

1091 (key, value) for key, value in headers if not is_hop_by_hop_header(key) 

1092 ] 

1093 

1094 

1095def is_entity_header(header: str) -> bool: 

1096 """Check if a header is an entity header. 

1097 

1098 .. versionadded:: 0.5 

1099 

1100 :param header: the header to test. 

1101 :return: `True` if it's an entity header, `False` otherwise. 

1102 """ 

1103 return header.lower() in _entity_headers 

1104 

1105 

1106def is_hop_by_hop_header(header: str) -> bool: 

1107 """Check if a header is an HTTP/1.1 "Hop-by-Hop" header. 

1108 

1109 .. versionadded:: 0.5 

1110 

1111 :param header: the header to test. 

1112 :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise. 

1113 """ 

1114 return header.lower() in _hop_by_hop_headers 

1115 

1116 

1117def parse_cookie( 

1118 header: t.Union["WSGIEnvironment", str, bytes, None], 

1119 charset: str = "utf-8", 

1120 errors: str = "replace", 

1121 cls: t.Optional[t.Type["ds.MultiDict"]] = None, 

1122) -> "ds.MultiDict[str, str]": 

1123 """Parse a cookie from a string or WSGI environ. 

1124 

1125 The same key can be provided multiple times, the values are stored 

1126 in-order. The default :class:`MultiDict` will have the first value 

1127 first, and all values can be retrieved with 

1128 :meth:`MultiDict.getlist`. 

1129 

1130 :param header: The cookie header as a string, or a WSGI environ dict 

1131 with a ``HTTP_COOKIE`` key. 

1132 :param charset: The charset for the cookie values. 

1133 :param errors: The error behavior for the charset decoding. 

1134 :param cls: A dict-like class to store the parsed cookies in. 

1135 Defaults to :class:`MultiDict`. 

1136 

1137 .. versionchanged:: 1.0.0 

1138 Returns a :class:`MultiDict` instead of a 

1139 ``TypeConversionDict``. 

1140 

1141 .. versionchanged:: 0.5 

1142 Returns a :class:`TypeConversionDict` instead of a regular dict. 

1143 The ``cls`` parameter was added. 

1144 """ 

1145 if isinstance(header, dict): 

1146 cookie = header.get("HTTP_COOKIE", "") 

1147 elif header is None: 

1148 cookie = "" 

1149 else: 

1150 cookie = header 

1151 

1152 return _sansio_http.parse_cookie( 

1153 cookie=cookie, charset=charset, errors=errors, cls=cls 

1154 ) 

1155 

1156 

1157def dump_cookie( 

1158 key: str, 

1159 value: t.Union[bytes, str] = "", 

1160 max_age: t.Optional[t.Union[timedelta, int]] = None, 

1161 expires: t.Optional[t.Union[str, datetime, int, float]] = None, 

1162 path: t.Optional[str] = "/", 

1163 domain: t.Optional[str] = None, 

1164 secure: bool = False, 

1165 httponly: bool = False, 

1166 charset: str = "utf-8", 

1167 sync_expires: bool = True, 

1168 max_size: int = 4093, 

1169 samesite: t.Optional[str] = None, 

1170) -> str: 

1171 """Create a Set-Cookie header without the ``Set-Cookie`` prefix. 

1172 

1173 The return value is usually restricted to ascii as the vast majority 

1174 of values are properly escaped, but that is no guarantee. It's 

1175 tunneled through latin1 as required by :pep:`3333`. 

1176 

1177 The return value is not ASCII safe if the key contains unicode 

1178 characters. This is technically against the specification but 

1179 happens in the wild. It's strongly recommended to not use 

1180 non-ASCII values for the keys. 

1181 

1182 :param max_age: should be a number of seconds, or `None` (default) if 

1183 the cookie should last only as long as the client's 

1184 browser session. Additionally `timedelta` objects 

1185 are accepted, too. 

1186 :param expires: should be a `datetime` object or unix timestamp. 

1187 :param path: limits the cookie to a given path, per default it will 

1188 span the whole domain. 

1189 :param domain: Use this if you want to set a cross-domain cookie. For 

1190 example, ``domain=".example.com"`` will set a cookie 

1191 that is readable by the domain ``www.example.com``, 

1192 ``foo.example.com`` etc. Otherwise, a cookie will only 

1193 be readable by the domain that set it. 

1194 :param secure: The cookie will only be available via HTTPS 

1195 :param httponly: disallow JavaScript to access the cookie. This is an 

1196 extension to the cookie standard and probably not 

1197 supported by all browsers. 

1198 :param charset: the encoding for string values. 

1199 :param sync_expires: automatically set expires if max_age is defined 

1200 but expires not. 

1201 :param max_size: Warn if the final header value exceeds this size. The 

1202 default, 4093, should be safely `supported by most browsers 

1203 <cookie_>`_. Set to 0 to disable this check. 

1204 :param samesite: Limits the scope of the cookie such that it will 

1205 only be attached to requests if those requests are same-site. 

1206 

1207 .. _`cookie`: http://browsercookielimits.squawky.net/ 

1208 

1209 .. versionchanged:: 1.0.0 

1210 The string ``'None'`` is accepted for ``samesite``. 

1211 """ 

1212 key = _to_bytes(key, charset) 

1213 value = _to_bytes(value, charset) 

1214 

1215 if path is not None: 

1216 from .urls import iri_to_uri 

1217 

1218 path = iri_to_uri(path, charset) 

1219 

1220 domain = _make_cookie_domain(domain) 

1221 

1222 if isinstance(max_age, timedelta): 

1223 max_age = int(max_age.total_seconds()) 

1224 

1225 if expires is not None: 

1226 if not isinstance(expires, str): 

1227 expires = http_date(expires) 

1228 elif max_age is not None and sync_expires: 

1229 expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age) 

1230 

1231 if samesite is not None: 

1232 samesite = samesite.title() 

1233 

1234 if samesite not in {"Strict", "Lax", "None"}: 

1235 raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.") 

1236 

1237 buf = [key + b"=" + _cookie_quote(value)] 

1238 

1239 # XXX: In theory all of these parameters that are not marked with `None` 

1240 # should be quoted. Because stdlib did not quote it before I did not 

1241 # want to introduce quoting there now. 

1242 for k, v, q in ( 

1243 (b"Domain", domain, True), 

1244 (b"Expires", expires, False), 

1245 (b"Max-Age", max_age, False), 

1246 (b"Secure", secure, None), 

1247 (b"HttpOnly", httponly, None), 

1248 (b"Path", path, False), 

1249 (b"SameSite", samesite, False), 

1250 ): 

1251 if q is None: 

1252 if v: 

1253 buf.append(k) 

1254 continue 

1255 

1256 if v is None: 

1257 continue 

1258 

1259 tmp = bytearray(k) 

1260 if not isinstance(v, (bytes, bytearray)): 

1261 v = _to_bytes(str(v), charset) 

1262 if q: 

1263 v = _cookie_quote(v) 

1264 tmp += b"=" + v 

1265 buf.append(bytes(tmp)) 

1266 

1267 # The return value will be an incorrectly encoded latin1 header for 

1268 # consistency with the headers object. 

1269 rv = b"; ".join(buf) 

1270 rv = rv.decode("latin1") 

1271 

1272 # Warn if the final value of the cookie is larger than the limit. If the 

1273 # cookie is too large, then it may be silently ignored by the browser, 

1274 # which can be quite hard to debug. 

1275 cookie_size = len(rv) 

1276 

1277 if max_size and cookie_size > max_size: 

1278 value_size = len(value) 

1279 warnings.warn( 

1280 f"The {key.decode(charset)!r} cookie is too large: the value was" 

1281 f" {value_size} bytes but the" 

1282 f" header required {cookie_size - value_size} extra bytes. The final size" 

1283 f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may" 

1284 f" silently ignore cookies larger than this.", 

1285 stacklevel=2, 

1286 ) 

1287 

1288 return rv 

1289 

1290 

1291def is_byte_range_valid( 

1292 start: t.Optional[int], stop: t.Optional[int], length: t.Optional[int] 

1293) -> bool: 

1294 """Checks if a given byte content range is valid for the given length. 

1295 

1296 .. versionadded:: 0.7 

1297 """ 

1298 if (start is None) != (stop is None): 

1299 return False 

1300 elif start is None: 

1301 return length is None or length >= 0 

1302 elif length is None: 

1303 return 0 <= start < stop # type: ignore 

1304 elif start >= stop: # type: ignore 

1305 return False 

1306 return 0 <= start < length 

1307 

1308 

1309# circular dependencies 

1310from . import datastructures as ds 

1311from .sansio import http as _sansio_http