Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/h11/_headers.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

98 statements  

1import re 

2from typing import AnyStr, cast, List, overload, Sequence, Tuple, TYPE_CHECKING, Union 

3 

4from ._abnf import field_name, field_value 

5from ._util import bytesify, LocalProtocolError, validate 

6 

7if TYPE_CHECKING: 

8 from ._events import Request 

9 

10try: 

11 from typing import Literal 

12except ImportError: 

13 from typing_extensions import Literal # type: ignore 

14 

15 

16# Facts 

17# ----- 

18# 

19# Headers are: 

20# keys: case-insensitive ascii 

21# values: mixture of ascii and raw bytes 

22# 

23# "Historically, HTTP has allowed field content with text in the ISO-8859-1 

24# charset [ISO-8859-1], supporting other charsets only through use of 

25# [RFC2047] encoding. In practice, most HTTP header field values use only a 

26# subset of the US-ASCII charset [USASCII]. Newly defined header fields SHOULD 

27# limit their field values to US-ASCII octets. A recipient SHOULD treat other 

28# octets in field content (obs-text) as opaque data." 

29# And it deprecates all non-ascii values 

30# 

31# Leading/trailing whitespace in header names is forbidden 

32# 

33# Values get leading/trailing whitespace stripped 

34# 

35# Content-Disposition actually needs to contain unicode semantically; to 

36# accomplish this it has a terrifically weird way of encoding the filename 

37# itself as ascii (and even this still has lots of cross-browser 

38# incompatibilities) 

39# 

40# Order is important: 

41# "a proxy MUST NOT change the order of these field values when forwarding a 

42# message" 

43# (and there are several headers where the order indicates a preference) 

44# 

45# Multiple occurences of the same header: 

46# "A sender MUST NOT generate multiple header fields with the same field name 

47# in a message unless either the entire field value for that header field is 

48# defined as a comma-separated list [or the header is Set-Cookie which gets a 

49# special exception]" - RFC 7230. (cookies are in RFC 6265) 

50# 

51# So every header aside from Set-Cookie can be merged by b", ".join if it 

52# occurs repeatedly. But, of course, they can't necessarily be split by 

53# .split(b","), because quoting. 

54# 

55# Given all this mess (case insensitive, duplicates allowed, order is 

56# important, ...), there doesn't appear to be any standard way to handle 

57# headers in Python -- they're almost like dicts, but... actually just 

58# aren't. For now we punt and just use a super simple representation: headers 

59# are a list of pairs 

60# 

61# [(name1, value1), (name2, value2), ...] 

62# 

63# where all entries are bytestrings, names are lowercase and have no 

64# leading/trailing whitespace, and values are bytestrings with no 

65# leading/trailing whitespace. Searching and updating are done via naive O(n) 

66# methods. 

67# 

68# Maybe a dict-of-lists would be better? 

69 

70_content_length_re = re.compile(rb"[0-9]+") 

71_field_name_re = re.compile(field_name.encode("ascii")) 

72_field_value_re = re.compile(field_value.encode("ascii")) 

73 

74 

75class Headers(Sequence[Tuple[bytes, bytes]]): 

76 """ 

77 A list-like interface that allows iterating over headers as byte-pairs 

78 of (lowercased-name, value). 

79 

80 Internally we actually store the representation as three-tuples, 

81 including both the raw original casing, in order to preserve casing 

82 over-the-wire, and the lowercased name, for case-insensitive comparisions. 

83 

84 r = Request( 

85 method="GET", 

86 target="/", 

87 headers=[("Host", "example.org"), ("Connection", "keep-alive")], 

88 http_version="1.1", 

89 ) 

90 assert r.headers == [ 

91 (b"host", b"example.org"), 

92 (b"connection", b"keep-alive") 

93 ] 

94 assert r.headers.raw_items() == [ 

95 (b"Host", b"example.org"), 

96 (b"Connection", b"keep-alive") 

97 ] 

98 """ 

99 

100 __slots__ = "_full_items" 

101 

102 def __init__(self, full_items: List[Tuple[bytes, bytes, bytes]]) -> None: 

103 self._full_items = full_items 

104 

105 def __bool__(self) -> bool: 

106 return bool(self._full_items) 

107 

108 def __eq__(self, other: object) -> bool: 

109 return list(self) == list(other) # type: ignore 

110 

111 def __len__(self) -> int: 

112 return len(self._full_items) 

113 

114 def __repr__(self) -> str: 

115 return "<Headers(%s)>" % repr(list(self)) 

116 

117 def __getitem__(self, idx: int) -> Tuple[bytes, bytes]: # type: ignore[override] 

118 _, name, value = self._full_items[idx] 

119 return (name, value) 

120 

121 def raw_items(self) -> List[Tuple[bytes, bytes]]: 

122 return [(raw_name, value) for raw_name, _, value in self._full_items] 

123 

124 

125HeaderTypes = Union[ 

126 List[Tuple[bytes, bytes]], 

127 List[Tuple[bytes, str]], 

128 List[Tuple[str, bytes]], 

129 List[Tuple[str, str]], 

130] 

131 

132 

133@overload 

134def normalize_and_validate(headers: Headers, _parsed: Literal[True]) -> Headers: 

135 ... 

136 

137 

138@overload 

139def normalize_and_validate(headers: HeaderTypes, _parsed: Literal[False]) -> Headers: 

140 ... 

141 

142 

143@overload 

144def normalize_and_validate( 

145 headers: Union[Headers, HeaderTypes], _parsed: bool = False 

146) -> Headers: 

147 ... 

148 

149 

150def normalize_and_validate( 

151 headers: Union[Headers, HeaderTypes], _parsed: bool = False 

152) -> Headers: 

153 new_headers = [] 

154 seen_content_length = None 

155 saw_transfer_encoding = False 

156 for name, value in headers: 

157 # For headers coming out of the parser, we can safely skip some steps, 

158 # because it always returns bytes and has already run these regexes 

159 # over the data: 

160 if not _parsed: 

161 name = bytesify(name) 

162 value = bytesify(value) 

163 validate(_field_name_re, name, "Illegal header name {!r}", name) 

164 validate(_field_value_re, value, "Illegal header value {!r}", value) 

165 assert isinstance(name, bytes) 

166 assert isinstance(value, bytes) 

167 

168 raw_name = name 

169 name = name.lower() 

170 if name == b"content-length": 

171 lengths = {length.strip() for length in value.split(b",")} 

172 if len(lengths) != 1: 

173 raise LocalProtocolError("conflicting Content-Length headers") 

174 value = lengths.pop() 

175 validate(_content_length_re, value, "bad Content-Length") 

176 if seen_content_length is None: 

177 seen_content_length = value 

178 new_headers.append((raw_name, name, value)) 

179 elif seen_content_length != value: 

180 raise LocalProtocolError("conflicting Content-Length headers") 

181 elif name == b"transfer-encoding": 

182 # "A server that receives a request message with a transfer coding 

183 # it does not understand SHOULD respond with 501 (Not 

184 # Implemented)." 

185 # https://tools.ietf.org/html/rfc7230#section-3.3.1 

186 if saw_transfer_encoding: 

187 raise LocalProtocolError( 

188 "multiple Transfer-Encoding headers", error_status_hint=501 

189 ) 

190 # "All transfer-coding names are case-insensitive" 

191 # -- https://tools.ietf.org/html/rfc7230#section-4 

192 value = value.lower() 

193 if value != b"chunked": 

194 raise LocalProtocolError( 

195 "Only Transfer-Encoding: chunked is supported", 

196 error_status_hint=501, 

197 ) 

198 saw_transfer_encoding = True 

199 new_headers.append((raw_name, name, value)) 

200 else: 

201 new_headers.append((raw_name, name, value)) 

202 return Headers(new_headers) 

203 

204 

205def get_comma_header(headers: Headers, name: bytes) -> List[bytes]: 

206 # Should only be used for headers whose value is a list of 

207 # comma-separated, case-insensitive values. 

208 # 

209 # The header name `name` is expected to be lower-case bytes. 

210 # 

211 # Connection: meets these criteria (including cast insensitivity). 

212 # 

213 # Content-Length: technically is just a single value (1*DIGIT), but the 

214 # standard makes reference to implementations that do multiple values, and 

215 # using this doesn't hurt. Ditto, case insensitivity doesn't things either 

216 # way. 

217 # 

218 # Transfer-Encoding: is more complex (allows for quoted strings), so 

219 # splitting on , is actually wrong. For example, this is legal: 

220 # 

221 # Transfer-Encoding: foo; options="1,2", chunked 

222 # 

223 # and should be parsed as 

224 # 

225 # foo; options="1,2" 

226 # chunked 

227 # 

228 # but this naive function will parse it as 

229 # 

230 # foo; options="1 

231 # 2" 

232 # chunked 

233 # 

234 # However, this is okay because the only thing we are going to do with 

235 # any Transfer-Encoding is reject ones that aren't just "chunked", so 

236 # both of these will be treated the same anyway. 

237 # 

238 # Expect: the only legal value is the literal string 

239 # "100-continue". Splitting on commas is harmless. Case insensitive. 

240 # 

241 out: List[bytes] = [] 

242 for _, found_name, found_raw_value in headers._full_items: 

243 if found_name == name: 

244 found_raw_value = found_raw_value.lower() 

245 for found_split_value in found_raw_value.split(b","): 

246 found_split_value = found_split_value.strip() 

247 if found_split_value: 

248 out.append(found_split_value) 

249 return out 

250 

251 

252def set_comma_header(headers: Headers, name: bytes, new_values: List[bytes]) -> Headers: 

253 # The header name `name` is expected to be lower-case bytes. 

254 # 

255 # Note that when we store the header we use title casing for the header 

256 # names, in order to match the conventional HTTP header style. 

257 # 

258 # Simply calling `.title()` is a blunt approach, but it's correct 

259 # here given the cases where we're using `set_comma_header`... 

260 # 

261 # Connection, Content-Length, Transfer-Encoding. 

262 new_headers: List[Tuple[bytes, bytes]] = [] 

263 for found_raw_name, found_name, found_raw_value in headers._full_items: 

264 if found_name != name: 

265 new_headers.append((found_raw_name, found_raw_value)) 

266 for new_value in new_values: 

267 new_headers.append((name.title(), new_value)) 

268 return normalize_and_validate(new_headers) 

269 

270 

271def has_expect_100_continue(request: "Request") -> bool: 

272 # https://tools.ietf.org/html/rfc7231#section-5.1.1 

273 # "A server that receives a 100-continue expectation in an HTTP/1.0 request 

274 # MUST ignore that expectation." 

275 if request.http_version < b"1.1": 

276 return False 

277 expect = get_comma_header(request.headers, b"expect") 

278 return b"100-continue" in expect