Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/werkzeug/datastructures/headers.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

284 statements  

1from __future__ import annotations 

2 

3import collections.abc as cabc 

4import re 

5import typing as t 

6 

7from .._internal import _missing 

8from ..exceptions import BadRequestKeyError 

9from .mixins import ImmutableHeadersMixin 

10from .structures import iter_multi_items 

11from .structures import MultiDict 

12 

13if t.TYPE_CHECKING: 

14 import typing_extensions as te 

15 from _typeshed.wsgi import WSGIEnvironment 

16 

17T = t.TypeVar("T") 

18 

19 

20class Headers: 

21 """An object that stores some headers. It has a dict-like interface, 

22 but is ordered, can store the same key multiple times, and iterating 

23 yields ``(key, value)`` pairs instead of only keys. 

24 

25 This data structure is useful if you want a nicer way to handle WSGI 

26 headers which are stored as tuples in a list. 

27 

28 From Werkzeug 0.3 onwards, the :exc:`KeyError` raised by this class is 

29 also a subclass of the :class:`~exceptions.BadRequest` HTTP exception 

30 and will render a page for a ``400 BAD REQUEST`` if caught in a 

31 catch-all for HTTP exceptions. 

32 

33 Headers is mostly compatible with the Python :class:`wsgiref.headers.Headers` 

34 class, with the exception of `__getitem__`. :mod:`wsgiref` will return 

35 `None` for ``headers['missing']``, whereas :class:`Headers` will raise 

36 a :class:`KeyError`. 

37 

38 To create a new ``Headers`` object, pass it a list, dict, or 

39 other ``Headers`` object with default values. These values are 

40 validated the same way values added later are. 

41 

42 :param defaults: The list of default values for the :class:`Headers`. 

43 

44 .. versionchanged:: 3.1 

45 Implement ``|`` and ``|=`` operators. 

46 

47 .. versionchanged:: 2.1.0 

48 Default values are validated the same as values added later. 

49 

50 .. versionchanged:: 0.9 

51 This data structure now stores unicode values similar to how the 

52 multi dicts do it. The main difference is that bytes can be set as 

53 well which will automatically be latin1 decoded. 

54 

55 .. versionchanged:: 0.9 

56 The :meth:`linked` function was removed without replacement as it 

57 was an API that does not support the changes to the encoding model. 

58 """ 

59 

60 def __init__( 

61 self, 

62 defaults: ( 

63 Headers 

64 | MultiDict[str, t.Any] 

65 | cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | set[t.Any]] 

66 | cabc.Iterable[tuple[str, t.Any]] 

67 | None 

68 ) = None, 

69 ) -> None: 

70 self._list: list[tuple[str, str]] = [] 

71 

72 if defaults is not None: 

73 self.extend(defaults) 

74 

75 @t.overload 

76 def __getitem__(self, key: str) -> str: ... 

77 @t.overload 

78 def __getitem__(self, key: int) -> tuple[str, str]: ... 

79 @t.overload 

80 def __getitem__(self, key: slice) -> te.Self: ... 

81 def __getitem__(self, key: str | int | slice) -> str | tuple[str, str] | te.Self: 

82 if isinstance(key, str): 

83 return self._get_key(key) 

84 

85 if isinstance(key, int): 

86 return self._list[key] 

87 

88 return self.__class__(self._list[key]) 

89 

90 def _get_key(self, key: str) -> str: 

91 ikey = key.lower() 

92 

93 for k, v in self._list: 

94 if k.lower() == ikey: 

95 return v 

96 

97 raise BadRequestKeyError(key) 

98 

99 def __eq__(self, other: object) -> bool: 

100 if other.__class__ is not self.__class__: 

101 return NotImplemented 

102 

103 def lowered(item: tuple[str, ...]) -> tuple[str, ...]: 

104 return item[0].lower(), *item[1:] 

105 

106 return set(map(lowered, other._list)) == set(map(lowered, self._list)) # type: ignore[attr-defined] 

107 

108 __hash__ = None # type: ignore[assignment] 

109 

110 @t.overload 

111 def get(self, key: str) -> str | None: ... 

112 @t.overload 

113 def get(self, key: str, default: str) -> str: ... 

114 @t.overload 

115 def get(self, key: str, default: T) -> str | T: ... 

116 @t.overload 

117 def get(self, key: str, type: cabc.Callable[[str], T]) -> T | None: ... 

118 @t.overload 

119 def get(self, key: str, default: T, type: cabc.Callable[[str], T]) -> T: ... 

120 def get( # type: ignore[misc] 

121 self, 

122 key: str, 

123 default: str | T | None = None, 

124 type: cabc.Callable[[str], T] | None = None, 

125 ) -> str | T | None: 

126 """Return the default value if the requested data doesn't exist. 

127 If `type` is provided and is a callable it should convert the value, 

128 return it or raise a :exc:`ValueError` if that is not possible. In 

129 this case the function will return the default as if the value was not 

130 found: 

131 

132 >>> d = Headers([('Content-Length', '42')]) 

133 >>> d.get('Content-Length', type=int) 

134 42 

135 

136 :param key: The key to be looked up. 

137 :param default: The default value to be returned if the key can't 

138 be looked up. If not further specified `None` is 

139 returned. 

140 :param type: A callable that is used to cast the value in the 

141 :class:`Headers`. If a :exc:`ValueError` is raised 

142 by this callable the default value is returned. 

143 

144 .. versionchanged:: 3.0 

145 The ``as_bytes`` parameter was removed. 

146 

147 .. versionchanged:: 0.9 

148 The ``as_bytes`` parameter was added. 

149 """ 

150 try: 

151 rv = self._get_key(key) 

152 except KeyError: 

153 return default 

154 

155 if type is None: 

156 return rv 

157 

158 try: 

159 return type(rv) 

160 except ValueError: 

161 return default 

162 

163 @t.overload 

164 def getlist(self, key: str) -> list[str]: ... 

165 @t.overload 

166 def getlist(self, key: str, type: cabc.Callable[[str], T]) -> list[T]: ... 

167 def getlist( 

168 self, key: str, type: cabc.Callable[[str], T] | None = None 

169 ) -> list[str] | list[T]: 

170 """Return the list of items for a given key. If that key is not in the 

171 :class:`Headers`, the return value will be an empty list. Just like 

172 :meth:`get`, :meth:`getlist` accepts a `type` parameter. All items will 

173 be converted with the callable defined there. 

174 

175 :param key: The key to be looked up. 

176 :param type: A callable that is used to cast the value in the 

177 :class:`Headers`. If a :exc:`ValueError` is raised 

178 by this callable the value will be removed from the list. 

179 :return: a :class:`list` of all the values for the key. 

180 

181 .. versionchanged:: 3.0 

182 The ``as_bytes`` parameter was removed. 

183 

184 .. versionchanged:: 0.9 

185 The ``as_bytes`` parameter was added. 

186 """ 

187 ikey = key.lower() 

188 

189 if type is not None: 

190 result = [] 

191 

192 for k, v in self: 

193 if k.lower() == ikey: 

194 try: 

195 result.append(type(v)) 

196 except ValueError: 

197 continue 

198 

199 return result 

200 

201 return [v for k, v in self if k.lower() == ikey] 

202 

203 def get_all(self, name: str) -> list[str]: 

204 """Return a list of all the values for the named field. 

205 

206 This method is compatible with the :mod:`wsgiref` 

207 :meth:`~wsgiref.headers.Headers.get_all` method. 

208 """ 

209 return self.getlist(name) 

210 

211 def items(self, lower: bool = False) -> t.Iterable[tuple[str, str]]: 

212 for key, value in self: 

213 if lower: 

214 key = key.lower() 

215 yield key, value 

216 

217 def keys(self, lower: bool = False) -> t.Iterable[str]: 

218 for key, _ in self.items(lower): 

219 yield key 

220 

221 def values(self) -> t.Iterable[str]: 

222 for _, value in self.items(): 

223 yield value 

224 

225 def extend( 

226 self, 

227 arg: ( 

228 Headers 

229 | MultiDict[str, t.Any] 

230 | cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | set[t.Any]] 

231 | cabc.Iterable[tuple[str, t.Any]] 

232 | None 

233 ) = None, 

234 /, 

235 **kwargs: str, 

236 ) -> None: 

237 """Extend headers in this object with items from another object 

238 containing header items as well as keyword arguments. 

239 

240 To replace existing keys instead of extending, use 

241 :meth:`update` instead. 

242 

243 If provided, the first argument can be another :class:`Headers` 

244 object, a :class:`MultiDict`, :class:`dict`, or iterable of 

245 pairs. 

246 

247 .. versionchanged:: 1.0 

248 Support :class:`MultiDict`. Allow passing ``kwargs``. 

249 """ 

250 if arg is not None: 

251 for key, value in iter_multi_items(arg): 

252 self.add(key, value) 

253 

254 for key, value in iter_multi_items(kwargs): 

255 self.add(key, value) 

256 

257 def __delitem__(self, key: str | int | slice) -> None: 

258 if isinstance(key, str): 

259 self._del_key(key) 

260 return 

261 

262 del self._list[key] 

263 

264 def _del_key(self, key: str) -> None: 

265 key = key.lower() 

266 new = [] 

267 

268 for k, v in self._list: 

269 if k.lower() != key: 

270 new.append((k, v)) 

271 

272 self._list[:] = new 

273 

274 def remove(self, key: str) -> None: 

275 """Remove a key. 

276 

277 :param key: The key to be removed. 

278 """ 

279 return self._del_key(key) 

280 

281 @t.overload 

282 def pop(self) -> tuple[str, str]: ... 

283 @t.overload 

284 def pop(self, key: str) -> str: ... 

285 @t.overload 

286 def pop(self, key: int | None = ...) -> tuple[str, str]: ... 

287 @t.overload 

288 def pop(self, key: str, default: str) -> str: ... 

289 @t.overload 

290 def pop(self, key: str, default: T) -> str | T: ... 

291 def pop( 

292 self, 

293 key: str | int | None = None, 

294 default: str | T = _missing, # type: ignore[assignment] 

295 ) -> str | tuple[str, str] | T: 

296 """Removes and returns a key or index. 

297 

298 :param key: The key to be popped. If this is an integer the item at 

299 that position is removed, if it's a string the value for 

300 that key is. If the key is omitted or `None` the last 

301 item is removed. 

302 :return: an item. 

303 """ 

304 if key is None: 

305 return self._list.pop() 

306 

307 if isinstance(key, int): 

308 return self._list.pop(key) 

309 

310 try: 

311 rv = self._get_key(key) 

312 except KeyError: 

313 if default is not _missing: 

314 return default 

315 

316 raise 

317 

318 self.remove(key) 

319 return rv 

320 

321 def popitem(self) -> tuple[str, str]: 

322 """Removes a key or index and returns a (key, value) item.""" 

323 return self._list.pop() 

324 

325 def __contains__(self, key: str) -> bool: 

326 """Check if a key is present.""" 

327 try: 

328 self._get_key(key) 

329 except KeyError: 

330 return False 

331 

332 return True 

333 

334 def __iter__(self) -> t.Iterator[tuple[str, str]]: 

335 """Yield ``(key, value)`` tuples.""" 

336 return iter(self._list) 

337 

338 def __len__(self) -> int: 

339 return len(self._list) 

340 

341 def add(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None: 

342 """Add a new header tuple to the list. 

343 

344 Keyword arguments can specify additional parameters for the header 

345 value, with underscores converted to dashes:: 

346 

347 >>> d = Headers() 

348 >>> d.add('Content-Type', 'text/plain') 

349 >>> d.add('Content-Disposition', 'attachment', filename='foo.png') 

350 

351 The keyword argument dumping uses :func:`dump_options_header` 

352 behind the scenes. 

353 

354 .. versionchanged:: 0.4.1 

355 keyword arguments were added for :mod:`wsgiref` compatibility. 

356 """ 

357 if kwargs: 

358 value = _options_header_vkw(value, kwargs) 

359 

360 value_str = _str_header_value(value) 

361 self._list.append((key, value_str)) 

362 

363 def add_header(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None: 

364 """Add a new header tuple to the list. 

365 

366 An alias for :meth:`add` for compatibility with the :mod:`wsgiref` 

367 :meth:`~wsgiref.headers.Headers.add_header` method. 

368 """ 

369 self.add(key, value, **kwargs) 

370 

371 def clear(self) -> None: 

372 """Clears all headers.""" 

373 self._list.clear() 

374 

375 def set(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None: 

376 """Remove all header tuples for `key` and add a new one. The newly 

377 added key either appears at the end of the list if there was no 

378 entry or replaces the first one. 

379 

380 Keyword arguments can specify additional parameters for the header 

381 value, with underscores converted to dashes. See :meth:`add` for 

382 more information. 

383 

384 .. versionchanged:: 0.6.1 

385 :meth:`set` now accepts the same arguments as :meth:`add`. 

386 

387 :param key: The key to be inserted. 

388 :param value: The value to be inserted. 

389 """ 

390 if kwargs: 

391 value = _options_header_vkw(value, kwargs) 

392 

393 value_str = _str_header_value(value) 

394 

395 if not self._list: 

396 self._list.append((key, value_str)) 

397 return 

398 

399 iter_list = iter(self._list) 

400 ikey = key.lower() 

401 

402 for idx, (old_key, _) in enumerate(iter_list): 

403 if old_key.lower() == ikey: 

404 # replace first occurrence 

405 self._list[idx] = (key, value_str) 

406 break 

407 else: 

408 # no existing occurrences 

409 self._list.append((key, value_str)) 

410 return 

411 

412 # remove remaining occurrences 

413 self._list[idx + 1 :] = [t for t in iter_list if t[0].lower() != ikey] 

414 

415 def setlist(self, key: str, values: cabc.Iterable[t.Any]) -> None: 

416 """Remove any existing values for a header and add new ones. 

417 

418 :param key: The header key to set. 

419 :param values: An iterable of values to set for the key. 

420 

421 .. versionadded:: 1.0 

422 """ 

423 if values: 

424 values_iter = iter(values) 

425 self.set(key, next(values_iter)) 

426 

427 for value in values_iter: 

428 self.add(key, value) 

429 else: 

430 self.remove(key) 

431 

432 def setdefault(self, key: str, default: t.Any) -> str: 

433 """Return the first value for the key if it is in the headers, 

434 otherwise set the header to the value given by ``default`` and 

435 return that. 

436 

437 :param key: The header key to get. 

438 :param default: The value to set for the key if it is not in the 

439 headers. 

440 """ 

441 try: 

442 return self._get_key(key) 

443 except KeyError: 

444 pass 

445 

446 self.set(key, default) 

447 return self._get_key(key) 

448 

449 def setlistdefault(self, key: str, default: cabc.Iterable[t.Any]) -> list[str]: 

450 """Return the list of values for the key if it is in the 

451 headers, otherwise set the header to the list of values given 

452 by ``default`` and return that. 

453 

454 Unlike :meth:`MultiDict.setlistdefault`, modifying the returned 

455 list will not affect the headers. 

456 

457 :param key: The header key to get. 

458 :param default: An iterable of values to set for the key if it 

459 is not in the headers. 

460 

461 .. versionadded:: 1.0 

462 """ 

463 if key not in self: 

464 self.setlist(key, default) 

465 

466 return self.getlist(key) 

467 

468 @t.overload 

469 def __setitem__(self, key: str, value: t.Any) -> None: ... 

470 @t.overload 

471 def __setitem__(self, key: int, value: tuple[str, t.Any]) -> None: ... 

472 @t.overload 

473 def __setitem__( 

474 self, key: slice, value: cabc.Iterable[tuple[str, t.Any]] 

475 ) -> None: ... 

476 def __setitem__( 

477 self, 

478 key: str | int | slice, 

479 value: t.Any | tuple[str, t.Any] | cabc.Iterable[tuple[str, t.Any]], 

480 ) -> None: 

481 """Like :meth:`set` but also supports index/slice based setting.""" 

482 if isinstance(key, str): 

483 self.set(key, value) 

484 elif isinstance(key, int): 

485 self._list[key] = value[0], _str_header_value(value[1]) # type: ignore[index] 

486 else: 

487 self._list[key] = [(k, _str_header_value(v)) for k, v in value] # type: ignore[misc] 

488 

489 def update( 

490 self, 

491 arg: ( 

492 Headers 

493 | MultiDict[str, t.Any] 

494 | cabc.Mapping[ 

495 str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any] 

496 ] 

497 | cabc.Iterable[tuple[str, t.Any]] 

498 | None 

499 ) = None, 

500 /, 

501 **kwargs: t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any], 

502 ) -> None: 

503 """Replace headers in this object with items from another 

504 headers object and keyword arguments. 

505 

506 To extend existing keys instead of replacing, use :meth:`extend` 

507 instead. 

508 

509 If provided, the first argument can be another :class:`Headers` 

510 object, a :class:`MultiDict`, :class:`dict`, or iterable of 

511 pairs. 

512 

513 .. versionadded:: 1.0 

514 """ 

515 if arg is not None: 

516 if isinstance(arg, (Headers, MultiDict)): 

517 for key in arg.keys(): 

518 self.setlist(key, arg.getlist(key)) 

519 elif isinstance(arg, cabc.Mapping): 

520 for key, value in arg.items(): 

521 if isinstance(value, (list, tuple, set)): 

522 self.setlist(key, value) 

523 else: 

524 self.set(key, value) 

525 else: 

526 for key, value in arg: 

527 self.set(key, value) 

528 

529 for key, value in kwargs.items(): 

530 if isinstance(value, (list, tuple, set)): 

531 self.setlist(key, value) 

532 else: 

533 self.set(key, value) 

534 

535 def __or__( 

536 self, 

537 other: cabc.Mapping[ 

538 str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any] 

539 ], 

540 ) -> te.Self: 

541 if not isinstance(other, cabc.Mapping): 

542 return NotImplemented 

543 

544 rv = self.copy() 

545 rv.update(other) 

546 return rv 

547 

548 def __ior__( 

549 self, 

550 other: ( 

551 cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]] 

552 | cabc.Iterable[tuple[str, t.Any]] 

553 ), 

554 ) -> te.Self: 

555 if not isinstance(other, (cabc.Mapping, cabc.Iterable)): 

556 return NotImplemented 

557 

558 self.update(other) 

559 return self 

560 

561 def to_wsgi_list(self) -> list[tuple[str, str]]: 

562 """Convert the headers into a list suitable for WSGI. 

563 

564 :return: list 

565 """ 

566 return list(self) 

567 

568 def copy(self) -> te.Self: 

569 return self.__class__(self._list) 

570 

571 def __copy__(self) -> te.Self: 

572 return self.copy() 

573 

574 def __str__(self) -> str: 

575 """Returns formatted headers suitable for HTTP transmission.""" 

576 strs = [] 

577 for key, value in self.to_wsgi_list(): 

578 strs.append(f"{key}: {value}") 

579 strs.append("\r\n") 

580 return "\r\n".join(strs) 

581 

582 def __repr__(self) -> str: 

583 return f"{type(self).__name__}({list(self)!r})" 

584 

585 

586def _options_header_vkw(value: str, kw: dict[str, t.Any]) -> str: 

587 return http.dump_options_header( 

588 value, {k.replace("_", "-"): v for k, v in kw.items()} 

589 ) 

590 

591 

592_newline_re = re.compile(r"[\r\n]") 

593 

594 

595def _str_header_value(value: t.Any) -> str: 

596 if not isinstance(value, str): 

597 value = str(value) 

598 

599 if _newline_re.search(value) is not None: 

600 raise ValueError("Header values must not contain newline characters.") 

601 

602 return value # type: ignore[no-any-return] 

603 

604 

605class EnvironHeaders(ImmutableHeadersMixin, Headers): # type: ignore[misc] 

606 """Read only version of the headers from a WSGI environment. This 

607 provides the same interface as `Headers` and is constructed from 

608 a WSGI environment. 

609 From Werkzeug 0.3 onwards, the `KeyError` raised by this class is also a 

610 subclass of the :exc:`~exceptions.BadRequest` HTTP exception and will 

611 render a page for a ``400 BAD REQUEST`` if caught in a catch-all for 

612 HTTP exceptions. 

613 """ 

614 

615 def __init__(self, environ: WSGIEnvironment) -> None: 

616 super().__init__() 

617 self.environ = environ 

618 

619 def __eq__(self, other: object) -> bool: 

620 if not isinstance(other, EnvironHeaders): 

621 return NotImplemented 

622 

623 return self.environ is other.environ 

624 

625 __hash__ = None # type: ignore[assignment] 

626 

627 def __getitem__(self, key: str) -> str: # type: ignore[override] 

628 return self._get_key(key) 

629 

630 def _get_key(self, key: str) -> str: 

631 if not isinstance(key, str): 

632 raise BadRequestKeyError(key) 

633 

634 key = key.upper().replace("-", "_") 

635 

636 if key in {"CONTENT_TYPE", "CONTENT_LENGTH"}: 

637 return self.environ[key] # type: ignore[no-any-return] 

638 

639 return self.environ[f"HTTP_{key}"] # type: ignore[no-any-return] 

640 

641 def __len__(self) -> int: 

642 return sum(1 for _ in self) 

643 

644 def __iter__(self) -> cabc.Iterator[tuple[str, str]]: 

645 for key, value in self.environ.items(): 

646 if key.startswith("HTTP_") and key not in { 

647 "HTTP_CONTENT_TYPE", 

648 "HTTP_CONTENT_LENGTH", 

649 }: 

650 yield key[5:].replace("_", "-").title(), value 

651 elif key in {"CONTENT_TYPE", "CONTENT_LENGTH"} and value: 

652 yield key.replace("_", "-").title(), value 

653 

654 def copy(self) -> t.NoReturn: 

655 raise TypeError(f"cannot create {type(self).__name__!r} copies") 

656 

657 def __or__(self, other: t.Any) -> t.NoReturn: 

658 raise TypeError(f"cannot create {type(self).__name__!r} copies") 

659 

660 

661# circular dependencies 

662from .. import http