Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/http.py: 20%

464 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import base64 

2import email.utils 

3import re 

4import typing 

5import typing as t 

6import warnings 

7from datetime import date 

8from datetime import datetime 

9from datetime import time 

10from datetime import timedelta 

11from datetime import timezone 

12from enum import Enum 

13from hashlib import sha1 

14from time import mktime 

15from time import struct_time 

16from urllib.parse import unquote_to_bytes as _unquote 

17from urllib.request import parse_http_list as _parse_list_header 

18 

19from ._internal import _cookie_quote 

20from ._internal import _dt_as_utc 

21from ._internal import _make_cookie_domain 

22from ._internal import _to_bytes 

23from ._internal import _to_str 

24from ._internal import _wsgi_decoding_dance 

25 

26if t.TYPE_CHECKING: 

27 from _typeshed.wsgi import WSGIEnvironment 

28 

29# for explanation of "media-range", etc. see Sections 5.3.{1,2} of RFC 7231 

30_accept_re = re.compile( 

31 r""" 

32 ( # media-range capturing-parenthesis 

33 [^\s;,]+ # type/subtype 

34 (?:[ \t]*;[ \t]* # ";" 

35 (?: # parameter non-capturing-parenthesis 

36 [^\s;,q][^\s;,]* # token that doesn't start with "q" 

37 | # or 

38 q[^\s;,=][^\s;,]* # token that is more than just "q" 

39 ) 

40 )* # zero or more parameters 

41 ) # end of media-range 

42 (?:[ \t]*;[ \t]*q= # weight is a "q" parameter 

43 (\d*(?:\.\d+)?) # qvalue capturing-parentheses 

44 [^,]* # "extension" accept params: who cares? 

45 )? # accept params are optional 

46 """, 

47 re.VERBOSE, 

48) 

49_token_chars = frozenset( 

50 "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~" 

51) 

52_etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)') 

53_option_header_piece_re = re.compile( 

54 r""" 

55 ;\s*,?\s* # newlines were replaced with commas 

56 (?P<key> 

57 "[^"\\]*(?:\\.[^"\\]*)*" # quoted string 

58 | 

59 [^\s;,=*]+ # token 

60 ) 

61 (?:\*(?P<count>\d+))? # *1, optional continuation index 

62 \s* 

63 (?: # optionally followed by =value 

64 (?: # equals sign, possibly with encoding 

65 \*\s*=\s* # * indicates extended notation 

66 (?: # optional encoding 

67 (?P<encoding>[^\s]+?) 

68 '(?P<language>[^\s]*?)' 

69 )? 

70 | 

71 =\s* # basic notation 

72 ) 

73 (?P<value> 

74 "[^"\\]*(?:\\.[^"\\]*)*" # quoted string 

75 | 

76 [^;,]+ # token 

77 )? 

78 )? 

79 \s* 

80 """, 

81 flags=re.VERBOSE, 

82) 

83_option_header_start_mime_type = re.compile(r",\s*([^;,\s]+)([;,]\s*.+)?") 

84_entity_headers = frozenset( 

85 [ 

86 "allow", 

87 "content-encoding", 

88 "content-language", 

89 "content-length", 

90 "content-location", 

91 "content-md5", 

92 "content-range", 

93 "content-type", 

94 "expires", 

95 "last-modified", 

96 ] 

97) 

98_hop_by_hop_headers = frozenset( 

99 [ 

100 "connection", 

101 "keep-alive", 

102 "proxy-authenticate", 

103 "proxy-authorization", 

104 "te", 

105 "trailer", 

106 "transfer-encoding", 

107 "upgrade", 

108 ] 

109) 

110HTTP_STATUS_CODES = { 

111 100: "Continue", 

112 101: "Switching Protocols", 

113 102: "Processing", 

114 103: "Early Hints", # see RFC 8297 

115 200: "OK", 

116 201: "Created", 

117 202: "Accepted", 

118 203: "Non Authoritative Information", 

119 204: "No Content", 

120 205: "Reset Content", 

121 206: "Partial Content", 

122 207: "Multi Status", 

123 208: "Already Reported", # see RFC 5842 

124 226: "IM Used", # see RFC 3229 

125 300: "Multiple Choices", 

126 301: "Moved Permanently", 

127 302: "Found", 

128 303: "See Other", 

129 304: "Not Modified", 

130 305: "Use Proxy", 

131 306: "Switch Proxy", # unused 

132 307: "Temporary Redirect", 

133 308: "Permanent Redirect", 

134 400: "Bad Request", 

135 401: "Unauthorized", 

136 402: "Payment Required", # unused 

137 403: "Forbidden", 

138 404: "Not Found", 

139 405: "Method Not Allowed", 

140 406: "Not Acceptable", 

141 407: "Proxy Authentication Required", 

142 408: "Request Timeout", 

143 409: "Conflict", 

144 410: "Gone", 

145 411: "Length Required", 

146 412: "Precondition Failed", 

147 413: "Request Entity Too Large", 

148 414: "Request URI Too Long", 

149 415: "Unsupported Media Type", 

150 416: "Requested Range Not Satisfiable", 

151 417: "Expectation Failed", 

152 418: "I'm a teapot", # see RFC 2324 

153 421: "Misdirected Request", # see RFC 7540 

154 422: "Unprocessable Entity", 

155 423: "Locked", 

156 424: "Failed Dependency", 

157 425: "Too Early", # see RFC 8470 

158 426: "Upgrade Required", 

159 428: "Precondition Required", # see RFC 6585 

160 429: "Too Many Requests", 

161 431: "Request Header Fields Too Large", 

162 449: "Retry With", # proprietary MS extension 

163 451: "Unavailable For Legal Reasons", 

164 500: "Internal Server Error", 

165 501: "Not Implemented", 

166 502: "Bad Gateway", 

167 503: "Service Unavailable", 

168 504: "Gateway Timeout", 

169 505: "HTTP Version Not Supported", 

170 506: "Variant Also Negotiates", # see RFC 2295 

171 507: "Insufficient Storage", 

172 508: "Loop Detected", # see RFC 5842 

173 510: "Not Extended", 

174 511: "Network Authentication Failed", 

175} 

176 

177 

178class COEP(Enum): 

179 """Cross Origin Embedder Policies""" 

180 

181 UNSAFE_NONE = "unsafe-none" 

182 REQUIRE_CORP = "require-corp" 

183 

184 

185class COOP(Enum): 

186 """Cross Origin Opener Policies""" 

187 

188 UNSAFE_NONE = "unsafe-none" 

189 SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups" 

190 SAME_ORIGIN = "same-origin" 

191 

192 

193def _is_extended_parameter(key: str) -> bool: 

194 """Per RFC 5987/8187, "extended" values may *not* be quoted. 

195 This is in keeping with browser implementations. So we test 

196 using this function to see if the key indicates this parameter 

197 follows the `ext-parameter` syntax (using a trailing '*'). 

198 """ 

199 return key.strip().endswith("*") 

200 

201 

202def quote_header_value( 

203 value: t.Union[str, int], extra_chars: str = "", allow_token: bool = True 

204) -> str: 

205 """Quote a header value if necessary. 

206 

207 .. versionadded:: 0.5 

208 

209 :param value: the value to quote. 

210 :param extra_chars: a list of extra characters to skip quoting. 

211 :param allow_token: if this is enabled token values are returned 

212 unchanged. 

213 """ 

214 if isinstance(value, bytes): 

215 value = value.decode("latin1") 

216 value = str(value) 

217 if allow_token: 

218 token_chars = _token_chars | set(extra_chars) 

219 if set(value).issubset(token_chars): 

220 return value 

221 value = value.replace("\\", "\\\\").replace('"', '\\"') 

222 return f'"{value}"' 

223 

224 

225def unquote_header_value(value: str, is_filename: bool = False) -> str: 

226 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`). 

227 This does not use the real unquoting but what browsers are actually 

228 using for quoting. 

229 

230 .. versionadded:: 0.5 

231 

232 :param value: the header value to unquote. 

233 :param is_filename: The value represents a filename or path. 

234 """ 

235 if value and value[0] == value[-1] == '"': 

236 # this is not the real unquoting, but fixing this so that the 

237 # RFC is met will result in bugs with internet explorer and 

238 # probably some other browsers as well. IE for example is 

239 # uploading files with "C:\foo\bar.txt" as filename 

240 value = value[1:-1] 

241 

242 # if this is a filename and the starting characters look like 

243 # a UNC path, then just return the value without quotes. Using the 

244 # replace sequence below on a UNC path has the effect of turning 

245 # the leading double slash into a single slash and then 

246 # _fix_ie_filename() doesn't work correctly. See #458. 

247 if not is_filename or value[:2] != "\\\\": 

248 return value.replace("\\\\", "\\").replace('\\"', '"') 

249 return value 

250 

251 

252def dump_options_header( 

253 header: t.Optional[str], options: t.Mapping[str, t.Optional[t.Union[str, int]]] 

254) -> str: 

255 """The reverse function to :func:`parse_options_header`. 

256 

257 :param header: the header to dump 

258 :param options: a dict of options to append. 

259 """ 

260 segments = [] 

261 if header is not None: 

262 segments.append(header) 

263 for key, value in options.items(): 

264 if value is None: 

265 segments.append(key) 

266 elif _is_extended_parameter(key): 

267 segments.append(f"{key}={value}") 

268 else: 

269 segments.append(f"{key}={quote_header_value(value)}") 

270 return "; ".join(segments) 

271 

272 

273def dump_header( 

274 iterable: t.Union[t.Dict[str, t.Union[str, int]], t.Iterable[str]], 

275 allow_token: bool = True, 

276) -> str: 

277 """Dump an HTTP header again. This is the reversal of 

278 :func:`parse_list_header`, :func:`parse_set_header` and 

279 :func:`parse_dict_header`. This also quotes strings that include an 

280 equals sign unless you pass it as dict of key, value pairs. 

281 

282 >>> dump_header({'foo': 'bar baz'}) 

283 'foo="bar baz"' 

284 >>> dump_header(('foo', 'bar baz')) 

285 'foo, "bar baz"' 

286 

287 :param iterable: the iterable or dict of values to quote. 

288 :param allow_token: if set to `False` tokens as values are disallowed. 

289 See :func:`quote_header_value` for more details. 

290 """ 

291 if isinstance(iterable, dict): 

292 items = [] 

293 for key, value in iterable.items(): 

294 if value is None: 

295 items.append(key) 

296 elif _is_extended_parameter(key): 

297 items.append(f"{key}={value}") 

298 else: 

299 items.append( 

300 f"{key}={quote_header_value(value, allow_token=allow_token)}" 

301 ) 

302 else: 

303 items = [quote_header_value(x, allow_token=allow_token) for x in iterable] 

304 return ", ".join(items) 

305 

306 

307def dump_csp_header(header: "ds.ContentSecurityPolicy") -> str: 

308 """Dump a Content Security Policy header. 

309 

310 These are structured into policies such as "default-src 'self'; 

311 script-src 'self'". 

312 

313 .. versionadded:: 1.0.0 

314 Support for Content Security Policy headers was added. 

315 

316 """ 

317 return "; ".join(f"{key} {value}" for key, value in header.items()) 

318 

319 

320def parse_list_header(value: str) -> t.List[str]: 

321 """Parse lists as described by RFC 2068 Section 2. 

322 

323 In particular, parse comma-separated lists where the elements of 

324 the list may include quoted-strings. A quoted-string could 

325 contain a comma. A non-quoted string could have quotes in the 

326 middle. Quotes are removed automatically after parsing. 

327 

328 It basically works like :func:`parse_set_header` just that items 

329 may appear multiple times and case sensitivity is preserved. 

330 

331 The return value is a standard :class:`list`: 

332 

333 >>> parse_list_header('token, "quoted value"') 

334 ['token', 'quoted value'] 

335 

336 To create a header from the :class:`list` again, use the 

337 :func:`dump_header` function. 

338 

339 :param value: a string with a list header. 

340 :return: :class:`list` 

341 """ 

342 result = [] 

343 for item in _parse_list_header(value): 

344 if item[:1] == item[-1:] == '"': 

345 item = unquote_header_value(item[1:-1]) 

346 result.append(item) 

347 return result 

348 

349 

350def parse_dict_header(value: str, cls: t.Type[dict] = dict) -> t.Dict[str, str]: 

351 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and 

352 convert them into a python dict (or any other mapping object created from 

353 the type with a dict like interface provided by the `cls` argument): 

354 

355 >>> d = parse_dict_header('foo="is a fish", bar="as well"') 

356 >>> type(d) is dict 

357 True 

358 >>> sorted(d.items()) 

359 [('bar', 'as well'), ('foo', 'is a fish')] 

360 

361 If there is no value for a key it will be `None`: 

362 

363 >>> parse_dict_header('key_without_value') 

364 {'key_without_value': None} 

365 

366 To create a header from the :class:`dict` again, use the 

367 :func:`dump_header` function. 

368 

369 .. versionchanged:: 0.9 

370 Added support for `cls` argument. 

371 

372 :param value: a string with a dict header. 

373 :param cls: callable to use for storage of parsed results. 

374 :return: an instance of `cls` 

375 """ 

376 result = cls() 

377 if isinstance(value, bytes): 

378 value = value.decode("latin1") 

379 for item in _parse_list_header(value): 

380 if "=" not in item: 

381 result[item] = None 

382 continue 

383 name, value = item.split("=", 1) 

384 if value[:1] == value[-1:] == '"': 

385 value = unquote_header_value(value[1:-1]) 

386 result[name] = value 

387 return result 

388 

389 

390def parse_options_header(value: t.Optional[str]) -> t.Tuple[str, t.Dict[str, str]]: 

391 """Parse a ``Content-Type``-like header into a tuple with the 

392 value and any options: 

393 

394 >>> parse_options_header('text/html; charset=utf8') 

395 ('text/html', {'charset': 'utf8'}) 

396 

397 This should is not for ``Cache-Control``-like headers, which use a 

398 different format. For those, use :func:`parse_dict_header`. 

399 

400 :param value: The header value to parse. 

401 

402 .. versionchanged:: 2.2 

403 Option names are always converted to lowercase. 

404 

405 .. versionchanged:: 2.1 

406 The ``multiple`` parameter is deprecated and will be removed in 

407 Werkzeug 2.2. 

408 

409 .. versionchanged:: 0.15 

410 :rfc:`2231` parameter continuations are handled. 

411 

412 .. versionadded:: 0.5 

413 """ 

414 if not value: 

415 return "", {} 

416 

417 result: t.List[t.Any] = [] 

418 

419 value = "," + value.replace("\n", ",") 

420 while value: 

421 match = _option_header_start_mime_type.match(value) 

422 if not match: 

423 break 

424 result.append(match.group(1)) # mimetype 

425 options: t.Dict[str, str] = {} 

426 # Parse options 

427 rest = match.group(2) 

428 encoding: t.Optional[str] 

429 continued_encoding: t.Optional[str] = None 

430 while rest: 

431 optmatch = _option_header_piece_re.match(rest) 

432 if not optmatch: 

433 break 

434 option, count, encoding, language, option_value = optmatch.groups() 

435 # Continuations don't have to supply the encoding after the 

436 # first line. If we're in a continuation, track the current 

437 # encoding to use for subsequent lines. Reset it when the 

438 # continuation ends. 

439 if not count: 

440 continued_encoding = None 

441 else: 

442 if not encoding: 

443 encoding = continued_encoding 

444 continued_encoding = encoding 

445 option = unquote_header_value(option).lower() 

446 

447 if option_value is not None: 

448 option_value = unquote_header_value(option_value, option == "filename") 

449 

450 if encoding is not None: 

451 option_value = _unquote(option_value).decode(encoding) 

452 

453 if count: 

454 # Continuations append to the existing value. For 

455 # simplicity, this ignores the possibility of 

456 # out-of-order indices, which shouldn't happen anyway. 

457 if option_value is not None: 

458 options[option] = options.get(option, "") + option_value 

459 else: 

460 options[option] = option_value # type: ignore[assignment] 

461 

462 rest = rest[optmatch.end() :] 

463 result.append(options) 

464 return tuple(result) # type: ignore[return-value] 

465 

466 return tuple(result) if result else ("", {}) # type: ignore[return-value] 

467 

468 

469_TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept") 

470 

471 

472@typing.overload 

473def parse_accept_header(value: t.Optional[str]) -> "ds.Accept": 

474 ... 

475 

476 

477@typing.overload 

478def parse_accept_header( 

479 value: t.Optional[str], cls: t.Type[_TAnyAccept] 

480) -> _TAnyAccept: 

481 ... 

482 

483 

484def parse_accept_header( 

485 value: t.Optional[str], cls: t.Optional[t.Type[_TAnyAccept]] = None 

486) -> _TAnyAccept: 

487 """Parses an HTTP Accept-* header. This does not implement a complete 

488 valid algorithm but one that supports at least value and quality 

489 extraction. 

490 

491 Returns a new :class:`Accept` object (basically a list of ``(value, quality)`` 

492 tuples sorted by the quality with some additional accessor methods). 

493 

494 The second parameter can be a subclass of :class:`Accept` that is created 

495 with the parsed values and returned. 

496 

497 :param value: the accept header string to be parsed. 

498 :param cls: the wrapper class for the return value (can be 

499 :class:`Accept` or a subclass thereof) 

500 :return: an instance of `cls`. 

501 """ 

502 if cls is None: 

503 cls = t.cast(t.Type[_TAnyAccept], ds.Accept) 

504 

505 if not value: 

506 return cls(None) 

507 

508 result = [] 

509 for match in _accept_re.finditer(value): 

510 quality_match = match.group(2) 

511 if not quality_match: 

512 quality: float = 1 

513 else: 

514 quality = max(min(float(quality_match), 1), 0) 

515 result.append((match.group(1), quality)) 

516 return cls(result) 

517 

518 

519_TAnyCC = t.TypeVar("_TAnyCC", bound="ds._CacheControl") 

520_t_cc_update = t.Optional[t.Callable[[_TAnyCC], None]] 

521 

522 

523@typing.overload 

524def parse_cache_control_header( 

525 value: t.Optional[str], on_update: _t_cc_update, cls: None = None 

526) -> "ds.RequestCacheControl": 

527 ... 

528 

529 

530@typing.overload 

531def parse_cache_control_header( 

532 value: t.Optional[str], on_update: _t_cc_update, cls: t.Type[_TAnyCC] 

533) -> _TAnyCC: 

534 ... 

535 

536 

537def parse_cache_control_header( 

538 value: t.Optional[str], 

539 on_update: _t_cc_update = None, 

540 cls: t.Optional[t.Type[_TAnyCC]] = None, 

541) -> _TAnyCC: 

542 """Parse a cache control header. The RFC differs between response and 

543 request cache control, this method does not. It's your responsibility 

544 to not use the wrong control statements. 

545 

546 .. versionadded:: 0.5 

547 The `cls` was added. If not specified an immutable 

548 :class:`~werkzeug.datastructures.RequestCacheControl` is returned. 

549 

550 :param value: a cache control header to be parsed. 

551 :param on_update: an optional callable that is called every time a value 

552 on the :class:`~werkzeug.datastructures.CacheControl` 

553 object is changed. 

554 :param cls: the class for the returned object. By default 

555 :class:`~werkzeug.datastructures.RequestCacheControl` is used. 

556 :return: a `cls` object. 

557 """ 

558 if cls is None: 

559 cls = t.cast(t.Type[_TAnyCC], ds.RequestCacheControl) 

560 

561 if not value: 

562 return cls((), on_update) 

563 

564 return cls(parse_dict_header(value), on_update) 

565 

566 

567_TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy") 

568_t_csp_update = t.Optional[t.Callable[[_TAnyCSP], None]] 

569 

570 

571@typing.overload 

572def parse_csp_header( 

573 value: t.Optional[str], on_update: _t_csp_update, cls: None = None 

574) -> "ds.ContentSecurityPolicy": 

575 ... 

576 

577 

578@typing.overload 

579def parse_csp_header( 

580 value: t.Optional[str], on_update: _t_csp_update, cls: t.Type[_TAnyCSP] 

581) -> _TAnyCSP: 

582 ... 

583 

584 

585def parse_csp_header( 

586 value: t.Optional[str], 

587 on_update: _t_csp_update = None, 

588 cls: t.Optional[t.Type[_TAnyCSP]] = None, 

589) -> _TAnyCSP: 

590 """Parse a Content Security Policy header. 

591 

592 .. versionadded:: 1.0.0 

593 Support for Content Security Policy headers was added. 

594 

595 :param value: a csp header to be parsed. 

596 :param on_update: an optional callable that is called every time a value 

597 on the object is changed. 

598 :param cls: the class for the returned object. By default 

599 :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used. 

600 :return: a `cls` object. 

601 """ 

602 if cls is None: 

603 cls = t.cast(t.Type[_TAnyCSP], ds.ContentSecurityPolicy) 

604 

605 if value is None: 

606 return cls((), on_update) 

607 

608 items = [] 

609 

610 for policy in value.split(";"): 

611 policy = policy.strip() 

612 

613 # Ignore badly formatted policies (no space) 

614 if " " in policy: 

615 directive, value = policy.strip().split(" ", 1) 

616 items.append((directive.strip(), value.strip())) 

617 

618 return cls(items, on_update) 

619 

620 

621def parse_set_header( 

622 value: t.Optional[str], 

623 on_update: t.Optional[t.Callable[["ds.HeaderSet"], None]] = None, 

624) -> "ds.HeaderSet": 

625 """Parse a set-like header and return a 

626 :class:`~werkzeug.datastructures.HeaderSet` object: 

627 

628 >>> hs = parse_set_header('token, "quoted value"') 

629 

630 The return value is an object that treats the items case-insensitively 

631 and keeps the order of the items: 

632 

633 >>> 'TOKEN' in hs 

634 True 

635 >>> hs.index('quoted value') 

636 1 

637 >>> hs 

638 HeaderSet(['token', 'quoted value']) 

639 

640 To create a header from the :class:`HeaderSet` again, use the 

641 :func:`dump_header` function. 

642 

643 :param value: a set header to be parsed. 

644 :param on_update: an optional callable that is called every time a 

645 value on the :class:`~werkzeug.datastructures.HeaderSet` 

646 object is changed. 

647 :return: a :class:`~werkzeug.datastructures.HeaderSet` 

648 """ 

649 if not value: 

650 return ds.HeaderSet(None, on_update) 

651 return ds.HeaderSet(parse_list_header(value), on_update) 

652 

653 

654def parse_authorization_header( 

655 value: t.Optional[str], 

656) -> t.Optional["ds.Authorization"]: 

657 """Parse an HTTP basic/digest authorization header transmitted by the web 

658 browser. The return value is either `None` if the header was invalid or 

659 not given, otherwise an :class:`~werkzeug.datastructures.Authorization` 

660 object. 

661 

662 :param value: the authorization header to parse. 

663 :return: a :class:`~werkzeug.datastructures.Authorization` object or `None`. 

664 """ 

665 if not value: 

666 return None 

667 value = _wsgi_decoding_dance(value) 

668 try: 

669 auth_type, auth_info = value.split(None, 1) 

670 auth_type = auth_type.lower() 

671 except ValueError: 

672 return None 

673 if auth_type == "basic": 

674 try: 

675 username, password = base64.b64decode(auth_info).split(b":", 1) 

676 except Exception: 

677 return None 

678 try: 

679 return ds.Authorization( 

680 "basic", 

681 { 

682 "username": _to_str(username, "utf-8"), 

683 "password": _to_str(password, "utf-8"), 

684 }, 

685 ) 

686 except UnicodeDecodeError: 

687 return None 

688 elif auth_type == "digest": 

689 auth_map = parse_dict_header(auth_info) 

690 for key in "username", "realm", "nonce", "uri", "response": 

691 if key not in auth_map: 

692 return None 

693 if "qop" in auth_map: 

694 if not auth_map.get("nc") or not auth_map.get("cnonce"): 

695 return None 

696 return ds.Authorization("digest", auth_map) 

697 return None 

698 

699 

700def parse_www_authenticate_header( 

701 value: t.Optional[str], 

702 on_update: t.Optional[t.Callable[["ds.WWWAuthenticate"], None]] = None, 

703) -> "ds.WWWAuthenticate": 

704 """Parse an HTTP WWW-Authenticate header into a 

705 :class:`~werkzeug.datastructures.WWWAuthenticate` object. 

706 

707 :param value: a WWW-Authenticate header to parse. 

708 :param on_update: an optional callable that is called every time a value 

709 on the :class:`~werkzeug.datastructures.WWWAuthenticate` 

710 object is changed. 

711 :return: a :class:`~werkzeug.datastructures.WWWAuthenticate` object. 

712 """ 

713 if not value: 

714 return ds.WWWAuthenticate(on_update=on_update) 

715 try: 

716 auth_type, auth_info = value.split(None, 1) 

717 auth_type = auth_type.lower() 

718 except (ValueError, AttributeError): 

719 return ds.WWWAuthenticate(value.strip().lower(), on_update=on_update) 

720 return ds.WWWAuthenticate(auth_type, parse_dict_header(auth_info), on_update) 

721 

722 

723def parse_if_range_header(value: t.Optional[str]) -> "ds.IfRange": 

724 """Parses an if-range header which can be an etag or a date. Returns 

725 a :class:`~werkzeug.datastructures.IfRange` object. 

726 

727 .. versionchanged:: 2.0 

728 If the value represents a datetime, it is timezone-aware. 

729 

730 .. versionadded:: 0.7 

731 """ 

732 if not value: 

733 return ds.IfRange() 

734 date = parse_date(value) 

735 if date is not None: 

736 return ds.IfRange(date=date) 

737 # drop weakness information 

738 return ds.IfRange(unquote_etag(value)[0]) 

739 

740 

741def parse_range_header( 

742 value: t.Optional[str], make_inclusive: bool = True 

743) -> t.Optional["ds.Range"]: 

744 """Parses a range header into a :class:`~werkzeug.datastructures.Range` 

745 object. If the header is missing or malformed `None` is returned. 

746 `ranges` is a list of ``(start, stop)`` tuples where the ranges are 

747 non-inclusive. 

748 

749 .. versionadded:: 0.7 

750 """ 

751 if not value or "=" not in value: 

752 return None 

753 

754 ranges = [] 

755 last_end = 0 

756 units, rng = value.split("=", 1) 

757 units = units.strip().lower() 

758 

759 for item in rng.split(","): 

760 item = item.strip() 

761 if "-" not in item: 

762 return None 

763 if item.startswith("-"): 

764 if last_end < 0: 

765 return None 

766 try: 

767 begin = int(item) 

768 except ValueError: 

769 return None 

770 end = None 

771 last_end = -1 

772 elif "-" in item: 

773 begin_str, end_str = item.split("-", 1) 

774 begin_str = begin_str.strip() 

775 end_str = end_str.strip() 

776 

777 try: 

778 begin = int(begin_str) 

779 except ValueError: 

780 return None 

781 

782 if begin < last_end or last_end < 0: 

783 return None 

784 if end_str: 

785 try: 

786 end = int(end_str) + 1 

787 except ValueError: 

788 return None 

789 

790 if begin >= end: 

791 return None 

792 else: 

793 end = None 

794 last_end = end if end is not None else -1 

795 ranges.append((begin, end)) 

796 

797 return ds.Range(units, ranges) 

798 

799 

800def parse_content_range_header( 

801 value: t.Optional[str], 

802 on_update: t.Optional[t.Callable[["ds.ContentRange"], None]] = None, 

803) -> t.Optional["ds.ContentRange"]: 

804 """Parses a range header into a 

805 :class:`~werkzeug.datastructures.ContentRange` object or `None` if 

806 parsing is not possible. 

807 

808 .. versionadded:: 0.7 

809 

810 :param value: a content range header to be parsed. 

811 :param on_update: an optional callable that is called every time a value 

812 on the :class:`~werkzeug.datastructures.ContentRange` 

813 object is changed. 

814 """ 

815 if value is None: 

816 return None 

817 try: 

818 units, rangedef = (value or "").strip().split(None, 1) 

819 except ValueError: 

820 return None 

821 

822 if "/" not in rangedef: 

823 return None 

824 rng, length_str = rangedef.split("/", 1) 

825 if length_str == "*": 

826 length = None 

827 else: 

828 try: 

829 length = int(length_str) 

830 except ValueError: 

831 return None 

832 

833 if rng == "*": 

834 if not is_byte_range_valid(None, None, length): 

835 return None 

836 

837 return ds.ContentRange(units, None, None, length, on_update=on_update) 

838 elif "-" not in rng: 

839 return None 

840 

841 start_str, stop_str = rng.split("-", 1) 

842 try: 

843 start = int(start_str) 

844 stop = int(stop_str) + 1 

845 except ValueError: 

846 return None 

847 

848 if is_byte_range_valid(start, stop, length): 

849 return ds.ContentRange(units, start, stop, length, on_update=on_update) 

850 

851 return None 

852 

853 

854def quote_etag(etag: str, weak: bool = False) -> str: 

855 """Quote an etag. 

856 

857 :param etag: the etag to quote. 

858 :param weak: set to `True` to tag it "weak". 

859 """ 

860 if '"' in etag: 

861 raise ValueError("invalid etag") 

862 etag = f'"{etag}"' 

863 if weak: 

864 etag = f"W/{etag}" 

865 return etag 

866 

867 

868def unquote_etag( 

869 etag: t.Optional[str], 

870) -> t.Union[t.Tuple[str, bool], t.Tuple[None, None]]: 

871 """Unquote a single etag: 

872 

873 >>> unquote_etag('W/"bar"') 

874 ('bar', True) 

875 >>> unquote_etag('"bar"') 

876 ('bar', False) 

877 

878 :param etag: the etag identifier to unquote. 

879 :return: a ``(etag, weak)`` tuple. 

880 """ 

881 if not etag: 

882 return None, None 

883 etag = etag.strip() 

884 weak = False 

885 if etag.startswith(("W/", "w/")): 

886 weak = True 

887 etag = etag[2:] 

888 if etag[:1] == etag[-1:] == '"': 

889 etag = etag[1:-1] 

890 return etag, weak 

891 

892 

893def parse_etags(value: t.Optional[str]) -> "ds.ETags": 

894 """Parse an etag header. 

895 

896 :param value: the tag header to parse 

897 :return: an :class:`~werkzeug.datastructures.ETags` object. 

898 """ 

899 if not value: 

900 return ds.ETags() 

901 strong = [] 

902 weak = [] 

903 end = len(value) 

904 pos = 0 

905 while pos < end: 

906 match = _etag_re.match(value, pos) 

907 if match is None: 

908 break 

909 is_weak, quoted, raw = match.groups() 

910 if raw == "*": 

911 return ds.ETags(star_tag=True) 

912 elif quoted: 

913 raw = quoted 

914 if is_weak: 

915 weak.append(raw) 

916 else: 

917 strong.append(raw) 

918 pos = match.end() 

919 return ds.ETags(strong, weak) 

920 

921 

922def generate_etag(data: bytes) -> str: 

923 """Generate an etag for some data. 

924 

925 .. versionchanged:: 2.0 

926 Use SHA-1. MD5 may not be available in some environments. 

927 """ 

928 return sha1(data).hexdigest() 

929 

930 

931def parse_date(value: t.Optional[str]) -> t.Optional[datetime]: 

932 """Parse an :rfc:`2822` date into a timezone-aware 

933 :class:`datetime.datetime` object, or ``None`` if parsing fails. 

934 

935 This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It 

936 returns ``None`` if parsing fails instead of raising an exception, 

937 and always returns a timezone-aware datetime object. If the string 

938 doesn't have timezone information, it is assumed to be UTC. 

939 

940 :param value: A string with a supported date format. 

941 

942 .. versionchanged:: 2.0 

943 Return a timezone-aware datetime object. Use 

944 ``email.utils.parsedate_to_datetime``. 

945 """ 

946 if value is None: 

947 return None 

948 

949 try: 

950 dt = email.utils.parsedate_to_datetime(value) 

951 except (TypeError, ValueError): 

952 return None 

953 

954 if dt.tzinfo is None: 

955 return dt.replace(tzinfo=timezone.utc) 

956 

957 return dt 

958 

959 

960def http_date( 

961 timestamp: t.Optional[t.Union[datetime, date, int, float, struct_time]] = None 

962) -> str: 

963 """Format a datetime object or timestamp into an :rfc:`2822` date 

964 string. 

965 

966 This is a wrapper for :func:`email.utils.format_datetime`. It 

967 assumes naive datetime objects are in UTC instead of raising an 

968 exception. 

969 

970 :param timestamp: The datetime or timestamp to format. Defaults to 

971 the current time. 

972 

973 .. versionchanged:: 2.0 

974 Use ``email.utils.format_datetime``. Accept ``date`` objects. 

975 """ 

976 if isinstance(timestamp, date): 

977 if not isinstance(timestamp, datetime): 

978 # Assume plain date is midnight UTC. 

979 timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc) 

980 else: 

981 # Ensure datetime is timezone-aware. 

982 timestamp = _dt_as_utc(timestamp) 

983 

984 return email.utils.format_datetime(timestamp, usegmt=True) 

985 

986 if isinstance(timestamp, struct_time): 

987 timestamp = mktime(timestamp) 

988 

989 return email.utils.formatdate(timestamp, usegmt=True) 

990 

991 

992def parse_age(value: t.Optional[str] = None) -> t.Optional[timedelta]: 

993 """Parses a base-10 integer count of seconds into a timedelta. 

994 

995 If parsing fails, the return value is `None`. 

996 

997 :param value: a string consisting of an integer represented in base-10 

998 :return: a :class:`datetime.timedelta` object or `None`. 

999 """ 

1000 if not value: 

1001 return None 

1002 try: 

1003 seconds = int(value) 

1004 except ValueError: 

1005 return None 

1006 if seconds < 0: 

1007 return None 

1008 try: 

1009 return timedelta(seconds=seconds) 

1010 except OverflowError: 

1011 return None 

1012 

1013 

1014def dump_age(age: t.Optional[t.Union[timedelta, int]] = None) -> t.Optional[str]: 

1015 """Formats the duration as a base-10 integer. 

1016 

1017 :param age: should be an integer number of seconds, 

1018 a :class:`datetime.timedelta` object, or, 

1019 if the age is unknown, `None` (default). 

1020 """ 

1021 if age is None: 

1022 return None 

1023 if isinstance(age, timedelta): 

1024 age = int(age.total_seconds()) 

1025 else: 

1026 age = int(age) 

1027 

1028 if age < 0: 

1029 raise ValueError("age cannot be negative") 

1030 

1031 return str(age) 

1032 

1033 

1034def is_resource_modified( 

1035 environ: "WSGIEnvironment", 

1036 etag: t.Optional[str] = None, 

1037 data: t.Optional[bytes] = None, 

1038 last_modified: t.Optional[t.Union[datetime, str]] = None, 

1039 ignore_if_range: bool = True, 

1040) -> bool: 

1041 """Convenience method for conditional requests. 

1042 

1043 :param environ: the WSGI environment of the request to be checked. 

1044 :param etag: the etag for the response for comparison. 

1045 :param data: or alternatively the data of the response to automatically 

1046 generate an etag using :func:`generate_etag`. 

1047 :param last_modified: an optional date of the last modification. 

1048 :param ignore_if_range: If `False`, `If-Range` header will be taken into 

1049 account. 

1050 :return: `True` if the resource was modified, otherwise `False`. 

1051 

1052 .. versionchanged:: 2.0 

1053 SHA-1 is used to generate an etag value for the data. MD5 may 

1054 not be available in some environments. 

1055 

1056 .. versionchanged:: 1.0.0 

1057 The check is run for methods other than ``GET`` and ``HEAD``. 

1058 """ 

1059 return _sansio_http.is_resource_modified( 

1060 http_range=environ.get("HTTP_RANGE"), 

1061 http_if_range=environ.get("HTTP_IF_RANGE"), 

1062 http_if_modified_since=environ.get("HTTP_IF_MODIFIED_SINCE"), 

1063 http_if_none_match=environ.get("HTTP_IF_NONE_MATCH"), 

1064 http_if_match=environ.get("HTTP_IF_MATCH"), 

1065 etag=etag, 

1066 data=data, 

1067 last_modified=last_modified, 

1068 ignore_if_range=ignore_if_range, 

1069 ) 

1070 

1071 

1072def remove_entity_headers( 

1073 headers: t.Union["ds.Headers", t.List[t.Tuple[str, str]]], 

1074 allowed: t.Iterable[str] = ("expires", "content-location"), 

1075) -> None: 

1076 """Remove all entity headers from a list or :class:`Headers` object. This 

1077 operation works in-place. `Expires` and `Content-Location` headers are 

1078 by default not removed. The reason for this is :rfc:`2616` section 

1079 10.3.5 which specifies some entity headers that should be sent. 

1080 

1081 .. versionchanged:: 0.5 

1082 added `allowed` parameter. 

1083 

1084 :param headers: a list or :class:`Headers` object. 

1085 :param allowed: a list of headers that should still be allowed even though 

1086 they are entity headers. 

1087 """ 

1088 allowed = {x.lower() for x in allowed} 

1089 headers[:] = [ 

1090 (key, value) 

1091 for key, value in headers 

1092 if not is_entity_header(key) or key.lower() in allowed 

1093 ] 

1094 

1095 

1096def remove_hop_by_hop_headers( 

1097 headers: t.Union["ds.Headers", t.List[t.Tuple[str, str]]] 

1098) -> None: 

1099 """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or 

1100 :class:`Headers` object. This operation works in-place. 

1101 

1102 .. versionadded:: 0.5 

1103 

1104 :param headers: a list or :class:`Headers` object. 

1105 """ 

1106 headers[:] = [ 

1107 (key, value) for key, value in headers if not is_hop_by_hop_header(key) 

1108 ] 

1109 

1110 

1111def is_entity_header(header: str) -> bool: 

1112 """Check if a header is an entity header. 

1113 

1114 .. versionadded:: 0.5 

1115 

1116 :param header: the header to test. 

1117 :return: `True` if it's an entity header, `False` otherwise. 

1118 """ 

1119 return header.lower() in _entity_headers 

1120 

1121 

1122def is_hop_by_hop_header(header: str) -> bool: 

1123 """Check if a header is an HTTP/1.1 "Hop-by-Hop" header. 

1124 

1125 .. versionadded:: 0.5 

1126 

1127 :param header: the header to test. 

1128 :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise. 

1129 """ 

1130 return header.lower() in _hop_by_hop_headers 

1131 

1132 

1133def parse_cookie( 

1134 header: t.Union["WSGIEnvironment", str, bytes, None], 

1135 charset: str = "utf-8", 

1136 errors: str = "replace", 

1137 cls: t.Optional[t.Type["ds.MultiDict"]] = None, 

1138) -> "ds.MultiDict[str, str]": 

1139 """Parse a cookie from a string or WSGI environ. 

1140 

1141 The same key can be provided multiple times, the values are stored 

1142 in-order. The default :class:`MultiDict` will have the first value 

1143 first, and all values can be retrieved with 

1144 :meth:`MultiDict.getlist`. 

1145 

1146 :param header: The cookie header as a string, or a WSGI environ dict 

1147 with a ``HTTP_COOKIE`` key. 

1148 :param charset: The charset for the cookie values. 

1149 :param errors: The error behavior for the charset decoding. 

1150 :param cls: A dict-like class to store the parsed cookies in. 

1151 Defaults to :class:`MultiDict`. 

1152 

1153 .. versionchanged:: 1.0.0 

1154 Returns a :class:`MultiDict` instead of a 

1155 ``TypeConversionDict``. 

1156 

1157 .. versionchanged:: 0.5 

1158 Returns a :class:`TypeConversionDict` instead of a regular dict. 

1159 The ``cls`` parameter was added. 

1160 """ 

1161 if isinstance(header, dict): 

1162 cookie = header.get("HTTP_COOKIE", "") 

1163 elif header is None: 

1164 cookie = "" 

1165 else: 

1166 cookie = header 

1167 

1168 return _sansio_http.parse_cookie( 

1169 cookie=cookie, charset=charset, errors=errors, cls=cls 

1170 ) 

1171 

1172 

1173def dump_cookie( 

1174 key: str, 

1175 value: t.Union[bytes, str] = "", 

1176 max_age: t.Optional[t.Union[timedelta, int]] = None, 

1177 expires: t.Optional[t.Union[str, datetime, int, float]] = None, 

1178 path: t.Optional[str] = "/", 

1179 domain: t.Optional[str] = None, 

1180 secure: bool = False, 

1181 httponly: bool = False, 

1182 charset: str = "utf-8", 

1183 sync_expires: bool = True, 

1184 max_size: int = 4093, 

1185 samesite: t.Optional[str] = None, 

1186) -> str: 

1187 """Create a Set-Cookie header without the ``Set-Cookie`` prefix. 

1188 

1189 The return value is usually restricted to ascii as the vast majority 

1190 of values are properly escaped, but that is no guarantee. It's 

1191 tunneled through latin1 as required by :pep:`3333`. 

1192 

1193 The return value is not ASCII safe if the key contains unicode 

1194 characters. This is technically against the specification but 

1195 happens in the wild. It's strongly recommended to not use 

1196 non-ASCII values for the keys. 

1197 

1198 :param max_age: should be a number of seconds, or `None` (default) if 

1199 the cookie should last only as long as the client's 

1200 browser session. Additionally `timedelta` objects 

1201 are accepted, too. 

1202 :param expires: should be a `datetime` object or unix timestamp. 

1203 :param path: limits the cookie to a given path, per default it will 

1204 span the whole domain. 

1205 :param domain: Use this if you want to set a cross-domain cookie. For 

1206 example, ``domain=".example.com"`` will set a cookie 

1207 that is readable by the domain ``www.example.com``, 

1208 ``foo.example.com`` etc. Otherwise, a cookie will only 

1209 be readable by the domain that set it. 

1210 :param secure: The cookie will only be available via HTTPS 

1211 :param httponly: disallow JavaScript to access the cookie. This is an 

1212 extension to the cookie standard and probably not 

1213 supported by all browsers. 

1214 :param charset: the encoding for string values. 

1215 :param sync_expires: automatically set expires if max_age is defined 

1216 but expires not. 

1217 :param max_size: Warn if the final header value exceeds this size. The 

1218 default, 4093, should be safely `supported by most browsers 

1219 <cookie_>`_. Set to 0 to disable this check. 

1220 :param samesite: Limits the scope of the cookie such that it will 

1221 only be attached to requests if those requests are same-site. 

1222 

1223 .. _`cookie`: http://browsercookielimits.squawky.net/ 

1224 

1225 .. versionchanged:: 1.0.0 

1226 The string ``'None'`` is accepted for ``samesite``. 

1227 """ 

1228 key = _to_bytes(key, charset) 

1229 value = _to_bytes(value, charset) 

1230 

1231 if path is not None: 

1232 from .urls import iri_to_uri 

1233 

1234 path = iri_to_uri(path, charset) 

1235 

1236 domain = _make_cookie_domain(domain) 

1237 

1238 if isinstance(max_age, timedelta): 

1239 max_age = int(max_age.total_seconds()) 

1240 

1241 if expires is not None: 

1242 if not isinstance(expires, str): 

1243 expires = http_date(expires) 

1244 elif max_age is not None and sync_expires: 

1245 expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age) 

1246 

1247 if samesite is not None: 

1248 samesite = samesite.title() 

1249 

1250 if samesite not in {"Strict", "Lax", "None"}: 

1251 raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.") 

1252 

1253 buf = [key + b"=" + _cookie_quote(value)] 

1254 

1255 # XXX: In theory all of these parameters that are not marked with `None` 

1256 # should be quoted. Because stdlib did not quote it before I did not 

1257 # want to introduce quoting there now. 

1258 for k, v, q in ( 

1259 (b"Domain", domain, True), 

1260 (b"Expires", expires, False), 

1261 (b"Max-Age", max_age, False), 

1262 (b"Secure", secure, None), 

1263 (b"HttpOnly", httponly, None), 

1264 (b"Path", path, False), 

1265 (b"SameSite", samesite, False), 

1266 ): 

1267 if q is None: 

1268 if v: 

1269 buf.append(k) 

1270 continue 

1271 

1272 if v is None: 

1273 continue 

1274 

1275 tmp = bytearray(k) 

1276 if not isinstance(v, (bytes, bytearray)): 

1277 v = _to_bytes(str(v), charset) 

1278 if q: 

1279 v = _cookie_quote(v) 

1280 tmp += b"=" + v 

1281 buf.append(bytes(tmp)) 

1282 

1283 # The return value will be an incorrectly encoded latin1 header for 

1284 # consistency with the headers object. 

1285 rv = b"; ".join(buf) 

1286 rv = rv.decode("latin1") 

1287 

1288 # Warn if the final value of the cookie is larger than the limit. If the 

1289 # cookie is too large, then it may be silently ignored by the browser, 

1290 # which can be quite hard to debug. 

1291 cookie_size = len(rv) 

1292 

1293 if max_size and cookie_size > max_size: 

1294 value_size = len(value) 

1295 warnings.warn( 

1296 f"The {key.decode(charset)!r} cookie is too large: the value was" 

1297 f" {value_size} bytes but the" 

1298 f" header required {cookie_size - value_size} extra bytes. The final size" 

1299 f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may" 

1300 f" silently ignore cookies larger than this.", 

1301 stacklevel=2, 

1302 ) 

1303 

1304 return rv 

1305 

1306 

1307def is_byte_range_valid( 

1308 start: t.Optional[int], stop: t.Optional[int], length: t.Optional[int] 

1309) -> bool: 

1310 """Checks if a given byte content range is valid for the given length. 

1311 

1312 .. versionadded:: 0.7 

1313 """ 

1314 if (start is None) != (stop is None): 

1315 return False 

1316 elif start is None: 

1317 return length is None or length >= 0 

1318 elif length is None: 

1319 return 0 <= start < stop # type: ignore 

1320 elif start >= stop: # type: ignore 

1321 return False 

1322 return 0 <= start < length 

1323 

1324 

1325# circular dependencies 

1326from . import datastructures as ds 

1327from .sansio import http as _sansio_http