Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/h11/_headers.py: 71%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

101 statements  

1import re 

2from typing import AnyStr, cast, List, overload, Sequence, Tuple, TYPE_CHECKING, Union 

3 

4from ._abnf import field_name, field_value 

5from ._util import bytesify, LocalProtocolError, validate 

6 

7if TYPE_CHECKING: 

8 from ._events import Request 

9 

10try: 

11 from typing import Literal 

12except ImportError: 

13 from typing_extensions import Literal # type: ignore 

14 

15CONTENT_LENGTH_MAX_DIGITS = 20 # allow up to 1 billion TB - 1 

16 

17 

18# Facts 

19# ----- 

20# 

21# Headers are: 

22# keys: case-insensitive ascii 

23# values: mixture of ascii and raw bytes 

24# 

25# "Historically, HTTP has allowed field content with text in the ISO-8859-1 

26# charset [ISO-8859-1], supporting other charsets only through use of 

27# [RFC2047] encoding. In practice, most HTTP header field values use only a 

28# subset of the US-ASCII charset [USASCII]. Newly defined header fields SHOULD 

29# limit their field values to US-ASCII octets. A recipient SHOULD treat other 

30# octets in field content (obs-text) as opaque data." 

31# And it deprecates all non-ascii values 

32# 

33# Leading/trailing whitespace in header names is forbidden 

34# 

35# Values get leading/trailing whitespace stripped 

36# 

37# Content-Disposition actually needs to contain unicode semantically; to 

38# accomplish this it has a terrifically weird way of encoding the filename 

39# itself as ascii (and even this still has lots of cross-browser 

40# incompatibilities) 

41# 

42# Order is important: 

43# "a proxy MUST NOT change the order of these field values when forwarding a 

44# message" 

45# (and there are several headers where the order indicates a preference) 

46# 

47# Multiple occurences of the same header: 

48# "A sender MUST NOT generate multiple header fields with the same field name 

49# in a message unless either the entire field value for that header field is 

50# defined as a comma-separated list [or the header is Set-Cookie which gets a 

51# special exception]" - RFC 7230. (cookies are in RFC 6265) 

52# 

53# So every header aside from Set-Cookie can be merged by b", ".join if it 

54# occurs repeatedly. But, of course, they can't necessarily be split by 

55# .split(b","), because quoting. 

56# 

57# Given all this mess (case insensitive, duplicates allowed, order is 

58# important, ...), there doesn't appear to be any standard way to handle 

59# headers in Python -- they're almost like dicts, but... actually just 

60# aren't. For now we punt and just use a super simple representation: headers 

61# are a list of pairs 

62# 

63# [(name1, value1), (name2, value2), ...] 

64# 

65# where all entries are bytestrings, names are lowercase and have no 

66# leading/trailing whitespace, and values are bytestrings with no 

67# leading/trailing whitespace. Searching and updating are done via naive O(n) 

68# methods. 

69# 

70# Maybe a dict-of-lists would be better? 

71 

72_content_length_re = re.compile(rb"[0-9]+") 

73_field_name_re = re.compile(field_name.encode("ascii")) 

74_field_value_re = re.compile(field_value.encode("ascii")) 

75 

76 

77class Headers(Sequence[Tuple[bytes, bytes]]): 

78 """ 

79 A list-like interface that allows iterating over headers as byte-pairs 

80 of (lowercased-name, value). 

81 

82 Internally we actually store the representation as three-tuples, 

83 including both the raw original casing, in order to preserve casing 

84 over-the-wire, and the lowercased name, for case-insensitive comparisions. 

85 

86 r = Request( 

87 method="GET", 

88 target="/", 

89 headers=[("Host", "example.org"), ("Connection", "keep-alive")], 

90 http_version="1.1", 

91 ) 

92 assert r.headers == [ 

93 (b"host", b"example.org"), 

94 (b"connection", b"keep-alive") 

95 ] 

96 assert r.headers.raw_items() == [ 

97 (b"Host", b"example.org"), 

98 (b"Connection", b"keep-alive") 

99 ] 

100 """ 

101 

102 __slots__ = "_full_items" 

103 

104 def __init__(self, full_items: List[Tuple[bytes, bytes, bytes]]) -> None: 

105 self._full_items = full_items 

106 

107 def __bool__(self) -> bool: 

108 return bool(self._full_items) 

109 

110 def __eq__(self, other: object) -> bool: 

111 return list(self) == list(other) # type: ignore 

112 

113 def __len__(self) -> int: 

114 return len(self._full_items) 

115 

116 def __repr__(self) -> str: 

117 return "<Headers(%s)>" % repr(list(self)) 

118 

119 def __getitem__(self, idx: int) -> Tuple[bytes, bytes]: # type: ignore[override] 

120 _, name, value = self._full_items[idx] 

121 return (name, value) 

122 

123 def raw_items(self) -> List[Tuple[bytes, bytes]]: 

124 return [(raw_name, value) for raw_name, _, value in self._full_items] 

125 

126 

127HeaderTypes = Union[ 

128 List[Tuple[bytes, bytes]], 

129 List[Tuple[bytes, str]], 

130 List[Tuple[str, bytes]], 

131 List[Tuple[str, str]], 

132] 

133 

134 

135@overload 

136def normalize_and_validate(headers: Headers, _parsed: Literal[True]) -> Headers: 

137 ... 

138 

139 

140@overload 

141def normalize_and_validate(headers: HeaderTypes, _parsed: Literal[False]) -> Headers: 

142 ... 

143 

144 

145@overload 

146def normalize_and_validate( 

147 headers: Union[Headers, HeaderTypes], _parsed: bool = False 

148) -> Headers: 

149 ... 

150 

151 

152def normalize_and_validate( 

153 headers: Union[Headers, HeaderTypes], _parsed: bool = False 

154) -> Headers: 

155 new_headers = [] 

156 seen_content_length = None 

157 saw_transfer_encoding = False 

158 for name, value in headers: 

159 # For headers coming out of the parser, we can safely skip some steps, 

160 # because it always returns bytes and has already run these regexes 

161 # over the data: 

162 if not _parsed: 

163 name = bytesify(name) 

164 value = bytesify(value) 

165 validate(_field_name_re, name, "Illegal header name {!r}", name) 

166 validate(_field_value_re, value, "Illegal header value {!r}", value) 

167 assert isinstance(name, bytes) 

168 assert isinstance(value, bytes) 

169 

170 raw_name = name 

171 name = name.lower() 

172 if name == b"content-length": 

173 lengths = {length.strip() for length in value.split(b",")} 

174 if len(lengths) != 1: 

175 raise LocalProtocolError("conflicting Content-Length headers") 

176 value = lengths.pop() 

177 validate(_content_length_re, value, "bad Content-Length") 

178 if len(value) > CONTENT_LENGTH_MAX_DIGITS: 

179 raise LocalProtocolError("bad Content-Length") 

180 if seen_content_length is None: 

181 seen_content_length = value 

182 new_headers.append((raw_name, name, value)) 

183 elif seen_content_length != value: 

184 raise LocalProtocolError("conflicting Content-Length headers") 

185 elif name == b"transfer-encoding": 

186 # "A server that receives a request message with a transfer coding 

187 # it does not understand SHOULD respond with 501 (Not 

188 # Implemented)." 

189 # https://tools.ietf.org/html/rfc7230#section-3.3.1 

190 if saw_transfer_encoding: 

191 raise LocalProtocolError( 

192 "multiple Transfer-Encoding headers", error_status_hint=501 

193 ) 

194 # "All transfer-coding names are case-insensitive" 

195 # -- https://tools.ietf.org/html/rfc7230#section-4 

196 value = value.lower() 

197 if value != b"chunked": 

198 raise LocalProtocolError( 

199 "Only Transfer-Encoding: chunked is supported", 

200 error_status_hint=501, 

201 ) 

202 saw_transfer_encoding = True 

203 new_headers.append((raw_name, name, value)) 

204 else: 

205 new_headers.append((raw_name, name, value)) 

206 return Headers(new_headers) 

207 

208 

209def get_comma_header(headers: Headers, name: bytes) -> List[bytes]: 

210 # Should only be used for headers whose value is a list of 

211 # comma-separated, case-insensitive values. 

212 # 

213 # The header name `name` is expected to be lower-case bytes. 

214 # 

215 # Connection: meets these criteria (including cast insensitivity). 

216 # 

217 # Content-Length: technically is just a single value (1*DIGIT), but the 

218 # standard makes reference to implementations that do multiple values, and 

219 # using this doesn't hurt. Ditto, case insensitivity doesn't things either 

220 # way. 

221 # 

222 # Transfer-Encoding: is more complex (allows for quoted strings), so 

223 # splitting on , is actually wrong. For example, this is legal: 

224 # 

225 # Transfer-Encoding: foo; options="1,2", chunked 

226 # 

227 # and should be parsed as 

228 # 

229 # foo; options="1,2" 

230 # chunked 

231 # 

232 # but this naive function will parse it as 

233 # 

234 # foo; options="1 

235 # 2" 

236 # chunked 

237 # 

238 # However, this is okay because the only thing we are going to do with 

239 # any Transfer-Encoding is reject ones that aren't just "chunked", so 

240 # both of these will be treated the same anyway. 

241 # 

242 # Expect: the only legal value is the literal string 

243 # "100-continue". Splitting on commas is harmless. Case insensitive. 

244 # 

245 out: List[bytes] = [] 

246 for _, found_name, found_raw_value in headers._full_items: 

247 if found_name == name: 

248 found_raw_value = found_raw_value.lower() 

249 for found_split_value in found_raw_value.split(b","): 

250 found_split_value = found_split_value.strip() 

251 if found_split_value: 

252 out.append(found_split_value) 

253 return out 

254 

255 

256def set_comma_header(headers: Headers, name: bytes, new_values: List[bytes]) -> Headers: 

257 # The header name `name` is expected to be lower-case bytes. 

258 # 

259 # Note that when we store the header we use title casing for the header 

260 # names, in order to match the conventional HTTP header style. 

261 # 

262 # Simply calling `.title()` is a blunt approach, but it's correct 

263 # here given the cases where we're using `set_comma_header`... 

264 # 

265 # Connection, Content-Length, Transfer-Encoding. 

266 new_headers: List[Tuple[bytes, bytes]] = [] 

267 for found_raw_name, found_name, found_raw_value in headers._full_items: 

268 if found_name != name: 

269 new_headers.append((found_raw_name, found_raw_value)) 

270 for new_value in new_values: 

271 new_headers.append((name.title(), new_value)) 

272 return normalize_and_validate(new_headers) 

273 

274 

275def has_expect_100_continue(request: "Request") -> bool: 

276 # https://tools.ietf.org/html/rfc7231#section-5.1.1 

277 # "A server that receives a 100-continue expectation in an HTTP/1.0 request 

278 # MUST ignore that expectation." 

279 if request.http_version < b"1.1": 

280 return False 

281 expect = get_comma_header(request.headers, b"expect") 

282 return b"100-continue" in expect