Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/http.py: 21%

433 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 07:17 +0000

1from __future__ import annotations 

2 

3import email.utils 

4import re 

5import typing as t 

6import warnings 

7from datetime import date 

8from datetime import datetime 

9from datetime import time 

10from datetime import timedelta 

11from datetime import timezone 

12from enum import Enum 

13from hashlib import sha1 

14from time import mktime 

15from time import struct_time 

16from urllib.parse import quote 

17from urllib.parse import unquote 

18from urllib.request import parse_http_list as _parse_list_header 

19 

20from ._internal import _dt_as_utc 

21from ._internal import _plain_int 

22 

23if t.TYPE_CHECKING: 

24 from _typeshed.wsgi import WSGIEnvironment 

25 

26_token_chars = frozenset( 

27 "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~" 

28) 

29_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)') 

30_entity_headers = frozenset( 

31 [ 

32 "allow", 

33 "content-encoding", 

34 "content-language", 

35 "content-length", 

36 "content-location", 

37 "content-md5", 

38 "content-range", 

39 "content-type", 

40 "expires", 

41 "last-modified", 

42 ] 

43) 

44_hop_by_hop_headers = frozenset( 

45 [ 

46 "connection", 

47 "keep-alive", 

48 "proxy-authenticate", 

49 "proxy-authorization", 

50 "te", 

51 "trailer", 

52 "transfer-encoding", 

53 "upgrade", 

54 ] 

55) 

56HTTP_STATUS_CODES = { 

57 100: "Continue", 

58 101: "Switching Protocols", 

59 102: "Processing", 

60 103: "Early Hints", # see RFC 8297 

61 200: "OK", 

62 201: "Created", 

63 202: "Accepted", 

64 203: "Non Authoritative Information", 

65 204: "No Content", 

66 205: "Reset Content", 

67 206: "Partial Content", 

68 207: "Multi Status", 

69 208: "Already Reported", # see RFC 5842 

70 226: "IM Used", # see RFC 3229 

71 300: "Multiple Choices", 

72 301: "Moved Permanently", 

73 302: "Found", 

74 303: "See Other", 

75 304: "Not Modified", 

76 305: "Use Proxy", 

77 306: "Switch Proxy", # unused 

78 307: "Temporary Redirect", 

79 308: "Permanent Redirect", 

80 400: "Bad Request", 

81 401: "Unauthorized", 

82 402: "Payment Required", # unused 

83 403: "Forbidden", 

84 404: "Not Found", 

85 405: "Method Not Allowed", 

86 406: "Not Acceptable", 

87 407: "Proxy Authentication Required", 

88 408: "Request Timeout", 

89 409: "Conflict", 

90 410: "Gone", 

91 411: "Length Required", 

92 412: "Precondition Failed", 

93 413: "Request Entity Too Large", 

94 414: "Request URI Too Long", 

95 415: "Unsupported Media Type", 

96 416: "Requested Range Not Satisfiable", 

97 417: "Expectation Failed", 

98 418: "I'm a teapot", # see RFC 2324 

99 421: "Misdirected Request", # see RFC 7540 

100 422: "Unprocessable Entity", 

101 423: "Locked", 

102 424: "Failed Dependency", 

103 425: "Too Early", # see RFC 8470 

104 426: "Upgrade Required", 

105 428: "Precondition Required", # see RFC 6585 

106 429: "Too Many Requests", 

107 431: "Request Header Fields Too Large", 

108 449: "Retry With", # proprietary MS extension 

109 451: "Unavailable For Legal Reasons", 

110 500: "Internal Server Error", 

111 501: "Not Implemented", 

112 502: "Bad Gateway", 

113 503: "Service Unavailable", 

114 504: "Gateway Timeout", 

115 505: "HTTP Version Not Supported", 

116 506: "Variant Also Negotiates", # see RFC 2295 

117 507: "Insufficient Storage", 

118 508: "Loop Detected", # see RFC 5842 

119 510: "Not Extended", 

120 511: "Network Authentication Failed", 

121} 

122 

123 

124class COEP(Enum): 

125 """Cross Origin Embedder Policies""" 

126 

127 UNSAFE_NONE = "unsafe-none" 

128 REQUIRE_CORP = "require-corp" 

129 

130 

131class COOP(Enum): 

132 """Cross Origin Opener Policies""" 

133 

134 UNSAFE_NONE = "unsafe-none" 

135 SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups" 

136 SAME_ORIGIN = "same-origin" 

137 

138 

139def quote_header_value(value: t.Any, allow_token: bool = True) -> str: 

140 """Add double quotes around a header value. If the header contains only ASCII token 

141 characters, it will be returned unchanged. If the header contains ``"`` or ``\\`` 

142 characters, they will be escaped with an additional ``\\`` character. 

143 

144 This is the reverse of :func:`unquote_header_value`. 

145 

146 :param value: The value to quote. Will be converted to a string. 

147 :param allow_token: Disable to quote the value even if it only has token characters. 

148 

149 .. versionchanged:: 3.0 

150 Passing bytes is not supported. 

151 

152 .. versionchanged:: 3.0 

153 The ``extra_chars`` parameter is removed. 

154 

155 .. versionchanged:: 2.3 

156 The value is quoted if it is the empty string. 

157 

158 .. versionadded:: 0.5 

159 """ 

160 value = str(value) 

161 

162 if not value: 

163 return '""' 

164 

165 if allow_token: 

166 token_chars = _token_chars 

167 

168 if token_chars.issuperset(value): 

169 return value 

170 

171 value = value.replace("\\", "\\\\").replace('"', '\\"') 

172 return f'"{value}"' 

173 

174 

175def unquote_header_value(value: str) -> str: 

176 """Remove double quotes and decode slash-escaped ``"`` and ``\\`` characters in a 

177 header value. 

178 

179 This is the reverse of :func:`quote_header_value`. 

180 

181 :param value: The header value to unquote. 

182 

183 .. versionchanged:: 3.0 

184 The ``is_filename`` parameter is removed. 

185 """ 

186 if len(value) >= 2 and value[0] == value[-1] == '"': 

187 value = value[1:-1] 

188 return value.replace("\\\\", "\\").replace('\\"', '"') 

189 

190 return value 

191 

192 

193def dump_options_header(header: str | None, options: t.Mapping[str, t.Any]) -> str: 

194 """Produce a header value and ``key=value`` parameters separated by semicolons 

195 ``;``. For example, the ``Content-Type`` header. 

196 

197 .. code-block:: python 

198 

199 dump_options_header("text/html", {"charset": "UTF-8"}) 

200 'text/html; charset=UTF-8' 

201 

202 This is the reverse of :func:`parse_options_header`. 

203 

204 If a value contains non-token characters, it will be quoted. 

205 

206 If a value is ``None``, the parameter is skipped. 

207 

208 In some keys for some headers, a UTF-8 value can be encoded using a special 

209 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will 

210 not produce that format automatically, but if a given key ends with an asterisk 

211 ``*``, the value is assumed to have that form and will not be quoted further. 

212 

213 :param header: The primary header value. 

214 :param options: Parameters to encode as ``key=value`` pairs. 

215 

216 .. versionchanged:: 2.3 

217 Keys with ``None`` values are skipped rather than treated as a bare key. 

218 

219 .. versionchanged:: 2.2.3 

220 If a key ends with ``*``, its value will not be quoted. 

221 """ 

222 segments = [] 

223 

224 if header is not None: 

225 segments.append(header) 

226 

227 for key, value in options.items(): 

228 if value is None: 

229 continue 

230 

231 if key[-1] == "*": 

232 segments.append(f"{key}={value}") 

233 else: 

234 segments.append(f"{key}={quote_header_value(value)}") 

235 

236 return "; ".join(segments) 

237 

238 

239def dump_header(iterable: dict[str, t.Any] | t.Iterable[t.Any]) -> str: 

240 """Produce a header value from a list of items or ``key=value`` pairs, separated by 

241 commas ``,``. 

242 

243 This is the reverse of :func:`parse_list_header`, :func:`parse_dict_header`, and 

244 :func:`parse_set_header`. 

245 

246 If a value contains non-token characters, it will be quoted. 

247 

248 If a value is ``None``, the key is output alone. 

249 

250 In some keys for some headers, a UTF-8 value can be encoded using a special 

251 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will 

252 not produce that format automatically, but if a given key ends with an asterisk 

253 ``*``, the value is assumed to have that form and will not be quoted further. 

254 

255 .. code-block:: python 

256 

257 dump_header(["foo", "bar baz"]) 

258 'foo, "bar baz"' 

259 

260 dump_header({"foo": "bar baz"}) 

261 'foo="bar baz"' 

262 

263 :param iterable: The items to create a header from. 

264 

265 .. versionchanged:: 3.0 

266 The ``allow_token`` parameter is removed. 

267 

268 .. versionchanged:: 2.2.3 

269 If a key ends with ``*``, its value will not be quoted. 

270 """ 

271 if isinstance(iterable, dict): 

272 items = [] 

273 

274 for key, value in iterable.items(): 

275 if value is None: 

276 items.append(key) 

277 elif key[-1] == "*": 

278 items.append(f"{key}={value}") 

279 else: 

280 items.append(f"{key}={quote_header_value(value)}") 

281 else: 

282 items = [quote_header_value(x) for x in iterable] 

283 

284 return ", ".join(items) 

285 

286 

287def dump_csp_header(header: ds.ContentSecurityPolicy) -> str: 

288 """Dump a Content Security Policy header. 

289 

290 These are structured into policies such as "default-src 'self'; 

291 script-src 'self'". 

292 

293 .. versionadded:: 1.0.0 

294 Support for Content Security Policy headers was added. 

295 

296 """ 

297 return "; ".join(f"{key} {value}" for key, value in header.items()) 

298 

299 

300def parse_list_header(value: str) -> list[str]: 

301 """Parse a header value that consists of a list of comma separated items according 

302 to `RFC 9110 <https://httpwg.org/specs/rfc9110.html#abnf.extension>`__. 

303 

304 This extends :func:`urllib.request.parse_http_list` to remove surrounding quotes 

305 from values. 

306 

307 .. code-block:: python 

308 

309 parse_list_header('token, "quoted value"') 

310 ['token', 'quoted value'] 

311 

312 This is the reverse of :func:`dump_header`. 

313 

314 :param value: The header value to parse. 

315 """ 

316 result = [] 

317 

318 for item in _parse_list_header(value): 

319 if len(item) >= 2 and item[0] == item[-1] == '"': 

320 item = item[1:-1] 

321 

322 result.append(item) 

323 

324 return result 

325 

326 

327def parse_dict_header(value: str) -> dict[str, str | None]: 

328 """Parse a list header using :func:`parse_list_header`, then parse each item as a 

329 ``key=value`` pair. 

330 

331 .. code-block:: python 

332 

333 parse_dict_header('a=b, c="d, e", f') 

334 {"a": "b", "c": "d, e", "f": None} 

335 

336 This is the reverse of :func:`dump_header`. 

337 

338 If a key does not have a value, it is ``None``. 

339 

340 This handles charsets for values as described in 

341 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__. Only ASCII, UTF-8, 

342 and ISO-8859-1 charsets are accepted, otherwise the value remains quoted. 

343 

344 :param value: The header value to parse. 

345 

346 .. versionchanged:: 3.0 

347 Passing bytes is not supported. 

348 

349 .. versionchanged:: 3.0 

350 The ``cls`` argument is removed. 

351 

352 .. versionchanged:: 2.3 

353 Added support for ``key*=charset''value`` encoded items. 

354 

355 .. versionchanged:: 0.9 

356 The ``cls`` argument was added. 

357 """ 

358 result: dict[str, str | None] = {} 

359 

360 for item in parse_list_header(value): 

361 key, has_value, value = item.partition("=") 

362 key = key.strip() 

363 

364 if not has_value: 

365 result[key] = None 

366 continue 

367 

368 value = value.strip() 

369 encoding: str | None = None 

370 

371 if key[-1] == "*": 

372 # key*=charset''value becomes key=value, where value is percent encoded 

373 # adapted from parse_options_header, without the continuation handling 

374 key = key[:-1] 

375 match = _charset_value_re.match(value) 

376 

377 if match: 

378 # If there is a charset marker in the value, split it off. 

379 encoding, value = match.groups() 

380 encoding = encoding.lower() 

381 

382 # A safe list of encodings. Modern clients should only send ASCII or UTF-8. 

383 # This list will not be extended further. An invalid encoding will leave the 

384 # value quoted. 

385 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}: 

386 # invalid bytes are replaced during unquoting 

387 value = unquote(value, encoding=encoding) 

388 

389 if len(value) >= 2 and value[0] == value[-1] == '"': 

390 value = value[1:-1] 

391 

392 result[key] = value 

393 

394 return result 

395 

396 

397# https://httpwg.org/specs/rfc9110.html#parameter 

398_parameter_re = re.compile( 

399 r""" 

400 # don't match multiple empty parts, that causes backtracking 

401 \s*;\s* # find the part delimiter 

402 (?: 

403 ([\w!#$%&'*+\-.^`|~]+) # key, one or more token chars 

404 = # equals, with no space on either side 

405 ( # value, token or quoted string 

406 [\w!#$%&'*+\-.^`|~]+ # one or more token chars 

407 | 

408 "(?:\\\\|\\"|.)*?" # quoted string, consuming slash escapes 

409 ) 

410 )? # optionally match key=value, to account for empty parts 

411 """, 

412 re.ASCII | re.VERBOSE, 

413) 

414# https://www.rfc-editor.org/rfc/rfc2231#section-4 

415_charset_value_re = re.compile( 

416 r""" 

417 ([\w!#$%&*+\-.^`|~]*)' # charset part, could be empty 

418 [\w!#$%&*+\-.^`|~]*' # don't care about language part, usually empty 

419 ([\w!#$%&'*+\-.^`|~]+) # one or more token chars with percent encoding 

420 """, 

421 re.ASCII | re.VERBOSE, 

422) 

423# https://www.rfc-editor.org/rfc/rfc2231#section-3 

424_continuation_re = re.compile(r"\*(\d+)$", re.ASCII) 

425 

426 

427def parse_options_header(value: str | None) -> tuple[str, dict[str, str]]: 

428 """Parse a header that consists of a value with ``key=value`` parameters separated 

429 by semicolons ``;``. For example, the ``Content-Type`` header. 

430 

431 .. code-block:: python 

432 

433 parse_options_header("text/html; charset=UTF-8") 

434 ('text/html', {'charset': 'UTF-8'}) 

435 

436 parse_options_header("") 

437 ("", {}) 

438 

439 This is the reverse of :func:`dump_options_header`. 

440 

441 This parses valid parameter parts as described in 

442 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#parameter>`__. Invalid parts are 

443 skipped. 

444 

445 This handles continuations and charsets as described in 

446 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__, although not as 

447 strictly as the RFC. Only ASCII, UTF-8, and ISO-8859-1 charsets are accepted, 

448 otherwise the value remains quoted. 

449 

450 Clients may not be consistent in how they handle a quote character within a quoted 

451 value. The `HTML Standard <https://html.spec.whatwg.org/#multipart-form-data>`__ 

452 replaces it with ``%22`` in multipart form data. 

453 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#quoted.strings>`__ uses backslash 

454 escapes in HTTP headers. Both are decoded to the ``"`` character. 

455 

456 Clients may not be consistent in how they handle non-ASCII characters. HTML 

457 documents must declare ``<meta charset=UTF-8>``, otherwise browsers may replace with 

458 HTML character references, which can be decoded using :func:`html.unescape`. 

459 

460 :param value: The header value to parse. 

461 :return: ``(value, options)``, where ``options`` is a dict 

462 

463 .. versionchanged:: 2.3 

464 Invalid parts, such as keys with no value, quoted keys, and incorrectly quoted 

465 values, are discarded instead of treating as ``None``. 

466 

467 .. versionchanged:: 2.3 

468 Only ASCII, UTF-8, and ISO-8859-1 are accepted for charset values. 

469 

470 .. versionchanged:: 2.3 

471 Escaped quotes in quoted values, like ``%22`` and ``\\"``, are handled. 

472 

473 .. versionchanged:: 2.2 

474 Option names are always converted to lowercase. 

475 

476 .. versionchanged:: 2.2 

477 The ``multiple`` parameter was removed. 

478 

479 .. versionchanged:: 0.15 

480 :rfc:`2231` parameter continuations are handled. 

481 

482 .. versionadded:: 0.5 

483 """ 

484 if value is None: 

485 return "", {} 

486 

487 value, _, rest = value.partition(";") 

488 value = value.strip() 

489 rest = rest.strip() 

490 

491 if not value or not rest: 

492 # empty (invalid) value, or value without options 

493 return value, {} 

494 

495 rest = f";{rest}" 

496 options: dict[str, str] = {} 

497 encoding: str | None = None 

498 continued_encoding: str | None = None 

499 

500 for pk, pv in _parameter_re.findall(rest): 

501 if not pk: 

502 # empty or invalid part 

503 continue 

504 

505 pk = pk.lower() 

506 

507 if pk[-1] == "*": 

508 # key*=charset''value becomes key=value, where value is percent encoded 

509 pk = pk[:-1] 

510 match = _charset_value_re.match(pv) 

511 

512 if match: 

513 # If there is a valid charset marker in the value, split it off. 

514 encoding, pv = match.groups() 

515 # This might be the empty string, handled next. 

516 encoding = encoding.lower() 

517 

518 # No charset marker, or marker with empty charset value. 

519 if not encoding: 

520 encoding = continued_encoding 

521 

522 # A safe list of encodings. Modern clients should only send ASCII or UTF-8. 

523 # This list will not be extended further. An invalid encoding will leave the 

524 # value quoted. 

525 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}: 

526 # Continuation parts don't require their own charset marker. This is 

527 # looser than the RFC, it will persist across different keys and allows 

528 # changing the charset during a continuation. But this implementation is 

529 # much simpler than tracking the full state. 

530 continued_encoding = encoding 

531 # invalid bytes are replaced during unquoting 

532 pv = unquote(pv, encoding=encoding) 

533 

534 # Remove quotes. At this point the value cannot be empty or a single quote. 

535 if pv[0] == pv[-1] == '"': 

536 # HTTP headers use slash, multipart form data uses percent 

537 pv = pv[1:-1].replace("\\\\", "\\").replace('\\"', '"').replace("%22", '"') 

538 

539 match = _continuation_re.search(pk) 

540 

541 if match: 

542 # key*0=a; key*1=b becomes key=ab 

543 pk = pk[: match.start()] 

544 options[pk] = options.get(pk, "") + pv 

545 else: 

546 options[pk] = pv 

547 

548 return value, options 

549 

550 

551_q_value_re = re.compile(r"-?\d+(\.\d+)?", re.ASCII) 

552_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept") 

553 

554 

555@t.overload 

556def parse_accept_header(value: str | None) -> ds.Accept: 

557 ... 

558 

559 

560@t.overload 

561def parse_accept_header(value: str | None, cls: type[_TAnyAccept]) -> _TAnyAccept: 

562 ... 

563 

564 

565def parse_accept_header( 

566 value: str | None, cls: type[_TAnyAccept] | None = None 

567) -> _TAnyAccept: 

568 """Parse an ``Accept`` header according to 

569 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#field.accept>`__. 

570 

571 Returns an :class:`.Accept` instance, which can sort and inspect items based on 

572 their quality parameter. When parsing ``Accept-Charset``, ``Accept-Encoding``, or 

573 ``Accept-Language``, pass the appropriate :class:`.Accept` subclass. 

574 

575 :param value: The header value to parse. 

576 :param cls: The :class:`.Accept` class to wrap the result in. 

577 :return: An instance of ``cls``. 

578 

579 .. versionchanged:: 2.3 

580 Parse according to RFC 9110. Items with invalid ``q`` values are skipped. 

581 """ 

582 if cls is None: 

583 cls = t.cast(t.Type[_TAnyAccept], ds.Accept) 

584 

585 if not value: 

586 return cls(None) 

587 

588 result = [] 

589 

590 for item in parse_list_header(value): 

591 item, options = parse_options_header(item) 

592 

593 if "q" in options: 

594 # pop q, remaining options are reconstructed 

595 q_str = options.pop("q").strip() 

596 

597 if _q_value_re.fullmatch(q_str) is None: 

598 # ignore an invalid q 

599 continue 

600 

601 q = float(q_str) 

602 

603 if q < 0 or q > 1: 

604 # ignore an invalid q 

605 continue 

606 else: 

607 q = 1 

608 

609 if options: 

610 # reconstruct the media type with any options 

611 item = dump_options_header(item, options) 

612 

613 result.append((item, q)) 

614 

615 return cls(result) 

616 

617 

618_TAnyCC = t.TypeVar("_TAnyCC", bound="ds.cache_control._CacheControl") 

619_t_cc_update = t.Optional[t.Callable[[_TAnyCC], None]] 

620 

621 

622@t.overload 

623def parse_cache_control_header( 

624 value: str | None, on_update: _t_cc_update, cls: None = None 

625) -> ds.RequestCacheControl: 

626 ... 

627 

628 

629@t.overload 

630def parse_cache_control_header( 

631 value: str | None, on_update: _t_cc_update, cls: type[_TAnyCC] 

632) -> _TAnyCC: 

633 ... 

634 

635 

636def parse_cache_control_header( 

637 value: str | None, 

638 on_update: _t_cc_update = None, 

639 cls: type[_TAnyCC] | None = None, 

640) -> _TAnyCC: 

641 """Parse a cache control header. The RFC differs between response and 

642 request cache control, this method does not. It's your responsibility 

643 to not use the wrong control statements. 

644 

645 .. versionadded:: 0.5 

646 The `cls` was added. If not specified an immutable 

647 :class:`~werkzeug.datastructures.RequestCacheControl` is returned. 

648 

649 :param value: a cache control header to be parsed. 

650 :param on_update: an optional callable that is called every time a value 

651 on the :class:`~werkzeug.datastructures.CacheControl` 

652 object is changed. 

653 :param cls: the class for the returned object. By default 

654 :class:`~werkzeug.datastructures.RequestCacheControl` is used. 

655 :return: a `cls` object. 

656 """ 

657 if cls is None: 

658 cls = t.cast(t.Type[_TAnyCC], ds.RequestCacheControl) 

659 

660 if not value: 

661 return cls((), on_update) 

662 

663 return cls(parse_dict_header(value), on_update) 

664 

665 

666_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy") 

667_t_csp_update = t.Optional[t.Callable[[_TAnyCSP], None]] 

668 

669 

670@t.overload 

671def parse_csp_header( 

672 value: str | None, on_update: _t_csp_update, cls: None = None 

673) -> ds.ContentSecurityPolicy: 

674 ... 

675 

676 

677@t.overload 

678def parse_csp_header( 

679 value: str | None, on_update: _t_csp_update, cls: type[_TAnyCSP] 

680) -> _TAnyCSP: 

681 ... 

682 

683 

684def parse_csp_header( 

685 value: str | None, 

686 on_update: _t_csp_update = None, 

687 cls: type[_TAnyCSP] | None = None, 

688) -> _TAnyCSP: 

689 """Parse a Content Security Policy header. 

690 

691 .. versionadded:: 1.0.0 

692 Support for Content Security Policy headers was added. 

693 

694 :param value: a csp header to be parsed. 

695 :param on_update: an optional callable that is called every time a value 

696 on the object is changed. 

697 :param cls: the class for the returned object. By default 

698 :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used. 

699 :return: a `cls` object. 

700 """ 

701 if cls is None: 

702 cls = t.cast(t.Type[_TAnyCSP], ds.ContentSecurityPolicy) 

703 

704 if value is None: 

705 return cls((), on_update) 

706 

707 items = [] 

708 

709 for policy in value.split(";"): 

710 policy = policy.strip() 

711 

712 # Ignore badly formatted policies (no space) 

713 if " " in policy: 

714 directive, value = policy.strip().split(" ", 1) 

715 items.append((directive.strip(), value.strip())) 

716 

717 return cls(items, on_update) 

718 

719 

720def parse_set_header( 

721 value: str | None, 

722 on_update: t.Callable[[ds.HeaderSet], None] | None = None, 

723) -> ds.HeaderSet: 

724 """Parse a set-like header and return a 

725 :class:`~werkzeug.datastructures.HeaderSet` object: 

726 

727 >>> hs = parse_set_header('token, "quoted value"') 

728 

729 The return value is an object that treats the items case-insensitively 

730 and keeps the order of the items: 

731 

732 >>> 'TOKEN' in hs 

733 True 

734 >>> hs.index('quoted value') 

735 1 

736 >>> hs 

737 HeaderSet(['token', 'quoted value']) 

738 

739 To create a header from the :class:`HeaderSet` again, use the 

740 :func:`dump_header` function. 

741 

742 :param value: a set header to be parsed. 

743 :param on_update: an optional callable that is called every time a 

744 value on the :class:`~werkzeug.datastructures.HeaderSet` 

745 object is changed. 

746 :return: a :class:`~werkzeug.datastructures.HeaderSet` 

747 """ 

748 if not value: 

749 return ds.HeaderSet(None, on_update) 

750 return ds.HeaderSet(parse_list_header(value), on_update) 

751 

752 

753def parse_if_range_header(value: str | None) -> ds.IfRange: 

754 """Parses an if-range header which can be an etag or a date. Returns 

755 a :class:`~werkzeug.datastructures.IfRange` object. 

756 

757 .. versionchanged:: 2.0 

758 If the value represents a datetime, it is timezone-aware. 

759 

760 .. versionadded:: 0.7 

761 """ 

762 if not value: 

763 return ds.IfRange() 

764 date = parse_date(value) 

765 if date is not None: 

766 return ds.IfRange(date=date) 

767 # drop weakness information 

768 return ds.IfRange(unquote_etag(value)[0]) 

769 

770 

771def parse_range_header( 

772 value: str | None, make_inclusive: bool = True 

773) -> ds.Range | None: 

774 """Parses a range header into a :class:`~werkzeug.datastructures.Range` 

775 object. If the header is missing or malformed `None` is returned. 

776 `ranges` is a list of ``(start, stop)`` tuples where the ranges are 

777 non-inclusive. 

778 

779 .. versionadded:: 0.7 

780 """ 

781 if not value or "=" not in value: 

782 return None 

783 

784 ranges = [] 

785 last_end = 0 

786 units, rng = value.split("=", 1) 

787 units = units.strip().lower() 

788 

789 for item in rng.split(","): 

790 item = item.strip() 

791 if "-" not in item: 

792 return None 

793 if item.startswith("-"): 

794 if last_end < 0: 

795 return None 

796 try: 

797 begin = _plain_int(item) 

798 except ValueError: 

799 return None 

800 end = None 

801 last_end = -1 

802 elif "-" in item: 

803 begin_str, end_str = item.split("-", 1) 

804 begin_str = begin_str.strip() 

805 end_str = end_str.strip() 

806 

807 try: 

808 begin = _plain_int(begin_str) 

809 except ValueError: 

810 return None 

811 

812 if begin < last_end or last_end < 0: 

813 return None 

814 if end_str: 

815 try: 

816 end = _plain_int(end_str) + 1 

817 except ValueError: 

818 return None 

819 

820 if begin >= end: 

821 return None 

822 else: 

823 end = None 

824 last_end = end if end is not None else -1 

825 ranges.append((begin, end)) 

826 

827 return ds.Range(units, ranges) 

828 

829 

830def parse_content_range_header( 

831 value: str | None, 

832 on_update: t.Callable[[ds.ContentRange], None] | None = None, 

833) -> ds.ContentRange | None: 

834 """Parses a range header into a 

835 :class:`~werkzeug.datastructures.ContentRange` object or `None` if 

836 parsing is not possible. 

837 

838 .. versionadded:: 0.7 

839 

840 :param value: a content range header to be parsed. 

841 :param on_update: an optional callable that is called every time a value 

842 on the :class:`~werkzeug.datastructures.ContentRange` 

843 object is changed. 

844 """ 

845 if value is None: 

846 return None 

847 try: 

848 units, rangedef = (value or "").strip().split(None, 1) 

849 except ValueError: 

850 return None 

851 

852 if "/" not in rangedef: 

853 return None 

854 rng, length_str = rangedef.split("/", 1) 

855 if length_str == "*": 

856 length = None 

857 else: 

858 try: 

859 length = _plain_int(length_str) 

860 except ValueError: 

861 return None 

862 

863 if rng == "*": 

864 if not is_byte_range_valid(None, None, length): 

865 return None 

866 

867 return ds.ContentRange(units, None, None, length, on_update=on_update) 

868 elif "-" not in rng: 

869 return None 

870 

871 start_str, stop_str = rng.split("-", 1) 

872 try: 

873 start = _plain_int(start_str) 

874 stop = _plain_int(stop_str) + 1 

875 except ValueError: 

876 return None 

877 

878 if is_byte_range_valid(start, stop, length): 

879 return ds.ContentRange(units, start, stop, length, on_update=on_update) 

880 

881 return None 

882 

883 

884def quote_etag(etag: str, weak: bool = False) -> str: 

885 """Quote an etag. 

886 

887 :param etag: the etag to quote. 

888 :param weak: set to `True` to tag it "weak". 

889 """ 

890 if '"' in etag: 

891 raise ValueError("invalid etag") 

892 etag = f'"{etag}"' 

893 if weak: 

894 etag = f"W/{etag}" 

895 return etag 

896 

897 

898def unquote_etag( 

899 etag: str | None, 

900) -> tuple[str, bool] | tuple[None, None]: 

901 """Unquote a single etag: 

902 

903 >>> unquote_etag('W/"bar"') 

904 ('bar', True) 

905 >>> unquote_etag('"bar"') 

906 ('bar', False) 

907 

908 :param etag: the etag identifier to unquote. 

909 :return: a ``(etag, weak)`` tuple. 

910 """ 

911 if not etag: 

912 return None, None 

913 etag = etag.strip() 

914 weak = False 

915 if etag.startswith(("W/", "w/")): 

916 weak = True 

917 etag = etag[2:] 

918 if etag[:1] == etag[-1:] == '"': 

919 etag = etag[1:-1] 

920 return etag, weak 

921 

922 

923def parse_etags(value: str | None) -> ds.ETags: 

924 """Parse an etag header. 

925 

926 :param value: the tag header to parse 

927 :return: an :class:`~werkzeug.datastructures.ETags` object. 

928 """ 

929 if not value: 

930 return ds.ETags() 

931 strong = [] 

932 weak = [] 

933 end = len(value) 

934 pos = 0 

935 while pos < end: 

936 match = _etag_re.match(value, pos) 

937 if match is None: 

938 break 

939 is_weak, quoted, raw = match.groups() 

940 if raw == "*": 

941 return ds.ETags(star_tag=True) 

942 elif quoted: 

943 raw = quoted 

944 if is_weak: 

945 weak.append(raw) 

946 else: 

947 strong.append(raw) 

948 pos = match.end() 

949 return ds.ETags(strong, weak) 

950 

951 

952def generate_etag(data: bytes) -> str: 

953 """Generate an etag for some data. 

954 

955 .. versionchanged:: 2.0 

956 Use SHA-1. MD5 may not be available in some environments. 

957 """ 

958 return sha1(data).hexdigest() 

959 

960 

961def parse_date(value: str | None) -> datetime | None: 

962 """Parse an :rfc:`2822` date into a timezone-aware 

963 :class:`datetime.datetime` object, or ``None`` if parsing fails. 

964 

965 This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It 

966 returns ``None`` if parsing fails instead of raising an exception, 

967 and always returns a timezone-aware datetime object. If the string 

968 doesn't have timezone information, it is assumed to be UTC. 

969 

970 :param value: A string with a supported date format. 

971 

972 .. versionchanged:: 2.0 

973 Return a timezone-aware datetime object. Use 

974 ``email.utils.parsedate_to_datetime``. 

975 """ 

976 if value is None: 

977 return None 

978 

979 try: 

980 dt = email.utils.parsedate_to_datetime(value) 

981 except (TypeError, ValueError): 

982 return None 

983 

984 if dt.tzinfo is None: 

985 return dt.replace(tzinfo=timezone.utc) 

986 

987 return dt 

988 

989 

990def http_date( 

991 timestamp: datetime | date | int | float | struct_time | None = None, 

992) -> str: 

993 """Format a datetime object or timestamp into an :rfc:`2822` date 

994 string. 

995 

996 This is a wrapper for :func:`email.utils.format_datetime`. It 

997 assumes naive datetime objects are in UTC instead of raising an 

998 exception. 

999 

1000 :param timestamp: The datetime or timestamp to format. Defaults to 

1001 the current time. 

1002 

1003 .. versionchanged:: 2.0 

1004 Use ``email.utils.format_datetime``. Accept ``date`` objects. 

1005 """ 

1006 if isinstance(timestamp, date): 

1007 if not isinstance(timestamp, datetime): 

1008 # Assume plain date is midnight UTC. 

1009 timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc) 

1010 else: 

1011 # Ensure datetime is timezone-aware. 

1012 timestamp = _dt_as_utc(timestamp) 

1013 

1014 return email.utils.format_datetime(timestamp, usegmt=True) 

1015 

1016 if isinstance(timestamp, struct_time): 

1017 timestamp = mktime(timestamp) 

1018 

1019 return email.utils.formatdate(timestamp, usegmt=True) 

1020 

1021 

1022def parse_age(value: str | None = None) -> timedelta | None: 

1023 """Parses a base-10 integer count of seconds into a timedelta. 

1024 

1025 If parsing fails, the return value is `None`. 

1026 

1027 :param value: a string consisting of an integer represented in base-10 

1028 :return: a :class:`datetime.timedelta` object or `None`. 

1029 """ 

1030 if not value: 

1031 return None 

1032 try: 

1033 seconds = int(value) 

1034 except ValueError: 

1035 return None 

1036 if seconds < 0: 

1037 return None 

1038 try: 

1039 return timedelta(seconds=seconds) 

1040 except OverflowError: 

1041 return None 

1042 

1043 

1044def dump_age(age: timedelta | int | None = None) -> str | None: 

1045 """Formats the duration as a base-10 integer. 

1046 

1047 :param age: should be an integer number of seconds, 

1048 a :class:`datetime.timedelta` object, or, 

1049 if the age is unknown, `None` (default). 

1050 """ 

1051 if age is None: 

1052 return None 

1053 if isinstance(age, timedelta): 

1054 age = int(age.total_seconds()) 

1055 else: 

1056 age = int(age) 

1057 

1058 if age < 0: 

1059 raise ValueError("age cannot be negative") 

1060 

1061 return str(age) 

1062 

1063 

1064def is_resource_modified( 

1065 environ: WSGIEnvironment, 

1066 etag: str | None = None, 

1067 data: bytes | None = None, 

1068 last_modified: datetime | str | None = None, 

1069 ignore_if_range: bool = True, 

1070) -> bool: 

1071 """Convenience method for conditional requests. 

1072 

1073 :param environ: the WSGI environment of the request to be checked. 

1074 :param etag: the etag for the response for comparison. 

1075 :param data: or alternatively the data of the response to automatically 

1076 generate an etag using :func:`generate_etag`. 

1077 :param last_modified: an optional date of the last modification. 

1078 :param ignore_if_range: If `False`, `If-Range` header will be taken into 

1079 account. 

1080 :return: `True` if the resource was modified, otherwise `False`. 

1081 

1082 .. versionchanged:: 2.0 

1083 SHA-1 is used to generate an etag value for the data. MD5 may 

1084 not be available in some environments. 

1085 

1086 .. versionchanged:: 1.0.0 

1087 The check is run for methods other than ``GET`` and ``HEAD``. 

1088 """ 

1089 return _sansio_http.is_resource_modified( 

1090 http_range=environ.get("HTTP_RANGE"), 

1091 http_if_range=environ.get("HTTP_IF_RANGE"), 

1092 http_if_modified_since=environ.get("HTTP_IF_MODIFIED_SINCE"), 

1093 http_if_none_match=environ.get("HTTP_IF_NONE_MATCH"), 

1094 http_if_match=environ.get("HTTP_IF_MATCH"), 

1095 etag=etag, 

1096 data=data, 

1097 last_modified=last_modified, 

1098 ignore_if_range=ignore_if_range, 

1099 ) 

1100 

1101 

1102def remove_entity_headers( 

1103 headers: ds.Headers | list[tuple[str, str]], 

1104 allowed: t.Iterable[str] = ("expires", "content-location"), 

1105) -> None: 

1106 """Remove all entity headers from a list or :class:`Headers` object. This 

1107 operation works in-place. `Expires` and `Content-Location` headers are 

1108 by default not removed. The reason for this is :rfc:`2616` section 

1109 10.3.5 which specifies some entity headers that should be sent. 

1110 

1111 .. versionchanged:: 0.5 

1112 added `allowed` parameter. 

1113 

1114 :param headers: a list or :class:`Headers` object. 

1115 :param allowed: a list of headers that should still be allowed even though 

1116 they are entity headers. 

1117 """ 

1118 allowed = {x.lower() for x in allowed} 

1119 headers[:] = [ 

1120 (key, value) 

1121 for key, value in headers 

1122 if not is_entity_header(key) or key.lower() in allowed 

1123 ] 

1124 

1125 

1126def remove_hop_by_hop_headers(headers: ds.Headers | list[tuple[str, str]]) -> None: 

1127 """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or 

1128 :class:`Headers` object. This operation works in-place. 

1129 

1130 .. versionadded:: 0.5 

1131 

1132 :param headers: a list or :class:`Headers` object. 

1133 """ 

1134 headers[:] = [ 

1135 (key, value) for key, value in headers if not is_hop_by_hop_header(key) 

1136 ] 

1137 

1138 

1139def is_entity_header(header: str) -> bool: 

1140 """Check if a header is an entity header. 

1141 

1142 .. versionadded:: 0.5 

1143 

1144 :param header: the header to test. 

1145 :return: `True` if it's an entity header, `False` otherwise. 

1146 """ 

1147 return header.lower() in _entity_headers 

1148 

1149 

1150def is_hop_by_hop_header(header: str) -> bool: 

1151 """Check if a header is an HTTP/1.1 "Hop-by-Hop" header. 

1152 

1153 .. versionadded:: 0.5 

1154 

1155 :param header: the header to test. 

1156 :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise. 

1157 """ 

1158 return header.lower() in _hop_by_hop_headers 

1159 

1160 

1161def parse_cookie( 

1162 header: WSGIEnvironment | str | None, 

1163 cls: type[ds.MultiDict] | None = None, 

1164) -> ds.MultiDict[str, str]: 

1165 """Parse a cookie from a string or WSGI environ. 

1166 

1167 The same key can be provided multiple times, the values are stored 

1168 in-order. The default :class:`MultiDict` will have the first value 

1169 first, and all values can be retrieved with 

1170 :meth:`MultiDict.getlist`. 

1171 

1172 :param header: The cookie header as a string, or a WSGI environ dict 

1173 with a ``HTTP_COOKIE`` key. 

1174 :param cls: A dict-like class to store the parsed cookies in. 

1175 Defaults to :class:`MultiDict`. 

1176 

1177 .. versionchanged:: 3.0 

1178 Passing bytes, and the ``charset`` and ``errors`` parameters, were removed. 

1179 

1180 .. versionchanged:: 1.0 

1181 Returns a :class:`MultiDict` instead of a ``TypeConversionDict``. 

1182 

1183 .. versionchanged:: 0.5 

1184 Returns a :class:`TypeConversionDict` instead of a regular dict. The ``cls`` 

1185 parameter was added. 

1186 """ 

1187 if isinstance(header, dict): 

1188 cookie = header.get("HTTP_COOKIE") 

1189 else: 

1190 cookie = header 

1191 

1192 if cookie: 

1193 cookie = cookie.encode("latin1").decode() 

1194 

1195 return _sansio_http.parse_cookie(cookie=cookie, cls=cls) 

1196 

1197 

1198_cookie_no_quote_re = re.compile(r"[\w!#$%&'()*+\-./:<=>?@\[\]^`{|}~]*", re.A) 

1199_cookie_slash_re = re.compile(rb"[\x00-\x19\",;\\\x7f-\xff]", re.A) 

1200_cookie_slash_map = {b'"': b'\\"', b"\\": b"\\\\"} 

1201_cookie_slash_map.update( 

1202 (v.to_bytes(1, "big"), b"\\%03o" % v) 

1203 for v in [*range(0x20), *b",;", *range(0x7F, 256)] 

1204) 

1205 

1206 

1207def dump_cookie( 

1208 key: str, 

1209 value: str = "", 

1210 max_age: timedelta | int | None = None, 

1211 expires: str | datetime | int | float | None = None, 

1212 path: str | None = "/", 

1213 domain: str | None = None, 

1214 secure: bool = False, 

1215 httponly: bool = False, 

1216 sync_expires: bool = True, 

1217 max_size: int = 4093, 

1218 samesite: str | None = None, 

1219) -> str: 

1220 """Create a Set-Cookie header without the ``Set-Cookie`` prefix. 

1221 

1222 The return value is usually restricted to ascii as the vast majority 

1223 of values are properly escaped, but that is no guarantee. It's 

1224 tunneled through latin1 as required by :pep:`3333`. 

1225 

1226 The return value is not ASCII safe if the key contains unicode 

1227 characters. This is technically against the specification but 

1228 happens in the wild. It's strongly recommended to not use 

1229 non-ASCII values for the keys. 

1230 

1231 :param max_age: should be a number of seconds, or `None` (default) if 

1232 the cookie should last only as long as the client's 

1233 browser session. Additionally `timedelta` objects 

1234 are accepted, too. 

1235 :param expires: should be a `datetime` object or unix timestamp. 

1236 :param path: limits the cookie to a given path, per default it will 

1237 span the whole domain. 

1238 :param domain: Use this if you want to set a cross-domain cookie. For 

1239 example, ``domain="example.com"`` will set a cookie 

1240 that is readable by the domain ``www.example.com``, 

1241 ``foo.example.com`` etc. Otherwise, a cookie will only 

1242 be readable by the domain that set it. 

1243 :param secure: The cookie will only be available via HTTPS 

1244 :param httponly: disallow JavaScript to access the cookie. This is an 

1245 extension to the cookie standard and probably not 

1246 supported by all browsers. 

1247 :param charset: the encoding for string values. 

1248 :param sync_expires: automatically set expires if max_age is defined 

1249 but expires not. 

1250 :param max_size: Warn if the final header value exceeds this size. The 

1251 default, 4093, should be safely `supported by most browsers 

1252 <cookie_>`_. Set to 0 to disable this check. 

1253 :param samesite: Limits the scope of the cookie such that it will 

1254 only be attached to requests if those requests are same-site. 

1255 

1256 .. _`cookie`: http://browsercookielimits.squawky.net/ 

1257 

1258 .. versionchanged:: 3.0 

1259 Passing bytes, and the ``charset`` parameter, were removed. 

1260 

1261 .. versionchanged:: 2.3.3 

1262 The ``path`` parameter is ``/`` by default. 

1263 

1264 .. versionchanged:: 2.3.1 

1265 The value allows more characters without quoting. 

1266 

1267 .. versionchanged:: 2.3 

1268 ``localhost`` and other names without a dot are allowed for the domain. A 

1269 leading dot is ignored. 

1270 

1271 .. versionchanged:: 2.3 

1272 The ``path`` parameter is ``None`` by default. 

1273 

1274 .. versionchanged:: 1.0.0 

1275 The string ``'None'`` is accepted for ``samesite``. 

1276 """ 

1277 if path is not None: 

1278 # safe = https://url.spec.whatwg.org/#url-path-segment-string 

1279 # as well as percent for things that are already quoted 

1280 # excluding semicolon since it's part of the header syntax 

1281 path = quote(path, safe="%!$&'()*+,/:=@") 

1282 

1283 if domain: 

1284 domain = domain.partition(":")[0].lstrip(".").encode("idna").decode("ascii") 

1285 

1286 if isinstance(max_age, timedelta): 

1287 max_age = int(max_age.total_seconds()) 

1288 

1289 if expires is not None: 

1290 if not isinstance(expires, str): 

1291 expires = http_date(expires) 

1292 elif max_age is not None and sync_expires: 

1293 expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age) 

1294 

1295 if samesite is not None: 

1296 samesite = samesite.title() 

1297 

1298 if samesite not in {"Strict", "Lax", "None"}: 

1299 raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.") 

1300 

1301 # Quote value if it contains characters not allowed by RFC 6265. Slash-escape with 

1302 # three octal digits, which matches http.cookies, although the RFC suggests base64. 

1303 if not _cookie_no_quote_re.fullmatch(value): 

1304 # Work with bytes here, since a UTF-8 character could be multiple bytes. 

1305 value = _cookie_slash_re.sub( 

1306 lambda m: _cookie_slash_map[m.group()], value.encode() 

1307 ).decode("ascii") 

1308 value = f'"{value}"' 

1309 

1310 # Send a non-ASCII key as mojibake. Everything else should already be ASCII. 

1311 # TODO Remove encoding dance, it seems like clients accept UTF-8 keys 

1312 buf = [f"{key.encode().decode('latin1')}={value}"] 

1313 

1314 for k, v in ( 

1315 ("Domain", domain), 

1316 ("Expires", expires), 

1317 ("Max-Age", max_age), 

1318 ("Secure", secure), 

1319 ("HttpOnly", httponly), 

1320 ("Path", path), 

1321 ("SameSite", samesite), 

1322 ): 

1323 if v is None or v is False: 

1324 continue 

1325 

1326 if v is True: 

1327 buf.append(k) 

1328 continue 

1329 

1330 buf.append(f"{k}={v}") 

1331 

1332 rv = "; ".join(buf) 

1333 

1334 # Warn if the final value of the cookie is larger than the limit. If the cookie is 

1335 # too large, then it may be silently ignored by the browser, which can be quite hard 

1336 # to debug. 

1337 cookie_size = len(rv) 

1338 

1339 if max_size and cookie_size > max_size: 

1340 value_size = len(value) 

1341 warnings.warn( 

1342 f"The '{key}' cookie is too large: the value was {value_size} bytes but the" 

1343 f" header required {cookie_size - value_size} extra bytes. The final size" 

1344 f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may" 

1345 " silently ignore cookies larger than this.", 

1346 stacklevel=2, 

1347 ) 

1348 

1349 return rv 

1350 

1351 

1352def is_byte_range_valid( 

1353 start: int | None, stop: int | None, length: int | None 

1354) -> bool: 

1355 """Checks if a given byte content range is valid for the given length. 

1356 

1357 .. versionadded:: 0.7 

1358 """ 

1359 if (start is None) != (stop is None): 

1360 return False 

1361 elif start is None: 

1362 return length is None or length >= 0 

1363 elif length is None: 

1364 return 0 <= start < stop # type: ignore 

1365 elif start >= stop: # type: ignore 

1366 return False 

1367 return 0 <= start < length 

1368 

1369 

1370# circular dependencies 

1371from . import datastructures as ds 

1372from .sansio import http as _sansio_http