Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/http.py: 19%

497 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:03 +0000

1import base64 

2import email.utils 

3import re 

4import typing 

5import typing as t 

6import warnings 

7from datetime import date 

8from datetime import datetime 

9from datetime import time 

10from datetime import timedelta 

11from datetime import timezone 

12from enum import Enum 

13from hashlib import sha1 

14from time import mktime 

15from time import struct_time 

16from urllib.parse import unquote_to_bytes as _unquote 

17from urllib.request import parse_http_list as _parse_list_header 

18 

19from ._internal import _cookie_parse_impl 

20from ._internal import _cookie_quote 

21from ._internal import _make_cookie_domain 

22from ._internal import _to_bytes 

23from ._internal import _to_str 

24from ._internal import _wsgi_decoding_dance 

25from werkzeug._internal import _dt_as_utc 

26 

27if t.TYPE_CHECKING: 

28 import typing_extensions as te 

29 from _typeshed.wsgi import WSGIEnvironment 

30 

31# for explanation of "media-range", etc. see Sections 5.3.{1,2} of RFC 7231 

32_accept_re = re.compile( 

33 r""" 

34 ( # media-range capturing-parenthesis 

35 [^\s;,]+ # type/subtype 

36 (?:[ \t]*;[ \t]* # ";" 

37 (?: # parameter non-capturing-parenthesis 

38 [^\s;,q][^\s;,]* # token that doesn't start with "q" 

39 | # or 

40 q[^\s;,=][^\s;,]* # token that is more than just "q" 

41 ) 

42 )* # zero or more parameters 

43 ) # end of media-range 

44 (?:[ \t]*;[ \t]*q= # weight is a "q" parameter 

45 (\d*(?:\.\d+)?) # qvalue capturing-parentheses 

46 [^,]* # "extension" accept params: who cares? 

47 )? # accept params are optional 

48 """, 

49 re.VERBOSE, 

50) 

51_token_chars = frozenset( 

52 "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~" 

53) 

54_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)') 

55_option_header_piece_re = re.compile( 

56 r""" 

57 ;\s*,?\s* # newlines were replaced with commas 

58 (?P<key> 

59 "[^"\\]*(?:\\.[^"\\]*)*" # quoted string 

60 | 

61 [^\s;,=*]+ # token 

62 ) 

63 (?:\*(?P<count>\d+))? # *1, optional continuation index 

64 \s* 

65 (?: # optionally followed by =value 

66 (?: # equals sign, possibly with encoding 

67 \*\s*=\s* # * indicates extended notation 

68 (?: # optional encoding 

69 (?P<encoding>[^\s]+?) 

70 '(?P<language>[^\s]*?)' 

71 )? 

72 | 

73 =\s* # basic notation 

74 ) 

75 (?P<value> 

76 "[^"\\]*(?:\\.[^"\\]*)*" # quoted string 

77 | 

78 [^;,]+ # token 

79 )? 

80 )? 

81 \s* 

82 """, 

83 flags=re.VERBOSE, 

84) 

85_option_header_start_mime_type = re.compile(r",\s*([^;,\s]+)([;,]\s*.+)?") 

86_entity_headers = frozenset( 

87 [ 

88 "allow", 

89 "content-encoding", 

90 "content-language", 

91 "content-length", 

92 "content-location", 

93 "content-md5", 

94 "content-range", 

95 "content-type", 

96 "expires", 

97 "last-modified", 

98 ] 

99) 

100_hop_by_hop_headers = frozenset( 

101 [ 

102 "connection", 

103 "keep-alive", 

104 "proxy-authenticate", 

105 "proxy-authorization", 

106 "te", 

107 "trailer", 

108 "transfer-encoding", 

109 "upgrade", 

110 ] 

111) 

112HTTP_STATUS_CODES = { 

113 100: "Continue", 

114 101: "Switching Protocols", 

115 102: "Processing", 

116 103: "Early Hints", # see RFC 8297 

117 200: "OK", 

118 201: "Created", 

119 202: "Accepted", 

120 203: "Non Authoritative Information", 

121 204: "No Content", 

122 205: "Reset Content", 

123 206: "Partial Content", 

124 207: "Multi Status", 

125 208: "Already Reported", # see RFC 5842 

126 226: "IM Used", # see RFC 3229 

127 300: "Multiple Choices", 

128 301: "Moved Permanently", 

129 302: "Found", 

130 303: "See Other", 

131 304: "Not Modified", 

132 305: "Use Proxy", 

133 306: "Switch Proxy", # unused 

134 307: "Temporary Redirect", 

135 308: "Permanent Redirect", 

136 400: "Bad Request", 

137 401: "Unauthorized", 

138 402: "Payment Required", # unused 

139 403: "Forbidden", 

140 404: "Not Found", 

141 405: "Method Not Allowed", 

142 406: "Not Acceptable", 

143 407: "Proxy Authentication Required", 

144 408: "Request Timeout", 

145 409: "Conflict", 

146 410: "Gone", 

147 411: "Length Required", 

148 412: "Precondition Failed", 

149 413: "Request Entity Too Large", 

150 414: "Request URI Too Long", 

151 415: "Unsupported Media Type", 

152 416: "Requested Range Not Satisfiable", 

153 417: "Expectation Failed", 

154 418: "I'm a teapot", # see RFC 2324 

155 421: "Misdirected Request", # see RFC 7540 

156 422: "Unprocessable Entity", 

157 423: "Locked", 

158 424: "Failed Dependency", 

159 425: "Too Early", # see RFC 8470 

160 426: "Upgrade Required", 

161 428: "Precondition Required", # see RFC 6585 

162 429: "Too Many Requests", 

163 431: "Request Header Fields Too Large", 

164 449: "Retry With", # proprietary MS extension 

165 451: "Unavailable For Legal Reasons", 

166 500: "Internal Server Error", 

167 501: "Not Implemented", 

168 502: "Bad Gateway", 

169 503: "Service Unavailable", 

170 504: "Gateway Timeout", 

171 505: "HTTP Version Not Supported", 

172 506: "Variant Also Negotiates", # see RFC 2295 

173 507: "Insufficient Storage", 

174 508: "Loop Detected", # see RFC 5842 

175 510: "Not Extended", 

176 511: "Network Authentication Failed", 

177} 

178 

179 

180class COEP(Enum): 

181 """Cross Origin Embedder Policies""" 

182 

183 UNSAFE_NONE = "unsafe-none" 

184 REQUIRE_CORP = "require-corp" 

185 

186 

187class COOP(Enum): 

188 """Cross Origin Opener Policies""" 

189 

190 UNSAFE_NONE = "unsafe-none" 

191 SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups" 

192 SAME_ORIGIN = "same-origin" 

193 

194 

195def quote_header_value( 

196 value: t.Union[str, int], extra_chars: str = "", allow_token: bool = True 

197) -> str: 

198 """Quote a header value if necessary. 

199 

200 .. versionadded:: 0.5 

201 

202 :param value: the value to quote. 

203 :param extra_chars: a list of extra characters to skip quoting. 

204 :param allow_token: if this is enabled token values are returned 

205 unchanged. 

206 """ 

207 if isinstance(value, bytes): 

208 value = value.decode("latin1") 

209 value = str(value) 

210 if allow_token: 

211 token_chars = _token_chars | set(extra_chars) 

212 if set(value).issubset(token_chars): 

213 return value 

214 value = value.replace("\\", "\\\\").replace('"', '\\"') 

215 return f'"{value}"' 

216 

217 

218def unquote_header_value(value: str, is_filename: bool = False) -> str: 

219 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`). 

220 This does not use the real unquoting but what browsers are actually 

221 using for quoting. 

222 

223 .. versionadded:: 0.5 

224 

225 :param value: the header value to unquote. 

226 :param is_filename: The value represents a filename or path. 

227 """ 

228 if value and value[0] == value[-1] == '"': 

229 # this is not the real unquoting, but fixing this so that the 

230 # RFC is met will result in bugs with internet explorer and 

231 # probably some other browsers as well. IE for example is 

232 # uploading files with "C:\foo\bar.txt" as filename 

233 value = value[1:-1] 

234 

235 # if this is a filename and the starting characters look like 

236 # a UNC path, then just return the value without quotes. Using the 

237 # replace sequence below on a UNC path has the effect of turning 

238 # the leading double slash into a single slash and then 

239 # _fix_ie_filename() doesn't work correctly. See #458. 

240 if not is_filename or value[:2] != "\\\\": 

241 return value.replace("\\\\", "\\").replace('\\"', '"') 

242 return value 

243 

244 

245def dump_options_header( 

246 header: t.Optional[str], options: t.Mapping[str, t.Optional[t.Union[str, int]]] 

247) -> str: 

248 """The reverse function to :func:`parse_options_header`. 

249 

250 :param header: the header to dump 

251 :param options: a dict of options to append. 

252 """ 

253 segments = [] 

254 if header is not None: 

255 segments.append(header) 

256 for key, value in options.items(): 

257 if value is None: 

258 segments.append(key) 

259 else: 

260 segments.append(f"{key}={quote_header_value(value)}") 

261 return "; ".join(segments) 

262 

263 

264def dump_header( 

265 iterable: t.Union[t.Dict[str, t.Union[str, int]], t.Iterable[str]], 

266 allow_token: bool = True, 

267) -> str: 

268 """Dump an HTTP header again. This is the reversal of 

269 :func:`parse_list_header`, :func:`parse_set_header` and 

270 :func:`parse_dict_header`. This also quotes strings that include an 

271 equals sign unless you pass it as dict of key, value pairs. 

272 

273 >>> dump_header({'foo': 'bar baz'}) 

274 'foo="bar baz"' 

275 >>> dump_header(('foo', 'bar baz')) 

276 'foo, "bar baz"' 

277 

278 :param iterable: the iterable or dict of values to quote. 

279 :param allow_token: if set to `False` tokens as values are disallowed. 

280 See :func:`quote_header_value` for more details. 

281 """ 

282 if isinstance(iterable, dict): 

283 items = [] 

284 for key, value in iterable.items(): 

285 if value is None: 

286 items.append(key) 

287 else: 

288 items.append( 

289 f"{key}={quote_header_value(value, allow_token=allow_token)}" 

290 ) 

291 else: 

292 items = [quote_header_value(x, allow_token=allow_token) for x in iterable] 

293 return ", ".join(items) 

294 

295 

296def dump_csp_header(header: "ds.ContentSecurityPolicy") -> str: 

297 """Dump a Content Security Policy header. 

298 

299 These are structured into policies such as "default-src 'self'; 

300 script-src 'self'". 

301 

302 .. versionadded:: 1.0.0 

303 Support for Content Security Policy headers was added. 

304 

305 """ 

306 return "; ".join(f"{key} {value}" for key, value in header.items()) 

307 

308 

309def parse_list_header(value: str) -> t.List[str]: 

310 """Parse lists as described by RFC 2068 Section 2. 

311 

312 In particular, parse comma-separated lists where the elements of 

313 the list may include quoted-strings. A quoted-string could 

314 contain a comma. A non-quoted string could have quotes in the 

315 middle. Quotes are removed automatically after parsing. 

316 

317 It basically works like :func:`parse_set_header` just that items 

318 may appear multiple times and case sensitivity is preserved. 

319 

320 The return value is a standard :class:`list`: 

321 

322 >>> parse_list_header('token, "quoted value"') 

323 ['token', 'quoted value'] 

324 

325 To create a header from the :class:`list` again, use the 

326 :func:`dump_header` function. 

327 

328 :param value: a string with a list header. 

329 :return: :class:`list` 

330 """ 

331 result = [] 

332 for item in _parse_list_header(value): 

333 if item[:1] == item[-1:] == '"': 

334 item = unquote_header_value(item[1:-1]) 

335 result.append(item) 

336 return result 

337 

338 

339def parse_dict_header(value: str, cls: t.Type[dict] = dict) -> t.Dict[str, str]: 

340 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and 

341 convert them into a python dict (or any other mapping object created from 

342 the type with a dict like interface provided by the `cls` argument): 

343 

344 >>> d = parse_dict_header('foo="is a fish", bar="as well"') 

345 >>> type(d) is dict 

346 True 

347 >>> sorted(d.items()) 

348 [('bar', 'as well'), ('foo', 'is a fish')] 

349 

350 If there is no value for a key it will be `None`: 

351 

352 >>> parse_dict_header('key_without_value') 

353 {'key_without_value': None} 

354 

355 To create a header from the :class:`dict` again, use the 

356 :func:`dump_header` function. 

357 

358 .. versionchanged:: 0.9 

359 Added support for `cls` argument. 

360 

361 :param value: a string with a dict header. 

362 :param cls: callable to use for storage of parsed results. 

363 :return: an instance of `cls` 

364 """ 

365 result = cls() 

366 if isinstance(value, bytes): 

367 value = value.decode("latin1") 

368 for item in _parse_list_header(value): 

369 if "=" not in item: 

370 result[item] = None 

371 continue 

372 name, value = item.split("=", 1) 

373 if value[:1] == value[-1:] == '"': 

374 value = unquote_header_value(value[1:-1]) 

375 result[name] = value 

376 return result 

377 

378 

379def parse_options_header( 

380 value: t.Optional[str], multiple: "te.Literal[None]" = None 

381) -> t.Tuple[str, t.Dict[str, str]]: 

382 """Parse a ``Content-Type``-like header into a tuple with the 

383 value and any options: 

384 

385 >>> parse_options_header('text/html; charset=utf8') 

386 ('text/html', {'charset': 'utf8'}) 

387 

388 This should is not for ``Cache-Control``-like headers, which use a 

389 different format. For those, use :func:`parse_dict_header`. 

390 

391 :param value: The header value to parse. 

392 

393 .. versionchanged:: 2.1 

394 The ``multiple`` parameter is deprecated and will be removed in 

395 Werkzeug 2.2. 

396 

397 .. versionchanged:: 0.15 

398 :rfc:`2231` parameter continuations are handled. 

399 

400 .. versionadded:: 0.5 

401 """ 

402 if multiple is not None: 

403 import warnings 

404 

405 warnings.warn( 

406 "The 'multiple' parameter of 'parse_options_header' is" 

407 " deprecated and will be removed in Werkzeug 2.2.", 

408 DeprecationWarning, 

409 stacklevel=2, 

410 ) 

411 

412 if not value: 

413 return "", {} 

414 

415 result: t.List[t.Any] = [] 

416 

417 value = "," + value.replace("\n", ",") 

418 while value: 

419 match = _option_header_start_mime_type.match(value) 

420 if not match: 

421 break 

422 result.append(match.group(1)) # mimetype 

423 options: t.Dict[str, str] = {} 

424 # Parse options 

425 rest = match.group(2) 

426 encoding: t.Optional[str] 

427 continued_encoding: t.Optional[str] = None 

428 while rest: 

429 optmatch = _option_header_piece_re.match(rest) 

430 if not optmatch: 

431 break 

432 option, count, encoding, language, option_value = optmatch.groups() 

433 # Continuations don't have to supply the encoding after the 

434 # first line. If we're in a continuation, track the current 

435 # encoding to use for subsequent lines. Reset it when the 

436 # continuation ends. 

437 if not count: 

438 continued_encoding = None 

439 else: 

440 if not encoding: 

441 encoding = continued_encoding 

442 continued_encoding = encoding 

443 option = unquote_header_value(option) 

444 

445 if option_value is not None: 

446 option_value = unquote_header_value(option_value, option == "filename") 

447 

448 if encoding is not None: 

449 option_value = _unquote(option_value).decode(encoding) 

450 

451 if count: 

452 # Continuations append to the existing value. For 

453 # simplicity, this ignores the possibility of 

454 # out-of-order indices, which shouldn't happen anyway. 

455 if option_value is not None: 

456 options[option] = options.get(option, "") + option_value 

457 else: 

458 options[option] = option_value # type: ignore[assignment] 

459 

460 rest = rest[optmatch.end() :] 

461 result.append(options) 

462 if not multiple: 

463 return tuple(result) # type: ignore[return-value] 

464 value = rest 

465 

466 return tuple(result) if result else ("", {}) # type: ignore[return-value] 

467 

468 

469_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept") 

470 

471 

472@typing.overload 

473def parse_accept_header(value: t.Optional[str]) -> "ds.Accept": 

474 ... 

475 

476 

477@typing.overload 

478def parse_accept_header( 

479 value: t.Optional[str], cls: t.Type[_TAnyAccept] 

480) -> _TAnyAccept: 

481 ... 

482 

483 

484def parse_accept_header( 

485 value: t.Optional[str], cls: t.Optional[t.Type[_TAnyAccept]] = None 

486) -> _TAnyAccept: 

487 """Parses an HTTP Accept-* header. This does not implement a complete 

488 valid algorithm but one that supports at least value and quality 

489 extraction. 

490 

491 Returns a new :class:`Accept` object (basically a list of ``(value, quality)`` 

492 tuples sorted by the quality with some additional accessor methods). 

493 

494 The second parameter can be a subclass of :class:`Accept` that is created 

495 with the parsed values and returned. 

496 

497 :param value: the accept header string to be parsed. 

498 :param cls: the wrapper class for the return value (can be 

499 :class:`Accept` or a subclass thereof) 

500 :return: an instance of `cls`. 

501 """ 

502 if cls is None: 

503 cls = t.cast(t.Type[_TAnyAccept], ds.Accept) 

504 

505 if not value: 

506 return cls(None) 

507 

508 result = [] 

509 for match in _accept_re.finditer(value): 

510 quality_match = match.group(2) 

511 if not quality_match: 

512 quality: float = 1 

513 else: 

514 quality = max(min(float(quality_match), 1), 0) 

515 result.append((match.group(1), quality)) 

516 return cls(result) 

517 

518 

519_TAnyCC = t.TypeVar("_TAnyCC", bound="ds._CacheControl") 

520_t_cc_update = t.Optional[t.Callable[[_TAnyCC], None]] 

521 

522 

523@typing.overload 

524def parse_cache_control_header( 

525 value: t.Optional[str], on_update: _t_cc_update, cls: None = None 

526) -> "ds.RequestCacheControl": 

527 ... 

528 

529 

530@typing.overload 

531def parse_cache_control_header( 

532 value: t.Optional[str], on_update: _t_cc_update, cls: t.Type[_TAnyCC] 

533) -> _TAnyCC: 

534 ... 

535 

536 

537def parse_cache_control_header( 

538 value: t.Optional[str], 

539 on_update: _t_cc_update = None, 

540 cls: t.Optional[t.Type[_TAnyCC]] = None, 

541) -> _TAnyCC: 

542 """Parse a cache control header. The RFC differs between response and 

543 request cache control, this method does not. It's your responsibility 

544 to not use the wrong control statements. 

545 

546 .. versionadded:: 0.5 

547 The `cls` was added. If not specified an immutable 

548 :class:`~werkzeug.datastructures.RequestCacheControl` is returned. 

549 

550 :param value: a cache control header to be parsed. 

551 :param on_update: an optional callable that is called every time a value 

552 on the :class:`~werkzeug.datastructures.CacheControl` 

553 object is changed. 

554 :param cls: the class for the returned object. By default 

555 :class:`~werkzeug.datastructures.RequestCacheControl` is used. 

556 :return: a `cls` object. 

557 """ 

558 if cls is None: 

559 cls = t.cast(t.Type[_TAnyCC], ds.RequestCacheControl) 

560 

561 if not value: 

562 return cls((), on_update) 

563 

564 return cls(parse_dict_header(value), on_update) 

565 

566 

567_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy") 

568_t_csp_update = t.Optional[t.Callable[[_TAnyCSP], None]] 

569 

570 

571@typing.overload 

572def parse_csp_header( 

573 value: t.Optional[str], on_update: _t_csp_update, cls: None = None 

574) -> "ds.ContentSecurityPolicy": 

575 ... 

576 

577 

578@typing.overload 

579def parse_csp_header( 

580 value: t.Optional[str], on_update: _t_csp_update, cls: t.Type[_TAnyCSP] 

581) -> _TAnyCSP: 

582 ... 

583 

584 

585def parse_csp_header( 

586 value: t.Optional[str], 

587 on_update: _t_csp_update = None, 

588 cls: t.Optional[t.Type[_TAnyCSP]] = None, 

589) -> _TAnyCSP: 

590 """Parse a Content Security Policy header. 

591 

592 .. versionadded:: 1.0.0 

593 Support for Content Security Policy headers was added. 

594 

595 :param value: a csp header to be parsed. 

596 :param on_update: an optional callable that is called every time a value 

597 on the object is changed. 

598 :param cls: the class for the returned object. By default 

599 :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used. 

600 :return: a `cls` object. 

601 """ 

602 if cls is None: 

603 cls = t.cast(t.Type[_TAnyCSP], ds.ContentSecurityPolicy) 

604 

605 if value is None: 

606 return cls((), on_update) 

607 

608 items = [] 

609 

610 for policy in value.split(";"): 

611 policy = policy.strip() 

612 

613 # Ignore badly formatted policies (no space) 

614 if " " in policy: 

615 directive, value = policy.strip().split(" ", 1) 

616 items.append((directive.strip(), value.strip())) 

617 

618 return cls(items, on_update) 

619 

620 

621def parse_set_header( 

622 value: t.Optional[str], 

623 on_update: t.Optional[t.Callable[["ds.HeaderSet"], None]] = None, 

624) -> "ds.HeaderSet": 

625 """Parse a set-like header and return a 

626 :class:`~werkzeug.datastructures.HeaderSet` object: 

627 

628 >>> hs = parse_set_header('token, "quoted value"') 

629 

630 The return value is an object that treats the items case-insensitively 

631 and keeps the order of the items: 

632 

633 >>> 'TOKEN' in hs 

634 True 

635 >>> hs.index('quoted value') 

636 1 

637 >>> hs 

638 HeaderSet(['token', 'quoted value']) 

639 

640 To create a header from the :class:`HeaderSet` again, use the 

641 :func:`dump_header` function. 

642 

643 :param value: a set header to be parsed. 

644 :param on_update: an optional callable that is called every time a 

645 value on the :class:`~werkzeug.datastructures.HeaderSet` 

646 object is changed. 

647 :return: a :class:`~werkzeug.datastructures.HeaderSet` 

648 """ 

649 if not value: 

650 return ds.HeaderSet(None, on_update) 

651 return ds.HeaderSet(parse_list_header(value), on_update) 

652 

653 

654def parse_authorization_header( 

655 value: t.Optional[str], 

656) -> t.Optional["ds.Authorization"]: 

657 """Parse an HTTP basic/digest authorization header transmitted by the web 

658 browser. The return value is either `None` if the header was invalid or 

659 not given, otherwise an :class:`~werkzeug.datastructures.Authorization` 

660 object. 

661 

662 :param value: the authorization header to parse. 

663 :return: a :class:`~werkzeug.datastructures.Authorization` object or `None`. 

664 """ 

665 if not value: 

666 return None 

667 value = _wsgi_decoding_dance(value) 

668 try: 

669 auth_type, auth_info = value.split(None, 1) 

670 auth_type = auth_type.lower() 

671 except ValueError: 

672 return None 

673 if auth_type == "basic": 

674 try: 

675 username, password = base64.b64decode(auth_info).split(b":", 1) 

676 except Exception: 

677 return None 

678 try: 

679 return ds.Authorization( 

680 "basic", 

681 { 

682 "username": _to_str(username, "utf-8"), 

683 "password": _to_str(password, "utf-8"), 

684 }, 

685 ) 

686 except UnicodeDecodeError: 

687 return None 

688 elif auth_type == "digest": 

689 auth_map = parse_dict_header(auth_info) 

690 for key in "username", "realm", "nonce", "uri", "response": 

691 if key not in auth_map: 

692 return None 

693 if "qop" in auth_map: 

694 if not auth_map.get("nc") or not auth_map.get("cnonce"): 

695 return None 

696 return ds.Authorization("digest", auth_map) 

697 return None 

698 

699 

700def parse_www_authenticate_header( 

701 value: t.Optional[str], 

702 on_update: t.Optional[t.Callable[["ds.WWWAuthenticate"], None]] = None, 

703) -> "ds.WWWAuthenticate": 

704 """Parse an HTTP WWW-Authenticate header into a 

705 :class:`~werkzeug.datastructures.WWWAuthenticate` object. 

706 

707 :param value: a WWW-Authenticate header to parse. 

708 :param on_update: an optional callable that is called every time a value 

709 on the :class:`~werkzeug.datastructures.WWWAuthenticate` 

710 object is changed. 

711 :return: a :class:`~werkzeug.datastructures.WWWAuthenticate` object. 

712 """ 

713 if not value: 

714 return ds.WWWAuthenticate(on_update=on_update) 

715 try: 

716 auth_type, auth_info = value.split(None, 1) 

717 auth_type = auth_type.lower() 

718 except (ValueError, AttributeError): 

719 return ds.WWWAuthenticate(value.strip().lower(), on_update=on_update) 

720 return ds.WWWAuthenticate(auth_type, parse_dict_header(auth_info), on_update) 

721 

722 

723def parse_if_range_header(value: t.Optional[str]) -> "ds.IfRange": 

724 """Parses an if-range header which can be an etag or a date. Returns 

725 a :class:`~werkzeug.datastructures.IfRange` object. 

726 

727 .. versionchanged:: 2.0 

728 If the value represents a datetime, it is timezone-aware. 

729 

730 .. versionadded:: 0.7 

731 """ 

732 if not value: 

733 return ds.IfRange() 

734 date = parse_date(value) 

735 if date is not None: 

736 return ds.IfRange(date=date) 

737 # drop weakness information 

738 return ds.IfRange(unquote_etag(value)[0]) 

739 

740 

741def parse_range_header( 

742 value: t.Optional[str], make_inclusive: bool = True 

743) -> t.Optional["ds.Range"]: 

744 """Parses a range header into a :class:`~werkzeug.datastructures.Range` 

745 object. If the header is missing or malformed `None` is returned. 

746 `ranges` is a list of ``(start, stop)`` tuples where the ranges are 

747 non-inclusive. 

748 

749 .. versionadded:: 0.7 

750 """ 

751 if not value or "=" not in value: 

752 return None 

753 

754 ranges = [] 

755 last_end = 0 

756 units, rng = value.split("=", 1) 

757 units = units.strip().lower() 

758 

759 for item in rng.split(","): 

760 item = item.strip() 

761 if "-" not in item: 

762 return None 

763 if item.startswith("-"): 

764 if last_end < 0: 

765 return None 

766 try: 

767 begin = int(item) 

768 except ValueError: 

769 return None 

770 end = None 

771 last_end = -1 

772 elif "-" in item: 

773 begin_str, end_str = item.split("-", 1) 

774 begin_str = begin_str.strip() 

775 end_str = end_str.strip() 

776 if not begin_str.isdigit(): 

777 return None 

778 begin = int(begin_str) 

779 if begin < last_end or last_end < 0: 

780 return None 

781 if end_str: 

782 if not end_str.isdigit(): 

783 return None 

784 end = int(end_str) + 1 

785 if begin >= end: 

786 return None 

787 else: 

788 end = None 

789 last_end = end if end is not None else -1 

790 ranges.append((begin, end)) 

791 

792 return ds.Range(units, ranges) 

793 

794 

795def parse_content_range_header( 

796 value: t.Optional[str], 

797 on_update: t.Optional[t.Callable[["ds.ContentRange"], None]] = None, 

798) -> t.Optional["ds.ContentRange"]: 

799 """Parses a range header into a 

800 :class:`~werkzeug.datastructures.ContentRange` object or `None` if 

801 parsing is not possible. 

802 

803 .. versionadded:: 0.7 

804 

805 :param value: a content range header to be parsed. 

806 :param on_update: an optional callable that is called every time a value 

807 on the :class:`~werkzeug.datastructures.ContentRange` 

808 object is changed. 

809 """ 

810 if value is None: 

811 return None 

812 try: 

813 units, rangedef = (value or "").strip().split(None, 1) 

814 except ValueError: 

815 return None 

816 

817 if "/" not in rangedef: 

818 return None 

819 rng, length_str = rangedef.split("/", 1) 

820 if length_str == "*": 

821 length = None 

822 elif length_str.isdigit(): 

823 length = int(length_str) 

824 else: 

825 return None 

826 

827 if rng == "*": 

828 return ds.ContentRange(units, None, None, length, on_update=on_update) 

829 elif "-" not in rng: 

830 return None 

831 

832 start_str, stop_str = rng.split("-", 1) 

833 try: 

834 start = int(start_str) 

835 stop = int(stop_str) + 1 

836 except ValueError: 

837 return None 

838 

839 if is_byte_range_valid(start, stop, length): 

840 return ds.ContentRange(units, start, stop, length, on_update=on_update) 

841 

842 return None 

843 

844 

845def quote_etag(etag: str, weak: bool = False) -> str: 

846 """Quote an etag. 

847 

848 :param etag: the etag to quote. 

849 :param weak: set to `True` to tag it "weak". 

850 """ 

851 if '"' in etag: 

852 raise ValueError("invalid etag") 

853 etag = f'"{etag}"' 

854 if weak: 

855 etag = f"W/{etag}" 

856 return etag 

857 

858 

859def unquote_etag( 

860 etag: t.Optional[str], 

861) -> t.Union[t.Tuple[str, bool], t.Tuple[None, None]]: 

862 """Unquote a single etag: 

863 

864 >>> unquote_etag('W/"bar"') 

865 ('bar', True) 

866 >>> unquote_etag('"bar"') 

867 ('bar', False) 

868 

869 :param etag: the etag identifier to unquote. 

870 :return: a ``(etag, weak)`` tuple. 

871 """ 

872 if not etag: 

873 return None, None 

874 etag = etag.strip() 

875 weak = False 

876 if etag.startswith(("W/", "w/")): 

877 weak = True 

878 etag = etag[2:] 

879 if etag[:1] == etag[-1:] == '"': 

880 etag = etag[1:-1] 

881 return etag, weak 

882 

883 

884def parse_etags(value: t.Optional[str]) -> "ds.ETags": 

885 """Parse an etag header. 

886 

887 :param value: the tag header to parse 

888 :return: an :class:`~werkzeug.datastructures.ETags` object. 

889 """ 

890 if not value: 

891 return ds.ETags() 

892 strong = [] 

893 weak = [] 

894 end = len(value) 

895 pos = 0 

896 while pos < end: 

897 match = _etag_re.match(value, pos) 

898 if match is None: 

899 break 

900 is_weak, quoted, raw = match.groups() 

901 if raw == "*": 

902 return ds.ETags(star_tag=True) 

903 elif quoted: 

904 raw = quoted 

905 if is_weak: 

906 weak.append(raw) 

907 else: 

908 strong.append(raw) 

909 pos = match.end() 

910 return ds.ETags(strong, weak) 

911 

912 

913def generate_etag(data: bytes) -> str: 

914 """Generate an etag for some data. 

915 

916 .. versionchanged:: 2.0 

917 Use SHA-1. MD5 may not be available in some environments. 

918 """ 

919 return sha1(data).hexdigest() 

920 

921 

922def parse_date(value: t.Optional[str]) -> t.Optional[datetime]: 

923 """Parse an :rfc:`2822` date into a timezone-aware 

924 :class:`datetime.datetime` object, or ``None`` if parsing fails. 

925 

926 This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It 

927 returns ``None`` if parsing fails instead of raising an exception, 

928 and always returns a timezone-aware datetime object. If the string 

929 doesn't have timezone information, it is assumed to be UTC. 

930 

931 :param value: A string with a supported date format. 

932 

933 .. versionchanged:: 2.0 

934 Return a timezone-aware datetime object. Use 

935 ``email.utils.parsedate_to_datetime``. 

936 """ 

937 if value is None: 

938 return None 

939 

940 try: 

941 dt = email.utils.parsedate_to_datetime(value) 

942 except (TypeError, ValueError): 

943 return None 

944 

945 if dt.tzinfo is None: 

946 return dt.replace(tzinfo=timezone.utc) 

947 

948 return dt 

949 

950 

951def http_date( 

952 timestamp: t.Optional[t.Union[datetime, date, int, float, struct_time]] = None 

953) -> str: 

954 """Format a datetime object or timestamp into an :rfc:`2822` date 

955 string. 

956 

957 This is a wrapper for :func:`email.utils.format_datetime`. It 

958 assumes naive datetime objects are in UTC instead of raising an 

959 exception. 

960 

961 :param timestamp: The datetime or timestamp to format. Defaults to 

962 the current time. 

963 

964 .. versionchanged:: 2.0 

965 Use ``email.utils.format_datetime``. Accept ``date`` objects. 

966 """ 

967 if isinstance(timestamp, date): 

968 if not isinstance(timestamp, datetime): 

969 # Assume plain date is midnight UTC. 

970 timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc) 

971 else: 

972 # Ensure datetime is timezone-aware. 

973 timestamp = _dt_as_utc(timestamp) 

974 

975 return email.utils.format_datetime(timestamp, usegmt=True) 

976 

977 if isinstance(timestamp, struct_time): 

978 timestamp = mktime(timestamp) 

979 

980 return email.utils.formatdate(timestamp, usegmt=True) 

981 

982 

983def parse_age(value: t.Optional[str] = None) -> t.Optional[timedelta]: 

984 """Parses a base-10 integer count of seconds into a timedelta. 

985 

986 If parsing fails, the return value is `None`. 

987 

988 :param value: a string consisting of an integer represented in base-10 

989 :return: a :class:`datetime.timedelta` object or `None`. 

990 """ 

991 if not value: 

992 return None 

993 try: 

994 seconds = int(value) 

995 except ValueError: 

996 return None 

997 if seconds < 0: 

998 return None 

999 try: 

1000 return timedelta(seconds=seconds) 

1001 except OverflowError: 

1002 return None 

1003 

1004 

1005def dump_age(age: t.Optional[t.Union[timedelta, int]] = None) -> t.Optional[str]: 

1006 """Formats the duration as a base-10 integer. 

1007 

1008 :param age: should be an integer number of seconds, 

1009 a :class:`datetime.timedelta` object, or, 

1010 if the age is unknown, `None` (default). 

1011 """ 

1012 if age is None: 

1013 return None 

1014 if isinstance(age, timedelta): 

1015 age = int(age.total_seconds()) 

1016 else: 

1017 age = int(age) 

1018 

1019 if age < 0: 

1020 raise ValueError("age cannot be negative") 

1021 

1022 return str(age) 

1023 

1024 

1025def is_resource_modified( 

1026 environ: "WSGIEnvironment", 

1027 etag: t.Optional[str] = None, 

1028 data: t.Optional[bytes] = None, 

1029 last_modified: t.Optional[t.Union[datetime, str]] = None, 

1030 ignore_if_range: bool = True, 

1031) -> bool: 

1032 """Convenience method for conditional requests. 

1033 

1034 :param environ: the WSGI environment of the request to be checked. 

1035 :param etag: the etag for the response for comparison. 

1036 :param data: or alternatively the data of the response to automatically 

1037 generate an etag using :func:`generate_etag`. 

1038 :param last_modified: an optional date of the last modification. 

1039 :param ignore_if_range: If `False`, `If-Range` header will be taken into 

1040 account. 

1041 :return: `True` if the resource was modified, otherwise `False`. 

1042 

1043 .. versionchanged:: 2.0 

1044 SHA-1 is used to generate an etag value for the data. MD5 may 

1045 not be available in some environments. 

1046 

1047 .. versionchanged:: 1.0.0 

1048 The check is run for methods other than ``GET`` and ``HEAD``. 

1049 """ 

1050 if etag is None and data is not None: 

1051 etag = generate_etag(data) 

1052 elif data is not None: 

1053 raise TypeError("both data and etag given") 

1054 

1055 unmodified = False 

1056 if isinstance(last_modified, str): 

1057 last_modified = parse_date(last_modified) 

1058 

1059 # HTTP doesn't use microsecond, remove it to avoid false positive 

1060 # comparisons. Mark naive datetimes as UTC. 

1061 if last_modified is not None: 

1062 last_modified = _dt_as_utc(last_modified.replace(microsecond=0)) 

1063 

1064 if_range = None 

1065 if not ignore_if_range and "HTTP_RANGE" in environ: 

1066 # https://tools.ietf.org/html/rfc7233#section-3.2 

1067 # A server MUST ignore an If-Range header field received in a request 

1068 # that does not contain a Range header field. 

1069 if_range = parse_if_range_header(environ.get("HTTP_IF_RANGE")) 

1070 

1071 if if_range is not None and if_range.date is not None: 

1072 modified_since: t.Optional[datetime] = if_range.date 

1073 else: 

1074 modified_since = parse_date(environ.get("HTTP_IF_MODIFIED_SINCE")) 

1075 

1076 if modified_since and last_modified and last_modified <= modified_since: 

1077 unmodified = True 

1078 

1079 if etag: 

1080 etag, _ = unquote_etag(etag) 

1081 etag = t.cast(str, etag) 

1082 

1083 if if_range is not None and if_range.etag is not None: 

1084 unmodified = parse_etags(if_range.etag).contains(etag) 

1085 else: 

1086 if_none_match = parse_etags(environ.get("HTTP_IF_NONE_MATCH")) 

1087 if if_none_match: 

1088 # https://tools.ietf.org/html/rfc7232#section-3.2 

1089 # "A recipient MUST use the weak comparison function when comparing 

1090 # entity-tags for If-None-Match" 

1091 unmodified = if_none_match.contains_weak(etag) 

1092 

1093 # https://tools.ietf.org/html/rfc7232#section-3.1 

1094 # "Origin server MUST use the strong comparison function when 

1095 # comparing entity-tags for If-Match" 

1096 if_match = parse_etags(environ.get("HTTP_IF_MATCH")) 

1097 if if_match: 

1098 unmodified = not if_match.is_strong(etag) 

1099 

1100 return not unmodified 

1101 

1102 

1103def remove_entity_headers( 

1104 headers: t.Union["ds.Headers", t.List[t.Tuple[str, str]]], 

1105 allowed: t.Iterable[str] = ("expires", "content-location"), 

1106) -> None: 

1107 """Remove all entity headers from a list or :class:`Headers` object. This 

1108 operation works in-place. `Expires` and `Content-Location` headers are 

1109 by default not removed. The reason for this is :rfc:`2616` section 

1110 10.3.5 which specifies some entity headers that should be sent. 

1111 

1112 .. versionchanged:: 0.5 

1113 added `allowed` parameter. 

1114 

1115 :param headers: a list or :class:`Headers` object. 

1116 :param allowed: a list of headers that should still be allowed even though 

1117 they are entity headers. 

1118 """ 

1119 allowed = {x.lower() for x in allowed} 

1120 headers[:] = [ 

1121 (key, value) 

1122 for key, value in headers 

1123 if not is_entity_header(key) or key.lower() in allowed 

1124 ] 

1125 

1126 

1127def remove_hop_by_hop_headers( 

1128 headers: t.Union["ds.Headers", t.List[t.Tuple[str, str]]] 

1129) -> None: 

1130 """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or 

1131 :class:`Headers` object. This operation works in-place. 

1132 

1133 .. versionadded:: 0.5 

1134 

1135 :param headers: a list or :class:`Headers` object. 

1136 """ 

1137 headers[:] = [ 

1138 (key, value) for key, value in headers if not is_hop_by_hop_header(key) 

1139 ] 

1140 

1141 

1142def is_entity_header(header: str) -> bool: 

1143 """Check if a header is an entity header. 

1144 

1145 .. versionadded:: 0.5 

1146 

1147 :param header: the header to test. 

1148 :return: `True` if it's an entity header, `False` otherwise. 

1149 """ 

1150 return header.lower() in _entity_headers 

1151 

1152 

1153def is_hop_by_hop_header(header: str) -> bool: 

1154 """Check if a header is an HTTP/1.1 "Hop-by-Hop" header. 

1155 

1156 .. versionadded:: 0.5 

1157 

1158 :param header: the header to test. 

1159 :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise. 

1160 """ 

1161 return header.lower() in _hop_by_hop_headers 

1162 

1163 

1164def parse_cookie( 

1165 header: t.Union["WSGIEnvironment", str, bytes, None], 

1166 charset: str = "utf-8", 

1167 errors: str = "replace", 

1168 cls: t.Optional[t.Type["ds.MultiDict"]] = None, 

1169) -> "ds.MultiDict[str, str]": 

1170 """Parse a cookie from a string or WSGI environ. 

1171 

1172 The same key can be provided multiple times, the values are stored 

1173 in-order. The default :class:`MultiDict` will have the first value 

1174 first, and all values can be retrieved with 

1175 :meth:`MultiDict.getlist`. 

1176 

1177 :param header: The cookie header as a string, or a WSGI environ dict 

1178 with a ``HTTP_COOKIE`` key. 

1179 :param charset: The charset for the cookie values. 

1180 :param errors: The error behavior for the charset decoding. 

1181 :param cls: A dict-like class to store the parsed cookies in. 

1182 Defaults to :class:`MultiDict`. 

1183 

1184 .. versionchanged:: 1.0.0 

1185 Returns a :class:`MultiDict` instead of a 

1186 ``TypeConversionDict``. 

1187 

1188 .. versionchanged:: 0.5 

1189 Returns a :class:`TypeConversionDict` instead of a regular dict. 

1190 The ``cls`` parameter was added. 

1191 """ 

1192 if isinstance(header, dict): 

1193 header = header.get("HTTP_COOKIE", "") 

1194 elif header is None: 

1195 header = "" 

1196 

1197 # PEP 3333 sends headers through the environ as latin1 decoded 

1198 # strings. Encode strings back to bytes for parsing. 

1199 if isinstance(header, str): 

1200 header = header.encode("latin1", "replace") 

1201 

1202 if cls is None: 

1203 cls = ds.MultiDict 

1204 

1205 def _parse_pairs() -> t.Iterator[t.Tuple[str, str]]: 

1206 for key, val in _cookie_parse_impl(header): # type: ignore 

1207 key_str = _to_str(key, charset, errors, allow_none_charset=True) 

1208 

1209 if not key_str: 

1210 continue 

1211 

1212 val_str = _to_str(val, charset, errors, allow_none_charset=True) 

1213 yield key_str, val_str 

1214 

1215 return cls(_parse_pairs()) 

1216 

1217 

1218def dump_cookie( 

1219 key: str, 

1220 value: t.Union[bytes, str] = "", 

1221 max_age: t.Optional[t.Union[timedelta, int]] = None, 

1222 expires: t.Optional[t.Union[str, datetime, int, float]] = None, 

1223 path: t.Optional[str] = "/", 

1224 domain: t.Optional[str] = None, 

1225 secure: bool = False, 

1226 httponly: bool = False, 

1227 charset: str = "utf-8", 

1228 sync_expires: bool = True, 

1229 max_size: int = 4093, 

1230 samesite: t.Optional[str] = None, 

1231) -> str: 

1232 """Create a Set-Cookie header without the ``Set-Cookie`` prefix. 

1233 

1234 The return value is usually restricted to ascii as the vast majority 

1235 of values are properly escaped, but that is no guarantee. It's 

1236 tunneled through latin1 as required by :pep:`3333`. 

1237 

1238 The return value is not ASCII safe if the key contains unicode 

1239 characters. This is technically against the specification but 

1240 happens in the wild. It's strongly recommended to not use 

1241 non-ASCII values for the keys. 

1242 

1243 :param max_age: should be a number of seconds, or `None` (default) if 

1244 the cookie should last only as long as the client's 

1245 browser session. Additionally `timedelta` objects 

1246 are accepted, too. 

1247 :param expires: should be a `datetime` object or unix timestamp. 

1248 :param path: limits the cookie to a given path, per default it will 

1249 span the whole domain. 

1250 :param domain: Use this if you want to set a cross-domain cookie. For 

1251 example, ``domain=".example.com"`` will set a cookie 

1252 that is readable by the domain ``www.example.com``, 

1253 ``foo.example.com`` etc. Otherwise, a cookie will only 

1254 be readable by the domain that set it. 

1255 :param secure: The cookie will only be available via HTTPS 

1256 :param httponly: disallow JavaScript to access the cookie. This is an 

1257 extension to the cookie standard and probably not 

1258 supported by all browsers. 

1259 :param charset: the encoding for string values. 

1260 :param sync_expires: automatically set expires if max_age is defined 

1261 but expires not. 

1262 :param max_size: Warn if the final header value exceeds this size. The 

1263 default, 4093, should be safely `supported by most browsers 

1264 <cookie_>`_. Set to 0 to disable this check. 

1265 :param samesite: Limits the scope of the cookie such that it will 

1266 only be attached to requests if those requests are same-site. 

1267 

1268 .. _`cookie`: http://browsercookielimits.squawky.net/ 

1269 

1270 .. versionchanged:: 1.0.0 

1271 The string ``'None'`` is accepted for ``samesite``. 

1272 """ 

1273 key = _to_bytes(key, charset) 

1274 value = _to_bytes(value, charset) 

1275 

1276 if path is not None: 

1277 from .urls import iri_to_uri 

1278 

1279 path = iri_to_uri(path, charset) 

1280 

1281 domain = _make_cookie_domain(domain) 

1282 

1283 if isinstance(max_age, timedelta): 

1284 max_age = int(max_age.total_seconds()) 

1285 

1286 if expires is not None: 

1287 if not isinstance(expires, str): 

1288 expires = http_date(expires) 

1289 elif max_age is not None and sync_expires: 

1290 expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age) 

1291 

1292 if samesite is not None: 

1293 samesite = samesite.title() 

1294 

1295 if samesite not in {"Strict", "Lax", "None"}: 

1296 raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.") 

1297 

1298 buf = [key + b"=" + _cookie_quote(value)] 

1299 

1300 # XXX: In theory all of these parameters that are not marked with `None` 

1301 # should be quoted. Because stdlib did not quote it before I did not 

1302 # want to introduce quoting there now. 

1303 for k, v, q in ( 

1304 (b"Domain", domain, True), 

1305 (b"Expires", expires, False), 

1306 (b"Max-Age", max_age, False), 

1307 (b"Secure", secure, None), 

1308 (b"HttpOnly", httponly, None), 

1309 (b"Path", path, False), 

1310 (b"SameSite", samesite, False), 

1311 ): 

1312 if q is None: 

1313 if v: 

1314 buf.append(k) 

1315 continue 

1316 

1317 if v is None: 

1318 continue 

1319 

1320 tmp = bytearray(k) 

1321 if not isinstance(v, (bytes, bytearray)): 

1322 v = _to_bytes(str(v), charset) 

1323 if q: 

1324 v = _cookie_quote(v) 

1325 tmp += b"=" + v 

1326 buf.append(bytes(tmp)) 

1327 

1328 # The return value will be an incorrectly encoded latin1 header for 

1329 # consistency with the headers object. 

1330 rv = b"; ".join(buf) 

1331 rv = rv.decode("latin1") 

1332 

1333 # Warn if the final value of the cookie is larger than the limit. If the 

1334 # cookie is too large, then it may be silently ignored by the browser, 

1335 # which can be quite hard to debug. 

1336 cookie_size = len(rv) 

1337 

1338 if max_size and cookie_size > max_size: 

1339 value_size = len(value) 

1340 warnings.warn( 

1341 f"The {key.decode(charset)!r} cookie is too large: the value was" 

1342 f" {value_size} bytes but the" 

1343 f" header required {cookie_size - value_size} extra bytes. The final size" 

1344 f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may" 

1345 f" silently ignore cookies larger than this.", 

1346 stacklevel=2, 

1347 ) 

1348 

1349 return rv 

1350 

1351 

1352def is_byte_range_valid( 

1353 start: t.Optional[int], stop: t.Optional[int], length: t.Optional[int] 

1354) -> bool: 

1355 """Checks if a given byte content range is valid for the given length. 

1356 

1357 .. versionadded:: 0.7 

1358 """ 

1359 if (start is None) != (stop is None): 

1360 return False 

1361 elif start is None: 

1362 return length is None or length >= 0 

1363 elif length is None: 

1364 return 0 <= start < stop # type: ignore 

1365 elif start >= stop: # type: ignore 

1366 return False 

1367 return 0 <= start < length 

1368 

1369 

1370# circular dependencies 

1371from . import datastructures as ds