Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tornado/http1connection.py: 13%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

455 statements  

1# 

2# Copyright 2014 Facebook 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""Client and server implementations of HTTP/1.x. 

17 

18.. versionadded:: 4.0 

19""" 

20 

21import asyncio 

22import logging 

23import re 

24import types 

25 

26from tornado.concurrent import ( 

27 Future, 

28 future_add_done_callback, 

29 future_set_result_unless_cancelled, 

30) 

31from tornado.escape import native_str, utf8 

32from tornado import gen 

33from tornado import httputil 

34from tornado import iostream 

35from tornado.log import gen_log, app_log 

36from tornado.util import GzipDecompressor 

37 

38 

39from typing import cast, Optional, Type, Awaitable, Callable, Union, Tuple 

40 

41CR_OR_LF_RE = re.compile(b"\r|\n") 

42 

43 

44class _QuietException(Exception): 

45 def __init__(self) -> None: 

46 pass 

47 

48 

49class _ExceptionLoggingContext: 

50 """Used with the ``with`` statement when calling delegate methods to 

51 log any exceptions with the given logger. Any exceptions caught are 

52 converted to _QuietException 

53 """ 

54 

55 def __init__(self, logger: logging.Logger) -> None: 

56 self.logger = logger 

57 

58 def __enter__(self) -> None: 

59 pass 

60 

61 def __exit__( 

62 self, 

63 typ: "Optional[Type[BaseException]]", 

64 value: Optional[BaseException], 

65 tb: types.TracebackType, 

66 ) -> None: 

67 if value is not None: 

68 assert typ is not None 

69 # Let HTTPInputError pass through to higher-level handler 

70 if isinstance(value, httputil.HTTPInputError): 

71 return None 

72 self.logger.error("Uncaught exception", exc_info=(typ, value, tb)) 

73 raise _QuietException 

74 

75 

76class HTTP1ConnectionParameters: 

77 """Parameters for `.HTTP1Connection` and `.HTTP1ServerConnection`.""" 

78 

79 def __init__( 

80 self, 

81 no_keep_alive: bool = False, 

82 chunk_size: Optional[int] = None, 

83 max_header_size: Optional[int] = None, 

84 header_timeout: Optional[float] = None, 

85 max_body_size: Optional[int] = None, 

86 body_timeout: Optional[float] = None, 

87 decompress: bool = False, 

88 ) -> None: 

89 """ 

90 :arg bool no_keep_alive: If true, always close the connection after 

91 one request. 

92 :arg int chunk_size: how much data to read into memory at once 

93 :arg int max_header_size: maximum amount of data for HTTP headers 

94 :arg float header_timeout: how long to wait for all headers (seconds) 

95 :arg int max_body_size: maximum amount of data for body 

96 :arg float body_timeout: how long to wait while reading body (seconds) 

97 :arg bool decompress: if true, decode incoming 

98 ``Content-Encoding: gzip`` 

99 """ 

100 self.no_keep_alive = no_keep_alive 

101 self.chunk_size = chunk_size or 65536 

102 self.max_header_size = max_header_size or 65536 

103 self.header_timeout = header_timeout 

104 self.max_body_size = max_body_size 

105 self.body_timeout = body_timeout 

106 self.decompress = decompress 

107 

108 

109class HTTP1Connection(httputil.HTTPConnection): 

110 """Implements the HTTP/1.x protocol. 

111 

112 This class can be on its own for clients, or via `HTTP1ServerConnection` 

113 for servers. 

114 """ 

115 

116 def __init__( 

117 self, 

118 stream: iostream.IOStream, 

119 is_client: bool, 

120 params: Optional[HTTP1ConnectionParameters] = None, 

121 context: Optional[object] = None, 

122 ) -> None: 

123 """ 

124 :arg stream: an `.IOStream` 

125 :arg bool is_client: client or server 

126 :arg params: a `.HTTP1ConnectionParameters` instance or ``None`` 

127 :arg context: an opaque application-defined object that can be accessed 

128 as ``connection.context``. 

129 """ 

130 self.is_client = is_client 

131 self.stream = stream 

132 if params is None: 

133 params = HTTP1ConnectionParameters() 

134 self.params = params 

135 self.context = context 

136 self.no_keep_alive = params.no_keep_alive 

137 # The body limits can be altered by the delegate, so save them 

138 # here instead of just referencing self.params later. 

139 self._max_body_size = ( 

140 self.params.max_body_size 

141 if self.params.max_body_size is not None 

142 else self.stream.max_buffer_size 

143 ) 

144 self._body_timeout = self.params.body_timeout 

145 # _write_finished is set to True when finish() has been called, 

146 # i.e. there will be no more data sent. Data may still be in the 

147 # stream's write buffer. 

148 self._write_finished = False 

149 # True when we have read the entire incoming body. 

150 self._read_finished = False 

151 # _finish_future resolves when all data has been written and flushed 

152 # to the IOStream. 

153 self._finish_future = Future() # type: Future[None] 

154 # If true, the connection should be closed after this request 

155 # (after the response has been written in the server side, 

156 # and after it has been read in the client) 

157 self._disconnect_on_finish = False 

158 self._clear_callbacks() 

159 # Save the start lines after we read or write them; they 

160 # affect later processing (e.g. 304 responses and HEAD methods 

161 # have content-length but no bodies) 

162 self._request_start_line = None # type: Optional[httputil.RequestStartLine] 

163 self._response_start_line = None # type: Optional[httputil.ResponseStartLine] 

164 self._request_headers = None # type: Optional[httputil.HTTPHeaders] 

165 # True if we are writing output with chunked encoding. 

166 self._chunking_output = False 

167 # While reading a body with a content-length, this is the 

168 # amount left to read. 

169 self._expected_content_remaining = None # type: Optional[int] 

170 # A Future for our outgoing writes, returned by IOStream.write. 

171 self._pending_write = None # type: Optional[Future[None]] 

172 

173 def read_response(self, delegate: httputil.HTTPMessageDelegate) -> Awaitable[bool]: 

174 """Read a single HTTP response. 

175 

176 Typical client-mode usage is to write a request using `write_headers`, 

177 `write`, and `finish`, and then call ``read_response``. 

178 

179 :arg delegate: a `.HTTPMessageDelegate` 

180 

181 Returns a `.Future` that resolves to a bool after the full response has 

182 been read. The result is true if the stream is still open. 

183 """ 

184 if self.params.decompress: 

185 delegate = _GzipMessageDelegate( 

186 delegate, self.params.chunk_size, self._max_body_size 

187 ) 

188 return self._read_message(delegate) 

189 

190 async def _read_message(self, delegate: httputil.HTTPMessageDelegate) -> bool: 

191 need_delegate_close = False 

192 try: 

193 header_future = self.stream.read_until_regex( 

194 b"\r?\n\r?\n", max_bytes=self.params.max_header_size 

195 ) 

196 if self.params.header_timeout is None: 

197 header_data = await header_future 

198 else: 

199 try: 

200 header_data = await gen.with_timeout( 

201 self.stream.io_loop.time() + self.params.header_timeout, 

202 header_future, 

203 quiet_exceptions=iostream.StreamClosedError, 

204 ) 

205 except gen.TimeoutError: 

206 self.close() 

207 return False 

208 start_line_str, headers = self._parse_headers(header_data) 

209 if self.is_client: 

210 resp_start_line = httputil.parse_response_start_line(start_line_str) 

211 self._response_start_line = resp_start_line 

212 start_line = ( 

213 resp_start_line 

214 ) # type: Union[httputil.RequestStartLine, httputil.ResponseStartLine] 

215 # TODO: this will need to change to support client-side keepalive 

216 self._disconnect_on_finish = False 

217 else: 

218 req_start_line = httputil.parse_request_start_line(start_line_str) 

219 self._request_start_line = req_start_line 

220 self._request_headers = headers 

221 start_line = req_start_line 

222 self._disconnect_on_finish = not self._can_keep_alive( 

223 req_start_line, headers 

224 ) 

225 need_delegate_close = True 

226 with _ExceptionLoggingContext(app_log): 

227 header_recv_future = delegate.headers_received(start_line, headers) 

228 if header_recv_future is not None: 

229 await header_recv_future 

230 if self.stream is None: 

231 # We've been detached. 

232 need_delegate_close = False 

233 return False 

234 skip_body = False 

235 if self.is_client: 

236 assert isinstance(start_line, httputil.ResponseStartLine) 

237 if ( 

238 self._request_start_line is not None 

239 and self._request_start_line.method == "HEAD" 

240 ): 

241 skip_body = True 

242 code = start_line.code 

243 if code == 304: 

244 # 304 responses may include the content-length header 

245 # but do not actually have a body. 

246 # http://tools.ietf.org/html/rfc7230#section-3.3 

247 skip_body = True 

248 if 100 <= code < 200: 

249 # 1xx responses should never indicate the presence of 

250 # a body. 

251 if "Content-Length" in headers or "Transfer-Encoding" in headers: 

252 raise httputil.HTTPInputError( 

253 "Response code %d cannot have body" % code 

254 ) 

255 # TODO: client delegates will get headers_received twice 

256 # in the case of a 100-continue. Document or change? 

257 await self._read_message(delegate) 

258 else: 

259 if headers.get("Expect") == "100-continue" and not self._write_finished: 

260 self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n") 

261 if not skip_body: 

262 body_future = self._read_body( 

263 resp_start_line.code if self.is_client else 0, headers, delegate 

264 ) 

265 if body_future is not None: 

266 if self._body_timeout is None: 

267 await body_future 

268 else: 

269 try: 

270 await gen.with_timeout( 

271 self.stream.io_loop.time() + self._body_timeout, 

272 body_future, 

273 quiet_exceptions=iostream.StreamClosedError, 

274 ) 

275 except gen.TimeoutError: 

276 gen_log.info("Timeout reading body from %s", self.context) 

277 self.stream.close() 

278 return False 

279 self._read_finished = True 

280 if not self._write_finished or self.is_client: 

281 need_delegate_close = False 

282 with _ExceptionLoggingContext(app_log): 

283 delegate.finish() 

284 # If we're waiting for the application to produce an asynchronous 

285 # response, and we're not detached, register a close callback 

286 # on the stream (we didn't need one while we were reading) 

287 if ( 

288 not self._finish_future.done() 

289 and self.stream is not None 

290 and not self.stream.closed() 

291 ): 

292 self.stream.set_close_callback(self._on_connection_close) 

293 await self._finish_future 

294 if self.is_client and self._disconnect_on_finish: 

295 self.close() 

296 if self.stream is None: 

297 return False 

298 except httputil.HTTPInputError as e: 

299 gen_log.info("Malformed HTTP message from %s: %s", self.context, e) 

300 if not self.is_client: 

301 await self.stream.write(b"HTTP/1.1 400 Bad Request\r\n\r\n") 

302 self.close() 

303 return False 

304 finally: 

305 if need_delegate_close: 

306 with _ExceptionLoggingContext(app_log): 

307 delegate.on_connection_close() 

308 header_future = None # type: ignore 

309 self._clear_callbacks() 

310 return True 

311 

312 def _clear_callbacks(self) -> None: 

313 """Clears the callback attributes. 

314 

315 This allows the request handler to be garbage collected more 

316 quickly in CPython by breaking up reference cycles. 

317 """ 

318 self._write_callback = None 

319 self._write_future = None # type: Optional[Future[None]] 

320 self._close_callback = None # type: Optional[Callable[[], None]] 

321 if self.stream is not None: 

322 self.stream.set_close_callback(None) 

323 

324 def set_close_callback(self, callback: Optional[Callable[[], None]]) -> None: 

325 """Sets a callback that will be run when the connection is closed. 

326 

327 Note that this callback is slightly different from 

328 `.HTTPMessageDelegate.on_connection_close`: The 

329 `.HTTPMessageDelegate` method is called when the connection is 

330 closed while receiving a message. This callback is used when 

331 there is not an active delegate (for example, on the server 

332 side this callback is used if the client closes the connection 

333 after sending its request but before receiving all the 

334 response. 

335 """ 

336 self._close_callback = callback 

337 

338 def _on_connection_close(self) -> None: 

339 # Note that this callback is only registered on the IOStream 

340 # when we have finished reading the request and are waiting for 

341 # the application to produce its response. 

342 if self._close_callback is not None: 

343 callback = self._close_callback 

344 self._close_callback = None 

345 callback() 

346 if not self._finish_future.done(): 

347 future_set_result_unless_cancelled(self._finish_future, None) 

348 self._clear_callbacks() 

349 

350 def close(self) -> None: 

351 if self.stream is not None: 

352 self.stream.close() 

353 self._clear_callbacks() 

354 if not self._finish_future.done(): 

355 future_set_result_unless_cancelled(self._finish_future, None) 

356 

357 def detach(self) -> iostream.IOStream: 

358 """Take control of the underlying stream. 

359 

360 Returns the underlying `.IOStream` object and stops all further 

361 HTTP processing. May only be called during 

362 `.HTTPMessageDelegate.headers_received`. Intended for implementing 

363 protocols like websockets that tunnel over an HTTP handshake. 

364 """ 

365 self._clear_callbacks() 

366 stream = self.stream 

367 self.stream = None # type: ignore 

368 if not self._finish_future.done(): 

369 future_set_result_unless_cancelled(self._finish_future, None) 

370 return stream 

371 

372 def set_body_timeout(self, timeout: float) -> None: 

373 """Sets the body timeout for a single request. 

374 

375 Overrides the value from `.HTTP1ConnectionParameters`. 

376 """ 

377 self._body_timeout = timeout 

378 

379 def set_max_body_size(self, max_body_size: int) -> None: 

380 """Sets the body size limit for a single request. 

381 

382 Overrides the value from `.HTTP1ConnectionParameters`. 

383 """ 

384 self._max_body_size = max_body_size 

385 

386 def write_headers( 

387 self, 

388 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], 

389 headers: httputil.HTTPHeaders, 

390 chunk: Optional[bytes] = None, 

391 ) -> "Future[None]": 

392 """Implements `.HTTPConnection.write_headers`.""" 

393 lines = [] 

394 if self.is_client: 

395 assert isinstance(start_line, httputil.RequestStartLine) 

396 self._request_start_line = start_line 

397 lines.append(utf8(f"{start_line[0]} {start_line[1]} HTTP/1.1")) 

398 # Client requests with a non-empty body must have either a 

399 # Content-Length or a Transfer-Encoding. If Content-Length is not 

400 # present we'll add our Transfer-Encoding below. 

401 self._chunking_output = ( 

402 start_line.method in ("POST", "PUT", "PATCH") 

403 and "Content-Length" not in headers 

404 ) 

405 else: 

406 assert isinstance(start_line, httputil.ResponseStartLine) 

407 assert self._request_start_line is not None 

408 assert self._request_headers is not None 

409 self._response_start_line = start_line 

410 lines.append(utf8("HTTP/1.1 %d %s" % (start_line[1], start_line[2]))) 

411 self._chunking_output = ( 

412 # TODO: should this use 

413 # self._request_start_line.version or 

414 # start_line.version? 

415 self._request_start_line.version == "HTTP/1.1" 

416 # Omit payload header field for HEAD request. 

417 and self._request_start_line.method != "HEAD" 

418 # 1xx, 204 and 304 responses have no body (not even a zero-length 

419 # body), and so should not have either Content-Length or 

420 # Transfer-Encoding headers. 

421 and start_line.code not in (204, 304) 

422 and (start_line.code < 100 or start_line.code >= 200) 

423 # No need to chunk the output if a Content-Length is specified. 

424 and "Content-Length" not in headers 

425 ) 

426 # If connection to a 1.1 client will be closed, inform client 

427 if ( 

428 self._request_start_line.version == "HTTP/1.1" 

429 and self._disconnect_on_finish 

430 ): 

431 headers["Connection"] = "close" 

432 # If a 1.0 client asked for keep-alive, add the header. 

433 if ( 

434 self._request_start_line.version == "HTTP/1.0" 

435 and self._request_headers.get("Connection", "").lower() == "keep-alive" 

436 ): 

437 headers["Connection"] = "Keep-Alive" 

438 if self._chunking_output: 

439 headers["Transfer-Encoding"] = "chunked" 

440 if not self.is_client and ( 

441 self._request_start_line.method == "HEAD" 

442 or cast(httputil.ResponseStartLine, start_line).code == 304 

443 ): 

444 self._expected_content_remaining = 0 

445 elif "Content-Length" in headers: 

446 self._expected_content_remaining = parse_int(headers["Content-Length"]) 

447 else: 

448 self._expected_content_remaining = None 

449 # TODO: headers are supposed to be of type str, but we still have some 

450 # cases that let bytes slip through. Remove these native_str calls when those 

451 # are fixed. 

452 header_lines = ( 

453 native_str(n) + ": " + native_str(v) for n, v in headers.get_all() 

454 ) 

455 lines.extend(line.encode("latin1") for line in header_lines) 

456 for line in lines: 

457 if CR_OR_LF_RE.search(line): 

458 raise ValueError("Illegal characters (CR or LF) in header: %r" % line) 

459 future = None 

460 if self.stream.closed(): 

461 future = self._write_future = Future() 

462 future.set_exception(iostream.StreamClosedError()) 

463 future.exception() 

464 else: 

465 future = self._write_future = Future() 

466 data = b"\r\n".join(lines) + b"\r\n\r\n" 

467 if chunk: 

468 data += self._format_chunk(chunk) 

469 self._pending_write = self.stream.write(data) 

470 future_add_done_callback(self._pending_write, self._on_write_complete) 

471 return future 

472 

473 def _format_chunk(self, chunk: bytes) -> bytes: 

474 if self._expected_content_remaining is not None: 

475 self._expected_content_remaining -= len(chunk) 

476 if self._expected_content_remaining < 0: 

477 # Close the stream now to stop further framing errors. 

478 self.stream.close() 

479 raise httputil.HTTPOutputError( 

480 "Tried to write more data than Content-Length" 

481 ) 

482 if self._chunking_output and chunk: 

483 # Don't write out empty chunks because that means END-OF-STREAM 

484 # with chunked encoding 

485 return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n" 

486 else: 

487 return chunk 

488 

489 def write(self, chunk: bytes) -> "Future[None]": 

490 """Implements `.HTTPConnection.write`. 

491 

492 For backwards compatibility it is allowed but deprecated to 

493 skip `write_headers` and instead call `write()` with a 

494 pre-encoded header block. 

495 """ 

496 future = None 

497 if self.stream.closed(): 

498 future = self._write_future = Future() 

499 self._write_future.set_exception(iostream.StreamClosedError()) 

500 self._write_future.exception() 

501 else: 

502 future = self._write_future = Future() 

503 self._pending_write = self.stream.write(self._format_chunk(chunk)) 

504 future_add_done_callback(self._pending_write, self._on_write_complete) 

505 return future 

506 

507 def finish(self) -> None: 

508 """Implements `.HTTPConnection.finish`.""" 

509 if ( 

510 self._expected_content_remaining is not None 

511 and self._expected_content_remaining != 0 

512 and not self.stream.closed() 

513 ): 

514 self.stream.close() 

515 raise httputil.HTTPOutputError( 

516 "Tried to write %d bytes less than Content-Length" 

517 % self._expected_content_remaining 

518 ) 

519 if self._chunking_output: 

520 if not self.stream.closed(): 

521 self._pending_write = self.stream.write(b"0\r\n\r\n") 

522 self._pending_write.add_done_callback(self._on_write_complete) 

523 self._write_finished = True 

524 # If the app finished the request while we're still reading, 

525 # divert any remaining data away from the delegate and 

526 # close the connection when we're done sending our response. 

527 # Closing the connection is the only way to avoid reading the 

528 # whole input body. 

529 if not self._read_finished: 

530 self._disconnect_on_finish = True 

531 # No more data is coming, so instruct TCP to send any remaining 

532 # data immediately instead of waiting for a full packet or ack. 

533 self.stream.set_nodelay(True) 

534 if self._pending_write is None: 

535 self._finish_request(None) 

536 else: 

537 future_add_done_callback(self._pending_write, self._finish_request) 

538 

539 def _on_write_complete(self, future: "Future[None]") -> None: 

540 exc = future.exception() 

541 if exc is not None and not isinstance(exc, iostream.StreamClosedError): 

542 future.result() 

543 if self._write_callback is not None: 

544 callback = self._write_callback 

545 self._write_callback = None 

546 self.stream.io_loop.add_callback(callback) 

547 if self._write_future is not None: 

548 future = self._write_future 

549 self._write_future = None 

550 future_set_result_unless_cancelled(future, None) 

551 

552 def _can_keep_alive( 

553 self, start_line: httputil.RequestStartLine, headers: httputil.HTTPHeaders 

554 ) -> bool: 

555 if self.params.no_keep_alive: 

556 return False 

557 connection_header = headers.get("Connection") 

558 if connection_header is not None: 

559 connection_header = connection_header.lower() 

560 if start_line.version == "HTTP/1.1": 

561 return connection_header != "close" 

562 elif ( 

563 "Content-Length" in headers 

564 or is_transfer_encoding_chunked(headers) 

565 or getattr(start_line, "method", None) in ("HEAD", "GET") 

566 ): 

567 # start_line may be a request or response start line; only 

568 # the former has a method attribute. 

569 return connection_header == "keep-alive" 

570 return False 

571 

572 def _finish_request(self, future: "Optional[Future[None]]") -> None: 

573 self._clear_callbacks() 

574 if not self.is_client and self._disconnect_on_finish: 

575 self.close() 

576 return 

577 # Turn Nagle's algorithm back on, leaving the stream in its 

578 # default state for the next request. 

579 self.stream.set_nodelay(False) 

580 if not self._finish_future.done(): 

581 future_set_result_unless_cancelled(self._finish_future, None) 

582 

583 def _parse_headers(self, data: bytes) -> Tuple[str, httputil.HTTPHeaders]: 

584 # The lstrip removes newlines that some implementations sometimes 

585 # insert between messages of a reused connection. Per RFC 7230, 

586 # we SHOULD ignore at least one empty line before the request. 

587 # http://tools.ietf.org/html/rfc7230#section-3.5 

588 data_str = native_str(data.decode("latin1")).lstrip("\r\n") 

589 # RFC 7230 section allows for both CRLF and bare LF. 

590 eol = data_str.find("\n") 

591 start_line = data_str[:eol].rstrip("\r") 

592 headers = httputil.HTTPHeaders.parse(data_str[eol:]) 

593 return start_line, headers 

594 

595 def _read_body( 

596 self, 

597 code: int, 

598 headers: httputil.HTTPHeaders, 

599 delegate: httputil.HTTPMessageDelegate, 

600 ) -> Optional[Awaitable[None]]: 

601 if "Content-Length" in headers: 

602 if "," in headers["Content-Length"]: 

603 # Proxies sometimes cause Content-Length headers to get 

604 # duplicated. If all the values are identical then we can 

605 # use them but if they differ it's an error. 

606 pieces = re.split(r",\s*", headers["Content-Length"]) 

607 if any(i != pieces[0] for i in pieces): 

608 raise httputil.HTTPInputError( 

609 "Multiple unequal Content-Lengths: %r" 

610 % headers["Content-Length"] 

611 ) 

612 headers["Content-Length"] = pieces[0] 

613 

614 try: 

615 content_length: Optional[int] = parse_int(headers["Content-Length"]) 

616 except ValueError: 

617 # Handles non-integer Content-Length value. 

618 raise httputil.HTTPInputError( 

619 "Only integer Content-Length is allowed: %s" 

620 % headers["Content-Length"] 

621 ) 

622 

623 if cast(int, content_length) > self._max_body_size: 

624 raise httputil.HTTPInputError("Content-Length too long") 

625 else: 

626 content_length = None 

627 

628 is_chunked = is_transfer_encoding_chunked(headers) 

629 

630 if code == 204: 

631 # This response code is not allowed to have a non-empty body, 

632 # and has an implicit length of zero instead of read-until-close. 

633 # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3 

634 if is_chunked or content_length not in (None, 0): 

635 raise httputil.HTTPInputError( 

636 "Response with code %d should not have body" % code 

637 ) 

638 content_length = 0 

639 

640 if is_chunked: 

641 return self._read_chunked_body(delegate) 

642 if content_length is not None: 

643 return self._read_fixed_body(content_length, delegate) 

644 if self.is_client: 

645 return self._read_body_until_close(delegate) 

646 return None 

647 

648 async def _read_fixed_body( 

649 self, content_length: int, delegate: httputil.HTTPMessageDelegate 

650 ) -> None: 

651 while content_length > 0: 

652 body = await self.stream.read_bytes( 

653 min(self.params.chunk_size, content_length), partial=True 

654 ) 

655 content_length -= len(body) 

656 if not self._write_finished or self.is_client: 

657 with _ExceptionLoggingContext(app_log): 

658 ret = delegate.data_received(body) 

659 if ret is not None: 

660 await ret 

661 

662 async def _read_chunked_body(self, delegate: httputil.HTTPMessageDelegate) -> None: 

663 # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1 

664 total_size = 0 

665 while True: 

666 chunk_len_str = await self.stream.read_until(b"\r\n", max_bytes=64) 

667 try: 

668 chunk_len = parse_hex_int(native_str(chunk_len_str[:-2])) 

669 except ValueError: 

670 raise httputil.HTTPInputError("invalid chunk size") 

671 if chunk_len == 0: 

672 crlf = await self.stream.read_bytes(2) 

673 if crlf != b"\r\n": 

674 raise httputil.HTTPInputError( 

675 "improperly terminated chunked request" 

676 ) 

677 return 

678 total_size += chunk_len 

679 if total_size > self._max_body_size: 

680 raise httputil.HTTPInputError("chunked body too large") 

681 bytes_to_read = chunk_len 

682 while bytes_to_read: 

683 chunk = await self.stream.read_bytes( 

684 min(bytes_to_read, self.params.chunk_size), partial=True 

685 ) 

686 bytes_to_read -= len(chunk) 

687 if not self._write_finished or self.is_client: 

688 with _ExceptionLoggingContext(app_log): 

689 ret = delegate.data_received(chunk) 

690 if ret is not None: 

691 await ret 

692 # chunk ends with \r\n 

693 crlf = await self.stream.read_bytes(2) 

694 assert crlf == b"\r\n" 

695 

696 async def _read_body_until_close( 

697 self, delegate: httputil.HTTPMessageDelegate 

698 ) -> None: 

699 body = await self.stream.read_until_close() 

700 if not self._write_finished or self.is_client: 

701 with _ExceptionLoggingContext(app_log): 

702 ret = delegate.data_received(body) 

703 if ret is not None: 

704 await ret 

705 

706 

707class _GzipMessageDelegate(httputil.HTTPMessageDelegate): 

708 """Wraps an `HTTPMessageDelegate` to decode ``Content-Encoding: gzip``.""" 

709 

710 def __init__( 

711 self, 

712 delegate: httputil.HTTPMessageDelegate, 

713 chunk_size: int, 

714 max_body_size: int, 

715 ) -> None: 

716 self._delegate = delegate 

717 self._chunk_size = chunk_size 

718 self._max_body_size = max_body_size 

719 self._decompressed_body_size = 0 

720 self._decompressor = None # type: Optional[GzipDecompressor] 

721 

722 def headers_received( 

723 self, 

724 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], 

725 headers: httputil.HTTPHeaders, 

726 ) -> Optional[Awaitable[None]]: 

727 if headers.get("Content-Encoding", "").lower() == "gzip": 

728 self._decompressor = GzipDecompressor() 

729 # Downstream delegates will only see uncompressed data, 

730 # so rename the content-encoding header. 

731 # (but note that curl_httpclient doesn't do this). 

732 headers.add("X-Consumed-Content-Encoding", headers["Content-Encoding"]) 

733 del headers["Content-Encoding"] 

734 return self._delegate.headers_received(start_line, headers) 

735 

736 async def data_received(self, chunk: bytes) -> None: 

737 if self._decompressor: 

738 compressed_data = chunk 

739 while compressed_data: 

740 decompressed = self._decompressor.decompress( 

741 compressed_data, self._chunk_size 

742 ) 

743 if decompressed: 

744 self._decompressed_body_size += len(decompressed) 

745 if self._decompressed_body_size > self._max_body_size: 

746 raise httputil.HTTPInputError("decompressed body too large") 

747 ret = self._delegate.data_received(decompressed) 

748 if ret is not None: 

749 await ret 

750 compressed_data = self._decompressor.unconsumed_tail 

751 if compressed_data and not decompressed: 

752 raise httputil.HTTPInputError( 

753 "encountered unconsumed gzip data without making progress" 

754 ) 

755 else: 

756 ret = self._delegate.data_received(chunk) 

757 if ret is not None: 

758 await ret 

759 

760 def finish(self) -> None: 

761 if self._decompressor is not None: 

762 tail = self._decompressor.flush() 

763 if tail: 

764 # The tail should always be empty: decompress returned 

765 # all that it can in data_received and the only 

766 # purpose of the flush call is to detect errors such 

767 # as truncated input. If we did legitimately get a new 

768 # chunk at this point we'd need to change the 

769 # interface to make finish() a coroutine. 

770 raise ValueError( 

771 "decompressor.flush returned data; possible truncated input" 

772 ) 

773 return self._delegate.finish() 

774 

775 def on_connection_close(self) -> None: 

776 return self._delegate.on_connection_close() 

777 

778 

779class HTTP1ServerConnection: 

780 """An HTTP/1.x server.""" 

781 

782 def __init__( 

783 self, 

784 stream: iostream.IOStream, 

785 params: Optional[HTTP1ConnectionParameters] = None, 

786 context: Optional[object] = None, 

787 ) -> None: 

788 """ 

789 :arg stream: an `.IOStream` 

790 :arg params: a `.HTTP1ConnectionParameters` or None 

791 :arg context: an opaque application-defined object that is accessible 

792 as ``connection.context`` 

793 """ 

794 self.stream = stream 

795 if params is None: 

796 params = HTTP1ConnectionParameters() 

797 self.params = params 

798 self.context = context 

799 self._serving_future = None # type: Optional[Future[None]] 

800 

801 async def close(self) -> None: 

802 """Closes the connection. 

803 

804 Returns a `.Future` that resolves after the serving loop has exited. 

805 """ 

806 self.stream.close() 

807 # Block until the serving loop is done, but ignore any exceptions 

808 # (start_serving is already responsible for logging them). 

809 assert self._serving_future is not None 

810 try: 

811 await self._serving_future 

812 except Exception: 

813 pass 

814 

815 def start_serving(self, delegate: httputil.HTTPServerConnectionDelegate) -> None: 

816 """Starts serving requests on this connection. 

817 

818 :arg delegate: a `.HTTPServerConnectionDelegate` 

819 """ 

820 assert isinstance(delegate, httputil.HTTPServerConnectionDelegate) 

821 fut = gen.convert_yielded(self._server_request_loop(delegate)) 

822 self._serving_future = fut 

823 # Register the future on the IOLoop so its errors get logged. 

824 self.stream.io_loop.add_future(fut, lambda f: f.result()) 

825 

826 async def _server_request_loop( 

827 self, delegate: httputil.HTTPServerConnectionDelegate 

828 ) -> None: 

829 try: 

830 while True: 

831 conn = HTTP1Connection(self.stream, False, self.params, self.context) 

832 request_delegate = delegate.start_request(self, conn) 

833 try: 

834 ret = await conn.read_response(request_delegate) 

835 except ( 

836 iostream.StreamClosedError, 

837 iostream.UnsatisfiableReadError, 

838 asyncio.CancelledError, 

839 ): 

840 return 

841 except _QuietException: 

842 # This exception was already logged. 

843 conn.close() 

844 return 

845 except Exception: 

846 gen_log.error("Uncaught exception", exc_info=True) 

847 conn.close() 

848 return 

849 if not ret: 

850 return 

851 await asyncio.sleep(0) 

852 finally: 

853 delegate.on_close(self) 

854 

855 

856DIGITS = re.compile(r"[0-9]+") 

857HEXDIGITS = re.compile(r"[0-9a-fA-F]+") 

858 

859 

860def parse_int(s: str) -> int: 

861 """Parse a non-negative integer from a string.""" 

862 if DIGITS.fullmatch(s) is None: 

863 raise ValueError("not an integer: %r" % s) 

864 return int(s) 

865 

866 

867def parse_hex_int(s: str) -> int: 

868 """Parse a non-negative hexadecimal integer from a string.""" 

869 if HEXDIGITS.fullmatch(s) is None: 

870 raise ValueError("not a hexadecimal integer: %r" % s) 

871 return int(s, 16) 

872 

873 

874def is_transfer_encoding_chunked(headers: httputil.HTTPHeaders) -> bool: 

875 """Returns true if the headers specify Transfer-Encoding: chunked. 

876 

877 Raise httputil.HTTPInputError if any other transfer encoding is used. 

878 """ 

879 # Note that transfer-encoding is an area in which postel's law can lead 

880 # us astray. If a proxy and a backend server are liberal in what they accept, 

881 # but accept slightly different things, this can lead to mismatched framing 

882 # and request smuggling issues. Therefore we are as strict as possible here 

883 # (even technically going beyond the requirements of the RFCs: a value of 

884 # ",chunked" is legal but doesn't appear in practice for legitimate traffic) 

885 if "Transfer-Encoding" not in headers: 

886 return False 

887 if "Content-Length" in headers: 

888 # Message cannot contain both Content-Length and 

889 # Transfer-Encoding headers. 

890 # http://tools.ietf.org/html/rfc7230#section-3.3.3 

891 raise httputil.HTTPInputError( 

892 "Message with both Transfer-Encoding and Content-Length" 

893 ) 

894 if headers["Transfer-Encoding"].lower() == "chunked": 

895 return True 

896 # We do not support any transfer-encodings other than chunked, and we do not 

897 # expect to add any support because the concept of transfer-encoding has 

898 # been removed in HTTP/2. 

899 raise httputil.HTTPInputError( 

900 "Unsupported Transfer-Encoding %s" % headers["Transfer-Encoding"] 

901 )