Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tornado/httputil.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

501 statements  

1# 

2# Copyright 2009 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""HTTP utility code shared by clients and servers. 

17 

18This module also defines the `HTTPServerRequest` class which is exposed 

19via `tornado.web.RequestHandler.request`. 

20""" 

21 

22import calendar 

23import collections.abc 

24import copy 

25import dataclasses 

26import datetime 

27import email.utils 

28from functools import lru_cache 

29from http.client import responses 

30import http.cookies 

31import re 

32from ssl import SSLError 

33import time 

34import unicodedata 

35from urllib.parse import urlencode, urlparse, urlunparse, parse_qsl 

36 

37from tornado.escape import native_str, parse_qs_bytes, utf8, to_unicode 

38from tornado.util import ObjectDict, unicode_type 

39 

40 

41# responses is unused in this file, but we re-export it to other files. 

42# Reference it so pyflakes doesn't complain. 

43responses 

44 

45import typing 

46from typing import ( 

47 Tuple, 

48 Iterable, 

49 List, 

50 Mapping, 

51 Iterator, 

52 Dict, 

53 Union, 

54 Optional, 

55 Awaitable, 

56 Generator, 

57 AnyStr, 

58) 

59 

60if typing.TYPE_CHECKING: 

61 from typing import Deque # noqa: F401 

62 from asyncio import Future # noqa: F401 

63 import unittest # noqa: F401 

64 

65 # This can be done unconditionally in the base class of HTTPHeaders 

66 # after we drop support for Python 3.8. 

67 StrMutableMapping = collections.abc.MutableMapping[str, str] 

68else: 

69 StrMutableMapping = collections.abc.MutableMapping 

70 

71# To be used with str.strip() and related methods. 

72HTTP_WHITESPACE = " \t" 

73 

74# Roughly the inverse of RequestHandler._VALID_HEADER_CHARS, but permits 

75# chars greater than \xFF (which may appear after decoding utf8). 

76_FORBIDDEN_HEADER_CHARS_RE = re.compile(r"[\x00-\x08\x0A-\x1F\x7F]") 

77 

78 

79class _ABNF: 

80 """Class that holds a subset of ABNF rules from RFC 9110 and friends. 

81 

82 Class attributes are re.Pattern objects, with the same name as in the RFC 

83 (with hyphens changed to underscores). Currently contains only the subset 

84 we use (which is why this class is not public). Unfortunately the fields 

85 cannot be alphabetized as they are in the RFCs because of dependencies. 

86 """ 

87 

88 # RFC 3986 (URI) 

89 # The URI hostname ABNF is both complex (including detailed vaildation of IPv4 and IPv6 

90 # literals) and not strict enough (a lot of punctuation is allowed by the ABNF even though 

91 # it is not allowed by DNS). We simplify it by allowing square brackets and colons in any 

92 # position, not only for their use in IPv6 literals. 

93 uri_unreserved = re.compile(r"[A-Za-z0-9\-._~]") 

94 uri_sub_delims = re.compile(r"[!$&'()*+,;=]") 

95 uri_pct_encoded = re.compile(r"%[0-9A-Fa-f]{2}") 

96 uri_host = re.compile( 

97 rf"(?:[\[\]:]|{uri_unreserved.pattern}|{uri_sub_delims.pattern}|{uri_pct_encoded.pattern})*" 

98 ) 

99 uri_port = re.compile(r"[0-9]*") 

100 

101 # RFC 5234 (ABNF) 

102 VCHAR = re.compile(r"[\x21-\x7E]") 

103 

104 # RFC 9110 (HTTP Semantics) 

105 obs_text = re.compile(r"[\x80-\xFF]") 

106 field_vchar = re.compile(rf"(?:{VCHAR.pattern}|{obs_text.pattern})") 

107 # Not exactly from the RFC to simplify and combine field-content and field-value. 

108 field_value = re.compile( 

109 rf"|" 

110 rf"{field_vchar.pattern}|" 

111 rf"{field_vchar.pattern}(?:{field_vchar.pattern}| |\t)*{field_vchar.pattern}" 

112 ) 

113 tchar = re.compile(r"[!#$%&'*+\-.^_`|~0-9A-Za-z]") 

114 token = re.compile(rf"{tchar.pattern}+") 

115 field_name = token 

116 method = token 

117 host = re.compile(rf"(?:{uri_host.pattern})(?::{uri_port.pattern})?") 

118 

119 # RFC 9112 (HTTP/1.1) 

120 HTTP_version = re.compile(r"HTTP/[0-9]\.[0-9]") 

121 reason_phrase = re.compile(rf"(?:[\t ]|{VCHAR.pattern}|{obs_text.pattern})+") 

122 # request_target delegates to the URI RFC 3986, which is complex and may be 

123 # too restrictive (for example, the WHATWG version of the URL spec allows non-ASCII 

124 # characters). Instead, we allow everything but control chars and whitespace. 

125 request_target = re.compile(rf"{field_vchar.pattern}+") 

126 request_line = re.compile( 

127 rf"({method.pattern}) ({request_target.pattern}) ({HTTP_version.pattern})" 

128 ) 

129 status_code = re.compile(r"[0-9]{3}") 

130 status_line = re.compile( 

131 rf"({HTTP_version.pattern}) ({status_code.pattern}) ({reason_phrase.pattern})?" 

132 ) 

133 

134 

135@lru_cache(1000) 

136def _normalize_header(name: str) -> str: 

137 """Map a header name to Http-Header-Case. 

138 

139 >>> _normalize_header("coNtent-TYPE") 

140 'Content-Type' 

141 """ 

142 return "-".join([w.capitalize() for w in name.split("-")]) 

143 

144 

145class HTTPHeaders(StrMutableMapping): 

146 """A dictionary that maintains ``Http-Header-Case`` for all keys. 

147 

148 Supports multiple values per key via a pair of new methods, 

149 `add()` and `get_list()`. The regular dictionary interface 

150 returns a single value per key, with multiple values joined by a 

151 comma. 

152 

153 >>> h = HTTPHeaders({"content-type": "text/html"}) 

154 >>> list(h.keys()) 

155 ['Content-Type'] 

156 >>> h["Content-Type"] 

157 'text/html' 

158 

159 >>> h.add("Set-Cookie", "A=B") 

160 >>> h.add("Set-Cookie", "C=D") 

161 >>> h["set-cookie"] 

162 'A=B,C=D' 

163 >>> h.get_list("set-cookie") 

164 ['A=B', 'C=D'] 

165 

166 >>> for (k,v) in sorted(h.get_all()): 

167 ... print('%s: %s' % (k,v)) 

168 ... 

169 Content-Type: text/html 

170 Set-Cookie: A=B 

171 Set-Cookie: C=D 

172 """ 

173 

174 @typing.overload 

175 def __init__(self, __arg: Mapping[str, List[str]]) -> None: 

176 pass 

177 

178 @typing.overload # noqa: F811 

179 def __init__(self, __arg: Mapping[str, str]) -> None: 

180 pass 

181 

182 @typing.overload # noqa: F811 

183 def __init__(self, *args: Tuple[str, str]) -> None: 

184 pass 

185 

186 @typing.overload # noqa: F811 

187 def __init__(self, **kwargs: str) -> None: 

188 pass 

189 

190 def __init__(self, *args: typing.Any, **kwargs: str) -> None: # noqa: F811 

191 # Formally, HTTP headers are a mapping from a field name to a "combined field value", 

192 # which may be constructed from multiple field lines by joining them with commas. 

193 # In practice, however, some headers (notably Set-Cookie) do not follow this convention, 

194 # so we maintain a mapping from field name to a list of field lines in self._as_list. 

195 # self._combined_cache is a cache of the combined field values derived from self._as_list 

196 # on demand (and cleared whenever the list is modified). 

197 self._as_list: dict[str, list[str]] = {} 

198 self._combined_cache: dict[str, str] = {} 

199 self._last_key = None # type: Optional[str] 

200 if len(args) == 1 and len(kwargs) == 0 and isinstance(args[0], HTTPHeaders): 

201 # Copy constructor 

202 for k, v in args[0].get_all(): 

203 self.add(k, v) 

204 else: 

205 # Dict-style initialization 

206 self.update(*args, **kwargs) 

207 

208 # new public methods 

209 

210 def add(self, name: str, value: str, *, _chars_are_bytes: bool = True) -> None: 

211 """Adds a new value for the given key.""" 

212 if not _ABNF.field_name.fullmatch(name): 

213 raise HTTPInputError("Invalid header name %r" % name) 

214 if _chars_are_bytes: 

215 if not _ABNF.field_value.fullmatch(to_unicode(value)): 

216 # TODO: the fact we still support bytes here (contrary to type annotations) 

217 # and still test for it should probably be changed. 

218 raise HTTPInputError("Invalid header value %r" % value) 

219 else: 

220 if _FORBIDDEN_HEADER_CHARS_RE.search(value): 

221 raise HTTPInputError("Invalid header value %r" % value) 

222 norm_name = _normalize_header(name) 

223 self._last_key = norm_name 

224 if norm_name in self: 

225 self._combined_cache.pop(norm_name, None) 

226 self._as_list[norm_name].append(value) 

227 else: 

228 self[norm_name] = value 

229 

230 def get_list(self, name: str) -> List[str]: 

231 """Returns all values for the given header as a list.""" 

232 norm_name = _normalize_header(name) 

233 return self._as_list.get(norm_name, []) 

234 

235 def get_all(self) -> Iterable[Tuple[str, str]]: 

236 """Returns an iterable of all (name, value) pairs. 

237 

238 If a header has multiple values, multiple pairs will be 

239 returned with the same name. 

240 """ 

241 for name, values in self._as_list.items(): 

242 for value in values: 

243 yield (name, value) 

244 

245 def parse_line(self, line: str, *, _chars_are_bytes: bool = True) -> None: 

246 r"""Updates the dictionary with a single header line. 

247 

248 >>> h = HTTPHeaders() 

249 >>> h.parse_line("Content-Type: text/html") 

250 >>> h.get('content-type') 

251 'text/html' 

252 >>> h.parse_line("Content-Length: 42\r\n") 

253 >>> h.get('content-type') 

254 'text/html' 

255 

256 .. versionchanged:: 6.5 

257 Now supports lines with or without the trailing CRLF, making it possible 

258 to pass lines from AsyncHTTPClient's header_callback directly to this method. 

259 

260 .. deprecated:: 6.5 

261 In Tornado 7.0, certain deprecated features of HTTP will become errors. 

262 Specifically, line folding and the use of LF (with CR) as a line separator 

263 will be removed. 

264 """ 

265 if m := re.search(r"\r?\n$", line): 

266 # RFC 9112 section 2.2: a recipient MAY recognize a single LF as a line 

267 # terminator and ignore any preceding CR. 

268 # TODO(7.0): Remove this support for LF-only line endings. 

269 line = line[: m.start()] 

270 if not line: 

271 # Empty line, or the final CRLF of a header block. 

272 return 

273 if line[0] in HTTP_WHITESPACE: 

274 # continuation of a multi-line header 

275 # TODO(7.0): Remove support for line folding. 

276 if self._last_key is None: 

277 raise HTTPInputError("first header line cannot start with whitespace") 

278 new_part = " " + line.strip(HTTP_WHITESPACE) 

279 if _chars_are_bytes: 

280 if not _ABNF.field_value.fullmatch(new_part[1:]): 

281 raise HTTPInputError("Invalid header continuation %r" % new_part) 

282 else: 

283 if _FORBIDDEN_HEADER_CHARS_RE.search(new_part): 

284 raise HTTPInputError("Invalid header value %r" % new_part) 

285 self._as_list[self._last_key][-1] += new_part 

286 self._combined_cache.pop(self._last_key, None) 

287 else: 

288 try: 

289 name, value = line.split(":", 1) 

290 except ValueError: 

291 raise HTTPInputError("no colon in header line") 

292 self.add( 

293 name, value.strip(HTTP_WHITESPACE), _chars_are_bytes=_chars_are_bytes 

294 ) 

295 

296 @classmethod 

297 def parse(cls, headers: str, *, _chars_are_bytes: bool = True) -> "HTTPHeaders": 

298 """Returns a dictionary from HTTP header text. 

299 

300 >>> h = HTTPHeaders.parse("Content-Type: text/html\\r\\nContent-Length: 42\\r\\n") 

301 >>> sorted(h.items()) 

302 [('Content-Length', '42'), ('Content-Type', 'text/html')] 

303 

304 .. versionchanged:: 5.1 

305 

306 Raises `HTTPInputError` on malformed headers instead of a 

307 mix of `KeyError`, and `ValueError`. 

308 

309 """ 

310 # _chars_are_bytes is a hack. This method is used in two places, HTTP headers (in which 

311 # non-ascii characters are to be interpreted as latin-1) and multipart/form-data (in which 

312 # they are to be interpreted as utf-8). For historical reasons, this method handled this by 

313 # expecting both callers to decode the headers to strings before parsing them. This wasn't a 

314 # problem until we started doing stricter validation of the characters allowed in HTTP 

315 # headers (using ABNF rules defined in terms of byte values), which inadvertently started 

316 # disallowing non-latin1 characters in multipart/form-data filenames. 

317 # 

318 # This method should have accepted bytes and a desired encoding, but this change is being 

319 # introduced in a patch release that shouldn't change the API. Instead, the _chars_are_bytes 

320 # flag decides whether to use HTTP-style ABNF validation (treating the string as bytes 

321 # smuggled through the latin1 encoding) or to accept any non-control unicode characters 

322 # as required by multipart/form-data. This method will change to accept bytes in a future 

323 # release. 

324 h = cls() 

325 

326 start = 0 

327 while True: 

328 lf = headers.find("\n", start) 

329 if lf == -1: 

330 h.parse_line(headers[start:], _chars_are_bytes=_chars_are_bytes) 

331 break 

332 line = headers[start : lf + 1] 

333 start = lf + 1 

334 h.parse_line(line, _chars_are_bytes=_chars_are_bytes) 

335 return h 

336 

337 # MutableMapping abstract method implementations. 

338 

339 def __setitem__(self, name: str, value: str) -> None: 

340 norm_name = _normalize_header(name) 

341 self._combined_cache[norm_name] = value 

342 self._as_list[norm_name] = [value] 

343 

344 def __contains__(self, name: object) -> bool: 

345 # This is an important optimization to avoid the expensive concatenation 

346 # in __getitem__ when it's not needed. 

347 if not isinstance(name, str): 

348 return False 

349 norm_name = _normalize_header(name) 

350 return norm_name in self._as_list 

351 

352 def __getitem__(self, name: str) -> str: 

353 header = _normalize_header(name) 

354 if header not in self._combined_cache: 

355 self._combined_cache[header] = ",".join(self._as_list[header]) 

356 return self._combined_cache[header] 

357 

358 def __delitem__(self, name: str) -> None: 

359 norm_name = _normalize_header(name) 

360 del self._combined_cache[norm_name] 

361 del self._as_list[norm_name] 

362 

363 def __len__(self) -> int: 

364 return len(self._as_list) 

365 

366 def __iter__(self) -> Iterator[typing.Any]: 

367 return iter(self._as_list) 

368 

369 def copy(self) -> "HTTPHeaders": 

370 # defined in dict but not in MutableMapping. 

371 return HTTPHeaders(self) 

372 

373 # Use our overridden copy method for the copy.copy module. 

374 # This makes shallow copies one level deeper, but preserves 

375 # the appearance that HTTPHeaders is a single container. 

376 __copy__ = copy 

377 

378 def __str__(self) -> str: 

379 lines = [] 

380 for name, value in self.get_all(): 

381 lines.append(f"{name}: {value}\n") 

382 return "".join(lines) 

383 

384 __unicode__ = __str__ 

385 

386 

387class HTTPServerRequest: 

388 """A single HTTP request. 

389 

390 All attributes are type `str` unless otherwise noted. 

391 

392 .. attribute:: method 

393 

394 HTTP request method, e.g. "GET" or "POST" 

395 

396 .. attribute:: uri 

397 

398 The requested uri. 

399 

400 .. attribute:: path 

401 

402 The path portion of `uri` 

403 

404 .. attribute:: query 

405 

406 The query portion of `uri` 

407 

408 .. attribute:: version 

409 

410 HTTP version specified in request, e.g. "HTTP/1.1" 

411 

412 .. attribute:: headers 

413 

414 `.HTTPHeaders` dictionary-like object for request headers. Acts like 

415 a case-insensitive dictionary with additional methods for repeated 

416 headers. 

417 

418 .. attribute:: body 

419 

420 Request body, if present, as a byte string. 

421 

422 .. attribute:: remote_ip 

423 

424 Client's IP address as a string. If ``HTTPServer.xheaders`` is set, 

425 will pass along the real IP address provided by a load balancer 

426 in the ``X-Real-Ip`` or ``X-Forwarded-For`` header. 

427 

428 .. versionchanged:: 3.1 

429 The list format of ``X-Forwarded-For`` is now supported. 

430 

431 .. attribute:: protocol 

432 

433 The protocol used, either "http" or "https". If ``HTTPServer.xheaders`` 

434 is set, will pass along the protocol used by a load balancer if 

435 reported via an ``X-Scheme`` header. 

436 

437 .. attribute:: host 

438 

439 The requested hostname, usually taken from the ``Host`` header. 

440 

441 .. attribute:: arguments 

442 

443 GET/POST arguments are available in the arguments property, which 

444 maps arguments names to lists of values (to support multiple values 

445 for individual names). Names are of type `str`, while arguments 

446 are byte strings. Note that this is different from 

447 `.RequestHandler.get_argument`, which returns argument values as 

448 unicode strings. 

449 

450 .. attribute:: query_arguments 

451 

452 Same format as ``arguments``, but contains only arguments extracted 

453 from the query string. 

454 

455 .. versionadded:: 3.2 

456 

457 .. attribute:: body_arguments 

458 

459 Same format as ``arguments``, but contains only arguments extracted 

460 from the request body. 

461 

462 .. versionadded:: 3.2 

463 

464 .. attribute:: files 

465 

466 File uploads are available in the files property, which maps file 

467 names to lists of `.HTTPFile`. 

468 

469 .. attribute:: connection 

470 

471 An HTTP request is attached to a single HTTP connection, which can 

472 be accessed through the "connection" attribute. Since connections 

473 are typically kept open in HTTP/1.1, multiple requests can be handled 

474 sequentially on a single connection. 

475 

476 .. versionchanged:: 4.0 

477 Moved from ``tornado.httpserver.HTTPRequest``. 

478 

479 .. deprecated:: 6.5.2 

480 The ``host`` argument to the ``HTTPServerRequest`` constructor is deprecated. Use 

481 ``headers["Host"]`` instead. This argument was mistakenly removed in Tornado 6.5.0 and 

482 temporarily restored in 6.5.2. 

483 """ 

484 

485 path = None # type: str 

486 query = None # type: str 

487 

488 # HACK: Used for stream_request_body 

489 _body_future = None # type: Future[None] 

490 

491 def __init__( 

492 self, 

493 method: Optional[str] = None, 

494 uri: Optional[str] = None, 

495 version: str = "HTTP/1.0", 

496 headers: Optional[HTTPHeaders] = None, 

497 body: Optional[bytes] = None, 

498 host: Optional[str] = None, 

499 files: Optional[Dict[str, List["HTTPFile"]]] = None, 

500 connection: Optional["HTTPConnection"] = None, 

501 start_line: Optional["RequestStartLine"] = None, 

502 server_connection: Optional[object] = None, 

503 ) -> None: 

504 if start_line is not None: 

505 method, uri, version = start_line 

506 self.method = method 

507 self.uri = uri 

508 self.version = version 

509 self.headers = headers or HTTPHeaders() 

510 self.body = body or b"" 

511 

512 # set remote IP and protocol 

513 context = getattr(connection, "context", None) 

514 self.remote_ip = getattr(context, "remote_ip", None) 

515 self.protocol = getattr(context, "protocol", "http") 

516 

517 try: 

518 self.host = host or self.headers["Host"] 

519 except KeyError: 

520 if version == "HTTP/1.0": 

521 # HTTP/1.0 does not require the Host header. 

522 self.host = "127.0.0.1" 

523 else: 

524 raise HTTPInputError("Missing Host header") 

525 if not _ABNF.host.fullmatch(self.host): 

526 raise HTTPInputError("Invalid Host header: %r" % self.host) 

527 if "," in self.host: 

528 # https://www.rfc-editor.org/rfc/rfc9112.html#name-request-target 

529 # Server MUST respond with 400 Bad Request if multiple 

530 # Host headers are present. 

531 # 

532 # We test for the presence of a comma instead of the number of 

533 # headers received because a proxy may have converted 

534 # multiple headers into a single comma-separated value 

535 # (per RFC 9110 section 5.3). 

536 # 

537 # This is technically a departure from the RFC since the ABNF 

538 # does not forbid commas in the host header. However, since 

539 # commas are not allowed in DNS names, it is appropriate to 

540 # disallow them. (The same argument could be made for other special 

541 # characters, but commas are the most problematic since they could 

542 # be used to exploit differences between proxies when multiple headers 

543 # are supplied). 

544 raise HTTPInputError("Multiple host headers not allowed: %r" % self.host) 

545 self.host_name = split_host_and_port(self.host.lower())[0] 

546 self.files = files or {} 

547 self.connection = connection 

548 self.server_connection = server_connection 

549 self._start_time = time.time() 

550 self._finish_time = None 

551 

552 if uri is not None: 

553 self.path, sep, self.query = uri.partition("?") 

554 self.arguments = parse_qs_bytes(self.query, keep_blank_values=True) 

555 self.query_arguments = copy.deepcopy(self.arguments) 

556 self.body_arguments = {} # type: Dict[str, List[bytes]] 

557 

558 @property 

559 def cookies(self) -> Dict[str, http.cookies.Morsel]: 

560 """A dictionary of ``http.cookies.Morsel`` objects.""" 

561 if not hasattr(self, "_cookies"): 

562 self._cookies = ( 

563 http.cookies.SimpleCookie() 

564 ) # type: http.cookies.SimpleCookie 

565 if "Cookie" in self.headers: 

566 try: 

567 parsed = parse_cookie(self.headers["Cookie"]) 

568 except Exception: 

569 pass 

570 else: 

571 for k, v in parsed.items(): 

572 try: 

573 self._cookies[k] = v 

574 except Exception: 

575 # SimpleCookie imposes some restrictions on keys; 

576 # parse_cookie does not. Discard any cookies 

577 # with disallowed keys. 

578 pass 

579 return self._cookies 

580 

581 def full_url(self) -> str: 

582 """Reconstructs the full URL for this request.""" 

583 return self.protocol + "://" + self.host + self.uri # type: ignore[operator] 

584 

585 def request_time(self) -> float: 

586 """Returns the amount of time it took for this request to execute.""" 

587 if self._finish_time is None: 

588 return time.time() - self._start_time 

589 else: 

590 return self._finish_time - self._start_time 

591 

592 def get_ssl_certificate( 

593 self, binary_form: bool = False 

594 ) -> Union[None, Dict, bytes]: 

595 """Returns the client's SSL certificate, if any. 

596 

597 To use client certificates, the HTTPServer's 

598 `ssl.SSLContext.verify_mode` field must be set, e.g.:: 

599 

600 ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) 

601 ssl_ctx.load_cert_chain("foo.crt", "foo.key") 

602 ssl_ctx.load_verify_locations("cacerts.pem") 

603 ssl_ctx.verify_mode = ssl.CERT_REQUIRED 

604 server = HTTPServer(app, ssl_options=ssl_ctx) 

605 

606 By default, the return value is a dictionary (or None, if no 

607 client certificate is present). If ``binary_form`` is true, a 

608 DER-encoded form of the certificate is returned instead. See 

609 SSLSocket.getpeercert() in the standard library for more 

610 details. 

611 http://docs.python.org/library/ssl.html#sslsocket-objects 

612 """ 

613 try: 

614 if self.connection is None: 

615 return None 

616 # TODO: add a method to HTTPConnection for this so it can work with HTTP/2 

617 return self.connection.stream.socket.getpeercert( # type: ignore 

618 binary_form=binary_form 

619 ) 

620 except SSLError: 

621 return None 

622 

623 def _parse_body(self) -> None: 

624 parse_body_arguments( 

625 self.headers.get("Content-Type", ""), 

626 self.body, 

627 self.body_arguments, 

628 self.files, 

629 self.headers, 

630 ) 

631 

632 for k, v in self.body_arguments.items(): 

633 self.arguments.setdefault(k, []).extend(v) 

634 

635 def __repr__(self) -> str: 

636 attrs = ("protocol", "host", "method", "uri", "version", "remote_ip") 

637 args = ", ".join([f"{n}={getattr(self, n)!r}" for n in attrs]) 

638 return f"{self.__class__.__name__}({args})" 

639 

640 

641class HTTPInputError(Exception): 

642 """Exception class for malformed HTTP requests or responses 

643 from remote sources. 

644 

645 .. versionadded:: 4.0 

646 """ 

647 

648 pass 

649 

650 

651class HTTPOutputError(Exception): 

652 """Exception class for errors in HTTP output. 

653 

654 .. versionadded:: 4.0 

655 """ 

656 

657 pass 

658 

659 

660class HTTPServerConnectionDelegate: 

661 """Implement this interface to handle requests from `.HTTPServer`. 

662 

663 .. versionadded:: 4.0 

664 """ 

665 

666 def start_request( 

667 self, server_conn: object, request_conn: "HTTPConnection" 

668 ) -> "HTTPMessageDelegate": 

669 """This method is called by the server when a new request has started. 

670 

671 :arg server_conn: is an opaque object representing the long-lived 

672 (e.g. tcp-level) connection. 

673 :arg request_conn: is a `.HTTPConnection` object for a single 

674 request/response exchange. 

675 

676 This method should return a `.HTTPMessageDelegate`. 

677 """ 

678 raise NotImplementedError() 

679 

680 def on_close(self, server_conn: object) -> None: 

681 """This method is called when a connection has been closed. 

682 

683 :arg server_conn: is a server connection that has previously been 

684 passed to ``start_request``. 

685 """ 

686 pass 

687 

688 

689class HTTPMessageDelegate: 

690 """Implement this interface to handle an HTTP request or response. 

691 

692 .. versionadded:: 4.0 

693 """ 

694 

695 # TODO: genericize this class to avoid exposing the Union. 

696 def headers_received( 

697 self, 

698 start_line: Union["RequestStartLine", "ResponseStartLine"], 

699 headers: HTTPHeaders, 

700 ) -> Optional[Awaitable[None]]: 

701 """Called when the HTTP headers have been received and parsed. 

702 

703 :arg start_line: a `.RequestStartLine` or `.ResponseStartLine` 

704 depending on whether this is a client or server message. 

705 :arg headers: a `.HTTPHeaders` instance. 

706 

707 Some `.HTTPConnection` methods can only be called during 

708 ``headers_received``. 

709 

710 May return a `.Future`; if it does the body will not be read 

711 until it is done. 

712 """ 

713 pass 

714 

715 def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: 

716 """Called when a chunk of data has been received. 

717 

718 May return a `.Future` for flow control. 

719 """ 

720 pass 

721 

722 def finish(self) -> None: 

723 """Called after the last chunk of data has been received.""" 

724 pass 

725 

726 def on_connection_close(self) -> None: 

727 """Called if the connection is closed without finishing the request. 

728 

729 If ``headers_received`` is called, either ``finish`` or 

730 ``on_connection_close`` will be called, but not both. 

731 """ 

732 pass 

733 

734 

735class HTTPConnection: 

736 """Applications use this interface to write their responses. 

737 

738 .. versionadded:: 4.0 

739 """ 

740 

741 def write_headers( 

742 self, 

743 start_line: Union["RequestStartLine", "ResponseStartLine"], 

744 headers: HTTPHeaders, 

745 chunk: Optional[bytes] = None, 

746 ) -> "Future[None]": 

747 """Write an HTTP header block. 

748 

749 :arg start_line: a `.RequestStartLine` or `.ResponseStartLine`. 

750 :arg headers: a `.HTTPHeaders` instance. 

751 :arg chunk: the first (optional) chunk of data. This is an optimization 

752 so that small responses can be written in the same call as their 

753 headers. 

754 

755 The ``version`` field of ``start_line`` is ignored. 

756 

757 Returns a future for flow control. 

758 

759 .. versionchanged:: 6.0 

760 

761 The ``callback`` argument was removed. 

762 """ 

763 raise NotImplementedError() 

764 

765 def write(self, chunk: bytes) -> "Future[None]": 

766 """Writes a chunk of body data. 

767 

768 Returns a future for flow control. 

769 

770 .. versionchanged:: 6.0 

771 

772 The ``callback`` argument was removed. 

773 """ 

774 raise NotImplementedError() 

775 

776 def finish(self) -> None: 

777 """Indicates that the last body data has been written.""" 

778 raise NotImplementedError() 

779 

780 

781def url_concat( 

782 url: str, 

783 args: Union[ 

784 None, Dict[str, str], List[Tuple[str, str]], Tuple[Tuple[str, str], ...] 

785 ], 

786) -> str: 

787 """Concatenate url and arguments regardless of whether 

788 url has existing query parameters. 

789 

790 ``args`` may be either a dictionary or a list of key-value pairs 

791 (the latter allows for multiple values with the same key. 

792 

793 >>> url_concat("http://example.com/foo", dict(c="d")) 

794 'http://example.com/foo?c=d' 

795 >>> url_concat("http://example.com/foo?a=b", dict(c="d")) 

796 'http://example.com/foo?a=b&c=d' 

797 >>> url_concat("http://example.com/foo?a=b", [("c", "d"), ("c", "d2")]) 

798 'http://example.com/foo?a=b&c=d&c=d2' 

799 """ 

800 if args is None: 

801 return url 

802 parsed_url = urlparse(url) 

803 if isinstance(args, dict): 

804 parsed_query = parse_qsl(parsed_url.query, keep_blank_values=True) 

805 parsed_query.extend(args.items()) 

806 elif isinstance(args, list) or isinstance(args, tuple): 

807 parsed_query = parse_qsl(parsed_url.query, keep_blank_values=True) 

808 parsed_query.extend(args) 

809 else: 

810 err = "'args' parameter should be dict, list or tuple. Not {0}".format( 

811 type(args) 

812 ) 

813 raise TypeError(err) 

814 final_query = urlencode(parsed_query) 

815 url = urlunparse( 

816 ( 

817 parsed_url[0], 

818 parsed_url[1], 

819 parsed_url[2], 

820 parsed_url[3], 

821 final_query, 

822 parsed_url[5], 

823 ) 

824 ) 

825 return url 

826 

827 

828class HTTPFile(ObjectDict): 

829 """Represents a file uploaded via a form. 

830 

831 For backwards compatibility, its instance attributes are also 

832 accessible as dictionary keys. 

833 

834 * ``filename`` 

835 * ``body`` 

836 * ``content_type`` 

837 """ 

838 

839 filename: str 

840 body: bytes 

841 content_type: str 

842 

843 

844def _parse_request_range( 

845 range_header: str, 

846) -> Optional[Tuple[Optional[int], Optional[int]]]: 

847 """Parses a Range header. 

848 

849 Returns either ``None`` or tuple ``(start, end)``. 

850 Note that while the HTTP headers use inclusive byte positions, 

851 this method returns indexes suitable for use in slices. 

852 

853 >>> start, end = _parse_request_range("bytes=1-2") 

854 >>> start, end 

855 (1, 3) 

856 >>> [0, 1, 2, 3, 4][start:end] 

857 [1, 2] 

858 >>> _parse_request_range("bytes=6-") 

859 (6, None) 

860 >>> _parse_request_range("bytes=-6") 

861 (-6, None) 

862 >>> _parse_request_range("bytes=-0") 

863 (None, 0) 

864 >>> _parse_request_range("bytes=") 

865 (None, None) 

866 >>> _parse_request_range("foo=42") 

867 >>> _parse_request_range("bytes=1-2,6-10") 

868 

869 Note: only supports one range (ex, ``bytes=1-2,6-10`` is not allowed). 

870 

871 See [0] for the details of the range header. 

872 

873 [0]: http://greenbytes.de/tech/webdav/draft-ietf-httpbis-p5-range-latest.html#byte.ranges 

874 """ 

875 unit, _, value = range_header.partition("=") 

876 unit, value = unit.strip(), value.strip() 

877 if unit != "bytes": 

878 return None 

879 start_b, _, end_b = value.partition("-") 

880 try: 

881 start = _int_or_none(start_b) 

882 end = _int_or_none(end_b) 

883 except ValueError: 

884 return None 

885 if end is not None: 

886 if start is None: 

887 if end != 0: 

888 start = -end 

889 end = None 

890 else: 

891 end += 1 

892 return (start, end) 

893 

894 

895def _get_content_range(start: Optional[int], end: Optional[int], total: int) -> str: 

896 """Returns a suitable Content-Range header: 

897 

898 >>> print(_get_content_range(None, 1, 4)) 

899 bytes 0-0/4 

900 >>> print(_get_content_range(1, 3, 4)) 

901 bytes 1-2/4 

902 >>> print(_get_content_range(None, None, 4)) 

903 bytes 0-3/4 

904 """ 

905 start = start or 0 

906 end = (end or total) - 1 

907 return f"bytes {start}-{end}/{total}" 

908 

909 

910def _int_or_none(val: str) -> Optional[int]: 

911 val = val.strip() 

912 if val == "": 

913 return None 

914 return int(val) 

915 

916 

917@dataclasses.dataclass 

918class ParseMultipartConfig: 

919 """This class configures the parsing of ``multipart/form-data`` request bodies. 

920 

921 Its primary purpose is to place limits on the size and complexity of request messages 

922 to avoid potential denial-of-service attacks. 

923 

924 .. versionadded:: 6.5.5 

925 """ 

926 

927 enabled: bool = True 

928 """Set this to false to disable the parsing of ``multipart/form-data`` requests entirely. 

929 

930 This may be desirable for applications that do not need to handle this format, since 

931 multipart request have a history of DoS vulnerabilities in Tornado. Multipart requests 

932 are used primarily for ``<input type="file">`` in HTML forms, or in APIs that mimic this 

933 format. File uploads that use the HTTP ``PUT`` method generally do not use the multipart 

934 format. 

935 """ 

936 

937 max_parts: int = 100 

938 """The maximum number of parts accepted in a multipart request. 

939 

940 Each ``<input>`` element in an HTML form corresponds to at least one "part". 

941 """ 

942 

943 max_part_header_size: int = 10 * 1024 

944 """The maximum size of the headers for each part of a multipart request. 

945 

946 The header for a part contains the name of the form field and optionally the filename 

947 and content type of the uploaded file. 

948 """ 

949 

950 

951@dataclasses.dataclass 

952class ParseBodyConfig: 

953 """This class configures the parsing of request bodies. 

954 

955 .. versionadded:: 6.5.5 

956 """ 

957 

958 multipart: ParseMultipartConfig = dataclasses.field( 

959 default_factory=ParseMultipartConfig 

960 ) 

961 """Configuration for ``multipart/form-data`` request bodies.""" 

962 

963 

964_DEFAULT_PARSE_BODY_CONFIG = ParseBodyConfig() 

965 

966 

967def set_parse_body_config(config: ParseBodyConfig) -> None: 

968 r"""Sets the **global** default configuration for parsing request bodies. 

969 

970 This global setting is provided as a stopgap for applications that need to raise the limits 

971 introduced in Tornado 6.5.5, or who wish to disable the parsing of multipart/form-data bodies 

972 entirely. Non-global configuration for this functionality will be introduced in a future 

973 release. 

974 

975 >>> content_type = "multipart/form-data; boundary=foo" 

976 >>> multipart_body = b"--foo--\r\n" 

977 >>> parse_body_arguments(content_type, multipart_body, {}, {}) 

978 >>> multipart_config = ParseMultipartConfig(enabled=False) 

979 >>> config = ParseBodyConfig(multipart=multipart_config) 

980 >>> set_parse_body_config(config) 

981 >>> parse_body_arguments(content_type, multipart_body, {}, {}) 

982 Traceback (most recent call last): 

983 ... 

984 tornado.httputil.HTTPInputError: ...: multipart/form-data parsing is disabled 

985 >>> set_parse_body_config(ParseBodyConfig()) # reset to defaults 

986 

987 .. versionadded:: 6.5.5 

988 """ 

989 global _DEFAULT_PARSE_BODY_CONFIG 

990 _DEFAULT_PARSE_BODY_CONFIG = config 

991 

992 

993def parse_body_arguments( 

994 content_type: str, 

995 body: bytes, 

996 arguments: Dict[str, List[bytes]], 

997 files: Dict[str, List[HTTPFile]], 

998 headers: Optional[HTTPHeaders] = None, 

999 *, 

1000 config: Optional[ParseBodyConfig] = None, 

1001) -> None: 

1002 """Parses a form request body. 

1003 

1004 Supports ``application/x-www-form-urlencoded`` and 

1005 ``multipart/form-data``. The ``content_type`` parameter should be 

1006 a string and ``body`` should be a byte string. The ``arguments`` 

1007 and ``files`` parameters are dictionaries that will be updated 

1008 with the parsed contents. 

1009 """ 

1010 if config is None: 

1011 config = _DEFAULT_PARSE_BODY_CONFIG 

1012 if content_type.startswith("application/x-www-form-urlencoded"): 

1013 if headers and "Content-Encoding" in headers: 

1014 raise HTTPInputError( 

1015 "Unsupported Content-Encoding: %s" % headers["Content-Encoding"] 

1016 ) 

1017 try: 

1018 # real charset decoding will happen in RequestHandler.decode_argument() 

1019 uri_arguments = parse_qs_bytes(body, keep_blank_values=True) 

1020 except Exception as e: 

1021 raise HTTPInputError("Invalid x-www-form-urlencoded body: %s" % e) from e 

1022 for name, values in uri_arguments.items(): 

1023 if values: 

1024 arguments.setdefault(name, []).extend(values) 

1025 elif content_type.startswith("multipart/form-data"): 

1026 if headers and "Content-Encoding" in headers: 

1027 raise HTTPInputError( 

1028 "Unsupported Content-Encoding: %s" % headers["Content-Encoding"] 

1029 ) 

1030 try: 

1031 fields = content_type.split(";") 

1032 if fields[0].strip() != "multipart/form-data": 

1033 # This catches "Content-Type: multipart/form-dataxyz" 

1034 raise HTTPInputError("Invalid content type") 

1035 for field in fields: 

1036 k, sep, v = field.strip().partition("=") 

1037 if k == "boundary" and v: 

1038 parse_multipart_form_data( 

1039 utf8(v), body, arguments, files, config=config.multipart 

1040 ) 

1041 break 

1042 else: 

1043 raise HTTPInputError("multipart boundary not found") 

1044 except Exception as e: 

1045 raise HTTPInputError("Invalid multipart/form-data: %s" % e) from e 

1046 

1047 

1048def parse_multipart_form_data( 

1049 boundary: bytes, 

1050 data: bytes, 

1051 arguments: Dict[str, List[bytes]], 

1052 files: Dict[str, List[HTTPFile]], 

1053 *, 

1054 config: Optional[ParseMultipartConfig] = None, 

1055) -> None: 

1056 """Parses a ``multipart/form-data`` body. 

1057 

1058 The ``boundary`` and ``data`` parameters are both byte strings. 

1059 The dictionaries given in the arguments and files parameters 

1060 will be updated with the contents of the body. 

1061 

1062 .. versionchanged:: 5.1 

1063 

1064 Now recognizes non-ASCII filenames in RFC 2231/5987 

1065 (``filename*=``) format. 

1066 """ 

1067 if config is None: 

1068 config = _DEFAULT_PARSE_BODY_CONFIG.multipart 

1069 if not config.enabled: 

1070 raise HTTPInputError("multipart/form-data parsing is disabled") 

1071 # The standard allows for the boundary to be quoted in the header, 

1072 # although it's rare (it happens at least for google app engine 

1073 # xmpp). I think we're also supposed to handle backslash-escapes 

1074 # here but I'll save that until we see a client that uses them 

1075 # in the wild. 

1076 if boundary.startswith(b'"') and boundary.endswith(b'"'): 

1077 boundary = boundary[1:-1] 

1078 final_boundary_index = data.rfind(b"--" + boundary + b"--") 

1079 if final_boundary_index == -1: 

1080 raise HTTPInputError("Invalid multipart/form-data: no final boundary found") 

1081 parts = data[:final_boundary_index].split(b"--" + boundary + b"\r\n") 

1082 if len(parts) > config.max_parts: 

1083 raise HTTPInputError("multipart/form-data has too many parts") 

1084 for part in parts: 

1085 if not part: 

1086 continue 

1087 eoh = part.find(b"\r\n\r\n") 

1088 if eoh == -1: 

1089 raise HTTPInputError("multipart/form-data missing headers") 

1090 if eoh > config.max_part_header_size: 

1091 raise HTTPInputError("multipart/form-data part header too large") 

1092 headers = HTTPHeaders.parse(part[:eoh].decode("utf-8"), _chars_are_bytes=False) 

1093 disp_header = headers.get("Content-Disposition", "") 

1094 disposition, disp_params = _parse_header(disp_header) 

1095 if disposition != "form-data" or not part.endswith(b"\r\n"): 

1096 raise HTTPInputError("Invalid multipart/form-data") 

1097 value = part[eoh + 4 : -2] 

1098 if not disp_params.get("name"): 

1099 raise HTTPInputError("multipart/form-data missing name") 

1100 name = disp_params["name"] 

1101 if disp_params.get("filename"): 

1102 ctype = headers.get("Content-Type", "application/unknown") 

1103 files.setdefault(name, []).append( 

1104 HTTPFile( 

1105 filename=disp_params["filename"], body=value, content_type=ctype 

1106 ) 

1107 ) 

1108 else: 

1109 arguments.setdefault(name, []).append(value) 

1110 

1111 

1112def format_timestamp( 

1113 ts: Union[int, float, tuple, time.struct_time, datetime.datetime], 

1114) -> str: 

1115 """Formats a timestamp in the format used by HTTP. 

1116 

1117 The argument may be a numeric timestamp as returned by `time.time`, 

1118 a time tuple as returned by `time.gmtime`, or a `datetime.datetime` 

1119 object. Naive `datetime.datetime` objects are assumed to represent 

1120 UTC; aware objects are converted to UTC before formatting. 

1121 

1122 >>> format_timestamp(1359312200) 

1123 'Sun, 27 Jan 2013 18:43:20 GMT' 

1124 """ 

1125 if isinstance(ts, (int, float)): 

1126 time_num = ts 

1127 elif isinstance(ts, (tuple, time.struct_time)): 

1128 time_num = calendar.timegm(ts) 

1129 elif isinstance(ts, datetime.datetime): 

1130 time_num = calendar.timegm(ts.utctimetuple()) 

1131 else: 

1132 raise TypeError("unknown timestamp type: %r" % ts) 

1133 return email.utils.formatdate(time_num, usegmt=True) 

1134 

1135 

1136class RequestStartLine(typing.NamedTuple): 

1137 method: str 

1138 path: str 

1139 version: str 

1140 

1141 

1142def parse_request_start_line(line: str) -> RequestStartLine: 

1143 """Returns a (method, path, version) tuple for an HTTP 1.x request line. 

1144 

1145 The response is a `typing.NamedTuple`. 

1146 

1147 >>> parse_request_start_line("GET /foo HTTP/1.1") 

1148 RequestStartLine(method='GET', path='/foo', version='HTTP/1.1') 

1149 """ 

1150 match = _ABNF.request_line.fullmatch(line) 

1151 if not match: 

1152 # https://tools.ietf.org/html/rfc7230#section-3.1.1 

1153 # invalid request-line SHOULD respond with a 400 (Bad Request) 

1154 raise HTTPInputError("Malformed HTTP request line") 

1155 r = RequestStartLine(match.group(1), match.group(2), match.group(3)) 

1156 if not r.version.startswith("HTTP/1"): 

1157 # HTTP/2 and above doesn't use parse_request_start_line. 

1158 # This could be folded into the regex but we don't want to deviate 

1159 # from the ABNF in the RFCs. 

1160 raise HTTPInputError("Unexpected HTTP version %r" % r.version) 

1161 return r 

1162 

1163 

1164class ResponseStartLine(typing.NamedTuple): 

1165 version: str 

1166 code: int 

1167 reason: str 

1168 

1169 

1170def parse_response_start_line(line: str) -> ResponseStartLine: 

1171 """Returns a (version, code, reason) tuple for an HTTP 1.x response line. 

1172 

1173 The response is a `typing.NamedTuple`. 

1174 

1175 >>> parse_response_start_line("HTTP/1.1 200 OK") 

1176 ResponseStartLine(version='HTTP/1.1', code=200, reason='OK') 

1177 """ 

1178 match = _ABNF.status_line.fullmatch(line) 

1179 if not match: 

1180 raise HTTPInputError("Error parsing response start line") 

1181 r = ResponseStartLine(match.group(1), int(match.group(2)), match.group(3)) 

1182 if not r.version.startswith("HTTP/1"): 

1183 # HTTP/2 and above doesn't use parse_response_start_line. 

1184 raise HTTPInputError("Unexpected HTTP version %r" % r.version) 

1185 return r 

1186 

1187 

1188# _parseparam and _parse_header are copied and modified from python2.7's cgi.py 

1189# The original 2.7 version of this code did not correctly support some 

1190# combinations of semicolons and double quotes. 

1191# It has also been modified to support valueless parameters as seen in 

1192# websocket extension negotiations, and to support non-ascii values in 

1193# RFC 2231/5987 format. 

1194# 

1195# _parseparam has been further modified with the logic from 

1196# https://github.com/python/cpython/pull/136072/files 

1197# to avoid quadratic behavior when parsing semicolons in quoted strings. 

1198# 

1199# TODO: See if we can switch to email.message.Message for this functionality. 

1200# This is the suggested replacement for the cgi.py module now that cgi has 

1201# been removed from recent versions of Python. We need to verify that 

1202# the email module is consistent with our existing behavior (and all relevant 

1203# RFCs for multipart/form-data) before making this change. 

1204 

1205 

1206def _parseparam(s: str) -> Generator[str, None, None]: 

1207 start = 0 

1208 while s.find(";", start) == start: 

1209 start += 1 

1210 end = s.find(";", start) 

1211 ind, diff = start, 0 

1212 while end > 0: 

1213 diff += s.count('"', ind, end) - s.count('\\"', ind, end) 

1214 if diff % 2 == 0: 

1215 break 

1216 end, ind = ind, s.find(";", end + 1) 

1217 if end < 0: 

1218 end = len(s) 

1219 f = s[start:end] 

1220 yield f.strip() 

1221 start = end 

1222 

1223 

1224def _parse_header(line: str) -> Tuple[str, Dict[str, str]]: 

1225 r"""Parse a Content-type like header. 

1226 

1227 Return the main content-type and a dictionary of options. 

1228 

1229 >>> d = "form-data; foo=\"b\\\\a\\\"r\"; file*=utf-8''T%C3%A4st" 

1230 >>> ct, d = _parse_header(d) 

1231 >>> ct 

1232 'form-data' 

1233 >>> d['file'] == r'T\u00e4st'.encode('ascii').decode('unicode_escape') 

1234 True 

1235 >>> d['foo'] 

1236 'b\\a"r' 

1237 """ 

1238 parts = _parseparam(";" + line) 

1239 key = next(parts) 

1240 # decode_params treats first argument special, but we already stripped key 

1241 params = [("Dummy", "value")] 

1242 for p in parts: 

1243 i = p.find("=") 

1244 if i >= 0: 

1245 name = p[:i].strip().lower() 

1246 value = p[i + 1 :].strip() 

1247 params.append((name, native_str(value))) 

1248 decoded_params = email.utils.decode_params(params) 

1249 decoded_params.pop(0) # get rid of the dummy again 

1250 pdict = {} 

1251 for name, decoded_value in decoded_params: 

1252 value = email.utils.collapse_rfc2231_value(decoded_value) 

1253 if len(value) >= 2 and value[0] == '"' and value[-1] == '"': 

1254 value = value[1:-1] 

1255 pdict[name] = value 

1256 return key, pdict 

1257 

1258 

1259def _encode_header(key: str, pdict: Dict[str, str]) -> str: 

1260 """Inverse of _parse_header. 

1261 

1262 >>> _encode_header('permessage-deflate', 

1263 ... {'client_max_window_bits': 15, 'client_no_context_takeover': None}) 

1264 'permessage-deflate; client_max_window_bits=15; client_no_context_takeover' 

1265 """ 

1266 if not pdict: 

1267 return key 

1268 out = [key] 

1269 # Sort the parameters just to make it easy to test. 

1270 for k, v in sorted(pdict.items()): 

1271 if v is None: 

1272 out.append(k) 

1273 else: 

1274 # TODO: quote if necessary. 

1275 out.append(f"{k}={v}") 

1276 return "; ".join(out) 

1277 

1278 

1279def encode_username_password( 

1280 username: Union[str, bytes], password: Union[str, bytes] 

1281) -> bytes: 

1282 """Encodes a username/password pair in the format used by HTTP auth. 

1283 

1284 The return value is a byte string in the form ``username:password``. 

1285 

1286 .. versionadded:: 5.1 

1287 """ 

1288 if isinstance(username, unicode_type): 

1289 username = unicodedata.normalize("NFC", username) 

1290 if isinstance(password, unicode_type): 

1291 password = unicodedata.normalize("NFC", password) 

1292 return utf8(username) + b":" + utf8(password) 

1293 

1294 

1295def doctests(): 

1296 # type: () -> unittest.TestSuite 

1297 import doctest 

1298 

1299 return doctest.DocTestSuite(optionflags=doctest.ELLIPSIS) 

1300 

1301 

1302_netloc_re = re.compile(r"^(.+):(\d+)$") 

1303 

1304 

1305def split_host_and_port(netloc: str) -> Tuple[str, Optional[int]]: 

1306 """Returns ``(host, port)`` tuple from ``netloc``. 

1307 

1308 Returned ``port`` will be ``None`` if not present. 

1309 

1310 .. versionadded:: 4.1 

1311 """ 

1312 match = _netloc_re.match(netloc) 

1313 if match: 

1314 host = match.group(1) 

1315 port = int(match.group(2)) # type: Optional[int] 

1316 else: 

1317 host = netloc 

1318 port = None 

1319 return (host, port) 

1320 

1321 

1322def qs_to_qsl(qs: Dict[str, List[AnyStr]]) -> Iterable[Tuple[str, AnyStr]]: 

1323 """Generator converting a result of ``parse_qs`` back to name-value pairs. 

1324 

1325 .. versionadded:: 5.0 

1326 """ 

1327 for k, vs in qs.items(): 

1328 for v in vs: 

1329 yield (k, v) 

1330 

1331 

1332_unquote_sub = re.compile(r"\\(?:([0-3][0-7][0-7])|(.))").sub 

1333 

1334 

1335def _unquote_replace(m: re.Match) -> str: 

1336 if m[1]: 

1337 return chr(int(m[1], 8)) 

1338 else: 

1339 return m[2] 

1340 

1341 

1342def _unquote_cookie(s: str) -> str: 

1343 """Handle double quotes and escaping in cookie values. 

1344 

1345 This method is copied verbatim from the Python 3.13 standard 

1346 library (http.cookies._unquote) so we don't have to depend on 

1347 non-public interfaces. 

1348 """ 

1349 # If there aren't any doublequotes, 

1350 # then there can't be any special characters. See RFC 2109. 

1351 if s is None or len(s) < 2: 

1352 return s 

1353 if s[0] != '"' or s[-1] != '"': 

1354 return s 

1355 

1356 # We have to assume that we must decode this string. 

1357 # Down to work. 

1358 

1359 # Remove the "s 

1360 s = s[1:-1] 

1361 

1362 # Check for special sequences. Examples: 

1363 # \012 --> \n 

1364 # \" --> " 

1365 # 

1366 return _unquote_sub(_unquote_replace, s) 

1367 

1368 

1369def parse_cookie(cookie: str) -> Dict[str, str]: 

1370 """Parse a ``Cookie`` HTTP header into a dict of name/value pairs. 

1371 

1372 This function attempts to mimic browser cookie parsing behavior; 

1373 it specifically does not follow any of the cookie-related RFCs 

1374 (because browsers don't either). 

1375 

1376 The algorithm used is identical to that used by Django version 1.9.10. 

1377 

1378 .. versionadded:: 4.4.2 

1379 """ 

1380 cookiedict = {} 

1381 for chunk in cookie.split(";"): 

1382 if "=" in chunk: 

1383 key, val = chunk.split("=", 1) 

1384 else: 

1385 # Assume an empty name per 

1386 # https://bugzilla.mozilla.org/show_bug.cgi?id=169091 

1387 key, val = "", chunk 

1388 key, val = key.strip(), val.strip() 

1389 if key or val: 

1390 # unquote using Python's algorithm. 

1391 cookiedict[key] = _unquote_cookie(val) 

1392 return cookiedict