1from __future__ import annotations
2
3import collections.abc as cabc
4import re
5import typing as t
6
7from .._internal import _missing
8from ..exceptions import BadRequestKeyError
9from ..http import dump_options_header
10from .mixins import ImmutableHeadersMixin
11from .structures import iter_multi_items
12from .structures import MultiDict
13
14if t.TYPE_CHECKING:
15 import typing_extensions as te
16 from _typeshed.wsgi import WSGIEnvironment
17
18T = t.TypeVar("T")
19
20
21class Headers:
22 """An object that stores some headers. It has a dict-like interface,
23 but is ordered, can store the same key multiple times, and iterating
24 yields ``(key, value)`` pairs instead of only keys.
25
26 This data structure is useful if you want a nicer way to handle WSGI
27 headers which are stored as tuples in a list.
28
29 From Werkzeug 0.3 onwards, the :exc:`KeyError` raised by this class is
30 also a subclass of the :class:`~exceptions.BadRequest` HTTP exception
31 and will render a page for a ``400 BAD REQUEST`` if caught in a
32 catch-all for HTTP exceptions.
33
34 Headers is mostly compatible with the Python :class:`wsgiref.headers.Headers`
35 class, with the exception of `__getitem__`. :mod:`wsgiref` will return
36 `None` for ``headers['missing']``, whereas :class:`Headers` will raise
37 a :class:`KeyError`.
38
39 To create a new ``Headers`` object, pass it a list, dict, or
40 other ``Headers`` object with default values. These values are
41 validated the same way values added later are.
42
43 :param defaults: The list of default values for the :class:`Headers`.
44
45 .. versionchanged:: 3.1
46 Implement ``|`` and ``|=`` operators.
47
48 .. versionchanged:: 2.1.0
49 Default values are validated the same as values added later.
50
51 .. versionchanged:: 0.9
52 This data structure now stores unicode values similar to how the
53 multi dicts do it. The main difference is that bytes can be set as
54 well which will automatically be latin1 decoded.
55
56 .. versionchanged:: 0.9
57 The :meth:`linked` function was removed without replacement as it
58 was an API that does not support the changes to the encoding model.
59 """
60
61 def __init__(
62 self,
63 defaults: (
64 Headers
65 | MultiDict[str, t.Any]
66 | cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | set[t.Any]]
67 | cabc.Iterable[tuple[str, t.Any]]
68 | None
69 ) = None,
70 ) -> None:
71 self._list: list[tuple[str, str]] = []
72
73 if defaults is not None:
74 self.extend(defaults)
75
76 @t.overload
77 def __getitem__(self, key: str) -> str: ...
78 @t.overload
79 def __getitem__(self, key: int) -> tuple[str, str]: ...
80 @t.overload
81 def __getitem__(self, key: slice) -> te.Self: ...
82 def __getitem__(self, key: str | int | slice) -> str | tuple[str, str] | te.Self:
83 if isinstance(key, str):
84 return self._get_key(key)
85
86 if isinstance(key, int):
87 return self._list[key]
88
89 return self.__class__(self._list[key])
90
91 def _get_key(self, key: str) -> str:
92 ikey = key.lower()
93
94 for k, v in self._list:
95 if k.lower() == ikey:
96 return v
97
98 raise BadRequestKeyError(key)
99
100 def __eq__(self, other: object) -> bool:
101 if other.__class__ is not self.__class__:
102 return NotImplemented
103
104 def lowered(item: tuple[str, ...]) -> tuple[str, ...]:
105 return item[0].lower(), *item[1:]
106
107 return set(map(lowered, other._list)) == set(map(lowered, self._list))
108
109 __hash__ = None # type: ignore[assignment]
110
111 @t.overload
112 def get(self, key: str) -> str | None: ...
113 @t.overload
114 def get(self, key: str, default: str) -> str: ...
115 @t.overload
116 def get(self, key: str, default: T) -> str | T: ...
117 @t.overload
118 def get(self, key: str, type: cabc.Callable[[str], T]) -> T | None: ...
119 @t.overload
120 def get(self, key: str, default: T, type: cabc.Callable[[str], T]) -> T: ...
121 def get( # type: ignore[misc]
122 self,
123 key: str,
124 default: str | T | None = None,
125 type: cabc.Callable[[str], T] | None = None,
126 ) -> str | T | None:
127 """Return the default value if the requested data doesn't exist.
128 If `type` is provided and is a callable it should convert the value,
129 return it or raise a :exc:`ValueError` if that is not possible. In
130 this case the function will return the default as if the value was not
131 found:
132
133 >>> d = Headers([('Content-Length', '42')])
134 >>> d.get('Content-Length', type=int)
135 42
136
137 :param key: The key to be looked up.
138 :param default: The default value to be returned if the key can't
139 be looked up. If not further specified `None` is
140 returned.
141 :param type: A callable that is used to cast the value in the
142 :class:`Headers`. If a :exc:`ValueError` is raised
143 by this callable the default value is returned.
144
145 .. versionchanged:: 3.0
146 The ``as_bytes`` parameter was removed.
147
148 .. versionchanged:: 0.9
149 The ``as_bytes`` parameter was added.
150 """
151 try:
152 rv = self._get_key(key)
153 except KeyError:
154 return default
155
156 if type is None:
157 return rv
158
159 try:
160 return type(rv)
161 except ValueError:
162 return default
163
164 @t.overload
165 def getlist(self, key: str) -> list[str]: ...
166 @t.overload
167 def getlist(self, key: str, type: cabc.Callable[[str], T]) -> list[T]: ...
168 def getlist(
169 self, key: str, type: cabc.Callable[[str], T] | None = None
170 ) -> list[str] | list[T]:
171 """Return the list of items for a given key. If that key is not in the
172 :class:`Headers`, the return value will be an empty list. Just like
173 :meth:`get`, :meth:`getlist` accepts a `type` parameter. All items will
174 be converted with the callable defined there.
175
176 :param key: The key to be looked up.
177 :param type: A callable that is used to cast the value in the
178 :class:`Headers`. If a :exc:`ValueError` is raised
179 by this callable the value will be removed from the list.
180 :return: a :class:`list` of all the values for the key.
181
182 .. versionchanged:: 3.0
183 The ``as_bytes`` parameter was removed.
184
185 .. versionchanged:: 0.9
186 The ``as_bytes`` parameter was added.
187 """
188 ikey = key.lower()
189
190 if type is not None:
191 result = []
192
193 for k, v in self:
194 if k.lower() == ikey:
195 try:
196 result.append(type(v))
197 except ValueError:
198 continue
199
200 return result
201
202 return [v for k, v in self if k.lower() == ikey]
203
204 def get_all(self, name: str) -> list[str]:
205 """Return a list of all the values for the named field.
206
207 This method is compatible with the :mod:`wsgiref`
208 :meth:`~wsgiref.headers.Headers.get_all` method.
209 """
210 return self.getlist(name)
211
212 def items(self, lower: bool = False) -> t.Iterable[tuple[str, str]]:
213 for key, value in self:
214 if lower:
215 key = key.lower()
216 yield key, value
217
218 def keys(self, lower: bool = False) -> t.Iterable[str]:
219 for key, _ in self.items(lower):
220 yield key
221
222 def values(self) -> t.Iterable[str]:
223 for _, value in self.items():
224 yield value
225
226 def extend(
227 self,
228 arg: (
229 Headers
230 | MultiDict[str, t.Any]
231 | cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | set[t.Any]]
232 | cabc.Iterable[tuple[str, t.Any]]
233 | None
234 ) = None,
235 /,
236 **kwargs: str,
237 ) -> None:
238 """Extend headers in this object with items from another object
239 containing header items as well as keyword arguments.
240
241 To replace existing keys instead of extending, use
242 :meth:`update` instead.
243
244 If provided, the first argument can be another :class:`Headers`
245 object, a :class:`MultiDict`, :class:`dict`, or iterable of
246 pairs.
247
248 .. versionchanged:: 1.0
249 Support :class:`MultiDict`. Allow passing ``kwargs``.
250 """
251 if arg is not None:
252 for key, value in iter_multi_items(arg):
253 self.add(key, value)
254
255 for key, value in iter_multi_items(kwargs):
256 self.add(key, value)
257
258 def __delitem__(self, key: str | int | slice) -> None:
259 if isinstance(key, str):
260 self._del_key(key)
261 return
262
263 del self._list[key]
264
265 def _del_key(self, key: str) -> None:
266 key = key.lower()
267 new = []
268
269 for k, v in self._list:
270 if k.lower() != key:
271 new.append((k, v))
272
273 self._list[:] = new
274
275 def remove(self, key: str) -> None:
276 """Remove a key.
277
278 :param key: The key to be removed.
279 """
280 return self._del_key(key)
281
282 @t.overload
283 def pop(self) -> tuple[str, str]: ...
284 @t.overload
285 def pop(self, key: str) -> str: ...
286 @t.overload
287 def pop(self, key: int | None = ...) -> tuple[str, str]: ...
288 @t.overload
289 def pop(self, key: str, default: str) -> str: ...
290 @t.overload
291 def pop(self, key: str, default: T) -> str | T: ...
292 def pop(
293 self,
294 key: str | int | None = None,
295 default: str | T = _missing, # type: ignore[assignment]
296 ) -> str | tuple[str, str] | T:
297 """Removes and returns a key or index.
298
299 :param key: The key to be popped. If this is an integer the item at
300 that position is removed, if it's a string the value for
301 that key is. If the key is omitted or `None` the last
302 item is removed.
303 :return: an item.
304 """
305 if key is None:
306 return self._list.pop()
307
308 if isinstance(key, int):
309 return self._list.pop(key)
310
311 try:
312 rv = self._get_key(key)
313 except KeyError:
314 if default is not _missing:
315 return default
316
317 raise
318
319 self.remove(key)
320 return rv
321
322 def popitem(self) -> tuple[str, str]:
323 """Removes a key or index and returns a (key, value) item."""
324 return self._list.pop()
325
326 def __contains__(self, key: str) -> bool:
327 """Check if a key is present."""
328 try:
329 self._get_key(key)
330 except KeyError:
331 return False
332
333 return True
334
335 def __iter__(self) -> t.Iterator[tuple[str, str]]:
336 """Yield ``(key, value)`` tuples."""
337 return iter(self._list)
338
339 def __len__(self) -> int:
340 return len(self._list)
341
342 def add(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
343 """Add a new header tuple to the list.
344
345 Keyword arguments can specify additional parameters for the header
346 value, with underscores converted to dashes::
347
348 >>> d = Headers()
349 >>> d.add('Content-Type', 'text/plain')
350 >>> d.add('Content-Disposition', 'attachment', filename='foo.png')
351
352 The keyword argument dumping uses :func:`dump_options_header`
353 behind the scenes.
354
355 .. versionchanged:: 0.4.1
356 keyword arguments were added for :mod:`wsgiref` compatibility.
357 """
358 if kwargs:
359 value = _options_header_vkw(value, kwargs)
360
361 value_str = _str_header_value(value)
362 self._list.append((key, value_str))
363
364 def add_header(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
365 """Add a new header tuple to the list.
366
367 An alias for :meth:`add` for compatibility with the :mod:`wsgiref`
368 :meth:`~wsgiref.headers.Headers.add_header` method.
369 """
370 self.add(key, value, **kwargs)
371
372 def clear(self) -> None:
373 """Clears all headers."""
374 self._list.clear()
375
376 def set(self, key: str, value: t.Any, /, **kwargs: t.Any) -> None:
377 """Remove all header tuples for `key` and add a new one. The newly
378 added key either appears at the end of the list if there was no
379 entry or replaces the first one.
380
381 Keyword arguments can specify additional parameters for the header
382 value, with underscores converted to dashes. See :meth:`add` for
383 more information.
384
385 .. versionchanged:: 0.6.1
386 :meth:`set` now accepts the same arguments as :meth:`add`.
387
388 :param key: The key to be inserted.
389 :param value: The value to be inserted.
390 """
391 if kwargs:
392 value = _options_header_vkw(value, kwargs)
393
394 value_str = _str_header_value(value)
395
396 if not self._list:
397 self._list.append((key, value_str))
398 return
399
400 iter_list = iter(self._list)
401 ikey = key.lower()
402
403 for idx, (old_key, _) in enumerate(iter_list):
404 if old_key.lower() == ikey:
405 # replace first occurrence
406 self._list[idx] = (key, value_str)
407 break
408 else:
409 # no existing occurrences
410 self._list.append((key, value_str))
411 return
412
413 # remove remaining occurrences
414 self._list[idx + 1 :] = [t for t in iter_list if t[0].lower() != ikey]
415
416 def setlist(self, key: str, values: cabc.Iterable[t.Any]) -> None:
417 """Remove any existing values for a header and add new ones.
418
419 :param key: The header key to set.
420 :param values: An iterable of values to set for the key.
421
422 .. versionadded:: 1.0
423 """
424 if values:
425 values_iter = iter(values)
426 self.set(key, next(values_iter))
427
428 for value in values_iter:
429 self.add(key, value)
430 else:
431 self.remove(key)
432
433 def setdefault(self, key: str, default: t.Any) -> str:
434 """Return the first value for the key if it is in the headers,
435 otherwise set the header to the value given by ``default`` and
436 return that.
437
438 :param key: The header key to get.
439 :param default: The value to set for the key if it is not in the
440 headers.
441 """
442 try:
443 return self._get_key(key)
444 except KeyError:
445 pass
446
447 self.set(key, default)
448 return self._get_key(key)
449
450 def setlistdefault(self, key: str, default: cabc.Iterable[t.Any]) -> list[str]:
451 """Return the list of values for the key if it is in the
452 headers, otherwise set the header to the list of values given
453 by ``default`` and return that.
454
455 Unlike :meth:`MultiDict.setlistdefault`, modifying the returned
456 list will not affect the headers.
457
458 :param key: The header key to get.
459 :param default: An iterable of values to set for the key if it
460 is not in the headers.
461
462 .. versionadded:: 1.0
463 """
464 if key not in self:
465 self.setlist(key, default)
466
467 return self.getlist(key)
468
469 @t.overload
470 def __setitem__(self, key: str, value: t.Any) -> None: ...
471 @t.overload
472 def __setitem__(self, key: int, value: tuple[str, t.Any]) -> None: ...
473 @t.overload
474 def __setitem__(
475 self, key: slice, value: cabc.Iterable[tuple[str, t.Any]]
476 ) -> None: ...
477 def __setitem__(
478 self,
479 key: str | int | slice,
480 value: t.Any | tuple[str, t.Any] | cabc.Iterable[tuple[str, t.Any]],
481 ) -> None:
482 """Like :meth:`set` but also supports index/slice based setting."""
483 if isinstance(key, str):
484 self.set(key, value)
485 elif isinstance(key, int):
486 self._list[key] = value[0], _str_header_value(value[1]) # type: ignore[index]
487 else:
488 self._list[key] = [(k, _str_header_value(v)) for k, v in value] # type: ignore[str-unpack]
489
490 def update(
491 self,
492 arg: (
493 Headers
494 | MultiDict[str, t.Any]
495 | cabc.Mapping[
496 str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]
497 ]
498 | cabc.Iterable[tuple[str, t.Any]]
499 | None
500 ) = None,
501 /,
502 **kwargs: t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any],
503 ) -> None:
504 """Replace headers in this object with items from another
505 headers object and keyword arguments.
506
507 To extend existing keys instead of replacing, use :meth:`extend`
508 instead.
509
510 If provided, the first argument can be another :class:`Headers`
511 object, a :class:`MultiDict`, :class:`dict`, or iterable of
512 pairs.
513
514 .. versionadded:: 1.0
515 """
516 if arg is not None:
517 if isinstance(arg, (Headers, MultiDict)):
518 for key in arg.keys():
519 self.setlist(key, arg.getlist(key))
520 elif isinstance(arg, cabc.Mapping):
521 for key, value in arg.items():
522 if isinstance(value, (list, tuple, set)):
523 self.setlist(key, value)
524 else:
525 self.set(key, value)
526 else:
527 for key, value in arg:
528 self.set(key, value)
529
530 for key, value in kwargs.items():
531 if isinstance(value, (list, tuple, set)):
532 self.setlist(key, value)
533 else:
534 self.set(key, value)
535
536 def __or__(
537 self,
538 other: cabc.Mapping[
539 str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]
540 ],
541 ) -> te.Self:
542 if not isinstance(other, cabc.Mapping):
543 return NotImplemented
544
545 rv = self.copy()
546 rv.update(other)
547 return rv
548
549 def __ior__(
550 self,
551 other: (
552 cabc.Mapping[str, t.Any | list[t.Any] | tuple[t.Any, ...] | cabc.Set[t.Any]]
553 | cabc.Iterable[tuple[str, t.Any]]
554 ),
555 ) -> te.Self:
556 if not isinstance(other, (cabc.Mapping, cabc.Iterable)):
557 return NotImplemented
558
559 self.update(other)
560 return self
561
562 def to_wsgi_list(self) -> list[tuple[str, str]]:
563 """Convert the headers into a list suitable for WSGI.
564
565 :return: list
566 """
567 return list(self)
568
569 def copy(self) -> te.Self:
570 return self.__class__(self._list)
571
572 def __copy__(self) -> te.Self:
573 return self.copy()
574
575 def __str__(self) -> str:
576 """Returns formatted headers suitable for HTTP transmission."""
577 strs = []
578 for key, value in self.to_wsgi_list():
579 strs.append(f"{key}: {value}")
580 strs.append("\r\n")
581 return "\r\n".join(strs)
582
583 def __repr__(self) -> str:
584 return f"{type(self).__name__}({list(self)!r})"
585
586
587def _options_header_vkw(value: str, kw: dict[str, t.Any]) -> str:
588 return dump_options_header(value, {k.replace("_", "-"): v for k, v in kw.items()})
589
590
591_newline_re = re.compile(r"[\r\n]")
592
593
594def _str_header_value(value: t.Any) -> str:
595 if not isinstance(value, str):
596 value = str(value)
597
598 if _newline_re.search(value) is not None:
599 raise ValueError("Header values must not contain newline characters.")
600
601 return value # type: ignore[no-any-return]
602
603
604class EnvironHeaders(ImmutableHeadersMixin, Headers): # type: ignore[misc]
605 """Read only version of the headers from a WSGI environment. This
606 provides the same interface as `Headers` and is constructed from
607 a WSGI environment.
608 From Werkzeug 0.3 onwards, the `KeyError` raised by this class is also a
609 subclass of the :exc:`~exceptions.BadRequest` HTTP exception and will
610 render a page for a ``400 BAD REQUEST`` if caught in a catch-all for
611 HTTP exceptions.
612 """
613
614 def __init__(self, environ: WSGIEnvironment) -> None:
615 super().__init__()
616 self.environ = environ
617
618 def __eq__(self, other: object) -> bool:
619 if not isinstance(other, EnvironHeaders):
620 return NotImplemented
621
622 return self.environ is other.environ
623
624 __hash__ = None
625
626 def __getitem__(self, key: str) -> str: # type: ignore[override]
627 return self._get_key(key)
628
629 def _get_key(self, key: str) -> str:
630 if not isinstance(key, str):
631 raise BadRequestKeyError(key)
632
633 key = key.upper().replace("-", "_")
634
635 if key in {"CONTENT_TYPE", "CONTENT_LENGTH"}:
636 return self.environ[key] # type: ignore[no-any-return]
637
638 return self.environ[f"HTTP_{key}"] # type: ignore[no-any-return]
639
640 def __len__(self) -> int:
641 return sum(1 for _ in self)
642
643 def __iter__(self) -> cabc.Iterator[tuple[str, str]]:
644 for key, value in self.environ.items():
645 if key.startswith("HTTP_") and key not in {
646 "HTTP_CONTENT_TYPE",
647 "HTTP_CONTENT_LENGTH",
648 }:
649 yield key[5:].replace("_", "-").title(), value
650 elif key in {"CONTENT_TYPE", "CONTENT_LENGTH"} and value:
651 yield key.replace("_", "-").title(), value
652
653 def copy(self) -> t.NoReturn:
654 raise TypeError(f"cannot create {type(self).__name__!r} copies")
655
656 def __or__(self, other: t.Any) -> t.NoReturn:
657 raise TypeError(f"cannot create {type(self).__name__!r} copies")