Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/datastructures/headers.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

284 statements  

1from __future__ import annotations 

2 

3import collections.abc as cabc 

4import re 

5import typing as t 

6 

7from .._internal import _missing 

8from ..exceptions import BadRequestKeyError 

9from ..http import dump_options_header 

10from .mixins import ImmutableHeadersMixin 

11from .structures import iter_multi_items 

12from .structures import MultiDict 

13 

14if t.TYPE_CHECKING: 

15 import typing_extensions as te 

16 from _typeshed.wsgi import WSGIEnvironment 

17 

18T = t.TypeVar("T") 

19 

20 

21class Headers: 

22 """An object that stores some headers. It has a dict-like interface, 

23 but is ordered, can store the same key multiple times, and iterating 

24 yields ``(key, value)`` pairs instead of only keys. 

25 

26 This data structure is useful if you want a nicer way to handle WSGI 

27 headers which are stored as tuples in a list. 

28 

29 From Werkzeug 0.3 onwards, the :exc:`KeyError` raised by this class is 

30 also a subclass of the :class:`~exceptions.BadRequest` HTTP exception 

31 and will render a page for a ``400 BAD REQUEST`` if caught in a 

32 catch-all for HTTP exceptions. 

33 

34 Headers is mostly compatible with the Python :class:`wsgiref.headers.Headers` 

35 class, with the exception of `__getitem__`. :mod:`wsgiref` will return 

36 `None` for ``headers['missing']``, whereas :class:`Headers` will raise 

37 a :class:`KeyError`. 

38 

39 To create a new ``Headers`` object, pass it a list, dict, or 

40 other ``Headers`` object with default values. These values are 

41 validated the same way values added later are. 

42 

43 :param defaults: The list of default values for the :class:`Headers`. 

44 

45 .. versionchanged:: 3.1 

46 Implement ``|`` and ``|=`` operators. 

47 

48 .. versionchanged:: 2.1.0 

49 Default values are validated the same as values added later. 

50 

51 .. versionchanged:: 0.9 

52 This data structure now stores unicode values similar to how the 

53 multi dicts do it. The main difference is that bytes can be set as 

54 well which will automatically be latin1 decoded. 

55 

56 .. versionchanged:: 0.9 

57 The :meth:`linked` function was removed without replacement as it 

58 was an API that does not support the changes to the encoding model. 

59 """ 

60 

61 def __init__( 

62 self, 

63 defaults: ( 

64 Headers 

65 | MultiDict[str, t.Any] 

66 | cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | set[t.Any]] 

67 | cabc.Iterable[tuple[str, t.Any]] 

68 | None 

69 ) = None, 

70 ) -> None: 

71 self._list: list[tuple[str, str]] = [] 

72 

73 if defaults is not None: 

74 self.extend(defaults) 

75 

76 @t.overload 

77 def __getitem__(self, key: str) -> str: ... 

78 @t.overload 

79 def __getitem__(self, key: int) -> tuple[str, str]: ... 

80 @t.overload 

81 def __getitem__(self, key: slice) -> te.Self: ... 

82 def __getitem__(self, key: str | int | slice) -> str | tuple[str, str] | te.Self: 

83 if isinstance(key, str): 

84 return self._get_key(key) 

85 

86 if isinstance(key, int): 

87 return self._list[key] 

88 

89 return self.__class__(self._list[key]) 

90 

91 def _get_key(self, key: str) -> str: 

92 ikey = key.lower() 

93 

94 for k, v in self._list: 

95 if k.lower() == ikey: 

96 return v 

97 

98 raise BadRequestKeyError(key) 

99 

100 def __eq__(self, other: object) -> bool: 

101 if other.__class__ is not self.__class__: 

102 return NotImplemented 

103 

104 def lowered(item: tuple[str, ...]) -> tuple[str, ...]: 

105 return item[0].lower(), *item[1:] 

106 

107 return set(map(lowered, other._list)) == set(map(lowered, self._list)) 

108 

109 __hash__ = None # type: ignore[assignment] 

110 

111 @t.overload 

112 def get(self, key: str) -> str | None: ... 

113 @t.overload 

114 def get(self, key: str, default: str) -> str: ... 

115 @t.overload 

116 def get(self, key: str, default: T) -> str | T: ... 

117 @t.overload 

118 def get(self, key: str, type: cabc.Callable[[str], T]) -> T | None: ... 

119 @t.overload 

120 def get(self, key: str, default: T, type: cabc.Callable[[str], T]) -> T: ... 

121 def get( # type: ignore[misc] 

122 self, 

123 key: str, 

124 default: str | T | None = None, 

125 type: cabc.Callable[[str], T] | None = None, 

126 ) -> str | T | None: 

127 """Return the default value if the requested data doesn't exist. 

128 If `type` is provided and is a callable it should convert the value, 

129 return it or raise a :exc:`ValueError` if that is not possible. In 

130 this case the function will return the default as if the value was not 

131 found: 

132 

133 >>> d = Headers([('Content-Length', '42')]) 

134 >>> d.get('Content-Length', type=int) 

135 42 

136 

137 :param key: The key to be looked up. 

138 :param default: The default value to be returned if the key can't 

139 be looked up. If not further specified `None` is 

140 returned. 

141 :param type: A callable that is used to cast the value in the 

142 :class:`Headers`. If a :exc:`ValueError` is raised 

143 by this callable the default value is returned. 

144 

145 .. versionchanged:: 3.0 

146 The ``as_bytes`` parameter was removed. 

147 

148 .. versionchanged:: 0.9 

149 The ``as_bytes`` parameter was added. 

150 """ 

151 try: 

152 rv = self._get_key(key) 

153 except KeyError: 

154 return default 

155 

156 if type is None: 

157 return rv 

158 

159 try: 

160 return type(rv) 

161 except ValueError: 

162 return default 

163 

164 @t.overload 

165 def getlist(self, key: str) -> list[str]: ... 

166 @t.overload 

167 def getlist(self, key: str, type: cabc.Callable[[str], T]) -> list[T]: ... 

168 def getlist( 

169 self, key: str, type: cabc.Callable[[str], T] | None = None 

170 ) -> list[str] | list[T]: 

171 """Return the list of items for a given key. If that key is not in the 

172 :class:`Headers`, the return value will be an empty list. Just like 

173 :meth:`get`, :meth:`getlist` accepts a `type` parameter. All items will 

174 be converted with the callable defined there. 

175 

176 :param key: The key to be looked up. 

177 :param type: A callable that is used to cast the value in the 

178 :class:`Headers`. If a :exc:`ValueError` is raised 

179 by this callable the value will be removed from the list. 

180 :return: a :class:`list` of all the values for the key. 

181 

182 .. versionchanged:: 3.0 

183 The ``as_bytes`` parameter was removed. 

184 

185 .. versionchanged:: 0.9 

186 The ``as_bytes`` parameter was added. 

187 """ 

188 ikey = key.lower() 

189 

190 if type is not None: 

191 result = [] 

192 

193 for k, v in self: 

194 if k.lower() == ikey: 

195 try: 

196 result.append(type(v)) 

197 except ValueError: 

198 continue 

199 

200 return result 

201 

202 return [v for k, v in self if k.lower() == ikey] 

203 

204 def get_all(self, name: str) -> list[str]: 

205 """Return a list of all the values for the named field. 

206 

207 This method is compatible with the :mod:`wsgiref` 

208 :meth:`~wsgiref.headers.Headers.get_all` method. 

209 """ 

210 return self.getlist(name) 

211 

212 def items(self, lower: bool = False) -> t.Iterable[tuple[str, str]]: 

213 for key, value in self: 

214 if lower: 

215 key = key.lower() 

216 yield key, value 

217 

218 def keys(self, lower: bool = False) -> t.Iterable[str]: 

219 for key, _ in self.items(lower): 

220 yield key 

221 

222 def values(self) -> t.Iterable[str]: 

223 for _, value in self.items(): 

224 yield value 

225 

226 def extend( 

227 self, 

228 arg: ( 

229 Headers 

230 | MultiDict[str, t.Any] 

231 | cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | set[t.Any]] 

232 | cabc.Iterable[tuple[str, t.Any]] 

233 | None 

234 ) = None, 

235 /, 

236 **kwargs: str, 

237 ) -> None: 

238 """Extend headers in this object with items from another object 

239 containing header items as well as keyword arguments. 

240 

241 To replace existing keys instead of extending, use 

242 :meth:`update` instead. 

243 

244 If provided, the first argument can be another :class:`Headers` 

245 object, a :class:`MultiDict`, :class:`dict`, or iterable of 

246 pairs. 

247 

248 .. versionchanged:: 1.0 

249 Support :class:`MultiDict`. Allow passing ``kwargs``. 

250 """ 

251 if arg is not None: 

252 for key, value in iter_multi_items(arg): 

253 self.add(key, value) 

254 

255 for key, value in iter_multi_items(kwargs): 

256 self.add(key, value) 

257 

258 def __delitem__(self, key: str | int | slice) -> None: 

259 if isinstance(key, str): 

260 self._del_key(key) 

261 return 

262 

263 del self._list[key] 

264 

265 def _del_key(self, key: str) -> None: 

266 key = key.lower() 

267 new = [] 

268 

269 for k, v in self._list: 

270 if k.lower() != key: 

271 new.append((k, v)) 

272 

273 self._list[:] = new 

274 

275 def remove(self, key: str) -> None: 

276 """Remove a key. 

277 

278 :param key: The key to be removed. 

279 """ 

280 return self._del_key(key) 

281 

282 @t.overload 

283 def pop(self) -> tuple[str, str]: ... 

284 @t.overload 

285 def pop(self, key: str) -> str: ... 

286 @t.overload 

287 def pop(self, key: int | None = ...) -> tuple[str, str]: ... 

288 @t.overload 

289 def pop(self, key: str, default: str) -> str: ... 

290 @t.overload 

291 def pop(self, key: str, default: T) -> str | T: ... 

292 def pop( 

293 self, 

294 key: str | int | None = None, 

295 default: str | T = _missing, # type: ignore[assignment] 

296 ) -> str | tuple[str, str] | T: 

297 """Removes and returns a key or index. 

298 

299 :param key: The key to be popped. If this is an integer the item at 

300 that position is removed, if it's a string the value for 

301 that key is. If the key is omitted or `None` the last 

302 item is removed. 

303 :return: an item. 

304 """ 

305 if key is None: 

306 return self._list.pop() 

307 

308 if isinstance(key, int): 

309 return self._list.pop(key) 

310 

311 try: 

312 rv = self._get_key(key) 

313 except KeyError: 

314 if default is not _missing: 

315 return default 

316 

317 raise 

318 

319 self.remove(key) 

320 return rv 

321 

322 def popitem(self) -> tuple[str, str]: 

323 """Removes a key or index and returns a (key, value) item.""" 

324 return self._list.pop() 

325 

326 def __contains__(self, key: str) -> bool: 

327 """Check if a key is present.""" 

328 try: 

329 self._get_key(key) 

330 except KeyError: 

331 return False 

332 

333 return True 

334 

335 def __iter__(self) -> t.Iterator[tuple[str, str]]: 

336 """Yield ``(key, value)`` tuples.""" 

337 return iter(self._list) 

338 

339 def __len__(self) -> int: 

340 return len(self._list) 

341 

342 def add(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None: 

343 """Add a new header tuple to the list. 

344 

345 Keyword arguments can specify additional parameters for the header 

346 value, with underscores converted to dashes:: 

347 

348 >>> d = Headers() 

349 >>> d.add('Content-Type', 'text/plain') 

350 >>> d.add('Content-Disposition', 'attachment', filename='foo.png') 

351 

352 The keyword argument dumping uses :func:`dump_options_header` 

353 behind the scenes. 

354 

355 .. versionchanged:: 0.4.1 

356 keyword arguments were added for :mod:`wsgiref` compatibility. 

357 """ 

358 if kwargs: 

359 value = _options_header_vkw(value, kwargs) 

360 

361 value_str = _str_header_value(value) 

362 self._list.append((key, value_str)) 

363 

364 def add_header(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None: 

365 """Add a new header tuple to the list. 

366 

367 An alias for :meth:`add` for compatibility with the :mod:`wsgiref` 

368 :meth:`~wsgiref.headers.Headers.add_header` method. 

369 """ 

370 self.add(key, value, **kwargs) 

371 

372 def clear(self) -> None: 

373 """Clears all headers.""" 

374 self._list.clear() 

375 

376 def set(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None: 

377 """Remove all header tuples for `key` and add a new one. The newly 

378 added key either appears at the end of the list if there was no 

379 entry or replaces the first one. 

380 

381 Keyword arguments can specify additional parameters for the header 

382 value, with underscores converted to dashes. See :meth:`add` for 

383 more information. 

384 

385 .. versionchanged:: 0.6.1 

386 :meth:`set` now accepts the same arguments as :meth:`add`. 

387 

388 :param key: The key to be inserted. 

389 :param value: The value to be inserted. 

390 """ 

391 if kwargs: 

392 value = _options_header_vkw(value, kwargs) 

393 

394 value_str = _str_header_value(value) 

395 

396 if not self._list: 

397 self._list.append((key, value_str)) 

398 return 

399 

400 iter_list = iter(self._list) 

401 ikey = key.lower() 

402 

403 for idx, (old_key, _) in enumerate(iter_list): 

404 if old_key.lower() == ikey: 

405 # replace first occurrence 

406 self._list[idx] = (key, value_str) 

407 break 

408 else: 

409 # no existing occurrences 

410 self._list.append((key, value_str)) 

411 return 

412 

413 # remove remaining occurrences 

414 self._list[idx + 1 :] = [t for t in iter_list if t[0].lower() != ikey] 

415 

416 def setlist(self, key: str, values: cabc.Iterable[t.Any]) -> None: 

417 """Remove any existing values for a header and add new ones. 

418 

419 :param key: The header key to set. 

420 :param values: An iterable of values to set for the key. 

421 

422 .. versionadded:: 1.0 

423 """ 

424 if values: 

425 values_iter = iter(values) 

426 self.set(key, next(values_iter)) 

427 

428 for value in values_iter: 

429 self.add(key, value) 

430 else: 

431 self.remove(key) 

432 

433 def setdefault(self, key: str, default: t.Any) -> str: 

434 """Return the first value for the key if it is in the headers, 

435 otherwise set the header to the value given by ``default`` and 

436 return that. 

437 

438 :param key: The header key to get. 

439 :param default: The value to set for the key if it is not in the 

440 headers. 

441 """ 

442 try: 

443 return self._get_key(key) 

444 except KeyError: 

445 pass 

446 

447 self.set(key, default) 

448 return self._get_key(key) 

449 

450 def setlistdefault(self, key: str, default: cabc.Iterable[t.Any]) -> list[str]: 

451 """Return the list of values for the key if it is in the 

452 headers, otherwise set the header to the list of values given 

453 by ``default`` and return that. 

454 

455 Unlike :meth:`MultiDict.setlistdefault`, modifying the returned 

456 list will not affect the headers. 

457 

458 :param key: The header key to get. 

459 :param default: An iterable of values to set for the key if it 

460 is not in the headers. 

461 

462 .. versionadded:: 1.0 

463 """ 

464 if key not in self: 

465 self.setlist(key, default) 

466 

467 return self.getlist(key) 

468 

469 @t.overload 

470 def __setitem__(self, key: str, value: t.Any) -> None: ... 

471 @t.overload 

472 def __setitem__(self, key: int, value: tuple[str, t.Any]) -> None: ... 

473 @t.overload 

474 def __setitem__( 

475 self, key: slice, value: cabc.Iterable[tuple[str, t.Any]] 

476 ) -> None: ... 

477 def __setitem__( 

478 self, 

479 key: str | int | slice, 

480 value: t.Any | tuple[str, t.Any] | cabc.Iterable[tuple[str, t.Any]], 

481 ) -> None: 

482 """Like :meth:`set` but also supports index/slice based setting.""" 

483 if isinstance(key, str): 

484 self.set(key, value) 

485 elif isinstance(key, int): 

486 self._list[key] = value[0], _str_header_value(value[1]) # type: ignore[index] 

487 else: 

488 self._list[key] = [(k, _str_header_value(v)) for k, v in value] # type: ignore[str-unpack] 

489 

490 def update( 

491 self, 

492 arg: ( 

493 Headers 

494 | MultiDict[str, t.Any] 

495 | cabc.Mapping[ 

496 str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any] 

497 ] 

498 | cabc.Iterable[tuple[str, t.Any]] 

499 | None 

500 ) = None, 

501 /, 

502 **kwargs: t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any], 

503 ) -> None: 

504 """Replace headers in this object with items from another 

505 headers object and keyword arguments. 

506 

507 To extend existing keys instead of replacing, use :meth:`extend` 

508 instead. 

509 

510 If provided, the first argument can be another :class:`Headers` 

511 object, a :class:`MultiDict`, :class:`dict`, or iterable of 

512 pairs. 

513 

514 .. versionadded:: 1.0 

515 """ 

516 if arg is not None: 

517 if isinstance(arg, (Headers, MultiDict)): 

518 for key in arg.keys(): 

519 self.setlist(key, arg.getlist(key)) 

520 elif isinstance(arg, cabc.Mapping): 

521 for key, value in arg.items(): 

522 if isinstance(value, (list, tuple, set)): 

523 self.setlist(key, value) 

524 else: 

525 self.set(key, value) 

526 else: 

527 for key, value in arg: 

528 self.set(key, value) 

529 

530 for key, value in kwargs.items(): 

531 if isinstance(value, (list, tuple, set)): 

532 self.setlist(key, value) 

533 else: 

534 self.set(key, value) 

535 

536 def __or__( 

537 self, 

538 other: cabc.Mapping[ 

539 str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any] 

540 ], 

541 ) -> te.Self: 

542 if not isinstance(other, cabc.Mapping): 

543 return NotImplemented 

544 

545 rv = self.copy() 

546 rv.update(other) 

547 return rv 

548 

549 def __ior__( 

550 self, 

551 other: ( 

552 cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]] 

553 | cabc.Iterable[tuple[str, t.Any]] 

554 ), 

555 ) -> te.Self: 

556 if not isinstance(other, (cabc.Mapping, cabc.Iterable)): 

557 return NotImplemented 

558 

559 self.update(other) 

560 return self 

561 

562 def to_wsgi_list(self) -> list[tuple[str, str]]: 

563 """Convert the headers into a list suitable for WSGI. 

564 

565 :return: list 

566 """ 

567 return list(self) 

568 

569 def copy(self) -> te.Self: 

570 return self.__class__(self._list) 

571 

572 def __copy__(self) -> te.Self: 

573 return self.copy() 

574 

575 def __str__(self) -> str: 

576 """Returns formatted headers suitable for HTTP transmission.""" 

577 strs = [] 

578 for key, value in self.to_wsgi_list(): 

579 strs.append(f"{key}: {value}") 

580 strs.append("\r\n") 

581 return "\r\n".join(strs) 

582 

583 def __repr__(self) -> str: 

584 return f"{type(self).__name__}({list(self)!r})" 

585 

586 

587def _options_header_vkw(value: str, kw: dict[str, t.Any]) -> str: 

588 return dump_options_header(value, {k.replace("_", "-"): v for k, v in kw.items()}) 

589 

590 

591_newline_re = re.compile(r"[\r\n]") 

592 

593 

594def _str_header_value(value: t.Any) -> str: 

595 if not isinstance(value, str): 

596 value = str(value) 

597 

598 if _newline_re.search(value) is not None: 

599 raise ValueError("Header values must not contain newline characters.") 

600 

601 return value # type: ignore[no-any-return] 

602 

603 

604class EnvironHeaders(ImmutableHeadersMixin, Headers): # type: ignore[misc] 

605 """Read only version of the headers from a WSGI environment. This 

606 provides the same interface as `Headers` and is constructed from 

607 a WSGI environment. 

608 From Werkzeug 0.3 onwards, the `KeyError` raised by this class is also a 

609 subclass of the :exc:`~exceptions.BadRequest` HTTP exception and will 

610 render a page for a ``400 BAD REQUEST`` if caught in a catch-all for 

611 HTTP exceptions. 

612 """ 

613 

614 def __init__(self, environ: WSGIEnvironment) -> None: 

615 super().__init__() 

616 self.environ = environ 

617 

618 def __eq__(self, other: object) -> bool: 

619 if not isinstance(other, EnvironHeaders): 

620 return NotImplemented 

621 

622 return self.environ is other.environ 

623 

624 __hash__ = None 

625 

626 def __getitem__(self, key: str) -> str: # type: ignore[override] 

627 return self._get_key(key) 

628 

629 def _get_key(self, key: str) -> str: 

630 if not isinstance(key, str): 

631 raise BadRequestKeyError(key) 

632 

633 key = key.upper().replace("-", "_") 

634 

635 if key in {"CONTENT_TYPE", "CONTENT_LENGTH"}: 

636 return self.environ[key] # type: ignore[no-any-return] 

637 

638 return self.environ[f"HTTP_{key}"] # type: ignore[no-any-return] 

639 

640 def __len__(self) -> int: 

641 return sum(1 for _ in self) 

642 

643 def __iter__(self) -> cabc.Iterator[tuple[str, str]]: 

644 for key, value in self.environ.items(): 

645 if key.startswith("HTTP_") and key not in { 

646 "HTTP_CONTENT_TYPE", 

647 "HTTP_CONTENT_LENGTH", 

648 }: 

649 yield key[5:].replace("_", "-").title(), value 

650 elif key in {"CONTENT_TYPE", "CONTENT_LENGTH"} and value: 

651 yield key.replace("_", "-").title(), value 

652 

653 def copy(self) -> t.NoReturn: 

654 raise TypeError(f"cannot create {type(self).__name__!r} copies") 

655 

656 def __or__(self, other: t.Any) -> t.NoReturn: 

657 raise TypeError(f"cannot create {type(self).__name__!r} copies")