Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/http1connection.py: 13%

424 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-03 06:10 +0000

1# 

2# Copyright 2014 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""Client and server implementations of HTTP/1.x. 

17 

18.. versionadded:: 4.0 

19""" 

20 

21import asyncio 

22import logging 

23import re 

24import types 

25 

26from tornado.concurrent import ( 

27 Future, 

28 future_add_done_callback, 

29 future_set_result_unless_cancelled, 

30) 

31from tornado.escape import native_str, utf8 

32from tornado import gen 

33from tornado import httputil 

34from tornado import iostream 

35from tornado.log import gen_log, app_log 

36from tornado.util import GzipDecompressor 

37 

38 

39from typing import cast, Optional, Type, Awaitable, Callable, Union, Tuple 

40 

41 

42class _QuietException(Exception): 

43 def __init__(self) -> None: 

44 pass 

45 

46 

47class _ExceptionLoggingContext(object): 

48 """Used with the ``with`` statement when calling delegate methods to 

49 log any exceptions with the given logger. Any exceptions caught are 

50 converted to _QuietException 

51 """ 

52 

53 def __init__(self, logger: logging.Logger) -> None: 

54 self.logger = logger 

55 

56 def __enter__(self) -> None: 

57 pass 

58 

59 def __exit__( 

60 self, 

61 typ: "Optional[Type[BaseException]]", 

62 value: Optional[BaseException], 

63 tb: types.TracebackType, 

64 ) -> None: 

65 if value is not None: 

66 assert typ is not None 

67 self.logger.error("Uncaught exception", exc_info=(typ, value, tb)) 

68 raise _QuietException 

69 

70 

71class HTTP1ConnectionParameters(object): 

72 """Parameters for `.HTTP1Connection` and `.HTTP1ServerConnection`.""" 

73 

74 def __init__( 

75 self, 

76 no_keep_alive: bool = False, 

77 chunk_size: Optional[int] = None, 

78 max_header_size: Optional[int] = None, 

79 header_timeout: Optional[float] = None, 

80 max_body_size: Optional[int] = None, 

81 body_timeout: Optional[float] = None, 

82 decompress: bool = False, 

83 ) -> None: 

84 """ 

85 :arg bool no_keep_alive: If true, always close the connection after 

86 one request. 

87 :arg int chunk_size: how much data to read into memory at once 

88 :arg int max_header_size: maximum amount of data for HTTP headers 

89 :arg float header_timeout: how long to wait for all headers (seconds) 

90 :arg int max_body_size: maximum amount of data for body 

91 :arg float body_timeout: how long to wait while reading body (seconds) 

92 :arg bool decompress: if true, decode incoming 

93 ``Content-Encoding: gzip`` 

94 """ 

95 self.no_keep_alive = no_keep_alive 

96 self.chunk_size = chunk_size or 65536 

97 self.max_header_size = max_header_size or 65536 

98 self.header_timeout = header_timeout 

99 self.max_body_size = max_body_size 

100 self.body_timeout = body_timeout 

101 self.decompress = decompress 

102 

103 

104class HTTP1Connection(httputil.HTTPConnection): 

105 """Implements the HTTP/1.x protocol. 

106 

107 This class can be on its own for clients, or via `HTTP1ServerConnection` 

108 for servers. 

109 """ 

110 

111 def __init__( 

112 self, 

113 stream: iostream.IOStream, 

114 is_client: bool, 

115 params: Optional[HTTP1ConnectionParameters] = None, 

116 context: Optional[object] = None, 

117 ) -> None: 

118 """ 

119 :arg stream: an `.IOStream` 

120 :arg bool is_client: client or server 

121 :arg params: a `.HTTP1ConnectionParameters` instance or ``None`` 

122 :arg context: an opaque application-defined object that can be accessed 

123 as ``connection.context``. 

124 """ 

125 self.is_client = is_client 

126 self.stream = stream 

127 if params is None: 

128 params = HTTP1ConnectionParameters() 

129 self.params = params 

130 self.context = context 

131 self.no_keep_alive = params.no_keep_alive 

132 # The body limits can be altered by the delegate, so save them 

133 # here instead of just referencing self.params later. 

134 self._max_body_size = ( 

135 self.params.max_body_size 

136 if self.params.max_body_size is not None 

137 else self.stream.max_buffer_size 

138 ) 

139 self._body_timeout = self.params.body_timeout 

140 # _write_finished is set to True when finish() has been called, 

141 # i.e. there will be no more data sent. Data may still be in the 

142 # stream's write buffer. 

143 self._write_finished = False 

144 # True when we have read the entire incoming body. 

145 self._read_finished = False 

146 # _finish_future resolves when all data has been written and flushed 

147 # to the IOStream. 

148 self._finish_future = Future() # type: Future[None] 

149 # If true, the connection should be closed after this request 

150 # (after the response has been written in the server side, 

151 # and after it has been read in the client) 

152 self._disconnect_on_finish = False 

153 self._clear_callbacks() 

154 # Save the start lines after we read or write them; they 

155 # affect later processing (e.g. 304 responses and HEAD methods 

156 # have content-length but no bodies) 

157 self._request_start_line = None # type: Optional[httputil.RequestStartLine] 

158 self._response_start_line = None # type: Optional[httputil.ResponseStartLine] 

159 self._request_headers = None # type: Optional[httputil.HTTPHeaders] 

160 # True if we are writing output with chunked encoding. 

161 self._chunking_output = False 

162 # While reading a body with a content-length, this is the 

163 # amount left to read. 

164 self._expected_content_remaining = None # type: Optional[int] 

165 # A Future for our outgoing writes, returned by IOStream.write. 

166 self._pending_write = None # type: Optional[Future[None]] 

167 

168 def read_response(self, delegate: httputil.HTTPMessageDelegate) -> Awaitable[bool]: 

169 """Read a single HTTP response. 

170 

171 Typical client-mode usage is to write a request using `write_headers`, 

172 `write`, and `finish`, and then call ``read_response``. 

173 

174 :arg delegate: a `.HTTPMessageDelegate` 

175 

176 Returns a `.Future` that resolves to a bool after the full response has 

177 been read. The result is true if the stream is still open. 

178 """ 

179 if self.params.decompress: 

180 delegate = _GzipMessageDelegate(delegate, self.params.chunk_size) 

181 return self._read_message(delegate) 

182 

183 async def _read_message(self, delegate: httputil.HTTPMessageDelegate) -> bool: 

184 need_delegate_close = False 

185 try: 

186 header_future = self.stream.read_until_regex( 

187 b"\r?\n\r?\n", max_bytes=self.params.max_header_size 

188 ) 

189 if self.params.header_timeout is None: 

190 header_data = await header_future 

191 else: 

192 try: 

193 header_data = await gen.with_timeout( 

194 self.stream.io_loop.time() + self.params.header_timeout, 

195 header_future, 

196 quiet_exceptions=iostream.StreamClosedError, 

197 ) 

198 except gen.TimeoutError: 

199 self.close() 

200 return False 

201 start_line_str, headers = self._parse_headers(header_data) 

202 if self.is_client: 

203 resp_start_line = httputil.parse_response_start_line(start_line_str) 

204 self._response_start_line = resp_start_line 

205 start_line = ( 

206 resp_start_line 

207 ) # type: Union[httputil.RequestStartLine, httputil.ResponseStartLine] 

208 # TODO: this will need to change to support client-side keepalive 

209 self._disconnect_on_finish = False 

210 else: 

211 req_start_line = httputil.parse_request_start_line(start_line_str) 

212 self._request_start_line = req_start_line 

213 self._request_headers = headers 

214 start_line = req_start_line 

215 self._disconnect_on_finish = not self._can_keep_alive( 

216 req_start_line, headers 

217 ) 

218 need_delegate_close = True 

219 with _ExceptionLoggingContext(app_log): 

220 header_recv_future = delegate.headers_received(start_line, headers) 

221 if header_recv_future is not None: 

222 await header_recv_future 

223 if self.stream is None: 

224 # We've been detached. 

225 need_delegate_close = False 

226 return False 

227 skip_body = False 

228 if self.is_client: 

229 assert isinstance(start_line, httputil.ResponseStartLine) 

230 if ( 

231 self._request_start_line is not None 

232 and self._request_start_line.method == "HEAD" 

233 ): 

234 skip_body = True 

235 code = start_line.code 

236 if code == 304: 

237 # 304 responses may include the content-length header 

238 # but do not actually have a body. 

239 # http://tools.ietf.org/html/rfc7230#section-3.3 

240 skip_body = True 

241 if 100 <= code < 200: 

242 # 1xx responses should never indicate the presence of 

243 # a body. 

244 if "Content-Length" in headers or "Transfer-Encoding" in headers: 

245 raise httputil.HTTPInputError( 

246 "Response code %d cannot have body" % code 

247 ) 

248 # TODO: client delegates will get headers_received twice 

249 # in the case of a 100-continue. Document or change? 

250 await self._read_message(delegate) 

251 else: 

252 if headers.get("Expect") == "100-continue" and not self._write_finished: 

253 self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n") 

254 if not skip_body: 

255 body_future = self._read_body( 

256 resp_start_line.code if self.is_client else 0, headers, delegate 

257 ) 

258 if body_future is not None: 

259 if self._body_timeout is None: 

260 await body_future 

261 else: 

262 try: 

263 await gen.with_timeout( 

264 self.stream.io_loop.time() + self._body_timeout, 

265 body_future, 

266 quiet_exceptions=iostream.StreamClosedError, 

267 ) 

268 except gen.TimeoutError: 

269 gen_log.info("Timeout reading body from %s", self.context) 

270 self.stream.close() 

271 return False 

272 self._read_finished = True 

273 if not self._write_finished or self.is_client: 

274 need_delegate_close = False 

275 with _ExceptionLoggingContext(app_log): 

276 delegate.finish() 

277 # If we're waiting for the application to produce an asynchronous 

278 # response, and we're not detached, register a close callback 

279 # on the stream (we didn't need one while we were reading) 

280 if ( 

281 not self._finish_future.done() 

282 and self.stream is not None 

283 and not self.stream.closed() 

284 ): 

285 self.stream.set_close_callback(self._on_connection_close) 

286 await self._finish_future 

287 if self.is_client and self._disconnect_on_finish: 

288 self.close() 

289 if self.stream is None: 

290 return False 

291 except httputil.HTTPInputError as e: 

292 gen_log.info("Malformed HTTP message from %s: %s", self.context, e) 

293 if not self.is_client: 

294 await self.stream.write(b"HTTP/1.1 400 Bad Request\r\n\r\n") 

295 self.close() 

296 return False 

297 finally: 

298 if need_delegate_close: 

299 with _ExceptionLoggingContext(app_log): 

300 delegate.on_connection_close() 

301 header_future = None # type: ignore 

302 self._clear_callbacks() 

303 return True 

304 

305 def _clear_callbacks(self) -> None: 

306 """Clears the callback attributes. 

307 

308 This allows the request handler to be garbage collected more 

309 quickly in CPython by breaking up reference cycles. 

310 """ 

311 self._write_callback = None 

312 self._write_future = None # type: Optional[Future[None]] 

313 self._close_callback = None # type: Optional[Callable[[], None]] 

314 if self.stream is not None: 

315 self.stream.set_close_callback(None) 

316 

317 def set_close_callback(self, callback: Optional[Callable[[], None]]) -> None: 

318 """Sets a callback that will be run when the connection is closed. 

319 

320 Note that this callback is slightly different from 

321 `.HTTPMessageDelegate.on_connection_close`: The 

322 `.HTTPMessageDelegate` method is called when the connection is 

323 closed while receiving a message. This callback is used when 

324 there is not an active delegate (for example, on the server 

325 side this callback is used if the client closes the connection 

326 after sending its request but before receiving all the 

327 response. 

328 """ 

329 self._close_callback = callback 

330 

331 def _on_connection_close(self) -> None: 

332 # Note that this callback is only registered on the IOStream 

333 # when we have finished reading the request and are waiting for 

334 # the application to produce its response. 

335 if self._close_callback is not None: 

336 callback = self._close_callback 

337 self._close_callback = None 

338 callback() 

339 if not self._finish_future.done(): 

340 future_set_result_unless_cancelled(self._finish_future, None) 

341 self._clear_callbacks() 

342 

343 def close(self) -> None: 

344 if self.stream is not None: 

345 self.stream.close() 

346 self._clear_callbacks() 

347 if not self._finish_future.done(): 

348 future_set_result_unless_cancelled(self._finish_future, None) 

349 

350 def detach(self) -> iostream.IOStream: 

351 """Take control of the underlying stream. 

352 

353 Returns the underlying `.IOStream` object and stops all further 

354 HTTP processing. May only be called during 

355 `.HTTPMessageDelegate.headers_received`. Intended for implementing 

356 protocols like websockets that tunnel over an HTTP handshake. 

357 """ 

358 self._clear_callbacks() 

359 stream = self.stream 

360 self.stream = None # type: ignore 

361 if not self._finish_future.done(): 

362 future_set_result_unless_cancelled(self._finish_future, None) 

363 return stream 

364 

365 def set_body_timeout(self, timeout: float) -> None: 

366 """Sets the body timeout for a single request. 

367 

368 Overrides the value from `.HTTP1ConnectionParameters`. 

369 """ 

370 self._body_timeout = timeout 

371 

372 def set_max_body_size(self, max_body_size: int) -> None: 

373 """Sets the body size limit for a single request. 

374 

375 Overrides the value from `.HTTP1ConnectionParameters`. 

376 """ 

377 self._max_body_size = max_body_size 

378 

379 def write_headers( 

380 self, 

381 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], 

382 headers: httputil.HTTPHeaders, 

383 chunk: Optional[bytes] = None, 

384 ) -> "Future[None]": 

385 """Implements `.HTTPConnection.write_headers`.""" 

386 lines = [] 

387 if self.is_client: 

388 assert isinstance(start_line, httputil.RequestStartLine) 

389 self._request_start_line = start_line 

390 lines.append(utf8("%s %s HTTP/1.1" % (start_line[0], start_line[1]))) 

391 # Client requests with a non-empty body must have either a 

392 # Content-Length or a Transfer-Encoding. 

393 self._chunking_output = ( 

394 start_line.method in ("POST", "PUT", "PATCH") 

395 and "Content-Length" not in headers 

396 and ( 

397 "Transfer-Encoding" not in headers 

398 or headers["Transfer-Encoding"] == "chunked" 

399 ) 

400 ) 

401 else: 

402 assert isinstance(start_line, httputil.ResponseStartLine) 

403 assert self._request_start_line is not None 

404 assert self._request_headers is not None 

405 self._response_start_line = start_line 

406 lines.append(utf8("HTTP/1.1 %d %s" % (start_line[1], start_line[2]))) 

407 self._chunking_output = ( 

408 # TODO: should this use 

409 # self._request_start_line.version or 

410 # start_line.version? 

411 self._request_start_line.version == "HTTP/1.1" 

412 # Omit payload header field for HEAD request. 

413 and self._request_start_line.method != "HEAD" 

414 # 1xx, 204 and 304 responses have no body (not even a zero-length 

415 # body), and so should not have either Content-Length or 

416 # Transfer-Encoding headers. 

417 and start_line.code not in (204, 304) 

418 and (start_line.code < 100 or start_line.code >= 200) 

419 # No need to chunk the output if a Content-Length is specified. 

420 and "Content-Length" not in headers 

421 # Applications are discouraged from touching Transfer-Encoding, 

422 # but if they do, leave it alone. 

423 and "Transfer-Encoding" not in headers 

424 ) 

425 # If connection to a 1.1 client will be closed, inform client 

426 if ( 

427 self._request_start_line.version == "HTTP/1.1" 

428 and self._disconnect_on_finish 

429 ): 

430 headers["Connection"] = "close" 

431 # If a 1.0 client asked for keep-alive, add the header. 

432 if ( 

433 self._request_start_line.version == "HTTP/1.0" 

434 and self._request_headers.get("Connection", "").lower() == "keep-alive" 

435 ): 

436 headers["Connection"] = "Keep-Alive" 

437 if self._chunking_output: 

438 headers["Transfer-Encoding"] = "chunked" 

439 if not self.is_client and ( 

440 self._request_start_line.method == "HEAD" 

441 or cast(httputil.ResponseStartLine, start_line).code == 304 

442 ): 

443 self._expected_content_remaining = 0 

444 elif "Content-Length" in headers: 

445 self._expected_content_remaining = int(headers["Content-Length"]) 

446 else: 

447 self._expected_content_remaining = None 

448 # TODO: headers are supposed to be of type str, but we still have some 

449 # cases that let bytes slip through. Remove these native_str calls when those 

450 # are fixed. 

451 header_lines = ( 

452 native_str(n) + ": " + native_str(v) for n, v in headers.get_all() 

453 ) 

454 lines.extend(line.encode("latin1") for line in header_lines) 

455 for line in lines: 

456 if b"\n" in line: 

457 raise ValueError("Newline in header: " + repr(line)) 

458 future = None 

459 if self.stream.closed(): 

460 future = self._write_future = Future() 

461 future.set_exception(iostream.StreamClosedError()) 

462 future.exception() 

463 else: 

464 future = self._write_future = Future() 

465 data = b"\r\n".join(lines) + b"\r\n\r\n" 

466 if chunk: 

467 data += self._format_chunk(chunk) 

468 self._pending_write = self.stream.write(data) 

469 future_add_done_callback(self._pending_write, self._on_write_complete) 

470 return future 

471 

472 def _format_chunk(self, chunk: bytes) -> bytes: 

473 if self._expected_content_remaining is not None: 

474 self._expected_content_remaining -= len(chunk) 

475 if self._expected_content_remaining < 0: 

476 # Close the stream now to stop further framing errors. 

477 self.stream.close() 

478 raise httputil.HTTPOutputError( 

479 "Tried to write more data than Content-Length" 

480 ) 

481 if self._chunking_output and chunk: 

482 # Don't write out empty chunks because that means END-OF-STREAM 

483 # with chunked encoding 

484 return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n" 

485 else: 

486 return chunk 

487 

488 def write(self, chunk: bytes) -> "Future[None]": 

489 """Implements `.HTTPConnection.write`. 

490 

491 For backwards compatibility it is allowed but deprecated to 

492 skip `write_headers` and instead call `write()` with a 

493 pre-encoded header block. 

494 """ 

495 future = None 

496 if self.stream.closed(): 

497 future = self._write_future = Future() 

498 self._write_future.set_exception(iostream.StreamClosedError()) 

499 self._write_future.exception() 

500 else: 

501 future = self._write_future = Future() 

502 self._pending_write = self.stream.write(self._format_chunk(chunk)) 

503 future_add_done_callback(self._pending_write, self._on_write_complete) 

504 return future 

505 

506 def finish(self) -> None: 

507 """Implements `.HTTPConnection.finish`.""" 

508 if ( 

509 self._expected_content_remaining is not None 

510 and self._expected_content_remaining != 0 

511 and not self.stream.closed() 

512 ): 

513 self.stream.close() 

514 raise httputil.HTTPOutputError( 

515 "Tried to write %d bytes less than Content-Length" 

516 % self._expected_content_remaining 

517 ) 

518 if self._chunking_output: 

519 if not self.stream.closed(): 

520 self._pending_write = self.stream.write(b"0\r\n\r\n") 

521 self._pending_write.add_done_callback(self._on_write_complete) 

522 self._write_finished = True 

523 # If the app finished the request while we're still reading, 

524 # divert any remaining data away from the delegate and 

525 # close the connection when we're done sending our response. 

526 # Closing the connection is the only way to avoid reading the 

527 # whole input body. 

528 if not self._read_finished: 

529 self._disconnect_on_finish = True 

530 # No more data is coming, so instruct TCP to send any remaining 

531 # data immediately instead of waiting for a full packet or ack. 

532 self.stream.set_nodelay(True) 

533 if self._pending_write is None: 

534 self._finish_request(None) 

535 else: 

536 future_add_done_callback(self._pending_write, self._finish_request) 

537 

538 def _on_write_complete(self, future: "Future[None]") -> None: 

539 exc = future.exception() 

540 if exc is not None and not isinstance(exc, iostream.StreamClosedError): 

541 future.result() 

542 if self._write_callback is not None: 

543 callback = self._write_callback 

544 self._write_callback = None 

545 self.stream.io_loop.add_callback(callback) 

546 if self._write_future is not None: 

547 future = self._write_future 

548 self._write_future = None 

549 future_set_result_unless_cancelled(future, None) 

550 

551 def _can_keep_alive( 

552 self, start_line: httputil.RequestStartLine, headers: httputil.HTTPHeaders 

553 ) -> bool: 

554 if self.params.no_keep_alive: 

555 return False 

556 connection_header = headers.get("Connection") 

557 if connection_header is not None: 

558 connection_header = connection_header.lower() 

559 if start_line.version == "HTTP/1.1": 

560 return connection_header != "close" 

561 elif ( 

562 "Content-Length" in headers 

563 or headers.get("Transfer-Encoding", "").lower() == "chunked" 

564 or getattr(start_line, "method", None) in ("HEAD", "GET") 

565 ): 

566 # start_line may be a request or response start line; only 

567 # the former has a method attribute. 

568 return connection_header == "keep-alive" 

569 return False 

570 

571 def _finish_request(self, future: "Optional[Future[None]]") -> None: 

572 self._clear_callbacks() 

573 if not self.is_client and self._disconnect_on_finish: 

574 self.close() 

575 return 

576 # Turn Nagle's algorithm back on, leaving the stream in its 

577 # default state for the next request. 

578 self.stream.set_nodelay(False) 

579 if not self._finish_future.done(): 

580 future_set_result_unless_cancelled(self._finish_future, None) 

581 

582 def _parse_headers(self, data: bytes) -> Tuple[str, httputil.HTTPHeaders]: 

583 # The lstrip removes newlines that some implementations sometimes 

584 # insert between messages of a reused connection. Per RFC 7230, 

585 # we SHOULD ignore at least one empty line before the request. 

586 # http://tools.ietf.org/html/rfc7230#section-3.5 

587 data_str = native_str(data.decode("latin1")).lstrip("\r\n") 

588 # RFC 7230 section allows for both CRLF and bare LF. 

589 eol = data_str.find("\n") 

590 start_line = data_str[:eol].rstrip("\r") 

591 headers = httputil.HTTPHeaders.parse(data_str[eol:]) 

592 return start_line, headers 

593 

594 def _read_body( 

595 self, 

596 code: int, 

597 headers: httputil.HTTPHeaders, 

598 delegate: httputil.HTTPMessageDelegate, 

599 ) -> Optional[Awaitable[None]]: 

600 if "Content-Length" in headers: 

601 if "Transfer-Encoding" in headers: 

602 # Response cannot contain both Content-Length and 

603 # Transfer-Encoding headers. 

604 # http://tools.ietf.org/html/rfc7230#section-3.3.3 

605 raise httputil.HTTPInputError( 

606 "Response with both Transfer-Encoding and Content-Length" 

607 ) 

608 if "," in headers["Content-Length"]: 

609 # Proxies sometimes cause Content-Length headers to get 

610 # duplicated. If all the values are identical then we can 

611 # use them but if they differ it's an error. 

612 pieces = re.split(r",\s*", headers["Content-Length"]) 

613 if any(i != pieces[0] for i in pieces): 

614 raise httputil.HTTPInputError( 

615 "Multiple unequal Content-Lengths: %r" 

616 % headers["Content-Length"] 

617 ) 

618 headers["Content-Length"] = pieces[0] 

619 

620 try: 

621 content_length = int(headers["Content-Length"]) # type: Optional[int] 

622 except ValueError: 

623 # Handles non-integer Content-Length value. 

624 raise httputil.HTTPInputError( 

625 "Only integer Content-Length is allowed: %s" 

626 % headers["Content-Length"] 

627 ) 

628 

629 if cast(int, content_length) > self._max_body_size: 

630 raise httputil.HTTPInputError("Content-Length too long") 

631 else: 

632 content_length = None 

633 

634 if code == 204: 

635 # This response code is not allowed to have a non-empty body, 

636 # and has an implicit length of zero instead of read-until-close. 

637 # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3 

638 if "Transfer-Encoding" in headers or content_length not in (None, 0): 

639 raise httputil.HTTPInputError( 

640 "Response with code %d should not have body" % code 

641 ) 

642 content_length = 0 

643 

644 if content_length is not None: 

645 return self._read_fixed_body(content_length, delegate) 

646 if headers.get("Transfer-Encoding", "").lower() == "chunked": 

647 return self._read_chunked_body(delegate) 

648 if self.is_client: 

649 return self._read_body_until_close(delegate) 

650 return None 

651 

652 async def _read_fixed_body( 

653 self, content_length: int, delegate: httputil.HTTPMessageDelegate 

654 ) -> None: 

655 while content_length > 0: 

656 body = await self.stream.read_bytes( 

657 min(self.params.chunk_size, content_length), partial=True 

658 ) 

659 content_length -= len(body) 

660 if not self._write_finished or self.is_client: 

661 with _ExceptionLoggingContext(app_log): 

662 ret = delegate.data_received(body) 

663 if ret is not None: 

664 await ret 

665 

666 async def _read_chunked_body(self, delegate: httputil.HTTPMessageDelegate) -> None: 

667 # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1 

668 total_size = 0 

669 while True: 

670 chunk_len_str = await self.stream.read_until(b"\r\n", max_bytes=64) 

671 chunk_len = int(chunk_len_str.strip(), 16) 

672 if chunk_len == 0: 

673 crlf = await self.stream.read_bytes(2) 

674 if crlf != b"\r\n": 

675 raise httputil.HTTPInputError( 

676 "improperly terminated chunked request" 

677 ) 

678 return 

679 total_size += chunk_len 

680 if total_size > self._max_body_size: 

681 raise httputil.HTTPInputError("chunked body too large") 

682 bytes_to_read = chunk_len 

683 while bytes_to_read: 

684 chunk = await self.stream.read_bytes( 

685 min(bytes_to_read, self.params.chunk_size), partial=True 

686 ) 

687 bytes_to_read -= len(chunk) 

688 if not self._write_finished or self.is_client: 

689 with _ExceptionLoggingContext(app_log): 

690 ret = delegate.data_received(chunk) 

691 if ret is not None: 

692 await ret 

693 # chunk ends with \r\n 

694 crlf = await self.stream.read_bytes(2) 

695 assert crlf == b"\r\n" 

696 

697 async def _read_body_until_close( 

698 self, delegate: httputil.HTTPMessageDelegate 

699 ) -> None: 

700 body = await self.stream.read_until_close() 

701 if not self._write_finished or self.is_client: 

702 with _ExceptionLoggingContext(app_log): 

703 ret = delegate.data_received(body) 

704 if ret is not None: 

705 await ret 

706 

707 

708class _GzipMessageDelegate(httputil.HTTPMessageDelegate): 

709 """Wraps an `HTTPMessageDelegate` to decode ``Content-Encoding: gzip``.""" 

710 

711 def __init__(self, delegate: httputil.HTTPMessageDelegate, chunk_size: int) -> None: 

712 self._delegate = delegate 

713 self._chunk_size = chunk_size 

714 self._decompressor = None # type: Optional[GzipDecompressor] 

715 

716 def headers_received( 

717 self, 

718 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], 

719 headers: httputil.HTTPHeaders, 

720 ) -> Optional[Awaitable[None]]: 

721 if headers.get("Content-Encoding", "").lower() == "gzip": 

722 self._decompressor = GzipDecompressor() 

723 # Downstream delegates will only see uncompressed data, 

724 # so rename the content-encoding header. 

725 # (but note that curl_httpclient doesn't do this). 

726 headers.add("X-Consumed-Content-Encoding", headers["Content-Encoding"]) 

727 del headers["Content-Encoding"] 

728 return self._delegate.headers_received(start_line, headers) 

729 

730 async def data_received(self, chunk: bytes) -> None: 

731 if self._decompressor: 

732 compressed_data = chunk 

733 while compressed_data: 

734 decompressed = self._decompressor.decompress( 

735 compressed_data, self._chunk_size 

736 ) 

737 if decompressed: 

738 ret = self._delegate.data_received(decompressed) 

739 if ret is not None: 

740 await ret 

741 compressed_data = self._decompressor.unconsumed_tail 

742 if compressed_data and not decompressed: 

743 raise httputil.HTTPInputError( 

744 "encountered unconsumed gzip data without making progress" 

745 ) 

746 else: 

747 ret = self._delegate.data_received(chunk) 

748 if ret is not None: 

749 await ret 

750 

751 def finish(self) -> None: 

752 if self._decompressor is not None: 

753 tail = self._decompressor.flush() 

754 if tail: 

755 # The tail should always be empty: decompress returned 

756 # all that it can in data_received and the only 

757 # purpose of the flush call is to detect errors such 

758 # as truncated input. If we did legitimately get a new 

759 # chunk at this point we'd need to change the 

760 # interface to make finish() a coroutine. 

761 raise ValueError( 

762 "decompressor.flush returned data; possible truncated input" 

763 ) 

764 return self._delegate.finish() 

765 

766 def on_connection_close(self) -> None: 

767 return self._delegate.on_connection_close() 

768 

769 

770class HTTP1ServerConnection(object): 

771 """An HTTP/1.x server.""" 

772 

773 def __init__( 

774 self, 

775 stream: iostream.IOStream, 

776 params: Optional[HTTP1ConnectionParameters] = None, 

777 context: Optional[object] = None, 

778 ) -> None: 

779 """ 

780 :arg stream: an `.IOStream` 

781 :arg params: a `.HTTP1ConnectionParameters` or None 

782 :arg context: an opaque application-defined object that is accessible 

783 as ``connection.context`` 

784 """ 

785 self.stream = stream 

786 if params is None: 

787 params = HTTP1ConnectionParameters() 

788 self.params = params 

789 self.context = context 

790 self._serving_future = None # type: Optional[Future[None]] 

791 

792 async def close(self) -> None: 

793 """Closes the connection. 

794 

795 Returns a `.Future` that resolves after the serving loop has exited. 

796 """ 

797 self.stream.close() 

798 # Block until the serving loop is done, but ignore any exceptions 

799 # (start_serving is already responsible for logging them). 

800 assert self._serving_future is not None 

801 try: 

802 await self._serving_future 

803 except Exception: 

804 pass 

805 

806 def start_serving(self, delegate: httputil.HTTPServerConnectionDelegate) -> None: 

807 """Starts serving requests on this connection. 

808 

809 :arg delegate: a `.HTTPServerConnectionDelegate` 

810 """ 

811 assert isinstance(delegate, httputil.HTTPServerConnectionDelegate) 

812 fut = gen.convert_yielded(self._server_request_loop(delegate)) 

813 self._serving_future = fut 

814 # Register the future on the IOLoop so its errors get logged. 

815 self.stream.io_loop.add_future(fut, lambda f: f.result()) 

816 

817 async def _server_request_loop( 

818 self, delegate: httputil.HTTPServerConnectionDelegate 

819 ) -> None: 

820 try: 

821 while True: 

822 conn = HTTP1Connection(self.stream, False, self.params, self.context) 

823 request_delegate = delegate.start_request(self, conn) 

824 try: 

825 ret = await conn.read_response(request_delegate) 

826 except ( 

827 iostream.StreamClosedError, 

828 iostream.UnsatisfiableReadError, 

829 asyncio.CancelledError, 

830 ): 

831 return 

832 except _QuietException: 

833 # This exception was already logged. 

834 conn.close() 

835 return 

836 except Exception: 

837 gen_log.error("Uncaught exception", exc_info=True) 

838 conn.close() 

839 return 

840 if not ret: 

841 return 

842 await asyncio.sleep(0) 

843 finally: 

844 delegate.on_close(self)