Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/h11/_headers.py: 71%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import re
2from typing import AnyStr, cast, List, overload, Sequence, Tuple, TYPE_CHECKING, Union
4from ._abnf import field_name, field_value
5from ._util import bytesify, LocalProtocolError, validate
7if TYPE_CHECKING:
8 from ._events import Request
10try:
11 from typing import Literal
12except ImportError:
13 from typing_extensions import Literal # type: ignore
15CONTENT_LENGTH_MAX_DIGITS = 20 # allow up to 1 billion TB - 1
18# Facts
19# -----
20#
21# Headers are:
22# keys: case-insensitive ascii
23# values: mixture of ascii and raw bytes
24#
25# "Historically, HTTP has allowed field content with text in the ISO-8859-1
26# charset [ISO-8859-1], supporting other charsets only through use of
27# [RFC2047] encoding. In practice, most HTTP header field values use only a
28# subset of the US-ASCII charset [USASCII]. Newly defined header fields SHOULD
29# limit their field values to US-ASCII octets. A recipient SHOULD treat other
30# octets in field content (obs-text) as opaque data."
31# And it deprecates all non-ascii values
32#
33# Leading/trailing whitespace in header names is forbidden
34#
35# Values get leading/trailing whitespace stripped
36#
37# Content-Disposition actually needs to contain unicode semantically; to
38# accomplish this it has a terrifically weird way of encoding the filename
39# itself as ascii (and even this still has lots of cross-browser
40# incompatibilities)
41#
42# Order is important:
43# "a proxy MUST NOT change the order of these field values when forwarding a
44# message"
45# (and there are several headers where the order indicates a preference)
46#
47# Multiple occurences of the same header:
48# "A sender MUST NOT generate multiple header fields with the same field name
49# in a message unless either the entire field value for that header field is
50# defined as a comma-separated list [or the header is Set-Cookie which gets a
51# special exception]" - RFC 7230. (cookies are in RFC 6265)
52#
53# So every header aside from Set-Cookie can be merged by b", ".join if it
54# occurs repeatedly. But, of course, they can't necessarily be split by
55# .split(b","), because quoting.
56#
57# Given all this mess (case insensitive, duplicates allowed, order is
58# important, ...), there doesn't appear to be any standard way to handle
59# headers in Python -- they're almost like dicts, but... actually just
60# aren't. For now we punt and just use a super simple representation: headers
61# are a list of pairs
62#
63# [(name1, value1), (name2, value2), ...]
64#
65# where all entries are bytestrings, names are lowercase and have no
66# leading/trailing whitespace, and values are bytestrings with no
67# leading/trailing whitespace. Searching and updating are done via naive O(n)
68# methods.
69#
70# Maybe a dict-of-lists would be better?
72_content_length_re = re.compile(rb"[0-9]+")
73_field_name_re = re.compile(field_name.encode("ascii"))
74_field_value_re = re.compile(field_value.encode("ascii"))
77class Headers(Sequence[Tuple[bytes, bytes]]):
78 """
79 A list-like interface that allows iterating over headers as byte-pairs
80 of (lowercased-name, value).
82 Internally we actually store the representation as three-tuples,
83 including both the raw original casing, in order to preserve casing
84 over-the-wire, and the lowercased name, for case-insensitive comparisions.
86 r = Request(
87 method="GET",
88 target="/",
89 headers=[("Host", "example.org"), ("Connection", "keep-alive")],
90 http_version="1.1",
91 )
92 assert r.headers == [
93 (b"host", b"example.org"),
94 (b"connection", b"keep-alive")
95 ]
96 assert r.headers.raw_items() == [
97 (b"Host", b"example.org"),
98 (b"Connection", b"keep-alive")
99 ]
100 """
102 __slots__ = "_full_items"
104 def __init__(self, full_items: List[Tuple[bytes, bytes, bytes]]) -> None:
105 self._full_items = full_items
107 def __bool__(self) -> bool:
108 return bool(self._full_items)
110 def __eq__(self, other: object) -> bool:
111 return list(self) == list(other) # type: ignore
113 def __len__(self) -> int:
114 return len(self._full_items)
116 def __repr__(self) -> str:
117 return "<Headers(%s)>" % repr(list(self))
119 def __getitem__(self, idx: int) -> Tuple[bytes, bytes]: # type: ignore[override]
120 _, name, value = self._full_items[idx]
121 return (name, value)
123 def raw_items(self) -> List[Tuple[bytes, bytes]]:
124 return [(raw_name, value) for raw_name, _, value in self._full_items]
127HeaderTypes = Union[
128 List[Tuple[bytes, bytes]],
129 List[Tuple[bytes, str]],
130 List[Tuple[str, bytes]],
131 List[Tuple[str, str]],
132]
135@overload
136def normalize_and_validate(headers: Headers, _parsed: Literal[True]) -> Headers:
137 ...
140@overload
141def normalize_and_validate(headers: HeaderTypes, _parsed: Literal[False]) -> Headers:
142 ...
145@overload
146def normalize_and_validate(
147 headers: Union[Headers, HeaderTypes], _parsed: bool = False
148) -> Headers:
149 ...
152def normalize_and_validate(
153 headers: Union[Headers, HeaderTypes], _parsed: bool = False
154) -> Headers:
155 new_headers = []
156 seen_content_length = None
157 saw_transfer_encoding = False
158 for name, value in headers:
159 # For headers coming out of the parser, we can safely skip some steps,
160 # because it always returns bytes and has already run these regexes
161 # over the data:
162 if not _parsed:
163 name = bytesify(name)
164 value = bytesify(value)
165 validate(_field_name_re, name, "Illegal header name {!r}", name)
166 validate(_field_value_re, value, "Illegal header value {!r}", value)
167 assert isinstance(name, bytes)
168 assert isinstance(value, bytes)
170 raw_name = name
171 name = name.lower()
172 if name == b"content-length":
173 lengths = {length.strip() for length in value.split(b",")}
174 if len(lengths) != 1:
175 raise LocalProtocolError("conflicting Content-Length headers")
176 value = lengths.pop()
177 validate(_content_length_re, value, "bad Content-Length")
178 if len(value) > CONTENT_LENGTH_MAX_DIGITS:
179 raise LocalProtocolError("bad Content-Length")
180 if seen_content_length is None:
181 seen_content_length = value
182 new_headers.append((raw_name, name, value))
183 elif seen_content_length != value:
184 raise LocalProtocolError("conflicting Content-Length headers")
185 elif name == b"transfer-encoding":
186 # "A server that receives a request message with a transfer coding
187 # it does not understand SHOULD respond with 501 (Not
188 # Implemented)."
189 # https://tools.ietf.org/html/rfc7230#section-3.3.1
190 if saw_transfer_encoding:
191 raise LocalProtocolError(
192 "multiple Transfer-Encoding headers", error_status_hint=501
193 )
194 # "All transfer-coding names are case-insensitive"
195 # -- https://tools.ietf.org/html/rfc7230#section-4
196 value = value.lower()
197 if value != b"chunked":
198 raise LocalProtocolError(
199 "Only Transfer-Encoding: chunked is supported",
200 error_status_hint=501,
201 )
202 saw_transfer_encoding = True
203 new_headers.append((raw_name, name, value))
204 else:
205 new_headers.append((raw_name, name, value))
206 return Headers(new_headers)
209def get_comma_header(headers: Headers, name: bytes) -> List[bytes]:
210 # Should only be used for headers whose value is a list of
211 # comma-separated, case-insensitive values.
212 #
213 # The header name `name` is expected to be lower-case bytes.
214 #
215 # Connection: meets these criteria (including cast insensitivity).
216 #
217 # Content-Length: technically is just a single value (1*DIGIT), but the
218 # standard makes reference to implementations that do multiple values, and
219 # using this doesn't hurt. Ditto, case insensitivity doesn't things either
220 # way.
221 #
222 # Transfer-Encoding: is more complex (allows for quoted strings), so
223 # splitting on , is actually wrong. For example, this is legal:
224 #
225 # Transfer-Encoding: foo; options="1,2", chunked
226 #
227 # and should be parsed as
228 #
229 # foo; options="1,2"
230 # chunked
231 #
232 # but this naive function will parse it as
233 #
234 # foo; options="1
235 # 2"
236 # chunked
237 #
238 # However, this is okay because the only thing we are going to do with
239 # any Transfer-Encoding is reject ones that aren't just "chunked", so
240 # both of these will be treated the same anyway.
241 #
242 # Expect: the only legal value is the literal string
243 # "100-continue". Splitting on commas is harmless. Case insensitive.
244 #
245 out: List[bytes] = []
246 for _, found_name, found_raw_value in headers._full_items:
247 if found_name == name:
248 found_raw_value = found_raw_value.lower()
249 for found_split_value in found_raw_value.split(b","):
250 found_split_value = found_split_value.strip()
251 if found_split_value:
252 out.append(found_split_value)
253 return out
256def set_comma_header(headers: Headers, name: bytes, new_values: List[bytes]) -> Headers:
257 # The header name `name` is expected to be lower-case bytes.
258 #
259 # Note that when we store the header we use title casing for the header
260 # names, in order to match the conventional HTTP header style.
261 #
262 # Simply calling `.title()` is a blunt approach, but it's correct
263 # here given the cases where we're using `set_comma_header`...
264 #
265 # Connection, Content-Length, Transfer-Encoding.
266 new_headers: List[Tuple[bytes, bytes]] = []
267 for found_raw_name, found_name, found_raw_value in headers._full_items:
268 if found_name != name:
269 new_headers.append((found_raw_name, found_raw_value))
270 for new_value in new_values:
271 new_headers.append((name.title(), new_value))
272 return normalize_and_validate(new_headers)
275def has_expect_100_continue(request: "Request") -> bool:
276 # https://tools.ietf.org/html/rfc7231#section-5.1.1
277 # "A server that receives a 100-continue expectation in an HTTP/1.0 request
278 # MUST ignore that expectation."
279 if request.http_version < b"1.1":
280 return False
281 expect = get_comma_header(request.headers, b"expect")
282 return b"100-continue" in expect