Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tornado/http1connection.py: 14%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

450 statements  

1# 

2# Copyright 2014 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""Client and server implementations of HTTP/1.x. 

17 

18.. versionadded:: 4.0 

19""" 

20 

21import asyncio 

22import logging 

23import re 

24import types 

25 

26from tornado.concurrent import ( 

27 Future, 

28 future_add_done_callback, 

29 future_set_result_unless_cancelled, 

30) 

31from tornado.escape import native_str, utf8 

32from tornado import gen 

33from tornado import httputil 

34from tornado import iostream 

35from tornado.log import gen_log, app_log 

36from tornado.util import GzipDecompressor 

37 

38 

39from typing import cast, Optional, Type, Awaitable, Callable, Union, Tuple 

40 

41CR_OR_LF_RE = re.compile(b"\r|\n") 

42 

43 

44class _QuietException(Exception): 

45 def __init__(self) -> None: 

46 pass 

47 

48 

49class _ExceptionLoggingContext: 

50 """Used with the ``with`` statement when calling delegate methods to 

51 log any exceptions with the given logger. Any exceptions caught are 

52 converted to _QuietException 

53 """ 

54 

55 def __init__(self, logger: logging.Logger) -> None: 

56 self.logger = logger 

57 

58 def __enter__(self) -> None: 

59 pass 

60 

61 def __exit__( 

62 self, 

63 typ: "Optional[Type[BaseException]]", 

64 value: Optional[BaseException], 

65 tb: types.TracebackType, 

66 ) -> None: 

67 if value is not None: 

68 assert typ is not None 

69 # Let HTTPInputError pass through to higher-level handler 

70 if isinstance(value, httputil.HTTPInputError): 

71 return None 

72 self.logger.error("Uncaught exception", exc_info=(typ, value, tb)) 

73 raise _QuietException 

74 

75 

76class HTTP1ConnectionParameters: 

77 """Parameters for `.HTTP1Connection` and `.HTTP1ServerConnection`.""" 

78 

79 def __init__( 

80 self, 

81 no_keep_alive: bool = False, 

82 chunk_size: Optional[int] = None, 

83 max_header_size: Optional[int] = None, 

84 header_timeout: Optional[float] = None, 

85 max_body_size: Optional[int] = None, 

86 body_timeout: Optional[float] = None, 

87 decompress: bool = False, 

88 ) -> None: 

89 """ 

90 :arg bool no_keep_alive: If true, always close the connection after 

91 one request. 

92 :arg int chunk_size: how much data to read into memory at once 

93 :arg int max_header_size: maximum amount of data for HTTP headers 

94 :arg float header_timeout: how long to wait for all headers (seconds) 

95 :arg int max_body_size: maximum amount of data for body 

96 :arg float body_timeout: how long to wait while reading body (seconds) 

97 :arg bool decompress: if true, decode incoming 

98 ``Content-Encoding: gzip`` 

99 """ 

100 self.no_keep_alive = no_keep_alive 

101 self.chunk_size = chunk_size or 65536 

102 self.max_header_size = max_header_size or 65536 

103 self.header_timeout = header_timeout 

104 self.max_body_size = max_body_size 

105 self.body_timeout = body_timeout 

106 self.decompress = decompress 

107 

108 

109class HTTP1Connection(httputil.HTTPConnection): 

110 """Implements the HTTP/1.x protocol. 

111 

112 This class can be on its own for clients, or via `HTTP1ServerConnection` 

113 for servers. 

114 """ 

115 

116 def __init__( 

117 self, 

118 stream: iostream.IOStream, 

119 is_client: bool, 

120 params: Optional[HTTP1ConnectionParameters] = None, 

121 context: Optional[object] = None, 

122 ) -> None: 

123 """ 

124 :arg stream: an `.IOStream` 

125 :arg bool is_client: client or server 

126 :arg params: a `.HTTP1ConnectionParameters` instance or ``None`` 

127 :arg context: an opaque application-defined object that can be accessed 

128 as ``connection.context``. 

129 """ 

130 self.is_client = is_client 

131 self.stream = stream 

132 if params is None: 

133 params = HTTP1ConnectionParameters() 

134 self.params = params 

135 self.context = context 

136 self.no_keep_alive = params.no_keep_alive 

137 # The body limits can be altered by the delegate, so save them 

138 # here instead of just referencing self.params later. 

139 self._max_body_size = ( 

140 self.params.max_body_size 

141 if self.params.max_body_size is not None 

142 else self.stream.max_buffer_size 

143 ) 

144 self._body_timeout = self.params.body_timeout 

145 # _write_finished is set to True when finish() has been called, 

146 # i.e. there will be no more data sent. Data may still be in the 

147 # stream's write buffer. 

148 self._write_finished = False 

149 # True when we have read the entire incoming body. 

150 self._read_finished = False 

151 # _finish_future resolves when all data has been written and flushed 

152 # to the IOStream. 

153 self._finish_future = Future() # type: Future[None] 

154 # If true, the connection should be closed after this request 

155 # (after the response has been written in the server side, 

156 # and after it has been read in the client) 

157 self._disconnect_on_finish = False 

158 self._clear_callbacks() 

159 # Save the start lines after we read or write them; they 

160 # affect later processing (e.g. 304 responses and HEAD methods 

161 # have content-length but no bodies) 

162 self._request_start_line = None # type: Optional[httputil.RequestStartLine] 

163 self._response_start_line = None # type: Optional[httputil.ResponseStartLine] 

164 self._request_headers = None # type: Optional[httputil.HTTPHeaders] 

165 # True if we are writing output with chunked encoding. 

166 self._chunking_output = False 

167 # While reading a body with a content-length, this is the 

168 # amount left to read. 

169 self._expected_content_remaining = None # type: Optional[int] 

170 # A Future for our outgoing writes, returned by IOStream.write. 

171 self._pending_write = None # type: Optional[Future[None]] 

172 

173 def read_response(self, delegate: httputil.HTTPMessageDelegate) -> Awaitable[bool]: 

174 """Read a single HTTP response. 

175 

176 Typical client-mode usage is to write a request using `write_headers`, 

177 `write`, and `finish`, and then call ``read_response``. 

178 

179 :arg delegate: a `.HTTPMessageDelegate` 

180 

181 Returns a `.Future` that resolves to a bool after the full response has 

182 been read. The result is true if the stream is still open. 

183 """ 

184 if self.params.decompress: 

185 delegate = _GzipMessageDelegate(delegate, self.params.chunk_size) 

186 return self._read_message(delegate) 

187 

188 async def _read_message(self, delegate: httputil.HTTPMessageDelegate) -> bool: 

189 need_delegate_close = False 

190 try: 

191 header_future = self.stream.read_until_regex( 

192 b"\r?\n\r?\n", max_bytes=self.params.max_header_size 

193 ) 

194 if self.params.header_timeout is None: 

195 header_data = await header_future 

196 else: 

197 try: 

198 header_data = await gen.with_timeout( 

199 self.stream.io_loop.time() + self.params.header_timeout, 

200 header_future, 

201 quiet_exceptions=iostream.StreamClosedError, 

202 ) 

203 except gen.TimeoutError: 

204 self.close() 

205 return False 

206 start_line_str, headers = self._parse_headers(header_data) 

207 if self.is_client: 

208 resp_start_line = httputil.parse_response_start_line(start_line_str) 

209 self._response_start_line = resp_start_line 

210 start_line = ( 

211 resp_start_line 

212 ) # type: Union[httputil.RequestStartLine, httputil.ResponseStartLine] 

213 # TODO: this will need to change to support client-side keepalive 

214 self._disconnect_on_finish = False 

215 else: 

216 req_start_line = httputil.parse_request_start_line(start_line_str) 

217 self._request_start_line = req_start_line 

218 self._request_headers = headers 

219 start_line = req_start_line 

220 self._disconnect_on_finish = not self._can_keep_alive( 

221 req_start_line, headers 

222 ) 

223 need_delegate_close = True 

224 with _ExceptionLoggingContext(app_log): 

225 header_recv_future = delegate.headers_received(start_line, headers) 

226 if header_recv_future is not None: 

227 await header_recv_future 

228 if self.stream is None: 

229 # We've been detached. 

230 need_delegate_close = False 

231 return False 

232 skip_body = False 

233 if self.is_client: 

234 assert isinstance(start_line, httputil.ResponseStartLine) 

235 if ( 

236 self._request_start_line is not None 

237 and self._request_start_line.method == "HEAD" 

238 ): 

239 skip_body = True 

240 code = start_line.code 

241 if code == 304: 

242 # 304 responses may include the content-length header 

243 # but do not actually have a body. 

244 # http://tools.ietf.org/html/rfc7230#section-3.3 

245 skip_body = True 

246 if 100 <= code < 200: 

247 # 1xx responses should never indicate the presence of 

248 # a body. 

249 if "Content-Length" in headers or "Transfer-Encoding" in headers: 

250 raise httputil.HTTPInputError( 

251 "Response code %d cannot have body" % code 

252 ) 

253 # TODO: client delegates will get headers_received twice 

254 # in the case of a 100-continue. Document or change? 

255 await self._read_message(delegate) 

256 else: 

257 if headers.get("Expect") == "100-continue" and not self._write_finished: 

258 self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n") 

259 if not skip_body: 

260 body_future = self._read_body( 

261 resp_start_line.code if self.is_client else 0, headers, delegate 

262 ) 

263 if body_future is not None: 

264 if self._body_timeout is None: 

265 await body_future 

266 else: 

267 try: 

268 await gen.with_timeout( 

269 self.stream.io_loop.time() + self._body_timeout, 

270 body_future, 

271 quiet_exceptions=iostream.StreamClosedError, 

272 ) 

273 except gen.TimeoutError: 

274 gen_log.info("Timeout reading body from %s", self.context) 

275 self.stream.close() 

276 return False 

277 self._read_finished = True 

278 if not self._write_finished or self.is_client: 

279 need_delegate_close = False 

280 with _ExceptionLoggingContext(app_log): 

281 delegate.finish() 

282 # If we're waiting for the application to produce an asynchronous 

283 # response, and we're not detached, register a close callback 

284 # on the stream (we didn't need one while we were reading) 

285 if ( 

286 not self._finish_future.done() 

287 and self.stream is not None 

288 and not self.stream.closed() 

289 ): 

290 self.stream.set_close_callback(self._on_connection_close) 

291 await self._finish_future 

292 if self.is_client and self._disconnect_on_finish: 

293 self.close() 

294 if self.stream is None: 

295 return False 

296 except httputil.HTTPInputError as e: 

297 gen_log.info("Malformed HTTP message from %s: %s", self.context, e) 

298 if not self.is_client: 

299 await self.stream.write(b"HTTP/1.1 400 Bad Request\r\n\r\n") 

300 self.close() 

301 return False 

302 finally: 

303 if need_delegate_close: 

304 with _ExceptionLoggingContext(app_log): 

305 delegate.on_connection_close() 

306 header_future = None # type: ignore 

307 self._clear_callbacks() 

308 return True 

309 

310 def _clear_callbacks(self) -> None: 

311 """Clears the callback attributes. 

312 

313 This allows the request handler to be garbage collected more 

314 quickly in CPython by breaking up reference cycles. 

315 """ 

316 self._write_callback = None 

317 self._write_future = None # type: Optional[Future[None]] 

318 self._close_callback = None # type: Optional[Callable[[], None]] 

319 if self.stream is not None: 

320 self.stream.set_close_callback(None) 

321 

322 def set_close_callback(self, callback: Optional[Callable[[], None]]) -> None: 

323 """Sets a callback that will be run when the connection is closed. 

324 

325 Note that this callback is slightly different from 

326 `.HTTPMessageDelegate.on_connection_close`: The 

327 `.HTTPMessageDelegate` method is called when the connection is 

328 closed while receiving a message. This callback is used when 

329 there is not an active delegate (for example, on the server 

330 side this callback is used if the client closes the connection 

331 after sending its request but before receiving all the 

332 response. 

333 """ 

334 self._close_callback = callback 

335 

336 def _on_connection_close(self) -> None: 

337 # Note that this callback is only registered on the IOStream 

338 # when we have finished reading the request and are waiting for 

339 # the application to produce its response. 

340 if self._close_callback is not None: 

341 callback = self._close_callback 

342 self._close_callback = None 

343 callback() 

344 if not self._finish_future.done(): 

345 future_set_result_unless_cancelled(self._finish_future, None) 

346 self._clear_callbacks() 

347 

348 def close(self) -> None: 

349 if self.stream is not None: 

350 self.stream.close() 

351 self._clear_callbacks() 

352 if not self._finish_future.done(): 

353 future_set_result_unless_cancelled(self._finish_future, None) 

354 

355 def detach(self) -> iostream.IOStream: 

356 """Take control of the underlying stream. 

357 

358 Returns the underlying `.IOStream` object and stops all further 

359 HTTP processing. May only be called during 

360 `.HTTPMessageDelegate.headers_received`. Intended for implementing 

361 protocols like websockets that tunnel over an HTTP handshake. 

362 """ 

363 self._clear_callbacks() 

364 stream = self.stream 

365 self.stream = None # type: ignore 

366 if not self._finish_future.done(): 

367 future_set_result_unless_cancelled(self._finish_future, None) 

368 return stream 

369 

370 def set_body_timeout(self, timeout: float) -> None: 

371 """Sets the body timeout for a single request. 

372 

373 Overrides the value from `.HTTP1ConnectionParameters`. 

374 """ 

375 self._body_timeout = timeout 

376 

377 def set_max_body_size(self, max_body_size: int) -> None: 

378 """Sets the body size limit for a single request. 

379 

380 Overrides the value from `.HTTP1ConnectionParameters`. 

381 """ 

382 self._max_body_size = max_body_size 

383 

384 def write_headers( 

385 self, 

386 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], 

387 headers: httputil.HTTPHeaders, 

388 chunk: Optional[bytes] = None, 

389 ) -> "Future[None]": 

390 """Implements `.HTTPConnection.write_headers`.""" 

391 lines = [] 

392 if self.is_client: 

393 assert isinstance(start_line, httputil.RequestStartLine) 

394 self._request_start_line = start_line 

395 lines.append(utf8(f"{start_line[0]} {start_line[1]} HTTP/1.1")) 

396 # Client requests with a non-empty body must have either a 

397 # Content-Length or a Transfer-Encoding. If Content-Length is not 

398 # present we'll add our Transfer-Encoding below. 

399 self._chunking_output = ( 

400 start_line.method in ("POST", "PUT", "PATCH") 

401 and "Content-Length" not in headers 

402 ) 

403 else: 

404 assert isinstance(start_line, httputil.ResponseStartLine) 

405 assert self._request_start_line is not None 

406 assert self._request_headers is not None 

407 self._response_start_line = start_line 

408 lines.append(utf8("HTTP/1.1 %d %s" % (start_line[1], start_line[2]))) 

409 self._chunking_output = ( 

410 # TODO: should this use 

411 # self._request_start_line.version or 

412 # start_line.version? 

413 self._request_start_line.version == "HTTP/1.1" 

414 # Omit payload header field for HEAD request. 

415 and self._request_start_line.method != "HEAD" 

416 # 1xx, 204 and 304 responses have no body (not even a zero-length 

417 # body), and so should not have either Content-Length or 

418 # Transfer-Encoding headers. 

419 and start_line.code not in (204, 304) 

420 and (start_line.code < 100 or start_line.code >= 200) 

421 # No need to chunk the output if a Content-Length is specified. 

422 and "Content-Length" not in headers 

423 ) 

424 # If connection to a 1.1 client will be closed, inform client 

425 if ( 

426 self._request_start_line.version == "HTTP/1.1" 

427 and self._disconnect_on_finish 

428 ): 

429 headers["Connection"] = "close" 

430 # If a 1.0 client asked for keep-alive, add the header. 

431 if ( 

432 self._request_start_line.version == "HTTP/1.0" 

433 and self._request_headers.get("Connection", "").lower() == "keep-alive" 

434 ): 

435 headers["Connection"] = "Keep-Alive" 

436 if self._chunking_output: 

437 headers["Transfer-Encoding"] = "chunked" 

438 if not self.is_client and ( 

439 self._request_start_line.method == "HEAD" 

440 or cast(httputil.ResponseStartLine, start_line).code == 304 

441 ): 

442 self._expected_content_remaining = 0 

443 elif "Content-Length" in headers: 

444 self._expected_content_remaining = parse_int(headers["Content-Length"]) 

445 else: 

446 self._expected_content_remaining = None 

447 # TODO: headers are supposed to be of type str, but we still have some 

448 # cases that let bytes slip through. Remove these native_str calls when those 

449 # are fixed. 

450 header_lines = ( 

451 native_str(n) + ": " + native_str(v) for n, v in headers.get_all() 

452 ) 

453 lines.extend(line.encode("latin1") for line in header_lines) 

454 for line in lines: 

455 if CR_OR_LF_RE.search(line): 

456 raise ValueError("Illegal characters (CR or LF) in header: %r" % line) 

457 future = None 

458 if self.stream.closed(): 

459 future = self._write_future = Future() 

460 future.set_exception(iostream.StreamClosedError()) 

461 future.exception() 

462 else: 

463 future = self._write_future = Future() 

464 data = b"\r\n".join(lines) + b"\r\n\r\n" 

465 if chunk: 

466 data += self._format_chunk(chunk) 

467 self._pending_write = self.stream.write(data) 

468 future_add_done_callback(self._pending_write, self._on_write_complete) 

469 return future 

470 

471 def _format_chunk(self, chunk: bytes) -> bytes: 

472 if self._expected_content_remaining is not None: 

473 self._expected_content_remaining -= len(chunk) 

474 if self._expected_content_remaining < 0: 

475 # Close the stream now to stop further framing errors. 

476 self.stream.close() 

477 raise httputil.HTTPOutputError( 

478 "Tried to write more data than Content-Length" 

479 ) 

480 if self._chunking_output and chunk: 

481 # Don't write out empty chunks because that means END-OF-STREAM 

482 # with chunked encoding 

483 return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n" 

484 else: 

485 return chunk 

486 

487 def write(self, chunk: bytes) -> "Future[None]": 

488 """Implements `.HTTPConnection.write`. 

489 

490 For backwards compatibility it is allowed but deprecated to 

491 skip `write_headers` and instead call `write()` with a 

492 pre-encoded header block. 

493 """ 

494 future = None 

495 if self.stream.closed(): 

496 future = self._write_future = Future() 

497 self._write_future.set_exception(iostream.StreamClosedError()) 

498 self._write_future.exception() 

499 else: 

500 future = self._write_future = Future() 

501 self._pending_write = self.stream.write(self._format_chunk(chunk)) 

502 future_add_done_callback(self._pending_write, self._on_write_complete) 

503 return future 

504 

505 def finish(self) -> None: 

506 """Implements `.HTTPConnection.finish`.""" 

507 if ( 

508 self._expected_content_remaining is not None 

509 and self._expected_content_remaining != 0 

510 and not self.stream.closed() 

511 ): 

512 self.stream.close() 

513 raise httputil.HTTPOutputError( 

514 "Tried to write %d bytes less than Content-Length" 

515 % self._expected_content_remaining 

516 ) 

517 if self._chunking_output: 

518 if not self.stream.closed(): 

519 self._pending_write = self.stream.write(b"0\r\n\r\n") 

520 self._pending_write.add_done_callback(self._on_write_complete) 

521 self._write_finished = True 

522 # If the app finished the request while we're still reading, 

523 # divert any remaining data away from the delegate and 

524 # close the connection when we're done sending our response. 

525 # Closing the connection is the only way to avoid reading the 

526 # whole input body. 

527 if not self._read_finished: 

528 self._disconnect_on_finish = True 

529 # No more data is coming, so instruct TCP to send any remaining 

530 # data immediately instead of waiting for a full packet or ack. 

531 self.stream.set_nodelay(True) 

532 if self._pending_write is None: 

533 self._finish_request(None) 

534 else: 

535 future_add_done_callback(self._pending_write, self._finish_request) 

536 

537 def _on_write_complete(self, future: "Future[None]") -> None: 

538 exc = future.exception() 

539 if exc is not None and not isinstance(exc, iostream.StreamClosedError): 

540 future.result() 

541 if self._write_callback is not None: 

542 callback = self._write_callback 

543 self._write_callback = None 

544 self.stream.io_loop.add_callback(callback) 

545 if self._write_future is not None: 

546 future = self._write_future 

547 self._write_future = None 

548 future_set_result_unless_cancelled(future, None) 

549 

550 def _can_keep_alive( 

551 self, start_line: httputil.RequestStartLine, headers: httputil.HTTPHeaders 

552 ) -> bool: 

553 if self.params.no_keep_alive: 

554 return False 

555 connection_header = headers.get("Connection") 

556 if connection_header is not None: 

557 connection_header = connection_header.lower() 

558 if start_line.version == "HTTP/1.1": 

559 return connection_header != "close" 

560 elif ( 

561 "Content-Length" in headers 

562 or is_transfer_encoding_chunked(headers) 

563 or getattr(start_line, "method", None) in ("HEAD", "GET") 

564 ): 

565 # start_line may be a request or response start line; only 

566 # the former has a method attribute. 

567 return connection_header == "keep-alive" 

568 return False 

569 

570 def _finish_request(self, future: "Optional[Future[None]]") -> None: 

571 self._clear_callbacks() 

572 if not self.is_client and self._disconnect_on_finish: 

573 self.close() 

574 return 

575 # Turn Nagle's algorithm back on, leaving the stream in its 

576 # default state for the next request. 

577 self.stream.set_nodelay(False) 

578 if not self._finish_future.done(): 

579 future_set_result_unless_cancelled(self._finish_future, None) 

580 

581 def _parse_headers(self, data: bytes) -> Tuple[str, httputil.HTTPHeaders]: 

582 # The lstrip removes newlines that some implementations sometimes 

583 # insert between messages of a reused connection. Per RFC 7230, 

584 # we SHOULD ignore at least one empty line before the request. 

585 # http://tools.ietf.org/html/rfc7230#section-3.5 

586 data_str = native_str(data.decode("latin1")).lstrip("\r\n") 

587 # RFC 7230 section allows for both CRLF and bare LF. 

588 eol = data_str.find("\n") 

589 start_line = data_str[:eol].rstrip("\r") 

590 headers = httputil.HTTPHeaders.parse(data_str[eol:]) 

591 return start_line, headers 

592 

593 def _read_body( 

594 self, 

595 code: int, 

596 headers: httputil.HTTPHeaders, 

597 delegate: httputil.HTTPMessageDelegate, 

598 ) -> Optional[Awaitable[None]]: 

599 if "Content-Length" in headers: 

600 if "," in headers["Content-Length"]: 

601 # Proxies sometimes cause Content-Length headers to get 

602 # duplicated. If all the values are identical then we can 

603 # use them but if they differ it's an error. 

604 pieces = re.split(r",\s*", headers["Content-Length"]) 

605 if any(i != pieces[0] for i in pieces): 

606 raise httputil.HTTPInputError( 

607 "Multiple unequal Content-Lengths: %r" 

608 % headers["Content-Length"] 

609 ) 

610 headers["Content-Length"] = pieces[0] 

611 

612 try: 

613 content_length: Optional[int] = parse_int(headers["Content-Length"]) 

614 except ValueError: 

615 # Handles non-integer Content-Length value. 

616 raise httputil.HTTPInputError( 

617 "Only integer Content-Length is allowed: %s" 

618 % headers["Content-Length"] 

619 ) 

620 

621 if cast(int, content_length) > self._max_body_size: 

622 raise httputil.HTTPInputError("Content-Length too long") 

623 else: 

624 content_length = None 

625 

626 is_chunked = is_transfer_encoding_chunked(headers) 

627 

628 if code == 204: 

629 # This response code is not allowed to have a non-empty body, 

630 # and has an implicit length of zero instead of read-until-close. 

631 # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3 

632 if is_chunked or content_length not in (None, 0): 

633 raise httputil.HTTPInputError( 

634 "Response with code %d should not have body" % code 

635 ) 

636 content_length = 0 

637 

638 if is_chunked: 

639 return self._read_chunked_body(delegate) 

640 if content_length is not None: 

641 return self._read_fixed_body(content_length, delegate) 

642 if self.is_client: 

643 return self._read_body_until_close(delegate) 

644 return None 

645 

646 async def _read_fixed_body( 

647 self, content_length: int, delegate: httputil.HTTPMessageDelegate 

648 ) -> None: 

649 while content_length > 0: 

650 body = await self.stream.read_bytes( 

651 min(self.params.chunk_size, content_length), partial=True 

652 ) 

653 content_length -= len(body) 

654 if not self._write_finished or self.is_client: 

655 with _ExceptionLoggingContext(app_log): 

656 ret = delegate.data_received(body) 

657 if ret is not None: 

658 await ret 

659 

660 async def _read_chunked_body(self, delegate: httputil.HTTPMessageDelegate) -> None: 

661 # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1 

662 total_size = 0 

663 while True: 

664 chunk_len_str = await self.stream.read_until(b"\r\n", max_bytes=64) 

665 try: 

666 chunk_len = parse_hex_int(native_str(chunk_len_str[:-2])) 

667 except ValueError: 

668 raise httputil.HTTPInputError("invalid chunk size") 

669 if chunk_len == 0: 

670 crlf = await self.stream.read_bytes(2) 

671 if crlf != b"\r\n": 

672 raise httputil.HTTPInputError( 

673 "improperly terminated chunked request" 

674 ) 

675 return 

676 total_size += chunk_len 

677 if total_size > self._max_body_size: 

678 raise httputil.HTTPInputError("chunked body too large") 

679 bytes_to_read = chunk_len 

680 while bytes_to_read: 

681 chunk = await self.stream.read_bytes( 

682 min(bytes_to_read, self.params.chunk_size), partial=True 

683 ) 

684 bytes_to_read -= len(chunk) 

685 if not self._write_finished or self.is_client: 

686 with _ExceptionLoggingContext(app_log): 

687 ret = delegate.data_received(chunk) 

688 if ret is not None: 

689 await ret 

690 # chunk ends with \r\n 

691 crlf = await self.stream.read_bytes(2) 

692 assert crlf == b"\r\n" 

693 

694 async def _read_body_until_close( 

695 self, delegate: httputil.HTTPMessageDelegate 

696 ) -> None: 

697 body = await self.stream.read_until_close() 

698 if not self._write_finished or self.is_client: 

699 with _ExceptionLoggingContext(app_log): 

700 ret = delegate.data_received(body) 

701 if ret is not None: 

702 await ret 

703 

704 

705class _GzipMessageDelegate(httputil.HTTPMessageDelegate): 

706 """Wraps an `HTTPMessageDelegate` to decode ``Content-Encoding: gzip``.""" 

707 

708 def __init__(self, delegate: httputil.HTTPMessageDelegate, chunk_size: int) -> None: 

709 self._delegate = delegate 

710 self._chunk_size = chunk_size 

711 self._decompressor = None # type: Optional[GzipDecompressor] 

712 

713 def headers_received( 

714 self, 

715 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], 

716 headers: httputil.HTTPHeaders, 

717 ) -> Optional[Awaitable[None]]: 

718 if headers.get("Content-Encoding", "").lower() == "gzip": 

719 self._decompressor = GzipDecompressor() 

720 # Downstream delegates will only see uncompressed data, 

721 # so rename the content-encoding header. 

722 # (but note that curl_httpclient doesn't do this). 

723 headers.add("X-Consumed-Content-Encoding", headers["Content-Encoding"]) 

724 del headers["Content-Encoding"] 

725 return self._delegate.headers_received(start_line, headers) 

726 

727 async def data_received(self, chunk: bytes) -> None: 

728 if self._decompressor: 

729 compressed_data = chunk 

730 while compressed_data: 

731 decompressed = self._decompressor.decompress( 

732 compressed_data, self._chunk_size 

733 ) 

734 if decompressed: 

735 ret = self._delegate.data_received(decompressed) 

736 if ret is not None: 

737 await ret 

738 compressed_data = self._decompressor.unconsumed_tail 

739 if compressed_data and not decompressed: 

740 raise httputil.HTTPInputError( 

741 "encountered unconsumed gzip data without making progress" 

742 ) 

743 else: 

744 ret = self._delegate.data_received(chunk) 

745 if ret is not None: 

746 await ret 

747 

748 def finish(self) -> None: 

749 if self._decompressor is not None: 

750 tail = self._decompressor.flush() 

751 if tail: 

752 # The tail should always be empty: decompress returned 

753 # all that it can in data_received and the only 

754 # purpose of the flush call is to detect errors such 

755 # as truncated input. If we did legitimately get a new 

756 # chunk at this point we'd need to change the 

757 # interface to make finish() a coroutine. 

758 raise ValueError( 

759 "decompressor.flush returned data; possible truncated input" 

760 ) 

761 return self._delegate.finish() 

762 

763 def on_connection_close(self) -> None: 

764 return self._delegate.on_connection_close() 

765 

766 

767class HTTP1ServerConnection: 

768 """An HTTP/1.x server.""" 

769 

770 def __init__( 

771 self, 

772 stream: iostream.IOStream, 

773 params: Optional[HTTP1ConnectionParameters] = None, 

774 context: Optional[object] = None, 

775 ) -> None: 

776 """ 

777 :arg stream: an `.IOStream` 

778 :arg params: a `.HTTP1ConnectionParameters` or None 

779 :arg context: an opaque application-defined object that is accessible 

780 as ``connection.context`` 

781 """ 

782 self.stream = stream 

783 if params is None: 

784 params = HTTP1ConnectionParameters() 

785 self.params = params 

786 self.context = context 

787 self._serving_future = None # type: Optional[Future[None]] 

788 

789 async def close(self) -> None: 

790 """Closes the connection. 

791 

792 Returns a `.Future` that resolves after the serving loop has exited. 

793 """ 

794 self.stream.close() 

795 # Block until the serving loop is done, but ignore any exceptions 

796 # (start_serving is already responsible for logging them). 

797 assert self._serving_future is not None 

798 try: 

799 await self._serving_future 

800 except Exception: 

801 pass 

802 

803 def start_serving(self, delegate: httputil.HTTPServerConnectionDelegate) -> None: 

804 """Starts serving requests on this connection. 

805 

806 :arg delegate: a `.HTTPServerConnectionDelegate` 

807 """ 

808 assert isinstance(delegate, httputil.HTTPServerConnectionDelegate) 

809 fut = gen.convert_yielded(self._server_request_loop(delegate)) 

810 self._serving_future = fut 

811 # Register the future on the IOLoop so its errors get logged. 

812 self.stream.io_loop.add_future(fut, lambda f: f.result()) 

813 

814 async def _server_request_loop( 

815 self, delegate: httputil.HTTPServerConnectionDelegate 

816 ) -> None: 

817 try: 

818 while True: 

819 conn = HTTP1Connection(self.stream, False, self.params, self.context) 

820 request_delegate = delegate.start_request(self, conn) 

821 try: 

822 ret = await conn.read_response(request_delegate) 

823 except ( 

824 iostream.StreamClosedError, 

825 iostream.UnsatisfiableReadError, 

826 asyncio.CancelledError, 

827 ): 

828 return 

829 except _QuietException: 

830 # This exception was already logged. 

831 conn.close() 

832 return 

833 except Exception: 

834 gen_log.error("Uncaught exception", exc_info=True) 

835 conn.close() 

836 return 

837 if not ret: 

838 return 

839 await asyncio.sleep(0) 

840 finally: 

841 delegate.on_close(self) 

842 

843 

844DIGITS = re.compile(r"[0-9]+") 

845HEXDIGITS = re.compile(r"[0-9a-fA-F]+") 

846 

847 

848def parse_int(s: str) -> int: 

849 """Parse a non-negative integer from a string.""" 

850 if DIGITS.fullmatch(s) is None: 

851 raise ValueError("not an integer: %r" % s) 

852 return int(s) 

853 

854 

855def parse_hex_int(s: str) -> int: 

856 """Parse a non-negative hexadecimal integer from a string.""" 

857 if HEXDIGITS.fullmatch(s) is None: 

858 raise ValueError("not a hexadecimal integer: %r" % s) 

859 return int(s, 16) 

860 

861 

862def is_transfer_encoding_chunked(headers: httputil.HTTPHeaders) -> bool: 

863 """Returns true if the headers specify Transfer-Encoding: chunked. 

864 

865 Raise httputil.HTTPInputError if any other transfer encoding is used. 

866 """ 

867 # Note that transfer-encoding is an area in which postel's law can lead 

868 # us astray. If a proxy and a backend server are liberal in what they accept, 

869 # but accept slightly different things, this can lead to mismatched framing 

870 # and request smuggling issues. Therefore we are as strict as possible here 

871 # (even technically going beyond the requirements of the RFCs: a value of 

872 # ",chunked" is legal but doesn't appear in practice for legitimate traffic) 

873 if "Transfer-Encoding" not in headers: 

874 return False 

875 if "Content-Length" in headers: 

876 # Message cannot contain both Content-Length and 

877 # Transfer-Encoding headers. 

878 # http://tools.ietf.org/html/rfc7230#section-3.3.3 

879 raise httputil.HTTPInputError( 

880 "Message with both Transfer-Encoding and Content-Length" 

881 ) 

882 if headers["Transfer-Encoding"].lower() == "chunked": 

883 return True 

884 # We do not support any transfer-encodings other than chunked, and we do not 

885 # expect to add any support because the concept of transfer-encoding has 

886 # been removed in HTTP/2. 

887 raise httputil.HTTPInputError( 

888 "Unsupported Transfer-Encoding %s" % headers["Transfer-Encoding"] 

889 )