Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/http.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

425 statements  

1from __future__ import annotations 

2 

3import email.utils 

4import re 

5import typing as t 

6import warnings 

7from datetime import date 

8from datetime import datetime 

9from datetime import time 

10from datetime import timedelta 

11from datetime import timezone 

12from enum import Enum 

13from hashlib import sha1 

14from time import mktime 

15from time import struct_time 

16from urllib.parse import quote 

17from urllib.parse import unquote 

18from urllib.request import parse_http_list as _parse_list_header 

19 

20from ._internal import _dt_as_utc 

21from ._internal import _plain_int 

22 

23if t.TYPE_CHECKING: 

24 from _typeshed.wsgi import WSGIEnvironment 

25 

26_token_chars = frozenset( 

27 "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~" 

28) 

29_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)') 

30_entity_headers = frozenset( 

31 [ 

32 "allow", 

33 "content-encoding", 

34 "content-language", 

35 "content-length", 

36 "content-location", 

37 "content-md5", 

38 "content-range", 

39 "content-type", 

40 "expires", 

41 "last-modified", 

42 ] 

43) 

44_hop_by_hop_headers = frozenset( 

45 [ 

46 "connection", 

47 "keep-alive", 

48 "proxy-authenticate", 

49 "proxy-authorization", 

50 "te", 

51 "trailer", 

52 "transfer-encoding", 

53 "upgrade", 

54 ] 

55) 

56HTTP_STATUS_CODES = { 

57 100: "Continue", 

58 101: "Switching Protocols", 

59 102: "Processing", 

60 103: "Early Hints", # see RFC 8297 

61 200: "OK", 

62 201: "Created", 

63 202: "Accepted", 

64 203: "Non Authoritative Information", 

65 204: "No Content", 

66 205: "Reset Content", 

67 206: "Partial Content", 

68 207: "Multi Status", 

69 208: "Already Reported", # see RFC 5842 

70 226: "IM Used", # see RFC 3229 

71 300: "Multiple Choices", 

72 301: "Moved Permanently", 

73 302: "Found", 

74 303: "See Other", 

75 304: "Not Modified", 

76 305: "Use Proxy", 

77 306: "Switch Proxy", # unused 

78 307: "Temporary Redirect", 

79 308: "Permanent Redirect", 

80 400: "Bad Request", 

81 401: "Unauthorized", 

82 402: "Payment Required", # unused 

83 403: "Forbidden", 

84 404: "Not Found", 

85 405: "Method Not Allowed", 

86 406: "Not Acceptable", 

87 407: "Proxy Authentication Required", 

88 408: "Request Timeout", 

89 409: "Conflict", 

90 410: "Gone", 

91 411: "Length Required", 

92 412: "Precondition Failed", 

93 413: "Request Entity Too Large", 

94 414: "Request URI Too Long", 

95 415: "Unsupported Media Type", 

96 416: "Requested Range Not Satisfiable", 

97 417: "Expectation Failed", 

98 418: "I'm a teapot", # see RFC 2324 

99 421: "Misdirected Request", # see RFC 7540 

100 422: "Unprocessable Entity", 

101 423: "Locked", 

102 424: "Failed Dependency", 

103 425: "Too Early", # see RFC 8470 

104 426: "Upgrade Required", 

105 428: "Precondition Required", # see RFC 6585 

106 429: "Too Many Requests", 

107 431: "Request Header Fields Too Large", 

108 449: "Retry With", # proprietary MS extension 

109 451: "Unavailable For Legal Reasons", 

110 500: "Internal Server Error", 

111 501: "Not Implemented", 

112 502: "Bad Gateway", 

113 503: "Service Unavailable", 

114 504: "Gateway Timeout", 

115 505: "HTTP Version Not Supported", 

116 506: "Variant Also Negotiates", # see RFC 2295 

117 507: "Insufficient Storage", 

118 508: "Loop Detected", # see RFC 5842 

119 510: "Not Extended", 

120 511: "Network Authentication Failed", 

121} 

122 

123 

124class COEP(Enum): 

125 """Cross Origin Embedder Policies""" 

126 

127 UNSAFE_NONE = "unsafe-none" 

128 REQUIRE_CORP = "require-corp" 

129 

130 

131class COOP(Enum): 

132 """Cross Origin Opener Policies""" 

133 

134 UNSAFE_NONE = "unsafe-none" 

135 SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups" 

136 SAME_ORIGIN = "same-origin" 

137 

138 

139def quote_header_value(value: t.Any, allow_token: bool = True) -> str: 

140 """Add double quotes around a header value. If the header contains only ASCII token 

141 characters, it will be returned unchanged. If the header contains ``"`` or ``\\`` 

142 characters, they will be escaped with an additional ``\\`` character. 

143 

144 This is the reverse of :func:`unquote_header_value`. 

145 

146 :param value: The value to quote. Will be converted to a string. 

147 :param allow_token: Disable to quote the value even if it only has token characters. 

148 

149 .. versionchanged:: 3.0 

150 Passing bytes is not supported. 

151 

152 .. versionchanged:: 3.0 

153 The ``extra_chars`` parameter is removed. 

154 

155 .. versionchanged:: 2.3 

156 The value is quoted if it is the empty string. 

157 

158 .. versionadded:: 0.5 

159 """ 

160 value_str = str(value) 

161 

162 if not value_str: 

163 return '""' 

164 

165 if allow_token: 

166 token_chars = _token_chars 

167 

168 if token_chars.issuperset(value_str): 

169 return value_str 

170 

171 value_str = value_str.replace("\\", "\\\\").replace('"', '\\"') 

172 return f'"{value_str}"' 

173 

174 

175def unquote_header_value(value: str) -> str: 

176 """Remove double quotes and decode slash-escaped ``"`` and ``\\`` characters in a 

177 header value. 

178 

179 This is the reverse of :func:`quote_header_value`. 

180 

181 :param value: The header value to unquote. 

182 

183 .. versionchanged:: 3.0 

184 The ``is_filename`` parameter is removed. 

185 """ 

186 if len(value) >= 2 and value[0] == value[-1] == '"': 

187 value = value[1:-1] 

188 return value.replace("\\\\", "\\").replace('\\"', '"') 

189 

190 return value 

191 

192 

193def dump_options_header(header: str | None, options: t.Mapping[str, t.Any]) -> str: 

194 """Produce a header value and ``key=value`` parameters separated by semicolons 

195 ``;``. For example, the ``Content-Type`` header. 

196 

197 .. code-block:: python 

198 

199 dump_options_header("text/html", {"charset": "UTF-8"}) 

200 'text/html; charset=UTF-8' 

201 

202 This is the reverse of :func:`parse_options_header`. 

203 

204 If a value contains non-token characters, it will be quoted. 

205 

206 If a value is ``None``, the parameter is skipped. 

207 

208 In some keys for some headers, a UTF-8 value can be encoded using a special 

209 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will 

210 not produce that format automatically, but if a given key ends with an asterisk 

211 ``*``, the value is assumed to have that form and will not be quoted further. 

212 

213 :param header: The primary header value. 

214 :param options: Parameters to encode as ``key=value`` pairs. 

215 

216 .. versionchanged:: 2.3 

217 Keys with ``None`` values are skipped rather than treated as a bare key. 

218 

219 .. versionchanged:: 2.2.3 

220 If a key ends with ``*``, its value will not be quoted. 

221 """ 

222 segments = [] 

223 

224 if header is not None: 

225 segments.append(header) 

226 

227 for key, value in options.items(): 

228 if value is None: 

229 continue 

230 

231 if key[-1] == "*": 

232 segments.append(f"{key}={value}") 

233 else: 

234 segments.append(f"{key}={quote_header_value(value)}") 

235 

236 return "; ".join(segments) 

237 

238 

239def dump_header(iterable: dict[str, t.Any] | t.Iterable[t.Any]) -> str: 

240 """Produce a header value from a list of items or ``key=value`` pairs, separated by 

241 commas ``,``. 

242 

243 This is the reverse of :func:`parse_list_header`, :func:`parse_dict_header`, and 

244 :func:`parse_set_header`. 

245 

246 If a value contains non-token characters, it will be quoted. 

247 

248 If a value is ``None``, the key is output alone. 

249 

250 In some keys for some headers, a UTF-8 value can be encoded using a special 

251 ``key*=UTF-8''value`` form, where ``value`` is percent encoded. This function will 

252 not produce that format automatically, but if a given key ends with an asterisk 

253 ``*``, the value is assumed to have that form and will not be quoted further. 

254 

255 .. code-block:: python 

256 

257 dump_header(["foo", "bar baz"]) 

258 'foo, "bar baz"' 

259 

260 dump_header({"foo": "bar baz"}) 

261 'foo="bar baz"' 

262 

263 :param iterable: The items to create a header from. 

264 

265 .. versionchanged:: 3.0 

266 The ``allow_token`` parameter is removed. 

267 

268 .. versionchanged:: 2.2.3 

269 If a key ends with ``*``, its value will not be quoted. 

270 """ 

271 if isinstance(iterable, dict): 

272 items = [] 

273 

274 for key, value in iterable.items(): 

275 if value is None: 

276 items.append(key) 

277 elif key[-1] == "*": 

278 items.append(f"{key}={value}") 

279 else: 

280 items.append(f"{key}={quote_header_value(value)}") 

281 else: 

282 items = [quote_header_value(x) for x in iterable] 

283 

284 return ", ".join(items) 

285 

286 

287def dump_csp_header(header: ds.ContentSecurityPolicy) -> str: 

288 """Dump a Content Security Policy header. 

289 

290 These are structured into policies such as "default-src 'self'; 

291 script-src 'self'". 

292 

293 .. versionadded:: 1.0.0 

294 Support for Content Security Policy headers was added. 

295 

296 """ 

297 return "; ".join(f"{key} {value}" for key, value in header.items()) 

298 

299 

300def parse_list_header(value: str) -> list[str]: 

301 """Parse a header value that consists of a list of comma separated items according 

302 to `RFC 9110 <https://httpwg.org/specs/rfc9110.html#abnf.extension>`__. 

303 

304 This extends :func:`urllib.request.parse_http_list` to remove surrounding quotes 

305 from values. 

306 

307 .. code-block:: python 

308 

309 parse_list_header('token, "quoted value"') 

310 ['token', 'quoted value'] 

311 

312 This is the reverse of :func:`dump_header`. 

313 

314 :param value: The header value to parse. 

315 """ 

316 result = [] 

317 

318 for item in _parse_list_header(value): 

319 if len(item) >= 2 and item[0] == item[-1] == '"': 

320 item = item[1:-1] 

321 

322 result.append(item) 

323 

324 return result 

325 

326 

327def parse_dict_header(value: str) -> dict[str, str | None]: 

328 """Parse a list header using :func:`parse_list_header`, then parse each item as a 

329 ``key=value`` pair. 

330 

331 .. code-block:: python 

332 

333 parse_dict_header('a=b, c="d, e", f') 

334 {"a": "b", "c": "d, e", "f": None} 

335 

336 This is the reverse of :func:`dump_header`. 

337 

338 If a key does not have a value, it is ``None``. 

339 

340 This handles charsets for values as described in 

341 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__. Only ASCII, UTF-8, 

342 and ISO-8859-1 charsets are accepted, otherwise the value remains quoted. 

343 

344 :param value: The header value to parse. 

345 

346 .. versionchanged:: 3.0 

347 Passing bytes is not supported. 

348 

349 .. versionchanged:: 3.0 

350 The ``cls`` argument is removed. 

351 

352 .. versionchanged:: 2.3 

353 Added support for ``key*=charset''value`` encoded items. 

354 

355 .. versionchanged:: 0.9 

356 The ``cls`` argument was added. 

357 """ 

358 result: dict[str, str | None] = {} 

359 

360 for item in parse_list_header(value): 

361 key, has_value, value = item.partition("=") 

362 key = key.strip() 

363 

364 if not has_value: 

365 result[key] = None 

366 continue 

367 

368 value = value.strip() 

369 encoding: str | None = None 

370 

371 if key[-1] == "*": 

372 # key*=charset''value becomes key=value, where value is percent encoded 

373 # adapted from parse_options_header, without the continuation handling 

374 key = key[:-1] 

375 match = _charset_value_re.match(value) 

376 

377 if match: 

378 # If there is a charset marker in the value, split it off. 

379 encoding, value = match.groups() 

380 encoding = encoding.lower() 

381 

382 # A safe list of encodings. Modern clients should only send ASCII or UTF-8. 

383 # This list will not be extended further. An invalid encoding will leave the 

384 # value quoted. 

385 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}: 

386 # invalid bytes are replaced during unquoting 

387 value = unquote(value, encoding=encoding) 

388 

389 if len(value) >= 2 and value[0] == value[-1] == '"': 

390 value = value[1:-1] 

391 

392 result[key] = value 

393 

394 return result 

395 

396 

397# https://httpwg.org/specs/rfc9110.html#parameter 

398_parameter_re = re.compile( 

399 r""" 

400 # don't match multiple empty parts, that causes backtracking 

401 \s*;\s* # find the part delimiter 

402 (?: 

403 ([\w!#$%&'*+\-.^`|~]+) # key, one or more token chars 

404 = # equals, with no space on either side 

405 ( # value, token or quoted string 

406 [\w!#$%&'*+\-.^`|~]+ # one or more token chars 

407 | 

408 "(?:\\\\|\\"|.)*?" # quoted string, consuming slash escapes 

409 ) 

410 )? # optionally match key=value, to account for empty parts 

411 """, 

412 re.ASCII | re.VERBOSE, 

413) 

414# https://www.rfc-editor.org/rfc/rfc2231#section-4 

415_charset_value_re = re.compile( 

416 r""" 

417 ([\w!#$%&*+\-.^`|~]*)' # charset part, could be empty 

418 [\w!#$%&*+\-.^`|~]*' # don't care about language part, usually empty 

419 ([\w!#$%&'*+\-.^`|~]+) # one or more token chars with percent encoding 

420 """, 

421 re.ASCII | re.VERBOSE, 

422) 

423# https://www.rfc-editor.org/rfc/rfc2231#section-3 

424_continuation_re = re.compile(r"\*(\d+)$", re.ASCII) 

425 

426 

427def parse_options_header(value: str | None) -> tuple[str, dict[str, str]]: 

428 """Parse a header that consists of a value with ``key=value`` parameters separated 

429 by semicolons ``;``. For example, the ``Content-Type`` header. 

430 

431 .. code-block:: python 

432 

433 parse_options_header("text/html; charset=UTF-8") 

434 ('text/html', {'charset': 'UTF-8'}) 

435 

436 parse_options_header("") 

437 ("", {}) 

438 

439 This is the reverse of :func:`dump_options_header`. 

440 

441 This parses valid parameter parts as described in 

442 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#parameter>`__. Invalid parts are 

443 skipped. 

444 

445 This handles continuations and charsets as described in 

446 `RFC 2231 <https://www.rfc-editor.org/rfc/rfc2231#section-3>`__, although not as 

447 strictly as the RFC. Only ASCII, UTF-8, and ISO-8859-1 charsets are accepted, 

448 otherwise the value remains quoted. 

449 

450 Clients may not be consistent in how they handle a quote character within a quoted 

451 value. The `HTML Standard <https://html.spec.whatwg.org/#multipart-form-data>`__ 

452 replaces it with ``%22`` in multipart form data. 

453 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#quoted.strings>`__ uses backslash 

454 escapes in HTTP headers. Both are decoded to the ``"`` character. 

455 

456 Clients may not be consistent in how they handle non-ASCII characters. HTML 

457 documents must declare ``<meta charset=UTF-8>``, otherwise browsers may replace with 

458 HTML character references, which can be decoded using :func:`html.unescape`. 

459 

460 :param value: The header value to parse. 

461 :return: ``(value, options)``, where ``options`` is a dict 

462 

463 .. versionchanged:: 2.3 

464 Invalid parts, such as keys with no value, quoted keys, and incorrectly quoted 

465 values, are discarded instead of treating as ``None``. 

466 

467 .. versionchanged:: 2.3 

468 Only ASCII, UTF-8, and ISO-8859-1 are accepted for charset values. 

469 

470 .. versionchanged:: 2.3 

471 Escaped quotes in quoted values, like ``%22`` and ``\\"``, are handled. 

472 

473 .. versionchanged:: 2.2 

474 Option names are always converted to lowercase. 

475 

476 .. versionchanged:: 2.2 

477 The ``multiple`` parameter was removed. 

478 

479 .. versionchanged:: 0.15 

480 :rfc:`2231` parameter continuations are handled. 

481 

482 .. versionadded:: 0.5 

483 """ 

484 if value is None: 

485 return "", {} 

486 

487 value, _, rest = value.partition(";") 

488 value = value.strip() 

489 rest = rest.strip() 

490 

491 if not value or not rest: 

492 # empty (invalid) value, or value without options 

493 return value, {} 

494 

495 rest = f";{rest}" 

496 options: dict[str, str] = {} 

497 encoding: str | None = None 

498 continued_encoding: str | None = None 

499 

500 for pk, pv in _parameter_re.findall(rest): 

501 if not pk: 

502 # empty or invalid part 

503 continue 

504 

505 pk = pk.lower() 

506 

507 if pk[-1] == "*": 

508 # key*=charset''value becomes key=value, where value is percent encoded 

509 pk = pk[:-1] 

510 match = _charset_value_re.match(pv) 

511 

512 if match: 

513 # If there is a valid charset marker in the value, split it off. 

514 encoding, pv = match.groups() 

515 # This might be the empty string, handled next. 

516 encoding = encoding.lower() 

517 

518 # No charset marker, or marker with empty charset value. 

519 if not encoding: 

520 encoding = continued_encoding 

521 

522 # A safe list of encodings. Modern clients should only send ASCII or UTF-8. 

523 # This list will not be extended further. An invalid encoding will leave the 

524 # value quoted. 

525 if encoding in {"ascii", "us-ascii", "utf-8", "iso-8859-1"}: 

526 # Continuation parts don't require their own charset marker. This is 

527 # looser than the RFC, it will persist across different keys and allows 

528 # changing the charset during a continuation. But this implementation is 

529 # much simpler than tracking the full state. 

530 continued_encoding = encoding 

531 # invalid bytes are replaced during unquoting 

532 pv = unquote(pv, encoding=encoding) 

533 

534 # Remove quotes. At this point the value cannot be empty or a single quote. 

535 if pv[0] == pv[-1] == '"': 

536 # HTTP headers use slash, multipart form data uses percent 

537 pv = pv[1:-1].replace("\\\\", "\\").replace('\\"', '"').replace("%22", '"') 

538 

539 match = _continuation_re.search(pk) 

540 

541 if match: 

542 # key*0=a; key*1=b becomes key=ab 

543 pk = pk[: match.start()] 

544 options[pk] = options.get(pk, "") + pv 

545 else: 

546 options[pk] = pv 

547 

548 return value, options 

549 

550 

551_q_value_re = re.compile(r"-?\d+(\.\d+)?", re.ASCII) 

552_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept") 

553 

554 

555@t.overload 

556def parse_accept_header(value: str | None) -> ds.Accept: ... 

557 

558 

559@t.overload 

560def parse_accept_header(value: str | None, cls: type[_TAnyAccept]) -> _TAnyAccept: ... 

561 

562 

563def parse_accept_header( 

564 value: str | None, cls: type[_TAnyAccept] | None = None 

565) -> _TAnyAccept: 

566 """Parse an ``Accept`` header according to 

567 `RFC 9110 <https://httpwg.org/specs/rfc9110.html#field.accept>`__. 

568 

569 Returns an :class:`.Accept` instance, which can sort and inspect items based on 

570 their quality parameter. When parsing ``Accept-Charset``, ``Accept-Encoding``, or 

571 ``Accept-Language``, pass the appropriate :class:`.Accept` subclass. 

572 

573 :param value: The header value to parse. 

574 :param cls: The :class:`.Accept` class to wrap the result in. 

575 :return: An instance of ``cls``. 

576 

577 .. versionchanged:: 2.3 

578 Parse according to RFC 9110. Items with invalid ``q`` values are skipped. 

579 """ 

580 if cls is None: 

581 cls = t.cast(t.Type[_TAnyAccept], ds.Accept) 

582 

583 if not value: 

584 return cls(None) 

585 

586 result = [] 

587 

588 for item in parse_list_header(value): 

589 item, options = parse_options_header(item) 

590 

591 if "q" in options: 

592 # pop q, remaining options are reconstructed 

593 q_str = options.pop("q").strip() 

594 

595 if _q_value_re.fullmatch(q_str) is None: 

596 # ignore an invalid q 

597 continue 

598 

599 q = float(q_str) 

600 

601 if q < 0 or q > 1: 

602 # ignore an invalid q 

603 continue 

604 else: 

605 q = 1 

606 

607 if options: 

608 # reconstruct the media type with any options 

609 item = dump_options_header(item, options) 

610 

611 result.append((item, q)) 

612 

613 return cls(result) 

614 

615 

616_TAnyCC = t.TypeVar("_TAnyCC", bound="ds.cache_control._CacheControl") 

617 

618 

619@t.overload 

620def parse_cache_control_header( 

621 value: str | None, 

622 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None, 

623) -> ds.RequestCacheControl: ... 

624 

625 

626@t.overload 

627def parse_cache_control_header( 

628 value: str | None, 

629 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None, 

630 cls: type[_TAnyCC] = ..., 

631) -> _TAnyCC: ... 

632 

633 

634def parse_cache_control_header( 

635 value: str | None, 

636 on_update: t.Callable[[ds.cache_control._CacheControl], None] | None = None, 

637 cls: type[_TAnyCC] | None = None, 

638) -> _TAnyCC: 

639 """Parse a cache control header. The RFC differs between response and 

640 request cache control, this method does not. It's your responsibility 

641 to not use the wrong control statements. 

642 

643 .. versionadded:: 0.5 

644 The `cls` was added. If not specified an immutable 

645 :class:`~werkzeug.datastructures.RequestCacheControl` is returned. 

646 

647 :param value: a cache control header to be parsed. 

648 :param on_update: an optional callable that is called every time a value 

649 on the :class:`~werkzeug.datastructures.CacheControl` 

650 object is changed. 

651 :param cls: the class for the returned object. By default 

652 :class:`~werkzeug.datastructures.RequestCacheControl` is used. 

653 :return: a `cls` object. 

654 """ 

655 if cls is None: 

656 cls = t.cast("type[_TAnyCC]", ds.RequestCacheControl) 

657 

658 if not value: 

659 return cls((), on_update) 

660 

661 return cls(parse_dict_header(value), on_update) 

662 

663 

664_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy") 

665 

666 

667@t.overload 

668def parse_csp_header( 

669 value: str | None, 

670 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None, 

671) -> ds.ContentSecurityPolicy: ... 

672 

673 

674@t.overload 

675def parse_csp_header( 

676 value: str | None, 

677 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None, 

678 cls: type[_TAnyCSP] = ..., 

679) -> _TAnyCSP: ... 

680 

681 

682def parse_csp_header( 

683 value: str | None, 

684 on_update: t.Callable[[ds.ContentSecurityPolicy], None] | None = None, 

685 cls: type[_TAnyCSP] | None = None, 

686) -> _TAnyCSP: 

687 """Parse a Content Security Policy header. 

688 

689 .. versionadded:: 1.0.0 

690 Support for Content Security Policy headers was added. 

691 

692 :param value: a csp header to be parsed. 

693 :param on_update: an optional callable that is called every time a value 

694 on the object is changed. 

695 :param cls: the class for the returned object. By default 

696 :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used. 

697 :return: a `cls` object. 

698 """ 

699 if cls is None: 

700 cls = t.cast("type[_TAnyCSP]", ds.ContentSecurityPolicy) 

701 

702 if value is None: 

703 return cls((), on_update) 

704 

705 items = [] 

706 

707 for policy in value.split(";"): 

708 policy = policy.strip() 

709 

710 # Ignore badly formatted policies (no space) 

711 if " " in policy: 

712 directive, value = policy.strip().split(" ", 1) 

713 items.append((directive.strip(), value.strip())) 

714 

715 return cls(items, on_update) 

716 

717 

718def parse_set_header( 

719 value: str | None, 

720 on_update: t.Callable[[ds.HeaderSet], None] | None = None, 

721) -> ds.HeaderSet: 

722 """Parse a set-like header and return a 

723 :class:`~werkzeug.datastructures.HeaderSet` object: 

724 

725 >>> hs = parse_set_header('token, "quoted value"') 

726 

727 The return value is an object that treats the items case-insensitively 

728 and keeps the order of the items: 

729 

730 >>> 'TOKEN' in hs 

731 True 

732 >>> hs.index('quoted value') 

733 1 

734 >>> hs 

735 HeaderSet(['token', 'quoted value']) 

736 

737 To create a header from the :class:`HeaderSet` again, use the 

738 :func:`dump_header` function. 

739 

740 :param value: a set header to be parsed. 

741 :param on_update: an optional callable that is called every time a 

742 value on the :class:`~werkzeug.datastructures.HeaderSet` 

743 object is changed. 

744 :return: a :class:`~werkzeug.datastructures.HeaderSet` 

745 """ 

746 if not value: 

747 return ds.HeaderSet(None, on_update) 

748 return ds.HeaderSet(parse_list_header(value), on_update) 

749 

750 

751def parse_if_range_header(value: str | None) -> ds.IfRange: 

752 """Parses an if-range header which can be an etag or a date. Returns 

753 a :class:`~werkzeug.datastructures.IfRange` object. 

754 

755 .. versionchanged:: 2.0 

756 If the value represents a datetime, it is timezone-aware. 

757 

758 .. versionadded:: 0.7 

759 """ 

760 if not value: 

761 return ds.IfRange() 

762 date = parse_date(value) 

763 if date is not None: 

764 return ds.IfRange(date=date) 

765 # drop weakness information 

766 return ds.IfRange(unquote_etag(value)[0]) 

767 

768 

769def parse_range_header( 

770 value: str | None, make_inclusive: bool = True 

771) -> ds.Range | None: 

772 """Parses a range header into a :class:`~werkzeug.datastructures.Range` 

773 object. If the header is missing or malformed `None` is returned. 

774 `ranges` is a list of ``(start, stop)`` tuples where the ranges are 

775 non-inclusive. 

776 

777 .. versionadded:: 0.7 

778 """ 

779 if not value or "=" not in value: 

780 return None 

781 

782 ranges = [] 

783 last_end = 0 

784 units, rng = value.split("=", 1) 

785 units = units.strip().lower() 

786 

787 for item in rng.split(","): 

788 item = item.strip() 

789 if "-" not in item: 

790 return None 

791 if item.startswith("-"): 

792 if last_end < 0: 

793 return None 

794 try: 

795 begin = _plain_int(item) 

796 except ValueError: 

797 return None 

798 end = None 

799 last_end = -1 

800 elif "-" in item: 

801 begin_str, end_str = item.split("-", 1) 

802 begin_str = begin_str.strip() 

803 end_str = end_str.strip() 

804 

805 try: 

806 begin = _plain_int(begin_str) 

807 except ValueError: 

808 return None 

809 

810 if begin < last_end or last_end < 0: 

811 return None 

812 if end_str: 

813 try: 

814 end = _plain_int(end_str) + 1 

815 except ValueError: 

816 return None 

817 

818 if begin >= end: 

819 return None 

820 else: 

821 end = None 

822 last_end = end if end is not None else -1 

823 ranges.append((begin, end)) 

824 

825 return ds.Range(units, ranges) 

826 

827 

828def parse_content_range_header( 

829 value: str | None, 

830 on_update: t.Callable[[ds.ContentRange], None] | None = None, 

831) -> ds.ContentRange | None: 

832 """Parses a range header into a 

833 :class:`~werkzeug.datastructures.ContentRange` object or `None` if 

834 parsing is not possible. 

835 

836 .. versionadded:: 0.7 

837 

838 :param value: a content range header to be parsed. 

839 :param on_update: an optional callable that is called every time a value 

840 on the :class:`~werkzeug.datastructures.ContentRange` 

841 object is changed. 

842 """ 

843 if value is None: 

844 return None 

845 try: 

846 units, rangedef = (value or "").strip().split(None, 1) 

847 except ValueError: 

848 return None 

849 

850 if "/" not in rangedef: 

851 return None 

852 rng, length_str = rangedef.split("/", 1) 

853 if length_str == "*": 

854 length = None 

855 else: 

856 try: 

857 length = _plain_int(length_str) 

858 except ValueError: 

859 return None 

860 

861 if rng == "*": 

862 if not is_byte_range_valid(None, None, length): 

863 return None 

864 

865 return ds.ContentRange(units, None, None, length, on_update=on_update) 

866 elif "-" not in rng: 

867 return None 

868 

869 start_str, stop_str = rng.split("-", 1) 

870 try: 

871 start = _plain_int(start_str) 

872 stop = _plain_int(stop_str) + 1 

873 except ValueError: 

874 return None 

875 

876 if is_byte_range_valid(start, stop, length): 

877 return ds.ContentRange(units, start, stop, length, on_update=on_update) 

878 

879 return None 

880 

881 

882def quote_etag(etag: str, weak: bool = False) -> str: 

883 """Quote an etag. 

884 

885 :param etag: the etag to quote. 

886 :param weak: set to `True` to tag it "weak". 

887 """ 

888 if '"' in etag: 

889 raise ValueError("invalid etag") 

890 etag = f'"{etag}"' 

891 if weak: 

892 etag = f"W/{etag}" 

893 return etag 

894 

895 

896def unquote_etag( 

897 etag: str | None, 

898) -> tuple[str, bool] | tuple[None, None]: 

899 """Unquote a single etag: 

900 

901 >>> unquote_etag('W/"bar"') 

902 ('bar', True) 

903 >>> unquote_etag('"bar"') 

904 ('bar', False) 

905 

906 :param etag: the etag identifier to unquote. 

907 :return: a ``(etag, weak)`` tuple. 

908 """ 

909 if not etag: 

910 return None, None 

911 etag = etag.strip() 

912 weak = False 

913 if etag.startswith(("W/", "w/")): 

914 weak = True 

915 etag = etag[2:] 

916 if etag[:1] == etag[-1:] == '"': 

917 etag = etag[1:-1] 

918 return etag, weak 

919 

920 

921def parse_etags(value: str | None) -> ds.ETags: 

922 """Parse an etag header. 

923 

924 :param value: the tag header to parse 

925 :return: an :class:`~werkzeug.datastructures.ETags` object. 

926 """ 

927 if not value: 

928 return ds.ETags() 

929 strong = [] 

930 weak = [] 

931 end = len(value) 

932 pos = 0 

933 while pos < end: 

934 match = _etag_re.match(value, pos) 

935 if match is None: 

936 break 

937 is_weak, quoted, raw = match.groups() 

938 if raw == "*": 

939 return ds.ETags(star_tag=True) 

940 elif quoted: 

941 raw = quoted 

942 if is_weak: 

943 weak.append(raw) 

944 else: 

945 strong.append(raw) 

946 pos = match.end() 

947 return ds.ETags(strong, weak) 

948 

949 

950def generate_etag(data: bytes) -> str: 

951 """Generate an etag for some data. 

952 

953 .. versionchanged:: 2.0 

954 Use SHA-1. MD5 may not be available in some environments. 

955 """ 

956 return sha1(data).hexdigest() 

957 

958 

959def parse_date(value: str | None) -> datetime | None: 

960 """Parse an :rfc:`2822` date into a timezone-aware 

961 :class:`datetime.datetime` object, or ``None`` if parsing fails. 

962 

963 This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It 

964 returns ``None`` if parsing fails instead of raising an exception, 

965 and always returns a timezone-aware datetime object. If the string 

966 doesn't have timezone information, it is assumed to be UTC. 

967 

968 :param value: A string with a supported date format. 

969 

970 .. versionchanged:: 2.0 

971 Return a timezone-aware datetime object. Use 

972 ``email.utils.parsedate_to_datetime``. 

973 """ 

974 if value is None: 

975 return None 

976 

977 try: 

978 dt = email.utils.parsedate_to_datetime(value) 

979 except (TypeError, ValueError): 

980 return None 

981 

982 if dt.tzinfo is None: 

983 return dt.replace(tzinfo=timezone.utc) 

984 

985 return dt 

986 

987 

988def http_date( 

989 timestamp: datetime | date | int | float | struct_time | None = None, 

990) -> str: 

991 """Format a datetime object or timestamp into an :rfc:`2822` date 

992 string. 

993 

994 This is a wrapper for :func:`email.utils.format_datetime`. It 

995 assumes naive datetime objects are in UTC instead of raising an 

996 exception. 

997 

998 :param timestamp: The datetime or timestamp to format. Defaults to 

999 the current time. 

1000 

1001 .. versionchanged:: 2.0 

1002 Use ``email.utils.format_datetime``. Accept ``date`` objects. 

1003 """ 

1004 if isinstance(timestamp, date): 

1005 if not isinstance(timestamp, datetime): 

1006 # Assume plain date is midnight UTC. 

1007 timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc) 

1008 else: 

1009 # Ensure datetime is timezone-aware. 

1010 timestamp = _dt_as_utc(timestamp) 

1011 

1012 return email.utils.format_datetime(timestamp, usegmt=True) 

1013 

1014 if isinstance(timestamp, struct_time): 

1015 timestamp = mktime(timestamp) 

1016 

1017 return email.utils.formatdate(timestamp, usegmt=True) 

1018 

1019 

1020def parse_age(value: str | None = None) -> timedelta | None: 

1021 """Parses a base-10 integer count of seconds into a timedelta. 

1022 

1023 If parsing fails, the return value is `None`. 

1024 

1025 :param value: a string consisting of an integer represented in base-10 

1026 :return: a :class:`datetime.timedelta` object or `None`. 

1027 """ 

1028 if not value: 

1029 return None 

1030 try: 

1031 seconds = int(value) 

1032 except ValueError: 

1033 return None 

1034 if seconds < 0: 

1035 return None 

1036 try: 

1037 return timedelta(seconds=seconds) 

1038 except OverflowError: 

1039 return None 

1040 

1041 

1042def dump_age(age: timedelta | int | None = None) -> str | None: 

1043 """Formats the duration as a base-10 integer. 

1044 

1045 :param age: should be an integer number of seconds, 

1046 a :class:`datetime.timedelta` object, or, 

1047 if the age is unknown, `None` (default). 

1048 """ 

1049 if age is None: 

1050 return None 

1051 if isinstance(age, timedelta): 

1052 age = int(age.total_seconds()) 

1053 else: 

1054 age = int(age) 

1055 

1056 if age < 0: 

1057 raise ValueError("age cannot be negative") 

1058 

1059 return str(age) 

1060 

1061 

1062def is_resource_modified( 

1063 environ: WSGIEnvironment, 

1064 etag: str | None = None, 

1065 data: bytes | None = None, 

1066 last_modified: datetime | str | None = None, 

1067 ignore_if_range: bool = True, 

1068) -> bool: 

1069 """Convenience method for conditional requests. 

1070 

1071 :param environ: the WSGI environment of the request to be checked. 

1072 :param etag: the etag for the response for comparison. 

1073 :param data: or alternatively the data of the response to automatically 

1074 generate an etag using :func:`generate_etag`. 

1075 :param last_modified: an optional date of the last modification. 

1076 :param ignore_if_range: If `False`, `If-Range` header will be taken into 

1077 account. 

1078 :return: `True` if the resource was modified, otherwise `False`. 

1079 

1080 .. versionchanged:: 2.0 

1081 SHA-1 is used to generate an etag value for the data. MD5 may 

1082 not be available in some environments. 

1083 

1084 .. versionchanged:: 1.0.0 

1085 The check is run for methods other than ``GET`` and ``HEAD``. 

1086 """ 

1087 return _sansio_http.is_resource_modified( 

1088 http_range=environ.get("HTTP_RANGE"), 

1089 http_if_range=environ.get("HTTP_IF_RANGE"), 

1090 http_if_modified_since=environ.get("HTTP_IF_MODIFIED_SINCE"), 

1091 http_if_none_match=environ.get("HTTP_IF_NONE_MATCH"), 

1092 http_if_match=environ.get("HTTP_IF_MATCH"), 

1093 etag=etag, 

1094 data=data, 

1095 last_modified=last_modified, 

1096 ignore_if_range=ignore_if_range, 

1097 ) 

1098 

1099 

1100def remove_entity_headers( 

1101 headers: ds.Headers | list[tuple[str, str]], 

1102 allowed: t.Iterable[str] = ("expires", "content-location"), 

1103) -> None: 

1104 """Remove all entity headers from a list or :class:`Headers` object. This 

1105 operation works in-place. `Expires` and `Content-Location` headers are 

1106 by default not removed. The reason for this is :rfc:`2616` section 

1107 10.3.5 which specifies some entity headers that should be sent. 

1108 

1109 .. versionchanged:: 0.5 

1110 added `allowed` parameter. 

1111 

1112 :param headers: a list or :class:`Headers` object. 

1113 :param allowed: a list of headers that should still be allowed even though 

1114 they are entity headers. 

1115 """ 

1116 allowed = {x.lower() for x in allowed} 

1117 headers[:] = [ 

1118 (key, value) 

1119 for key, value in headers 

1120 if not is_entity_header(key) or key.lower() in allowed 

1121 ] 

1122 

1123 

1124def remove_hop_by_hop_headers(headers: ds.Headers | list[tuple[str, str]]) -> None: 

1125 """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or 

1126 :class:`Headers` object. This operation works in-place. 

1127 

1128 .. versionadded:: 0.5 

1129 

1130 :param headers: a list or :class:`Headers` object. 

1131 """ 

1132 headers[:] = [ 

1133 (key, value) for key, value in headers if not is_hop_by_hop_header(key) 

1134 ] 

1135 

1136 

1137def is_entity_header(header: str) -> bool: 

1138 """Check if a header is an entity header. 

1139 

1140 .. versionadded:: 0.5 

1141 

1142 :param header: the header to test. 

1143 :return: `True` if it's an entity header, `False` otherwise. 

1144 """ 

1145 return header.lower() in _entity_headers 

1146 

1147 

1148def is_hop_by_hop_header(header: str) -> bool: 

1149 """Check if a header is an HTTP/1.1 "Hop-by-Hop" header. 

1150 

1151 .. versionadded:: 0.5 

1152 

1153 :param header: the header to test. 

1154 :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise. 

1155 """ 

1156 return header.lower() in _hop_by_hop_headers 

1157 

1158 

1159def parse_cookie( 

1160 header: WSGIEnvironment | str | None, 

1161 cls: type[ds.MultiDict[str, str]] | None = None, 

1162) -> ds.MultiDict[str, str]: 

1163 """Parse a cookie from a string or WSGI environ. 

1164 

1165 The same key can be provided multiple times, the values are stored 

1166 in-order. The default :class:`MultiDict` will have the first value 

1167 first, and all values can be retrieved with 

1168 :meth:`MultiDict.getlist`. 

1169 

1170 :param header: The cookie header as a string, or a WSGI environ dict 

1171 with a ``HTTP_COOKIE`` key. 

1172 :param cls: A dict-like class to store the parsed cookies in. 

1173 Defaults to :class:`MultiDict`. 

1174 

1175 .. versionchanged:: 3.0 

1176 Passing bytes, and the ``charset`` and ``errors`` parameters, were removed. 

1177 

1178 .. versionchanged:: 1.0 

1179 Returns a :class:`MultiDict` instead of a ``TypeConversionDict``. 

1180 

1181 .. versionchanged:: 0.5 

1182 Returns a :class:`TypeConversionDict` instead of a regular dict. The ``cls`` 

1183 parameter was added. 

1184 """ 

1185 if isinstance(header, dict): 

1186 cookie = header.get("HTTP_COOKIE") 

1187 else: 

1188 cookie = header 

1189 

1190 if cookie: 

1191 cookie = cookie.encode("latin1").decode() 

1192 

1193 return _sansio_http.parse_cookie(cookie=cookie, cls=cls) 

1194 

1195 

1196_cookie_no_quote_re = re.compile(r"[\w!#$%&'()*+\-./:<=>?@\[\]^`{|}~]*", re.A) 

1197_cookie_slash_re = re.compile(rb"[\x00-\x19\",;\\\x7f-\xff]", re.A) 

1198_cookie_slash_map = {b'"': b'\\"', b"\\": b"\\\\"} 

1199_cookie_slash_map.update( 

1200 (v.to_bytes(1, "big"), b"\\%03o" % v) 

1201 for v in [*range(0x20), *b",;", *range(0x7F, 256)] 

1202) 

1203 

1204 

1205def dump_cookie( 

1206 key: str, 

1207 value: str = "", 

1208 max_age: timedelta | int | None = None, 

1209 expires: str | datetime | int | float | None = None, 

1210 path: str | None = "/", 

1211 domain: str | None = None, 

1212 secure: bool = False, 

1213 httponly: bool = False, 

1214 sync_expires: bool = True, 

1215 max_size: int = 4093, 

1216 samesite: str | None = None, 

1217) -> str: 

1218 """Create a Set-Cookie header without the ``Set-Cookie`` prefix. 

1219 

1220 The return value is usually restricted to ascii as the vast majority 

1221 of values are properly escaped, but that is no guarantee. It's 

1222 tunneled through latin1 as required by :pep:`3333`. 

1223 

1224 The return value is not ASCII safe if the key contains unicode 

1225 characters. This is technically against the specification but 

1226 happens in the wild. It's strongly recommended to not use 

1227 non-ASCII values for the keys. 

1228 

1229 :param max_age: should be a number of seconds, or `None` (default) if 

1230 the cookie should last only as long as the client's 

1231 browser session. Additionally `timedelta` objects 

1232 are accepted, too. 

1233 :param expires: should be a `datetime` object or unix timestamp. 

1234 :param path: limits the cookie to a given path, per default it will 

1235 span the whole domain. 

1236 :param domain: Use this if you want to set a cross-domain cookie. For 

1237 example, ``domain="example.com"`` will set a cookie 

1238 that is readable by the domain ``www.example.com``, 

1239 ``foo.example.com`` etc. Otherwise, a cookie will only 

1240 be readable by the domain that set it. 

1241 :param secure: The cookie will only be available via HTTPS 

1242 :param httponly: disallow JavaScript to access the cookie. This is an 

1243 extension to the cookie standard and probably not 

1244 supported by all browsers. 

1245 :param charset: the encoding for string values. 

1246 :param sync_expires: automatically set expires if max_age is defined 

1247 but expires not. 

1248 :param max_size: Warn if the final header value exceeds this size. The 

1249 default, 4093, should be safely `supported by most browsers 

1250 <cookie_>`_. Set to 0 to disable this check. 

1251 :param samesite: Limits the scope of the cookie such that it will 

1252 only be attached to requests if those requests are same-site. 

1253 

1254 .. _`cookie`: http://browsercookielimits.squawky.net/ 

1255 

1256 .. versionchanged:: 3.0 

1257 Passing bytes, and the ``charset`` parameter, were removed. 

1258 

1259 .. versionchanged:: 2.3.3 

1260 The ``path`` parameter is ``/`` by default. 

1261 

1262 .. versionchanged:: 2.3.1 

1263 The value allows more characters without quoting. 

1264 

1265 .. versionchanged:: 2.3 

1266 ``localhost`` and other names without a dot are allowed for the domain. A 

1267 leading dot is ignored. 

1268 

1269 .. versionchanged:: 2.3 

1270 The ``path`` parameter is ``None`` by default. 

1271 

1272 .. versionchanged:: 1.0.0 

1273 The string ``'None'`` is accepted for ``samesite``. 

1274 """ 

1275 if path is not None: 

1276 # safe = https://url.spec.whatwg.org/#url-path-segment-string 

1277 # as well as percent for things that are already quoted 

1278 # excluding semicolon since it's part of the header syntax 

1279 path = quote(path, safe="%!$&'()*+,/:=@") 

1280 

1281 if domain: 

1282 domain = domain.partition(":")[0].lstrip(".").encode("idna").decode("ascii") 

1283 

1284 if isinstance(max_age, timedelta): 

1285 max_age = int(max_age.total_seconds()) 

1286 

1287 if expires is not None: 

1288 if not isinstance(expires, str): 

1289 expires = http_date(expires) 

1290 elif max_age is not None and sync_expires: 

1291 expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age) 

1292 

1293 if samesite is not None: 

1294 samesite = samesite.title() 

1295 

1296 if samesite not in {"Strict", "Lax", "None"}: 

1297 raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.") 

1298 

1299 # Quote value if it contains characters not allowed by RFC 6265. Slash-escape with 

1300 # three octal digits, which matches http.cookies, although the RFC suggests base64. 

1301 if not _cookie_no_quote_re.fullmatch(value): 

1302 # Work with bytes here, since a UTF-8 character could be multiple bytes. 

1303 value = _cookie_slash_re.sub( 

1304 lambda m: _cookie_slash_map[m.group()], value.encode() 

1305 ).decode("ascii") 

1306 value = f'"{value}"' 

1307 

1308 # Send a non-ASCII key as mojibake. Everything else should already be ASCII. 

1309 # TODO Remove encoding dance, it seems like clients accept UTF-8 keys 

1310 buf = [f"{key.encode().decode('latin1')}={value}"] 

1311 

1312 for k, v in ( 

1313 ("Domain", domain), 

1314 ("Expires", expires), 

1315 ("Max-Age", max_age), 

1316 ("Secure", secure), 

1317 ("HttpOnly", httponly), 

1318 ("Path", path), 

1319 ("SameSite", samesite), 

1320 ): 

1321 if v is None or v is False: 

1322 continue 

1323 

1324 if v is True: 

1325 buf.append(k) 

1326 continue 

1327 

1328 buf.append(f"{k}={v}") 

1329 

1330 rv = "; ".join(buf) 

1331 

1332 # Warn if the final value of the cookie is larger than the limit. If the cookie is 

1333 # too large, then it may be silently ignored by the browser, which can be quite hard 

1334 # to debug. 

1335 cookie_size = len(rv) 

1336 

1337 if max_size and cookie_size > max_size: 

1338 value_size = len(value) 

1339 warnings.warn( 

1340 f"The '{key}' cookie is too large: the value was {value_size} bytes but the" 

1341 f" header required {cookie_size - value_size} extra bytes. The final size" 

1342 f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may" 

1343 " silently ignore cookies larger than this.", 

1344 stacklevel=2, 

1345 ) 

1346 

1347 return rv 

1348 

1349 

1350def is_byte_range_valid( 

1351 start: int | None, stop: int | None, length: int | None 

1352) -> bool: 

1353 """Checks if a given byte content range is valid for the given length. 

1354 

1355 .. versionadded:: 0.7 

1356 """ 

1357 if (start is None) != (stop is None): 

1358 return False 

1359 elif start is None: 

1360 return length is None or length >= 0 

1361 elif length is None: 

1362 return 0 <= start < stop # type: ignore 

1363 elif start >= stop: # type: ignore 

1364 return False 

1365 return 0 <= start < length 

1366 

1367 

1368# circular dependencies 

1369from . import datastructures as ds 

1370from .sansio import http as _sansio_http