1from __future__ import annotations
2
3import collections.abc as cabc
4import re
5import typing as t
6
7from .._internal import _missing
8from ..exceptions import BadRequestKeyError
9from .mixins import ImmutableHeadersMixin
10from .structures import iter_multi_items
11from .structures import MultiDict
12
13if t.TYPE_CHECKING:
14 import typing_extensions as te
15 from _typeshed.wsgi import WSGIEnvironment
16
17T = t.TypeVar("T")
18
19
20class Headers:
21 """An object that stores some headers. It has a dict-like interface,
22 but is ordered, can store the same key multiple times, and iterating
23 yields ``(key, value)`` pairs instead of only keys.
24
25 This data structure is useful if you want a nicer way to handle WSGI
26 headers which are stored as tuples in a list.
27
28 From Werkzeug 0.3 onwards, the :exc:`KeyError` raised by this class is
29 also a subclass of the :class:`~exceptions.BadRequest` HTTP exception
30 and will render a page for a ``400 BAD REQUEST`` if caught in a
31 catch-all for HTTP exceptions.
32
33 Headers is mostly compatible with the Python :class:`wsgiref.headers.Headers`
34 class, with the exception of `__getitem__`. :mod:`wsgiref` will return
35 `None` for ``headers['missing']``, whereas :class:`Headers` will raise
36 a :class:`KeyError`.
37
38 To create a new ``Headers`` object, pass it a list, dict, or
39 other ``Headers`` object with default values. These values are
40 validated the same way values added later are.
41
42 :param defaults: The list of default values for the :class:`Headers`.
43
44 .. versionchanged:: 3.1
45 Implement ``|`` and ``|=`` operators.
46
47 .. versionchanged:: 2.1.0
48 Default values are validated the same as values added later.
49
50 .. versionchanged:: 0.9
51 This data structure now stores unicode values similar to how the
52 multi dicts do it. The main difference is that bytes can be set as
53 well which will automatically be latin1 decoded.
54
55 .. versionchanged:: 0.9
56 The :meth:`linked` function was removed without replacement as it
57 was an API that does not support the changes to the encoding model.
58 """
59
60 def __init__(
61 self,
62 defaults: (
63 Headers
64 | MultiDict[str, t.Any]
65 | cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | set[t.Any]]
66 | cabc.Iterable[tuple[str, t.Any]]
67 | None
68 ) = None,
69 ) -> None:
70 self._list: list[tuple[str, str]] = []
71
72 if defaults is not None:
73 self.extend(defaults)
74
75 @t.overload
76 def __getitem__(self, key: str) -> str: ...
77 @t.overload
78 def __getitem__(self, key: int) -> tuple[str, str]: ...
79 @t.overload
80 def __getitem__(self, key: slice) -> te.Self: ...
81 def __getitem__(self, key: str | int | slice) -> str | tuple[str, str] | te.Self:
82 if isinstance(key, str):
83 return self._get_key(key)
84
85 if isinstance(key, int):
86 return self._list[key]
87
88 return self.__class__(self._list[key])
89
90 def _get_key(self, key: str) -> str:
91 ikey = key.lower()
92
93 for k, v in self._list:
94 if k.lower() == ikey:
95 return v
96
97 raise BadRequestKeyError(key)
98
99 def __eq__(self, other: object) -> bool:
100 if other.__class__ is not self.__class__:
101 return NotImplemented
102
103 def lowered(item: tuple[str, ...]) -> tuple[str, ...]:
104 return item[0].lower(), *item[1:]
105
106 return set(map(lowered, other._list)) == set(map(lowered, self._list)) # type: ignore[attr-defined]
107
108 __hash__ = None # type: ignore[assignment]
109
110 @t.overload
111 def get(self, key: str) -> str | None: ...
112 @t.overload
113 def get(self, key: str, default: str) -> str: ...
114 @t.overload
115 def get(self, key: str, default: T) -> str | T: ...
116 @t.overload
117 def get(self, key: str, type: cabc.Callable[[str], T]) -> T | None: ...
118 @t.overload
119 def get(self, key: str, default: T, type: cabc.Callable[[str], T]) -> T: ...
120 def get( # type: ignore[misc]
121 self,
122 key: str,
123 default: str | T | None = None,
124 type: cabc.Callable[[str], T] | None = None,
125 ) -> str | T | None:
126 """Return the default value if the requested data doesn't exist.
127 If `type` is provided and is a callable it should convert the value,
128 return it or raise a :exc:`ValueError` if that is not possible. In
129 this case the function will return the default as if the value was not
130 found:
131
132 >>> d = Headers([('Content-Length', '42')])
133 >>> d.get('Content-Length', type=int)
134 42
135
136 :param key: The key to be looked up.
137 :param default: The default value to be returned if the key can't
138 be looked up. If not further specified `None` is
139 returned.
140 :param type: A callable that is used to cast the value in the
141 :class:`Headers`. If a :exc:`ValueError` is raised
142 by this callable the default value is returned.
143
144 .. versionchanged:: 3.0
145 The ``as_bytes`` parameter was removed.
146
147 .. versionchanged:: 0.9
148 The ``as_bytes`` parameter was added.
149 """
150 try:
151 rv = self._get_key(key)
152 except KeyError:
153 return default
154
155 if type is None:
156 return rv
157
158 try:
159 return type(rv)
160 except ValueError:
161 return default
162
163 @t.overload
164 def getlist(self, key: str) -> list[str]: ...
165 @t.overload
166 def getlist(self, key: str, type: cabc.Callable[[str], T]) -> list[T]: ...
167 def getlist(
168 self, key: str, type: cabc.Callable[[str], T] | None = None
169 ) -> list[str] | list[T]:
170 """Return the list of items for a given key. If that key is not in the
171 :class:`Headers`, the return value will be an empty list. Just like
172 :meth:`get`, :meth:`getlist` accepts a `type` parameter. All items will
173 be converted with the callable defined there.
174
175 :param key: The key to be looked up.
176 :param type: A callable that is used to cast the value in the
177 :class:`Headers`. If a :exc:`ValueError` is raised
178 by this callable the value will be removed from the list.
179 :return: a :class:`list` of all the values for the key.
180
181 .. versionchanged:: 3.0
182 The ``as_bytes`` parameter was removed.
183
184 .. versionchanged:: 0.9
185 The ``as_bytes`` parameter was added.
186 """
187 ikey = key.lower()
188
189 if type is not None:
190 result = []
191
192 for k, v in self:
193 if k.lower() == ikey:
194 try:
195 result.append(type(v))
196 except ValueError:
197 continue
198
199 return result
200
201 return [v for k, v in self if k.lower() == ikey]
202
203 def get_all(self, name: str) -> list[str]:
204 """Return a list of all the values for the named field.
205
206 This method is compatible with the :mod:`wsgiref`
207 :meth:`~wsgiref.headers.Headers.get_all` method.
208 """
209 return self.getlist(name)
210
211 def items(self, lower: bool = False) -> t.Iterable[tuple[str, str]]:
212 for key, value in self:
213 if lower:
214 key = key.lower()
215 yield key, value
216
217 def keys(self, lower: bool = False) -> t.Iterable[str]:
218 for key, _ in self.items(lower):
219 yield key
220
221 def values(self) -> t.Iterable[str]:
222 for _, value in self.items():
223 yield value
224
225 def extend(
226 self,
227 arg: (
228 Headers
229 | MultiDict[str, t.Any]
230 | cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | set[t.Any]]
231 | cabc.Iterable[tuple[str, t.Any]]
232 | None
233 ) = None,
234 /,
235 **kwargs: str,
236 ) -> None:
237 """Extend headers in this object with items from another object
238 containing header items as well as keyword arguments.
239
240 To replace existing keys instead of extending, use
241 :meth:`update` instead.
242
243 If provided, the first argument can be another :class:`Headers`
244 object, a :class:`MultiDict`, :class:`dict`, or iterable of
245 pairs.
246
247 .. versionchanged:: 1.0
248 Support :class:`MultiDict`. Allow passing ``kwargs``.
249 """
250 if arg is not None:
251 for key, value in iter_multi_items(arg):
252 self.add(key, value)
253
254 for key, value in iter_multi_items(kwargs):
255 self.add(key, value)
256
257 def __delitem__(self, key: str | int | slice) -> None:
258 if isinstance(key, str):
259 self._del_key(key)
260 return
261
262 del self._list[key]
263
264 def _del_key(self, key: str) -> None:
265 key = key.lower()
266 new = []
267
268 for k, v in self._list:
269 if k.lower() != key:
270 new.append((k, v))
271
272 self._list[:] = new
273
274 def remove(self, key: str) -> None:
275 """Remove a key.
276
277 :param key: The key to be removed.
278 """
279 return self._del_key(key)
280
281 @t.overload
282 def pop(self) -> tuple[str, str]: ...
283 @t.overload
284 def pop(self, key: str) -> str: ...
285 @t.overload
286 def pop(self, key: int | None = ...) -> tuple[str, str]: ...
287 @t.overload
288 def pop(self, key: str, default: str) -> str: ...
289 @t.overload
290 def pop(self, key: str, default: T) -> str | T: ...
291 def pop(
292 self,
293 key: str | int | None = None,
294 default: str | T = _missing, # type: ignore[assignment]
295 ) -> str | tuple[str, str] | T:
296 """Removes and returns a key or index.
297
298 :param key: The key to be popped. If this is an integer the item at
299 that position is removed, if it's a string the value for
300 that key is. If the key is omitted or `None` the last
301 item is removed.
302 :return: an item.
303 """
304 if key is None:
305 return self._list.pop()
306
307 if isinstance(key, int):
308 return self._list.pop(key)
309
310 try:
311 rv = self._get_key(key)
312 except KeyError:
313 if default is not _missing:
314 return default
315
316 raise
317
318 self.remove(key)
319 return rv
320
321 def popitem(self) -> tuple[str, str]:
322 """Removes a key or index and returns a (key, value) item."""
323 return self._list.pop()
324
325 def __contains__(self, key: str) -> bool:
326 """Check if a key is present."""
327 try:
328 self._get_key(key)
329 except KeyError:
330 return False
331
332 return True
333
334 def __iter__(self) -> t.Iterator[tuple[str, str]]:
335 """Yield ``(key, value)`` tuples."""
336 return iter(self._list)
337
338 def __len__(self) -> int:
339 return len(self._list)
340
341 def add(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
342 """Add a new header tuple to the list.
343
344 Keyword arguments can specify additional parameters for the header
345 value, with underscores converted to dashes::
346
347 >>> d = Headers()
348 >>> d.add('Content-Type', 'text/plain')
349 >>> d.add('Content-Disposition', 'attachment', filename='foo.png')
350
351 The keyword argument dumping uses :func:`dump_options_header`
352 behind the scenes.
353
354 .. versionchanged:: 0.4.1
355 keyword arguments were added for :mod:`wsgiref` compatibility.
356 """
357 if kwargs:
358 value = _options_header_vkw(value, kwargs)
359
360 value_str = _str_header_value(value)
361 self._list.append((key, value_str))
362
363 def add_header(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
364 """Add a new header tuple to the list.
365
366 An alias for :meth:`add` for compatibility with the :mod:`wsgiref`
367 :meth:`~wsgiref.headers.Headers.add_header` method.
368 """
369 self.add(key, value, **kwargs)
370
371 def clear(self) -> None:
372 """Clears all headers."""
373 self._list.clear()
374
375 def set(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
376 """Remove all header tuples for `key` and add a new one. The newly
377 added key either appears at the end of the list if there was no
378 entry or replaces the first one.
379
380 Keyword arguments can specify additional parameters for the header
381 value, with underscores converted to dashes. See :meth:`add` for
382 more information.
383
384 .. versionchanged:: 0.6.1
385 :meth:`set` now accepts the same arguments as :meth:`add`.
386
387 :param key: The key to be inserted.
388 :param value: The value to be inserted.
389 """
390 if kwargs:
391 value = _options_header_vkw(value, kwargs)
392
393 value_str = _str_header_value(value)
394
395 if not self._list:
396 self._list.append((key, value_str))
397 return
398
399 iter_list = iter(self._list)
400 ikey = key.lower()
401
402 for idx, (old_key, _) in enumerate(iter_list):
403 if old_key.lower() == ikey:
404 # replace first occurrence
405 self._list[idx] = (key, value_str)
406 break
407 else:
408 # no existing occurrences
409 self._list.append((key, value_str))
410 return
411
412 # remove remaining occurrences
413 self._list[idx + 1 :] = [t for t in iter_list if t[0].lower() != ikey]
414
415 def setlist(self, key: str, values: cabc.Iterable[t.Any]) -> None:
416 """Remove any existing values for a header and add new ones.
417
418 :param key: The header key to set.
419 :param values: An iterable of values to set for the key.
420
421 .. versionadded:: 1.0
422 """
423 if values:
424 values_iter = iter(values)
425 self.set(key, next(values_iter))
426
427 for value in values_iter:
428 self.add(key, value)
429 else:
430 self.remove(key)
431
432 def setdefault(self, key: str, default: t.Any) -> str:
433 """Return the first value for the key if it is in the headers,
434 otherwise set the header to the value given by ``default`` and
435 return that.
436
437 :param key: The header key to get.
438 :param default: The value to set for the key if it is not in the
439 headers.
440 """
441 try:
442 return self._get_key(key)
443 except KeyError:
444 pass
445
446 self.set(key, default)
447 return self._get_key(key)
448
449 def setlistdefault(self, key: str, default: cabc.Iterable[t.Any]) -> list[str]:
450 """Return the list of values for the key if it is in the
451 headers, otherwise set the header to the list of values given
452 by ``default`` and return that.
453
454 Unlike :meth:`MultiDict.setlistdefault`, modifying the returned
455 list will not affect the headers.
456
457 :param key: The header key to get.
458 :param default: An iterable of values to set for the key if it
459 is not in the headers.
460
461 .. versionadded:: 1.0
462 """
463 if key not in self:
464 self.setlist(key, default)
465
466 return self.getlist(key)
467
468 @t.overload
469 def __setitem__(self, key: str, value: t.Any) -> None: ...
470 @t.overload
471 def __setitem__(self, key: int, value: tuple[str, t.Any]) -> None: ...
472 @t.overload
473 def __setitem__(
474 self, key: slice, value: cabc.Iterable[tuple[str, t.Any]]
475 ) -> None: ...
476 def __setitem__(
477 self,
478 key: str | int | slice,
479 value: t.Any | tuple[str, t.Any] | cabc.Iterable[tuple[str, t.Any]],
480 ) -> None:
481 """Like :meth:`set` but also supports index/slice based setting."""
482 if isinstance(key, str):
483 self.set(key, value)
484 elif isinstance(key, int):
485 self._list[key] = value[0], _str_header_value(value[1]) # type: ignore[index]
486 else:
487 self._list[key] = [(k, _str_header_value(v)) for k, v in value] # type: ignore[misc]
488
489 def update(
490 self,
491 arg: (
492 Headers
493 | MultiDict[str, t.Any]
494 | cabc.Mapping[
495 str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]
496 ]
497 | cabc.Iterable[tuple[str, t.Any]]
498 | None
499 ) = None,
500 /,
501 **kwargs: t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any],
502 ) -> None:
503 """Replace headers in this object with items from another
504 headers object and keyword arguments.
505
506 To extend existing keys instead of replacing, use :meth:`extend`
507 instead.
508
509 If provided, the first argument can be another :class:`Headers`
510 object, a :class:`MultiDict`, :class:`dict`, or iterable of
511 pairs.
512
513 .. versionadded:: 1.0
514 """
515 if arg is not None:
516 if isinstance(arg, (Headers, MultiDict)):
517 for key in arg.keys():
518 self.setlist(key, arg.getlist(key))
519 elif isinstance(arg, cabc.Mapping):
520 for key, value in arg.items():
521 if isinstance(value, (list, tuple, set)):
522 self.setlist(key, value)
523 else:
524 self.set(key, value)
525 else:
526 for key, value in arg:
527 self.set(key, value)
528
529 for key, value in kwargs.items():
530 if isinstance(value, (list, tuple, set)):
531 self.setlist(key, value)
532 else:
533 self.set(key, value)
534
535 def __or__(
536 self,
537 other: cabc.Mapping[
538 str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]
539 ],
540 ) -> te.Self:
541 if not isinstance(other, cabc.Mapping):
542 return NotImplemented
543
544 rv = self.copy()
545 rv.update(other)
546 return rv
547
548 def __ior__(
549 self,
550 other: (
551 cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]]
552 | cabc.Iterable[tuple[str, t.Any]]
553 ),
554 ) -> te.Self:
555 if not isinstance(other, (cabc.Mapping, cabc.Iterable)):
556 return NotImplemented
557
558 self.update(other)
559 return self
560
561 def to_wsgi_list(self) -> list[tuple[str, str]]:
562 """Convert the headers into a list suitable for WSGI.
563
564 :return: list
565 """
566 return list(self)
567
568 def copy(self) -> te.Self:
569 return self.__class__(self._list)
570
571 def __copy__(self) -> te.Self:
572 return self.copy()
573
574 def __str__(self) -> str:
575 """Returns formatted headers suitable for HTTP transmission."""
576 strs = []
577 for key, value in self.to_wsgi_list():
578 strs.append(f"{key}: {value}")
579 strs.append("\r\n")
580 return "\r\n".join(strs)
581
582 def __repr__(self) -> str:
583 return f"{type(self).__name__}({list(self)!r})"
584
585
586def _options_header_vkw(value: str, kw: dict[str, t.Any]) -> str:
587 return http.dump_options_header(
588 value, {k.replace("_", "-"): v for k, v in kw.items()}
589 )
590
591
592_newline_re = re.compile(r"[\r\n]")
593
594
595def _str_header_value(value: t.Any) -> str:
596 if not isinstance(value, str):
597 value = str(value)
598
599 if _newline_re.search(value) is not None:
600 raise ValueError("Header values must not contain newline characters.")
601
602 return value # type: ignore[no-any-return]
603
604
605class EnvironHeaders(ImmutableHeadersMixin, Headers): # type: ignore[misc]
606 """Read only version of the headers from a WSGI environment. This
607 provides the same interface as `Headers` and is constructed from
608 a WSGI environment.
609 From Werkzeug 0.3 onwards, the `KeyError` raised by this class is also a
610 subclass of the :exc:`~exceptions.BadRequest` HTTP exception and will
611 render a page for a ``400 BAD REQUEST`` if caught in a catch-all for
612 HTTP exceptions.
613 """
614
615 def __init__(self, environ: WSGIEnvironment) -> None:
616 super().__init__()
617 self.environ = environ
618
619 def __eq__(self, other: object) -> bool:
620 if not isinstance(other, EnvironHeaders):
621 return NotImplemented
622
623 return self.environ is other.environ
624
625 __hash__ = None # type: ignore[assignment]
626
627 def __getitem__(self, key: str) -> str: # type: ignore[override]
628 return self._get_key(key)
629
630 def _get_key(self, key: str) -> str:
631 if not isinstance(key, str):
632 raise BadRequestKeyError(key)
633
634 key = key.upper().replace("-", "_")
635
636 if key in {"CONTENT_TYPE", "CONTENT_LENGTH"}:
637 return self.environ[key] # type: ignore[no-any-return]
638
639 return self.environ[f"HTTP_{key}"] # type: ignore[no-any-return]
640
641 def __len__(self) -> int:
642 return sum(1 for _ in self)
643
644 def __iter__(self) -> cabc.Iterator[tuple[str, str]]:
645 for key, value in self.environ.items():
646 if key.startswith("HTTP_") and key not in {
647 "HTTP_CONTENT_TYPE",
648 "HTTP_CONTENT_LENGTH",
649 }:
650 yield key[5:].replace("_", "-").title(), value
651 elif key in {"CONTENT_TYPE", "CONTENT_LENGTH"} and value:
652 yield key.replace("_", "-").title(), value
653
654 def copy(self) -> t.NoReturn:
655 raise TypeError(f"cannot create {type(self).__name__!r} copies")
656
657 def __or__(self, other: t.Any) -> t.NoReturn:
658 raise TypeError(f"cannot create {type(self).__name__!r} copies")
659
660
661# circular dependencies
662from .. import http