1from __future__ import annotations
2
3import re
4import typing as t
5
6from .._internal import _missing
7from ..exceptions import BadRequestKeyError
8from .mixins import ImmutableHeadersMixin
9from .structures import iter_multi_items
10from .structures import MultiDict
11
12
13class Headers:
14 """An object that stores some headers. It has a dict-like interface,
15 but is ordered, can store the same key multiple times, and iterating
16 yields ``(key, value)`` pairs instead of only keys.
17
18 This data structure is useful if you want a nicer way to handle WSGI
19 headers which are stored as tuples in a list.
20
21 From Werkzeug 0.3 onwards, the :exc:`KeyError` raised by this class is
22 also a subclass of the :class:`~exceptions.BadRequest` HTTP exception
23 and will render a page for a ``400 BAD REQUEST`` if caught in a
24 catch-all for HTTP exceptions.
25
26 Headers is mostly compatible with the Python :class:`wsgiref.headers.Headers`
27 class, with the exception of `__getitem__`. :mod:`wsgiref` will return
28 `None` for ``headers['missing']``, whereas :class:`Headers` will raise
29 a :class:`KeyError`.
30
31 To create a new ``Headers`` object, pass it a list, dict, or
32 other ``Headers`` object with default values. These values are
33 validated the same way values added later are.
34
35 :param defaults: The list of default values for the :class:`Headers`.
36
37 .. versionchanged:: 2.1.0
38 Default values are validated the same as values added later.
39
40 .. versionchanged:: 0.9
41 This data structure now stores unicode values similar to how the
42 multi dicts do it. The main difference is that bytes can be set as
43 well which will automatically be latin1 decoded.
44
45 .. versionchanged:: 0.9
46 The :meth:`linked` function was removed without replacement as it
47 was an API that does not support the changes to the encoding model.
48 """
49
50 def __init__(self, defaults=None):
51 self._list = []
52 if defaults is not None:
53 self.extend(defaults)
54
55 def __getitem__(self, key, _get_mode=False):
56 if not _get_mode:
57 if isinstance(key, int):
58 return self._list[key]
59 elif isinstance(key, slice):
60 return self.__class__(self._list[key])
61 if not isinstance(key, str):
62 raise BadRequestKeyError(key)
63 ikey = key.lower()
64 for k, v in self._list:
65 if k.lower() == ikey:
66 return v
67 # micro optimization: if we are in get mode we will catch that
68 # exception one stack level down so we can raise a standard
69 # key error instead of our special one.
70 if _get_mode:
71 raise KeyError()
72 raise BadRequestKeyError(key)
73
74 def __eq__(self, other):
75 def lowered(item):
76 return (item[0].lower(),) + item[1:]
77
78 return other.__class__ is self.__class__ and set(
79 map(lowered, other._list)
80 ) == set(map(lowered, self._list))
81
82 __hash__ = None
83
84 def get(self, key, default=None, type=None):
85 """Return the default value if the requested data doesn't exist.
86 If `type` is provided and is a callable it should convert the value,
87 return it or raise a :exc:`ValueError` if that is not possible. In
88 this case the function will return the default as if the value was not
89 found:
90
91 >>> d = Headers([('Content-Length', '42')])
92 >>> d.get('Content-Length', type=int)
93 42
94
95 :param key: The key to be looked up.
96 :param default: The default value to be returned if the key can't
97 be looked up. If not further specified `None` is
98 returned.
99 :param type: A callable that is used to cast the value in the
100 :class:`Headers`. If a :exc:`ValueError` is raised
101 by this callable the default value is returned.
102
103 .. versionchanged:: 3.0
104 The ``as_bytes`` parameter was removed.
105
106 .. versionchanged:: 0.9
107 The ``as_bytes`` parameter was added.
108 """
109 try:
110 rv = self.__getitem__(key, _get_mode=True)
111 except KeyError:
112 return default
113 if type is None:
114 return rv
115 try:
116 return type(rv)
117 except ValueError:
118 return default
119
120 def getlist(self, key, type=None):
121 """Return the list of items for a given key. If that key is not in the
122 :class:`Headers`, the return value will be an empty list. Just like
123 :meth:`get`, :meth:`getlist` accepts a `type` parameter. All items will
124 be converted with the callable defined there.
125
126 :param key: The key to be looked up.
127 :param type: A callable that is used to cast the value in the
128 :class:`Headers`. If a :exc:`ValueError` is raised
129 by this callable the value will be removed from the list.
130 :return: a :class:`list` of all the values for the key.
131
132 .. versionchanged:: 3.0
133 The ``as_bytes`` parameter was removed.
134
135 .. versionchanged:: 0.9
136 The ``as_bytes`` parameter was added.
137 """
138 ikey = key.lower()
139 result = []
140 for k, v in self:
141 if k.lower() == ikey:
142 if type is not None:
143 try:
144 v = type(v)
145 except ValueError:
146 continue
147 result.append(v)
148 return result
149
150 def get_all(self, name):
151 """Return a list of all the values for the named field.
152
153 This method is compatible with the :mod:`wsgiref`
154 :meth:`~wsgiref.headers.Headers.get_all` method.
155 """
156 return self.getlist(name)
157
158 def items(self, lower=False):
159 for key, value in self:
160 if lower:
161 key = key.lower()
162 yield key, value
163
164 def keys(self, lower=False):
165 for key, _ in self.items(lower):
166 yield key
167
168 def values(self):
169 for _, value in self.items():
170 yield value
171
172 def extend(self, *args, **kwargs):
173 """Extend headers in this object with items from another object
174 containing header items as well as keyword arguments.
175
176 To replace existing keys instead of extending, use
177 :meth:`update` instead.
178
179 If provided, the first argument can be another :class:`Headers`
180 object, a :class:`MultiDict`, :class:`dict`, or iterable of
181 pairs.
182
183 .. versionchanged:: 1.0
184 Support :class:`MultiDict`. Allow passing ``kwargs``.
185 """
186 if len(args) > 1:
187 raise TypeError(f"update expected at most 1 arguments, got {len(args)}")
188
189 if args:
190 for key, value in iter_multi_items(args[0]):
191 self.add(key, value)
192
193 for key, value in iter_multi_items(kwargs):
194 self.add(key, value)
195
196 def __delitem__(self, key, _index_operation=True):
197 if _index_operation and isinstance(key, (int, slice)):
198 del self._list[key]
199 return
200 key = key.lower()
201 new = []
202 for k, v in self._list:
203 if k.lower() != key:
204 new.append((k, v))
205 self._list[:] = new
206
207 def remove(self, key):
208 """Remove a key.
209
210 :param key: The key to be removed.
211 """
212 return self.__delitem__(key, _index_operation=False)
213
214 def pop(self, key=None, default=_missing):
215 """Removes and returns a key or index.
216
217 :param key: The key to be popped. If this is an integer the item at
218 that position is removed, if it's a string the value for
219 that key is. If the key is omitted or `None` the last
220 item is removed.
221 :return: an item.
222 """
223 if key is None:
224 return self._list.pop()
225 if isinstance(key, int):
226 return self._list.pop(key)
227 try:
228 rv = self[key]
229 self.remove(key)
230 except KeyError:
231 if default is not _missing:
232 return default
233 raise
234 return rv
235
236 def popitem(self):
237 """Removes a key or index and returns a (key, value) item."""
238 return self.pop()
239
240 def __contains__(self, key):
241 """Check if a key is present."""
242 try:
243 self.__getitem__(key, _get_mode=True)
244 except KeyError:
245 return False
246 return True
247
248 def __iter__(self):
249 """Yield ``(key, value)`` tuples."""
250 return iter(self._list)
251
252 def __len__(self):
253 return len(self._list)
254
255 def add(self, _key, _value, **kw):
256 """Add a new header tuple to the list.
257
258 Keyword arguments can specify additional parameters for the header
259 value, with underscores converted to dashes::
260
261 >>> d = Headers()
262 >>> d.add('Content-Type', 'text/plain')
263 >>> d.add('Content-Disposition', 'attachment', filename='foo.png')
264
265 The keyword argument dumping uses :func:`dump_options_header`
266 behind the scenes.
267
268 .. versionadded:: 0.4.1
269 keyword arguments were added for :mod:`wsgiref` compatibility.
270 """
271 if kw:
272 _value = _options_header_vkw(_value, kw)
273 _value = _str_header_value(_value)
274 self._list.append((_key, _value))
275
276 def add_header(self, _key, _value, **_kw):
277 """Add a new header tuple to the list.
278
279 An alias for :meth:`add` for compatibility with the :mod:`wsgiref`
280 :meth:`~wsgiref.headers.Headers.add_header` method.
281 """
282 self.add(_key, _value, **_kw)
283
284 def clear(self):
285 """Clears all headers."""
286 del self._list[:]
287
288 def set(self, _key, _value, **kw):
289 """Remove all header tuples for `key` and add a new one. The newly
290 added key either appears at the end of the list if there was no
291 entry or replaces the first one.
292
293 Keyword arguments can specify additional parameters for the header
294 value, with underscores converted to dashes. See :meth:`add` for
295 more information.
296
297 .. versionchanged:: 0.6.1
298 :meth:`set` now accepts the same arguments as :meth:`add`.
299
300 :param key: The key to be inserted.
301 :param value: The value to be inserted.
302 """
303 if kw:
304 _value = _options_header_vkw(_value, kw)
305 _value = _str_header_value(_value)
306 if not self._list:
307 self._list.append((_key, _value))
308 return
309 listiter = iter(self._list)
310 ikey = _key.lower()
311 for idx, (old_key, _old_value) in enumerate(listiter):
312 if old_key.lower() == ikey:
313 # replace first occurrence
314 self._list[idx] = (_key, _value)
315 break
316 else:
317 self._list.append((_key, _value))
318 return
319 self._list[idx + 1 :] = [t for t in listiter if t[0].lower() != ikey]
320
321 def setlist(self, key, values):
322 """Remove any existing values for a header and add new ones.
323
324 :param key: The header key to set.
325 :param values: An iterable of values to set for the key.
326
327 .. versionadded:: 1.0
328 """
329 if values:
330 values_iter = iter(values)
331 self.set(key, next(values_iter))
332
333 for value in values_iter:
334 self.add(key, value)
335 else:
336 self.remove(key)
337
338 def setdefault(self, key, default):
339 """Return the first value for the key if it is in the headers,
340 otherwise set the header to the value given by ``default`` and
341 return that.
342
343 :param key: The header key to get.
344 :param default: The value to set for the key if it is not in the
345 headers.
346 """
347 if key in self:
348 return self[key]
349
350 self.set(key, default)
351 return default
352
353 def setlistdefault(self, key, default):
354 """Return the list of values for the key if it is in the
355 headers, otherwise set the header to the list of values given
356 by ``default`` and return that.
357
358 Unlike :meth:`MultiDict.setlistdefault`, modifying the returned
359 list will not affect the headers.
360
361 :param key: The header key to get.
362 :param default: An iterable of values to set for the key if it
363 is not in the headers.
364
365 .. versionadded:: 1.0
366 """
367 if key not in self:
368 self.setlist(key, default)
369
370 return self.getlist(key)
371
372 def __setitem__(self, key, value):
373 """Like :meth:`set` but also supports index/slice based setting."""
374 if isinstance(key, (slice, int)):
375 if isinstance(key, int):
376 value = [value]
377 value = [(k, _str_header_value(v)) for (k, v) in value]
378 if isinstance(key, int):
379 self._list[key] = value[0]
380 else:
381 self._list[key] = value
382 else:
383 self.set(key, value)
384
385 def update(self, *args, **kwargs):
386 """Replace headers in this object with items from another
387 headers object and keyword arguments.
388
389 To extend existing keys instead of replacing, use :meth:`extend`
390 instead.
391
392 If provided, the first argument can be another :class:`Headers`
393 object, a :class:`MultiDict`, :class:`dict`, or iterable of
394 pairs.
395
396 .. versionadded:: 1.0
397 """
398 if len(args) > 1:
399 raise TypeError(f"update expected at most 1 arguments, got {len(args)}")
400
401 if args:
402 mapping = args[0]
403
404 if isinstance(mapping, (Headers, MultiDict)):
405 for key in mapping.keys():
406 self.setlist(key, mapping.getlist(key))
407 elif isinstance(mapping, dict):
408 for key, value in mapping.items():
409 if isinstance(value, (list, tuple)):
410 self.setlist(key, value)
411 else:
412 self.set(key, value)
413 else:
414 for key, value in mapping:
415 self.set(key, value)
416
417 for key, value in kwargs.items():
418 if isinstance(value, (list, tuple)):
419 self.setlist(key, value)
420 else:
421 self.set(key, value)
422
423 def to_wsgi_list(self):
424 """Convert the headers into a list suitable for WSGI.
425
426 :return: list
427 """
428 return list(self)
429
430 def copy(self):
431 return self.__class__(self._list)
432
433 def __copy__(self):
434 return self.copy()
435
436 def __str__(self):
437 """Returns formatted headers suitable for HTTP transmission."""
438 strs = []
439 for key, value in self.to_wsgi_list():
440 strs.append(f"{key}: {value}")
441 strs.append("\r\n")
442 return "\r\n".join(strs)
443
444 def __repr__(self):
445 return f"{type(self).__name__}({list(self)!r})"
446
447
448def _options_header_vkw(value: str, kw: dict[str, t.Any]):
449 return http.dump_options_header(
450 value, {k.replace("_", "-"): v for k, v in kw.items()}
451 )
452
453
454_newline_re = re.compile(r"[\r\n]")
455
456
457def _str_header_value(value: t.Any) -> str:
458 if not isinstance(value, str):
459 value = str(value)
460
461 if _newline_re.search(value) is not None:
462 raise ValueError("Header values must not contain newline characters.")
463
464 return value
465
466
467class EnvironHeaders(ImmutableHeadersMixin, Headers):
468 """Read only version of the headers from a WSGI environment. This
469 provides the same interface as `Headers` and is constructed from
470 a WSGI environment.
471 From Werkzeug 0.3 onwards, the `KeyError` raised by this class is also a
472 subclass of the :exc:`~exceptions.BadRequest` HTTP exception and will
473 render a page for a ``400 BAD REQUEST`` if caught in a catch-all for
474 HTTP exceptions.
475 """
476
477 def __init__(self, environ):
478 self.environ = environ
479
480 def __eq__(self, other):
481 return self.environ is other.environ
482
483 __hash__ = None
484
485 def __getitem__(self, key, _get_mode=False):
486 # _get_mode is a no-op for this class as there is no index but
487 # used because get() calls it.
488 if not isinstance(key, str):
489 raise KeyError(key)
490 key = key.upper().replace("-", "_")
491 if key in {"CONTENT_TYPE", "CONTENT_LENGTH"}:
492 return self.environ[key]
493 return self.environ[f"HTTP_{key}"]
494
495 def __len__(self):
496 # the iter is necessary because otherwise list calls our
497 # len which would call list again and so forth.
498 return len(list(iter(self)))
499
500 def __iter__(self):
501 for key, value in self.environ.items():
502 if key.startswith("HTTP_") and key not in {
503 "HTTP_CONTENT_TYPE",
504 "HTTP_CONTENT_LENGTH",
505 }:
506 yield key[5:].replace("_", "-").title(), value
507 elif key in {"CONTENT_TYPE", "CONTENT_LENGTH"} and value:
508 yield key.replace("_", "-").title(), value
509
510 def copy(self):
511 raise TypeError(f"cannot create {type(self).__name__!r} copies")
512
513
514# circular dependencies
515from .. import http