Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/http1connection.py: 14%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

445 statements  

1# 

2# Copyright 2014 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""Client and server implementations of HTTP/1.x. 

17 

18.. versionadded:: 4.0 

19""" 

20 

21import asyncio 

22import logging 

23import re 

24import types 

25 

26from tornado.concurrent import ( 

27 Future, 

28 future_add_done_callback, 

29 future_set_result_unless_cancelled, 

30) 

31from tornado.escape import native_str, utf8 

32from tornado import gen 

33from tornado import httputil 

34from tornado import iostream 

35from tornado.log import gen_log, app_log 

36from tornado.util import GzipDecompressor 

37 

38 

39from typing import cast, Optional, Type, Awaitable, Callable, Union, Tuple 

40 

41CR_OR_LF_RE = re.compile(b"\r|\n") 

42 

43 

44class _QuietException(Exception): 

45 def __init__(self) -> None: 

46 pass 

47 

48 

49class _ExceptionLoggingContext(object): 

50 """Used with the ``with`` statement when calling delegate methods to 

51 log any exceptions with the given logger. Any exceptions caught are 

52 converted to _QuietException 

53 """ 

54 

55 def __init__(self, logger: logging.Logger) -> None: 

56 self.logger = logger 

57 

58 def __enter__(self) -> None: 

59 pass 

60 

61 def __exit__( 

62 self, 

63 typ: "Optional[Type[BaseException]]", 

64 value: Optional[BaseException], 

65 tb: types.TracebackType, 

66 ) -> None: 

67 if value is not None: 

68 assert typ is not None 

69 self.logger.error("Uncaught exception", exc_info=(typ, value, tb)) 

70 raise _QuietException 

71 

72 

73class HTTP1ConnectionParameters(object): 

74 """Parameters for `.HTTP1Connection` and `.HTTP1ServerConnection`.""" 

75 

76 def __init__( 

77 self, 

78 no_keep_alive: bool = False, 

79 chunk_size: Optional[int] = None, 

80 max_header_size: Optional[int] = None, 

81 header_timeout: Optional[float] = None, 

82 max_body_size: Optional[int] = None, 

83 body_timeout: Optional[float] = None, 

84 decompress: bool = False, 

85 ) -> None: 

86 """ 

87 :arg bool no_keep_alive: If true, always close the connection after 

88 one request. 

89 :arg int chunk_size: how much data to read into memory at once 

90 :arg int max_header_size: maximum amount of data for HTTP headers 

91 :arg float header_timeout: how long to wait for all headers (seconds) 

92 :arg int max_body_size: maximum amount of data for body 

93 :arg float body_timeout: how long to wait while reading body (seconds) 

94 :arg bool decompress: if true, decode incoming 

95 ``Content-Encoding: gzip`` 

96 """ 

97 self.no_keep_alive = no_keep_alive 

98 self.chunk_size = chunk_size or 65536 

99 self.max_header_size = max_header_size or 65536 

100 self.header_timeout = header_timeout 

101 self.max_body_size = max_body_size 

102 self.body_timeout = body_timeout 

103 self.decompress = decompress 

104 

105 

106class HTTP1Connection(httputil.HTTPConnection): 

107 """Implements the HTTP/1.x protocol. 

108 

109 This class can be on its own for clients, or via `HTTP1ServerConnection` 

110 for servers. 

111 """ 

112 

113 def __init__( 

114 self, 

115 stream: iostream.IOStream, 

116 is_client: bool, 

117 params: Optional[HTTP1ConnectionParameters] = None, 

118 context: Optional[object] = None, 

119 ) -> None: 

120 """ 

121 :arg stream: an `.IOStream` 

122 :arg bool is_client: client or server 

123 :arg params: a `.HTTP1ConnectionParameters` instance or ``None`` 

124 :arg context: an opaque application-defined object that can be accessed 

125 as ``connection.context``. 

126 """ 

127 self.is_client = is_client 

128 self.stream = stream 

129 if params is None: 

130 params = HTTP1ConnectionParameters() 

131 self.params = params 

132 self.context = context 

133 self.no_keep_alive = params.no_keep_alive 

134 # The body limits can be altered by the delegate, so save them 

135 # here instead of just referencing self.params later. 

136 self._max_body_size = ( 

137 self.params.max_body_size 

138 if self.params.max_body_size is not None 

139 else self.stream.max_buffer_size 

140 ) 

141 self._body_timeout = self.params.body_timeout 

142 # _write_finished is set to True when finish() has been called, 

143 # i.e. there will be no more data sent. Data may still be in the 

144 # stream's write buffer. 

145 self._write_finished = False 

146 # True when we have read the entire incoming body. 

147 self._read_finished = False 

148 # _finish_future resolves when all data has been written and flushed 

149 # to the IOStream. 

150 self._finish_future = Future() # type: Future[None] 

151 # If true, the connection should be closed after this request 

152 # (after the response has been written in the server side, 

153 # and after it has been read in the client) 

154 self._disconnect_on_finish = False 

155 self._clear_callbacks() 

156 # Save the start lines after we read or write them; they 

157 # affect later processing (e.g. 304 responses and HEAD methods 

158 # have content-length but no bodies) 

159 self._request_start_line = None # type: Optional[httputil.RequestStartLine] 

160 self._response_start_line = None # type: Optional[httputil.ResponseStartLine] 

161 self._request_headers = None # type: Optional[httputil.HTTPHeaders] 

162 # True if we are writing output with chunked encoding. 

163 self._chunking_output = False 

164 # While reading a body with a content-length, this is the 

165 # amount left to read. 

166 self._expected_content_remaining = None # type: Optional[int] 

167 # A Future for our outgoing writes, returned by IOStream.write. 

168 self._pending_write = None # type: Optional[Future[None]] 

169 

170 def read_response(self, delegate: httputil.HTTPMessageDelegate) -> Awaitable[bool]: 

171 """Read a single HTTP response. 

172 

173 Typical client-mode usage is to write a request using `write_headers`, 

174 `write`, and `finish`, and then call ``read_response``. 

175 

176 :arg delegate: a `.HTTPMessageDelegate` 

177 

178 Returns a `.Future` that resolves to a bool after the full response has 

179 been read. The result is true if the stream is still open. 

180 """ 

181 if self.params.decompress: 

182 delegate = _GzipMessageDelegate(delegate, self.params.chunk_size) 

183 return self._read_message(delegate) 

184 

185 async def _read_message(self, delegate: httputil.HTTPMessageDelegate) -> bool: 

186 need_delegate_close = False 

187 try: 

188 header_future = self.stream.read_until_regex( 

189 b"\r?\n\r?\n", max_bytes=self.params.max_header_size 

190 ) 

191 if self.params.header_timeout is None: 

192 header_data = await header_future 

193 else: 

194 try: 

195 header_data = await gen.with_timeout( 

196 self.stream.io_loop.time() + self.params.header_timeout, 

197 header_future, 

198 quiet_exceptions=iostream.StreamClosedError, 

199 ) 

200 except gen.TimeoutError: 

201 self.close() 

202 return False 

203 start_line_str, headers = self._parse_headers(header_data) 

204 if self.is_client: 

205 resp_start_line = httputil.parse_response_start_line(start_line_str) 

206 self._response_start_line = resp_start_line 

207 start_line = ( 

208 resp_start_line 

209 ) # type: Union[httputil.RequestStartLine, httputil.ResponseStartLine] 

210 # TODO: this will need to change to support client-side keepalive 

211 self._disconnect_on_finish = False 

212 else: 

213 req_start_line = httputil.parse_request_start_line(start_line_str) 

214 self._request_start_line = req_start_line 

215 self._request_headers = headers 

216 start_line = req_start_line 

217 self._disconnect_on_finish = not self._can_keep_alive( 

218 req_start_line, headers 

219 ) 

220 need_delegate_close = True 

221 with _ExceptionLoggingContext(app_log): 

222 header_recv_future = delegate.headers_received(start_line, headers) 

223 if header_recv_future is not None: 

224 await header_recv_future 

225 if self.stream is None: 

226 # We've been detached. 

227 need_delegate_close = False 

228 return False 

229 skip_body = False 

230 if self.is_client: 

231 assert isinstance(start_line, httputil.ResponseStartLine) 

232 if ( 

233 self._request_start_line is not None 

234 and self._request_start_line.method == "HEAD" 

235 ): 

236 skip_body = True 

237 code = start_line.code 

238 if code == 304: 

239 # 304 responses may include the content-length header 

240 # but do not actually have a body. 

241 # http://tools.ietf.org/html/rfc7230#section-3.3 

242 skip_body = True 

243 if 100 <= code < 200: 

244 # 1xx responses should never indicate the presence of 

245 # a body. 

246 if "Content-Length" in headers or "Transfer-Encoding" in headers: 

247 raise httputil.HTTPInputError( 

248 "Response code %d cannot have body" % code 

249 ) 

250 # TODO: client delegates will get headers_received twice 

251 # in the case of a 100-continue. Document or change? 

252 await self._read_message(delegate) 

253 else: 

254 if headers.get("Expect") == "100-continue" and not self._write_finished: 

255 self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n") 

256 if not skip_body: 

257 body_future = self._read_body( 

258 resp_start_line.code if self.is_client else 0, headers, delegate 

259 ) 

260 if body_future is not None: 

261 if self._body_timeout is None: 

262 await body_future 

263 else: 

264 try: 

265 await gen.with_timeout( 

266 self.stream.io_loop.time() + self._body_timeout, 

267 body_future, 

268 quiet_exceptions=iostream.StreamClosedError, 

269 ) 

270 except gen.TimeoutError: 

271 gen_log.info("Timeout reading body from %s", self.context) 

272 self.stream.close() 

273 return False 

274 self._read_finished = True 

275 if not self._write_finished or self.is_client: 

276 need_delegate_close = False 

277 with _ExceptionLoggingContext(app_log): 

278 delegate.finish() 

279 # If we're waiting for the application to produce an asynchronous 

280 # response, and we're not detached, register a close callback 

281 # on the stream (we didn't need one while we were reading) 

282 if ( 

283 not self._finish_future.done() 

284 and self.stream is not None 

285 and not self.stream.closed() 

286 ): 

287 self.stream.set_close_callback(self._on_connection_close) 

288 await self._finish_future 

289 if self.is_client and self._disconnect_on_finish: 

290 self.close() 

291 if self.stream is None: 

292 return False 

293 except httputil.HTTPInputError as e: 

294 gen_log.info("Malformed HTTP message from %s: %s", self.context, e) 

295 if not self.is_client: 

296 await self.stream.write(b"HTTP/1.1 400 Bad Request\r\n\r\n") 

297 self.close() 

298 return False 

299 finally: 

300 if need_delegate_close: 

301 with _ExceptionLoggingContext(app_log): 

302 delegate.on_connection_close() 

303 header_future = None # type: ignore 

304 self._clear_callbacks() 

305 return True 

306 

307 def _clear_callbacks(self) -> None: 

308 """Clears the callback attributes. 

309 

310 This allows the request handler to be garbage collected more 

311 quickly in CPython by breaking up reference cycles. 

312 """ 

313 self._write_callback = None 

314 self._write_future = None # type: Optional[Future[None]] 

315 self._close_callback = None # type: Optional[Callable[[], None]] 

316 if self.stream is not None: 

317 self.stream.set_close_callback(None) 

318 

319 def set_close_callback(self, callback: Optional[Callable[[], None]]) -> None: 

320 """Sets a callback that will be run when the connection is closed. 

321 

322 Note that this callback is slightly different from 

323 `.HTTPMessageDelegate.on_connection_close`: The 

324 `.HTTPMessageDelegate` method is called when the connection is 

325 closed while receiving a message. This callback is used when 

326 there is not an active delegate (for example, on the server 

327 side this callback is used if the client closes the connection 

328 after sending its request but before receiving all the 

329 response. 

330 """ 

331 self._close_callback = callback 

332 

333 def _on_connection_close(self) -> None: 

334 # Note that this callback is only registered on the IOStream 

335 # when we have finished reading the request and are waiting for 

336 # the application to produce its response. 

337 if self._close_callback is not None: 

338 callback = self._close_callback 

339 self._close_callback = None 

340 callback() 

341 if not self._finish_future.done(): 

342 future_set_result_unless_cancelled(self._finish_future, None) 

343 self._clear_callbacks() 

344 

345 def close(self) -> None: 

346 if self.stream is not None: 

347 self.stream.close() 

348 self._clear_callbacks() 

349 if not self._finish_future.done(): 

350 future_set_result_unless_cancelled(self._finish_future, None) 

351 

352 def detach(self) -> iostream.IOStream: 

353 """Take control of the underlying stream. 

354 

355 Returns the underlying `.IOStream` object and stops all further 

356 HTTP processing. May only be called during 

357 `.HTTPMessageDelegate.headers_received`. Intended for implementing 

358 protocols like websockets that tunnel over an HTTP handshake. 

359 """ 

360 self._clear_callbacks() 

361 stream = self.stream 

362 self.stream = None # type: ignore 

363 if not self._finish_future.done(): 

364 future_set_result_unless_cancelled(self._finish_future, None) 

365 return stream 

366 

367 def set_body_timeout(self, timeout: float) -> None: 

368 """Sets the body timeout for a single request. 

369 

370 Overrides the value from `.HTTP1ConnectionParameters`. 

371 """ 

372 self._body_timeout = timeout 

373 

374 def set_max_body_size(self, max_body_size: int) -> None: 

375 """Sets the body size limit for a single request. 

376 

377 Overrides the value from `.HTTP1ConnectionParameters`. 

378 """ 

379 self._max_body_size = max_body_size 

380 

381 def write_headers( 

382 self, 

383 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], 

384 headers: httputil.HTTPHeaders, 

385 chunk: Optional[bytes] = None, 

386 ) -> "Future[None]": 

387 """Implements `.HTTPConnection.write_headers`.""" 

388 lines = [] 

389 if self.is_client: 

390 assert isinstance(start_line, httputil.RequestStartLine) 

391 self._request_start_line = start_line 

392 lines.append(utf8("%s %s HTTP/1.1" % (start_line[0], start_line[1]))) 

393 # Client requests with a non-empty body must have either a 

394 # Content-Length or a Transfer-Encoding. If Content-Length is not 

395 # present we'll add our Transfer-Encoding below. 

396 self._chunking_output = ( 

397 start_line.method in ("POST", "PUT", "PATCH") 

398 and "Content-Length" not in headers 

399 ) 

400 else: 

401 assert isinstance(start_line, httputil.ResponseStartLine) 

402 assert self._request_start_line is not None 

403 assert self._request_headers is not None 

404 self._response_start_line = start_line 

405 lines.append(utf8("HTTP/1.1 %d %s" % (start_line[1], start_line[2]))) 

406 self._chunking_output = ( 

407 # TODO: should this use 

408 # self._request_start_line.version or 

409 # start_line.version? 

410 self._request_start_line.version == "HTTP/1.1" 

411 # Omit payload header field for HEAD request. 

412 and self._request_start_line.method != "HEAD" 

413 # 1xx, 204 and 304 responses have no body (not even a zero-length 

414 # body), and so should not have either Content-Length or 

415 # Transfer-Encoding headers. 

416 and start_line.code not in (204, 304) 

417 and (start_line.code < 100 or start_line.code >= 200) 

418 # No need to chunk the output if a Content-Length is specified. 

419 and "Content-Length" not in headers 

420 ) 

421 # If connection to a 1.1 client will be closed, inform client 

422 if ( 

423 self._request_start_line.version == "HTTP/1.1" 

424 and self._disconnect_on_finish 

425 ): 

426 headers["Connection"] = "close" 

427 # If a 1.0 client asked for keep-alive, add the header. 

428 if ( 

429 self._request_start_line.version == "HTTP/1.0" 

430 and self._request_headers.get("Connection", "").lower() == "keep-alive" 

431 ): 

432 headers["Connection"] = "Keep-Alive" 

433 if self._chunking_output: 

434 headers["Transfer-Encoding"] = "chunked" 

435 if not self.is_client and ( 

436 self._request_start_line.method == "HEAD" 

437 or cast(httputil.ResponseStartLine, start_line).code == 304 

438 ): 

439 self._expected_content_remaining = 0 

440 elif "Content-Length" in headers: 

441 self._expected_content_remaining = parse_int(headers["Content-Length"]) 

442 else: 

443 self._expected_content_remaining = None 

444 # TODO: headers are supposed to be of type str, but we still have some 

445 # cases that let bytes slip through. Remove these native_str calls when those 

446 # are fixed. 

447 header_lines = ( 

448 native_str(n) + ": " + native_str(v) for n, v in headers.get_all() 

449 ) 

450 lines.extend(line.encode("latin1") for line in header_lines) 

451 for line in lines: 

452 if CR_OR_LF_RE.search(line): 

453 raise ValueError("Illegal characters (CR or LF) in header: %r" % line) 

454 future = None 

455 if self.stream.closed(): 

456 future = self._write_future = Future() 

457 future.set_exception(iostream.StreamClosedError()) 

458 future.exception() 

459 else: 

460 future = self._write_future = Future() 

461 data = b"\r\n".join(lines) + b"\r\n\r\n" 

462 if chunk: 

463 data += self._format_chunk(chunk) 

464 self._pending_write = self.stream.write(data) 

465 future_add_done_callback(self._pending_write, self._on_write_complete) 

466 return future 

467 

468 def _format_chunk(self, chunk: bytes) -> bytes: 

469 if self._expected_content_remaining is not None: 

470 self._expected_content_remaining -= len(chunk) 

471 if self._expected_content_remaining < 0: 

472 # Close the stream now to stop further framing errors. 

473 self.stream.close() 

474 raise httputil.HTTPOutputError( 

475 "Tried to write more data than Content-Length" 

476 ) 

477 if self._chunking_output and chunk: 

478 # Don't write out empty chunks because that means END-OF-STREAM 

479 # with chunked encoding 

480 return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n" 

481 else: 

482 return chunk 

483 

484 def write(self, chunk: bytes) -> "Future[None]": 

485 """Implements `.HTTPConnection.write`. 

486 

487 For backwards compatibility it is allowed but deprecated to 

488 skip `write_headers` and instead call `write()` with a 

489 pre-encoded header block. 

490 """ 

491 future = None 

492 if self.stream.closed(): 

493 future = self._write_future = Future() 

494 self._write_future.set_exception(iostream.StreamClosedError()) 

495 self._write_future.exception() 

496 else: 

497 future = self._write_future = Future() 

498 self._pending_write = self.stream.write(self._format_chunk(chunk)) 

499 future_add_done_callback(self._pending_write, self._on_write_complete) 

500 return future 

501 

502 def finish(self) -> None: 

503 """Implements `.HTTPConnection.finish`.""" 

504 if ( 

505 self._expected_content_remaining is not None 

506 and self._expected_content_remaining != 0 

507 and not self.stream.closed() 

508 ): 

509 self.stream.close() 

510 raise httputil.HTTPOutputError( 

511 "Tried to write %d bytes less than Content-Length" 

512 % self._expected_content_remaining 

513 ) 

514 if self._chunking_output: 

515 if not self.stream.closed(): 

516 self._pending_write = self.stream.write(b"0\r\n\r\n") 

517 self._pending_write.add_done_callback(self._on_write_complete) 

518 self._write_finished = True 

519 # If the app finished the request while we're still reading, 

520 # divert any remaining data away from the delegate and 

521 # close the connection when we're done sending our response. 

522 # Closing the connection is the only way to avoid reading the 

523 # whole input body. 

524 if not self._read_finished: 

525 self._disconnect_on_finish = True 

526 # No more data is coming, so instruct TCP to send any remaining 

527 # data immediately instead of waiting for a full packet or ack. 

528 self.stream.set_nodelay(True) 

529 if self._pending_write is None: 

530 self._finish_request(None) 

531 else: 

532 future_add_done_callback(self._pending_write, self._finish_request) 

533 

534 def _on_write_complete(self, future: "Future[None]") -> None: 

535 exc = future.exception() 

536 if exc is not None and not isinstance(exc, iostream.StreamClosedError): 

537 future.result() 

538 if self._write_callback is not None: 

539 callback = self._write_callback 

540 self._write_callback = None 

541 self.stream.io_loop.add_callback(callback) 

542 if self._write_future is not None: 

543 future = self._write_future 

544 self._write_future = None 

545 future_set_result_unless_cancelled(future, None) 

546 

547 def _can_keep_alive( 

548 self, start_line: httputil.RequestStartLine, headers: httputil.HTTPHeaders 

549 ) -> bool: 

550 if self.params.no_keep_alive: 

551 return False 

552 connection_header = headers.get("Connection") 

553 if connection_header is not None: 

554 connection_header = connection_header.lower() 

555 if start_line.version == "HTTP/1.1": 

556 return connection_header != "close" 

557 elif ( 

558 "Content-Length" in headers 

559 or is_transfer_encoding_chunked(headers) 

560 or getattr(start_line, "method", None) in ("HEAD", "GET") 

561 ): 

562 # start_line may be a request or response start line; only 

563 # the former has a method attribute. 

564 return connection_header == "keep-alive" 

565 return False 

566 

567 def _finish_request(self, future: "Optional[Future[None]]") -> None: 

568 self._clear_callbacks() 

569 if not self.is_client and self._disconnect_on_finish: 

570 self.close() 

571 return 

572 # Turn Nagle's algorithm back on, leaving the stream in its 

573 # default state for the next request. 

574 self.stream.set_nodelay(False) 

575 if not self._finish_future.done(): 

576 future_set_result_unless_cancelled(self._finish_future, None) 

577 

578 def _parse_headers(self, data: bytes) -> Tuple[str, httputil.HTTPHeaders]: 

579 # The lstrip removes newlines that some implementations sometimes 

580 # insert between messages of a reused connection. Per RFC 7230, 

581 # we SHOULD ignore at least one empty line before the request. 

582 # http://tools.ietf.org/html/rfc7230#section-3.5 

583 data_str = native_str(data.decode("latin1")).lstrip("\r\n") 

584 # RFC 7230 section allows for both CRLF and bare LF. 

585 eol = data_str.find("\n") 

586 start_line = data_str[:eol].rstrip("\r") 

587 headers = httputil.HTTPHeaders.parse(data_str[eol:]) 

588 return start_line, headers 

589 

590 def _read_body( 

591 self, 

592 code: int, 

593 headers: httputil.HTTPHeaders, 

594 delegate: httputil.HTTPMessageDelegate, 

595 ) -> Optional[Awaitable[None]]: 

596 if "Content-Length" in headers: 

597 if "," in headers["Content-Length"]: 

598 # Proxies sometimes cause Content-Length headers to get 

599 # duplicated. If all the values are identical then we can 

600 # use them but if they differ it's an error. 

601 pieces = re.split(r",\s*", headers["Content-Length"]) 

602 if any(i != pieces[0] for i in pieces): 

603 raise httputil.HTTPInputError( 

604 "Multiple unequal Content-Lengths: %r" 

605 % headers["Content-Length"] 

606 ) 

607 headers["Content-Length"] = pieces[0] 

608 

609 try: 

610 content_length: Optional[int] = parse_int(headers["Content-Length"]) 

611 except ValueError: 

612 # Handles non-integer Content-Length value. 

613 raise httputil.HTTPInputError( 

614 "Only integer Content-Length is allowed: %s" 

615 % headers["Content-Length"] 

616 ) 

617 

618 if cast(int, content_length) > self._max_body_size: 

619 raise httputil.HTTPInputError("Content-Length too long") 

620 else: 

621 content_length = None 

622 

623 is_chunked = is_transfer_encoding_chunked(headers) 

624 

625 if code == 204: 

626 # This response code is not allowed to have a non-empty body, 

627 # and has an implicit length of zero instead of read-until-close. 

628 # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3 

629 if is_chunked or content_length not in (None, 0): 

630 raise httputil.HTTPInputError( 

631 "Response with code %d should not have body" % code 

632 ) 

633 content_length = 0 

634 

635 if is_chunked: 

636 return self._read_chunked_body(delegate) 

637 if content_length is not None: 

638 return self._read_fixed_body(content_length, delegate) 

639 if self.is_client: 

640 return self._read_body_until_close(delegate) 

641 return None 

642 

643 async def _read_fixed_body( 

644 self, content_length: int, delegate: httputil.HTTPMessageDelegate 

645 ) -> None: 

646 while content_length > 0: 

647 body = await self.stream.read_bytes( 

648 min(self.params.chunk_size, content_length), partial=True 

649 ) 

650 content_length -= len(body) 

651 if not self._write_finished or self.is_client: 

652 with _ExceptionLoggingContext(app_log): 

653 ret = delegate.data_received(body) 

654 if ret is not None: 

655 await ret 

656 

657 async def _read_chunked_body(self, delegate: httputil.HTTPMessageDelegate) -> None: 

658 # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1 

659 total_size = 0 

660 while True: 

661 chunk_len_str = await self.stream.read_until(b"\r\n", max_bytes=64) 

662 try: 

663 chunk_len = parse_hex_int(native_str(chunk_len_str[:-2])) 

664 except ValueError: 

665 raise httputil.HTTPInputError("invalid chunk size") 

666 if chunk_len == 0: 

667 crlf = await self.stream.read_bytes(2) 

668 if crlf != b"\r\n": 

669 raise httputil.HTTPInputError( 

670 "improperly terminated chunked request" 

671 ) 

672 return 

673 total_size += chunk_len 

674 if total_size > self._max_body_size: 

675 raise httputil.HTTPInputError("chunked body too large") 

676 bytes_to_read = chunk_len 

677 while bytes_to_read: 

678 chunk = await self.stream.read_bytes( 

679 min(bytes_to_read, self.params.chunk_size), partial=True 

680 ) 

681 bytes_to_read -= len(chunk) 

682 if not self._write_finished or self.is_client: 

683 with _ExceptionLoggingContext(app_log): 

684 ret = delegate.data_received(chunk) 

685 if ret is not None: 

686 await ret 

687 # chunk ends with \r\n 

688 crlf = await self.stream.read_bytes(2) 

689 assert crlf == b"\r\n" 

690 

691 async def _read_body_until_close( 

692 self, delegate: httputil.HTTPMessageDelegate 

693 ) -> None: 

694 body = await self.stream.read_until_close() 

695 if not self._write_finished or self.is_client: 

696 with _ExceptionLoggingContext(app_log): 

697 ret = delegate.data_received(body) 

698 if ret is not None: 

699 await ret 

700 

701 

702class _GzipMessageDelegate(httputil.HTTPMessageDelegate): 

703 """Wraps an `HTTPMessageDelegate` to decode ``Content-Encoding: gzip``.""" 

704 

705 def __init__(self, delegate: httputil.HTTPMessageDelegate, chunk_size: int) -> None: 

706 self._delegate = delegate 

707 self._chunk_size = chunk_size 

708 self._decompressor = None # type: Optional[GzipDecompressor] 

709 

710 def headers_received( 

711 self, 

712 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], 

713 headers: httputil.HTTPHeaders, 

714 ) -> Optional[Awaitable[None]]: 

715 if headers.get("Content-Encoding", "").lower() == "gzip": 

716 self._decompressor = GzipDecompressor() 

717 # Downstream delegates will only see uncompressed data, 

718 # so rename the content-encoding header. 

719 # (but note that curl_httpclient doesn't do this). 

720 headers.add("X-Consumed-Content-Encoding", headers["Content-Encoding"]) 

721 del headers["Content-Encoding"] 

722 return self._delegate.headers_received(start_line, headers) 

723 

724 async def data_received(self, chunk: bytes) -> None: 

725 if self._decompressor: 

726 compressed_data = chunk 

727 while compressed_data: 

728 decompressed = self._decompressor.decompress( 

729 compressed_data, self._chunk_size 

730 ) 

731 if decompressed: 

732 ret = self._delegate.data_received(decompressed) 

733 if ret is not None: 

734 await ret 

735 compressed_data = self._decompressor.unconsumed_tail 

736 if compressed_data and not decompressed: 

737 raise httputil.HTTPInputError( 

738 "encountered unconsumed gzip data without making progress" 

739 ) 

740 else: 

741 ret = self._delegate.data_received(chunk) 

742 if ret is not None: 

743 await ret 

744 

745 def finish(self) -> None: 

746 if self._decompressor is not None: 

747 tail = self._decompressor.flush() 

748 if tail: 

749 # The tail should always be empty: decompress returned 

750 # all that it can in data_received and the only 

751 # purpose of the flush call is to detect errors such 

752 # as truncated input. If we did legitimately get a new 

753 # chunk at this point we'd need to change the 

754 # interface to make finish() a coroutine. 

755 raise ValueError( 

756 "decompressor.flush returned data; possible truncated input" 

757 ) 

758 return self._delegate.finish() 

759 

760 def on_connection_close(self) -> None: 

761 return self._delegate.on_connection_close() 

762 

763 

764class HTTP1ServerConnection(object): 

765 """An HTTP/1.x server.""" 

766 

767 def __init__( 

768 self, 

769 stream: iostream.IOStream, 

770 params: Optional[HTTP1ConnectionParameters] = None, 

771 context: Optional[object] = None, 

772 ) -> None: 

773 """ 

774 :arg stream: an `.IOStream` 

775 :arg params: a `.HTTP1ConnectionParameters` or None 

776 :arg context: an opaque application-defined object that is accessible 

777 as ``connection.context`` 

778 """ 

779 self.stream = stream 

780 if params is None: 

781 params = HTTP1ConnectionParameters() 

782 self.params = params 

783 self.context = context 

784 self._serving_future = None # type: Optional[Future[None]] 

785 

786 async def close(self) -> None: 

787 """Closes the connection. 

788 

789 Returns a `.Future` that resolves after the serving loop has exited. 

790 """ 

791 self.stream.close() 

792 # Block until the serving loop is done, but ignore any exceptions 

793 # (start_serving is already responsible for logging them). 

794 assert self._serving_future is not None 

795 try: 

796 await self._serving_future 

797 except Exception: 

798 pass 

799 

800 def start_serving(self, delegate: httputil.HTTPServerConnectionDelegate) -> None: 

801 """Starts serving requests on this connection. 

802 

803 :arg delegate: a `.HTTPServerConnectionDelegate` 

804 """ 

805 assert isinstance(delegate, httputil.HTTPServerConnectionDelegate) 

806 fut = gen.convert_yielded(self._server_request_loop(delegate)) 

807 self._serving_future = fut 

808 # Register the future on the IOLoop so its errors get logged. 

809 self.stream.io_loop.add_future(fut, lambda f: f.result()) 

810 

811 async def _server_request_loop( 

812 self, delegate: httputil.HTTPServerConnectionDelegate 

813 ) -> None: 

814 try: 

815 while True: 

816 conn = HTTP1Connection(self.stream, False, self.params, self.context) 

817 request_delegate = delegate.start_request(self, conn) 

818 try: 

819 ret = await conn.read_response(request_delegate) 

820 except ( 

821 iostream.StreamClosedError, 

822 iostream.UnsatisfiableReadError, 

823 asyncio.CancelledError, 

824 ): 

825 return 

826 except _QuietException: 

827 # This exception was already logged. 

828 conn.close() 

829 return 

830 except Exception: 

831 gen_log.error("Uncaught exception", exc_info=True) 

832 conn.close() 

833 return 

834 if not ret: 

835 return 

836 await asyncio.sleep(0) 

837 finally: 

838 delegate.on_close(self) 

839 

840 

841DIGITS = re.compile(r"[0-9]+") 

842HEXDIGITS = re.compile(r"[0-9a-fA-F]+") 

843 

844 

845def parse_int(s: str) -> int: 

846 """Parse a non-negative integer from a string.""" 

847 if DIGITS.fullmatch(s) is None: 

848 raise ValueError("not an integer: %r" % s) 

849 return int(s) 

850 

851 

852def parse_hex_int(s: str) -> int: 

853 """Parse a non-negative hexadecimal integer from a string.""" 

854 if HEXDIGITS.fullmatch(s) is None: 

855 raise ValueError("not a hexadecimal integer: %r" % s) 

856 return int(s, 16) 

857 

858 

859def is_transfer_encoding_chunked(headers: httputil.HTTPHeaders) -> bool: 

860 """Returns true if the headers specify Transfer-Encoding: chunked. 

861 

862 Raise httputil.HTTPInputError if any other transfer encoding is used. 

863 """ 

864 # Note that transfer-encoding is an area in which postel's law can lead 

865 # us astray. If a proxy and a backend server are liberal in what they accept, 

866 # but accept slightly different things, this can lead to mismatched framing 

867 # and request smuggling issues. Therefore we are as strict as possible here 

868 # (even technically going beyond the requirements of the RFCs: a value of 

869 # ",chunked" is legal but doesn't appear in practice for legitimate traffic) 

870 if "Transfer-Encoding" not in headers: 

871 return False 

872 if "Content-Length" in headers: 

873 # Message cannot contain both Content-Length and 

874 # Transfer-Encoding headers. 

875 # http://tools.ietf.org/html/rfc7230#section-3.3.3 

876 raise httputil.HTTPInputError( 

877 "Message with both Transfer-Encoding and Content-Length" 

878 ) 

879 if headers["Transfer-Encoding"].lower() == "chunked": 

880 return True 

881 # We do not support any transfer-encodings other than chunked, and we do not 

882 # expect to add any support because the concept of transfer-encoding has 

883 # been removed in HTTP/2. 

884 raise httputil.HTTPInputError( 

885 "Unsupported Transfer-Encoding %s" % headers["Transfer-Encoding"] 

886 )