Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tornado/websocket.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

741 statements  

1"""Implementation of the WebSocket protocol. 

2 

3`WebSockets <http://dev.w3.org/html5/websockets/>`_ allow for bidirectional 

4communication between the browser and server. WebSockets are supported in the 

5current versions of all major browsers. 

6 

7This module implements the final version of the WebSocket protocol as 

8defined in `RFC 6455 <http://tools.ietf.org/html/rfc6455>`_. 

9 

10.. versionchanged:: 4.0 

11 Removed support for the draft 76 protocol version. 

12""" 

13 

14import abc 

15import asyncio 

16import base64 

17import functools 

18import hashlib 

19import logging 

20import os 

21import sys 

22import struct 

23import tornado 

24from urllib.parse import urlparse 

25import warnings 

26import zlib 

27 

28from tornado.concurrent import Future, future_set_result_unless_cancelled 

29from tornado.escape import utf8, native_str, to_unicode 

30from tornado import gen, httpclient, httputil 

31from tornado.ioloop import IOLoop 

32from tornado.iostream import StreamClosedError, IOStream 

33from tornado.log import gen_log, app_log 

34from tornado.netutil import Resolver 

35from tornado import simple_httpclient 

36from tornado.queues import Queue 

37from tornado.tcpclient import TCPClient 

38from tornado.util import _websocket_mask 

39 

40from typing import ( 

41 TYPE_CHECKING, 

42 cast, 

43 Any, 

44 Optional, 

45 Dict, 

46 Union, 

47 List, 

48 Awaitable, 

49 Callable, 

50 Tuple, 

51 Type, 

52) 

53from types import TracebackType 

54 

55if TYPE_CHECKING: 

56 from typing_extensions import Protocol 

57 

58 # The zlib compressor types aren't actually exposed anywhere 

59 # publicly, so declare protocols for the portions we use. 

60 class _Compressor(Protocol): 

61 def compress(self, data: bytes) -> bytes: 

62 pass 

63 

64 def flush(self, mode: int) -> bytes: 

65 pass 

66 

67 class _Decompressor(Protocol): 

68 unconsumed_tail = b"" # type: bytes 

69 

70 def decompress(self, data: bytes, max_length: int) -> bytes: 

71 pass 

72 

73 class _WebSocketDelegate(Protocol): 

74 # The common base interface implemented by WebSocketHandler on 

75 # the server side and WebSocketClientConnection on the client 

76 # side. 

77 def on_ws_connection_close( 

78 self, close_code: Optional[int] = None, close_reason: Optional[str] = None 

79 ) -> None: 

80 pass 

81 

82 def on_message(self, message: Union[str, bytes]) -> Optional["Awaitable[None]"]: 

83 pass 

84 

85 def on_ping(self, data: bytes) -> None: 

86 pass 

87 

88 def on_pong(self, data: bytes) -> None: 

89 pass 

90 

91 def log_exception( 

92 self, 

93 typ: Optional[Type[BaseException]], 

94 value: Optional[BaseException], 

95 tb: Optional[TracebackType], 

96 ) -> None: 

97 pass 

98 

99 

100_default_max_message_size = 10 * 1024 * 1024 

101 

102# log to "gen_log" but suppress duplicate log messages 

103de_dupe_gen_log = functools.lru_cache(gen_log.log) 

104 

105 

106class WebSocketError(Exception): 

107 pass 

108 

109 

110class WebSocketClosedError(WebSocketError): 

111 """Raised by operations on a closed connection. 

112 

113 .. versionadded:: 3.2 

114 """ 

115 

116 pass 

117 

118 

119class _DecompressTooLargeError(Exception): 

120 pass 

121 

122 

123class _WebSocketParams: 

124 def __init__( 

125 self, 

126 ping_interval: Optional[float] = None, 

127 ping_timeout: Optional[float] = None, 

128 max_message_size: int = _default_max_message_size, 

129 compression_options: Optional[Dict[str, Any]] = None, 

130 ) -> None: 

131 self.ping_interval = ping_interval 

132 self.ping_timeout = ping_timeout 

133 self.max_message_size = max_message_size 

134 self.compression_options = compression_options 

135 

136 

137class WebSocketHandler(tornado.web.RequestHandler): 

138 """Subclass this class to create a basic WebSocket handler. 

139 

140 Override `on_message` to handle incoming messages, and use 

141 `write_message` to send messages to the client. You can also 

142 override `open` and `on_close` to handle opened and closed 

143 connections. 

144 

145 Custom upgrade response headers can be sent by overriding 

146 `~tornado.web.RequestHandler.set_default_headers` or 

147 `~tornado.web.RequestHandler.prepare`. 

148 

149 See http://dev.w3.org/html5/websockets/ for details on the 

150 JavaScript interface. The protocol is specified at 

151 http://tools.ietf.org/html/rfc6455. 

152 

153 Here is an example WebSocket handler that echos back all received messages 

154 back to the client: 

155 

156 .. testcode:: 

157 

158 class EchoWebSocket(tornado.websocket.WebSocketHandler): 

159 def open(self): 

160 print("WebSocket opened") 

161 

162 def on_message(self, message): 

163 self.write_message(u"You said: " + message) 

164 

165 def on_close(self): 

166 print("WebSocket closed") 

167 

168 WebSockets are not standard HTTP connections. The "handshake" is 

169 HTTP, but after the handshake, the protocol is 

170 message-based. Consequently, most of the Tornado HTTP facilities 

171 are not available in handlers of this type. The only communication 

172 methods available to you are `write_message()`, `ping()`, and 

173 `close()`. Likewise, your request handler class should implement 

174 `open()` method rather than ``get()`` or ``post()``. 

175 

176 If you map the handler above to ``/websocket`` in your application, you can 

177 invoke it in JavaScript with:: 

178 

179 var ws = new WebSocket("ws://localhost:8888/websocket"); 

180 ws.onopen = function() { 

181 ws.send("Hello, world"); 

182 }; 

183 ws.onmessage = function (evt) { 

184 alert(evt.data); 

185 }; 

186 

187 This script pops up an alert box that says "You said: Hello, world". 

188 

189 Web browsers allow any site to open a websocket connection to any other, 

190 instead of using the same-origin policy that governs other network 

191 access from JavaScript. This can be surprising and is a potential 

192 security hole, so since Tornado 4.0 `WebSocketHandler` requires 

193 applications that wish to receive cross-origin websockets to opt in 

194 by overriding the `~WebSocketHandler.check_origin` method (see that 

195 method's docs for details). Failure to do so is the most likely 

196 cause of 403 errors when making a websocket connection. 

197 

198 When using a secure websocket connection (``wss://``) with a self-signed 

199 certificate, the connection from a browser may fail because it wants 

200 to show the "accept this certificate" dialog but has nowhere to show it. 

201 You must first visit a regular HTML page using the same certificate 

202 to accept it before the websocket connection will succeed. 

203 

204 If the application setting ``websocket_ping_interval`` has a non-zero 

205 value, a ping will be sent periodically, and the connection will be 

206 closed if a response is not received before the ``websocket_ping_timeout``. 

207 Both settings are in seconds; floating point values are allowed. 

208 The default timeout is equal to the interval. 

209 

210 Messages larger than the ``websocket_max_message_size`` application setting 

211 (default 10MiB) will not be accepted. 

212 

213 .. versionchanged:: 4.5 

214 Added ``websocket_ping_interval``, ``websocket_ping_timeout``, and 

215 ``websocket_max_message_size``. 

216 """ 

217 

218 def __init__( 

219 self, 

220 application: tornado.web.Application, 

221 request: httputil.HTTPServerRequest, 

222 **kwargs: Any, 

223 ) -> None: 

224 super().__init__(application, request, **kwargs) 

225 self.ws_connection = None # type: Optional[WebSocketProtocol] 

226 self.close_code = None # type: Optional[int] 

227 self.close_reason = None # type: Optional[str] 

228 self._on_close_called = False 

229 

230 async def get(self, *args: Any, **kwargs: Any) -> None: 

231 self.open_args = args 

232 self.open_kwargs = kwargs 

233 

234 # Upgrade header should be present and should be equal to WebSocket 

235 if self.request.headers.get("Upgrade", "").lower() != "websocket": 

236 self.set_status(400) 

237 log_msg = 'Can "Upgrade" only to "WebSocket".' 

238 self.finish(log_msg) 

239 gen_log.debug(log_msg) 

240 return 

241 

242 # Connection header should be upgrade. 

243 # Some proxy servers/load balancers 

244 # might mess with it. 

245 headers = self.request.headers 

246 connection = map( 

247 lambda s: s.strip().lower(), headers.get("Connection", "").split(",") 

248 ) 

249 if "upgrade" not in connection: 

250 self.set_status(400) 

251 log_msg = '"Connection" must be "Upgrade".' 

252 self.finish(log_msg) 

253 gen_log.debug(log_msg) 

254 return 

255 

256 # Handle WebSocket Origin naming convention differences 

257 # The difference between version 8 and 13 is that in 8 the 

258 # client sends a "Sec-Websocket-Origin" header and in 13 it's 

259 # simply "Origin". 

260 if "Origin" in self.request.headers: 

261 origin = self.request.headers.get("Origin") 

262 else: 

263 origin = self.request.headers.get("Sec-Websocket-Origin", None) 

264 

265 # If there was an origin header, check to make sure it matches 

266 # according to check_origin. When the origin is None, we assume it 

267 # did not come from a browser and that it can be passed on. 

268 if origin is not None and not self.check_origin(origin): 

269 self.set_status(403) 

270 log_msg = "Cross origin websockets not allowed" 

271 self.finish(log_msg) 

272 gen_log.debug(log_msg) 

273 return 

274 

275 self.ws_connection = self.get_websocket_protocol() 

276 if self.ws_connection: 

277 await self.ws_connection.accept_connection(self) 

278 else: 

279 self.set_status(426, "Upgrade Required") 

280 self.set_header("Sec-WebSocket-Version", "7, 8, 13") 

281 

282 @property 

283 def ping_interval(self) -> Optional[float]: 

284 """The interval for sending websocket pings. 

285 

286 If this is non-zero, the websocket will send a ping every 

287 ping_interval seconds. 

288 The client will respond with a "pong". The connection can be configured 

289 to timeout on late pong delivery using ``websocket_ping_timeout``. 

290 

291 Set ``websocket_ping_interval = 0`` to disable pings. 

292 

293 Default: ``0`` 

294 """ 

295 return self.settings.get("websocket_ping_interval", None) 

296 

297 @property 

298 def ping_timeout(self) -> Optional[float]: 

299 """Timeout if no pong is received in this many seconds. 

300 

301 To be used in combination with ``websocket_ping_interval > 0``. 

302 If a ping response (a "pong") is not received within 

303 ``websocket_ping_timeout`` seconds, then the websocket connection 

304 will be closed. 

305 

306 This can help to clean up clients which have disconnected without 

307 cleanly closing the websocket connection. 

308 

309 Note, the ping timeout cannot be longer than the ping interval. 

310 

311 Set ``websocket_ping_timeout = 0`` to disable the ping timeout. 

312 

313 Default: equal to the ``ping_interval``. 

314 

315 .. versionchanged:: 6.5.0 

316 Default changed from the max of 3 pings or 30 seconds. 

317 The ping timeout can no longer be configured longer than the 

318 ping interval. 

319 """ 

320 return self.settings.get("websocket_ping_timeout", None) 

321 

322 @property 

323 def max_message_size(self) -> int: 

324 """Maximum allowed message size. 

325 

326 If the remote peer sends a message larger than this, the connection 

327 will be closed. 

328 

329 Default is 10MiB. 

330 """ 

331 return self.settings.get( 

332 "websocket_max_message_size", _default_max_message_size 

333 ) 

334 

335 def write_message( 

336 self, message: Union[bytes, str, Dict[str, Any]], binary: bool = False 

337 ) -> "Future[None]": 

338 """Sends the given message to the client of this Web Socket. 

339 

340 The message may be either a string or a dict (which will be 

341 encoded as json). If the ``binary`` argument is false, the 

342 message will be sent as utf8; in binary mode any byte string 

343 is allowed. 

344 

345 If the connection is already closed, raises `WebSocketClosedError`. 

346 Returns a `.Future` which can be used for flow control. 

347 

348 .. versionchanged:: 3.2 

349 `WebSocketClosedError` was added (previously a closed connection 

350 would raise an `AttributeError`) 

351 

352 .. versionchanged:: 4.3 

353 Returns a `.Future` which can be used for flow control. 

354 

355 .. versionchanged:: 5.0 

356 Consistently raises `WebSocketClosedError`. Previously could 

357 sometimes raise `.StreamClosedError`. 

358 """ 

359 if self.ws_connection is None or self.ws_connection.is_closing(): 

360 raise WebSocketClosedError() 

361 if isinstance(message, dict): 

362 message = tornado.escape.json_encode(message) 

363 return self.ws_connection.write_message(message, binary=binary) 

364 

365 def select_subprotocol(self, subprotocols: List[str]) -> Optional[str]: 

366 """Override to implement subprotocol negotiation. 

367 

368 ``subprotocols`` is a list of strings identifying the 

369 subprotocols proposed by the client. This method may be 

370 overridden to return one of those strings to select it, or 

371 ``None`` to not select a subprotocol. 

372 

373 Failure to select a subprotocol does not automatically abort 

374 the connection, although clients may close the connection if 

375 none of their proposed subprotocols was selected. 

376 

377 The list may be empty, in which case this method must return 

378 None. This method is always called exactly once even if no 

379 subprotocols were proposed so that the handler can be advised 

380 of this fact. 

381 

382 .. versionchanged:: 5.1 

383 

384 Previously, this method was called with a list containing 

385 an empty string instead of an empty list if no subprotocols 

386 were proposed by the client. 

387 """ 

388 return None 

389 

390 @property 

391 def selected_subprotocol(self) -> Optional[str]: 

392 """The subprotocol returned by `select_subprotocol`. 

393 

394 .. versionadded:: 5.1 

395 """ 

396 assert self.ws_connection is not None 

397 return self.ws_connection.selected_subprotocol 

398 

399 def get_compression_options(self) -> Optional[Dict[str, Any]]: 

400 """Override to return compression options for the connection. 

401 

402 If this method returns None (the default), compression will 

403 be disabled. If it returns a dict (even an empty one), it 

404 will be enabled. The contents of the dict may be used to 

405 control the following compression options: 

406 

407 ``compression_level`` specifies the compression level. 

408 

409 ``mem_level`` specifies the amount of memory used for the internal compression state. 

410 

411 These parameters are documented in detail here: 

412 https://docs.python.org/3.13/library/zlib.html#zlib.compressobj 

413 

414 .. versionadded:: 4.1 

415 

416 .. versionchanged:: 4.5 

417 

418 Added ``compression_level`` and ``mem_level``. 

419 """ 

420 # TODO: Add wbits option. 

421 return None 

422 

423 def open(self, *args: str, **kwargs: str) -> Optional[Awaitable[None]]: 

424 """Invoked when a new WebSocket is opened. 

425 

426 The arguments to `open` are extracted from the `tornado.web.URLSpec` 

427 regular expression, just like the arguments to 

428 `tornado.web.RequestHandler.get`. 

429 

430 `open` may be a coroutine. `on_message` will not be called until 

431 `open` has returned. 

432 

433 .. versionchanged:: 5.1 

434 

435 ``open`` may be a coroutine. 

436 """ 

437 pass 

438 

439 def on_message(self, message: Union[str, bytes]) -> Optional[Awaitable[None]]: 

440 """Handle incoming messages on the WebSocket 

441 

442 This method must be overridden. 

443 

444 .. versionchanged:: 4.5 

445 

446 ``on_message`` can be a coroutine. 

447 """ 

448 raise NotImplementedError 

449 

450 def ping(self, data: Union[str, bytes] = b"") -> None: 

451 """Send ping frame to the remote end. 

452 

453 The data argument allows a small amount of data (up to 125 

454 bytes) to be sent as a part of the ping message. Note that not 

455 all websocket implementations expose this data to 

456 applications. 

457 

458 Consider using the ``websocket_ping_interval`` application 

459 setting instead of sending pings manually. 

460 

461 .. versionchanged:: 5.1 

462 

463 The data argument is now optional. 

464 

465 """ 

466 data = utf8(data) 

467 if self.ws_connection is None or self.ws_connection.is_closing(): 

468 raise WebSocketClosedError() 

469 self.ws_connection.write_ping(data) 

470 

471 def on_pong(self, data: bytes) -> None: 

472 """Invoked when the response to a ping frame is received.""" 

473 pass 

474 

475 def on_ping(self, data: bytes) -> None: 

476 """Invoked when the a ping frame is received.""" 

477 pass 

478 

479 def on_close(self) -> None: 

480 """Invoked when the WebSocket is closed. 

481 

482 If the connection was closed cleanly and a status code or reason 

483 phrase was supplied, these values will be available as the attributes 

484 ``self.close_code`` and ``self.close_reason``. 

485 

486 .. versionchanged:: 4.0 

487 

488 Added ``close_code`` and ``close_reason`` attributes. 

489 """ 

490 pass 

491 

492 def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None: 

493 """Closes this Web Socket. 

494 

495 Once the close handshake is successful the socket will be closed. 

496 

497 ``code`` may be a numeric status code, taken from the values 

498 defined in `RFC 6455 section 7.4.1 

499 <https://tools.ietf.org/html/rfc6455#section-7.4.1>`_. 

500 ``reason`` may be a textual message about why the connection is 

501 closing. These values are made available to the client, but are 

502 not otherwise interpreted by the websocket protocol. 

503 

504 .. versionchanged:: 4.0 

505 

506 Added the ``code`` and ``reason`` arguments. 

507 """ 

508 if self.ws_connection: 

509 self.ws_connection.close(code, reason) 

510 self.ws_connection = None 

511 

512 def check_origin(self, origin: str) -> bool: 

513 """Override to enable support for allowing alternate origins. 

514 

515 The ``origin`` argument is the value of the ``Origin`` HTTP 

516 header, the url responsible for initiating this request. This 

517 method is not called for clients that do not send this header; 

518 such requests are always allowed (because all browsers that 

519 implement WebSockets support this header, and non-browser 

520 clients do not have the same cross-site security concerns). 

521 

522 Should return ``True`` to accept the request or ``False`` to 

523 reject it. By default, rejects all requests with an origin on 

524 a host other than this one. 

525 

526 This is a security protection against cross site scripting attacks on 

527 browsers, since WebSockets are allowed to bypass the usual same-origin 

528 policies and don't use CORS headers. 

529 

530 .. warning:: 

531 

532 This is an important security measure; don't disable it 

533 without understanding the security implications. In 

534 particular, if your authentication is cookie-based, you 

535 must either restrict the origins allowed by 

536 ``check_origin()`` or implement your own XSRF-like 

537 protection for websocket connections. See `these 

538 <https://www.christian-schneider.net/CrossSiteWebSocketHijacking.html>`_ 

539 `articles 

540 <https://devcenter.heroku.com/articles/websocket-security>`_ 

541 for more. 

542 

543 To accept all cross-origin traffic (which was the default prior to 

544 Tornado 4.0), simply override this method to always return ``True``:: 

545 

546 def check_origin(self, origin): 

547 return True 

548 

549 To allow connections from any subdomain of your site, you might 

550 do something like:: 

551 

552 def check_origin(self, origin): 

553 parsed_origin = urllib.parse.urlparse(origin) 

554 return parsed_origin.netloc.endswith(".mydomain.com") 

555 

556 .. versionadded:: 4.0 

557 

558 """ 

559 parsed_origin = urlparse(origin) 

560 origin = parsed_origin.netloc 

561 origin = origin.lower() 

562 

563 host = self.request.headers.get("Host") 

564 

565 # Check to see that origin matches host directly, including ports 

566 return origin == host 

567 

568 def set_nodelay(self, value: bool) -> None: 

569 """Set the no-delay flag for this stream. 

570 

571 By default, small messages may be delayed and/or combined to minimize 

572 the number of packets sent. This can sometimes cause 200-500ms delays 

573 due to the interaction between Nagle's algorithm and TCP delayed 

574 ACKs. To reduce this delay (at the expense of possibly increasing 

575 bandwidth usage), call ``self.set_nodelay(True)`` once the websocket 

576 connection is established. 

577 

578 See `.BaseIOStream.set_nodelay` for additional details. 

579 

580 .. versionadded:: 3.1 

581 """ 

582 assert self.ws_connection is not None 

583 self.ws_connection.set_nodelay(value) 

584 

585 def on_connection_close(self) -> None: 

586 if self.ws_connection: 

587 self.ws_connection.on_connection_close() 

588 self.ws_connection = None 

589 if not self._on_close_called: 

590 self._on_close_called = True 

591 self.on_close() 

592 self._break_cycles() 

593 

594 def on_ws_connection_close( 

595 self, close_code: Optional[int] = None, close_reason: Optional[str] = None 

596 ) -> None: 

597 self.close_code = close_code 

598 self.close_reason = close_reason 

599 self.on_connection_close() 

600 

601 def _break_cycles(self) -> None: 

602 # WebSocketHandlers call finish() early, but we don't want to 

603 # break up reference cycles (which makes it impossible to call 

604 # self.render_string) until after we've really closed the 

605 # connection (if it was established in the first place, 

606 # indicated by status code 101). 

607 if self.get_status() != 101 or self._on_close_called: 

608 super()._break_cycles() 

609 

610 def get_websocket_protocol(self) -> Optional["WebSocketProtocol"]: 

611 websocket_version = self.request.headers.get("Sec-WebSocket-Version") 

612 if websocket_version in ("7", "8", "13"): 

613 params = _WebSocketParams( 

614 ping_interval=self.ping_interval, 

615 ping_timeout=self.ping_timeout, 

616 max_message_size=self.max_message_size, 

617 compression_options=self.get_compression_options(), 

618 ) 

619 return WebSocketProtocol13(self, False, params) 

620 return None 

621 

622 def _detach_stream(self) -> IOStream: 

623 # disable non-WS methods 

624 for method in [ 

625 "write", 

626 "redirect", 

627 "set_header", 

628 "set_cookie", 

629 "set_status", 

630 "flush", 

631 "finish", 

632 ]: 

633 setattr(self, method, _raise_not_supported_for_websockets) 

634 return self.detach() 

635 

636 

637def _raise_not_supported_for_websockets(*args: Any, **kwargs: Any) -> None: 

638 raise RuntimeError("Method not supported for Web Sockets") 

639 

640 

641class WebSocketProtocol(abc.ABC): 

642 """Base class for WebSocket protocol versions.""" 

643 

644 def __init__(self, handler: "_WebSocketDelegate") -> None: 

645 self.handler = handler 

646 self.stream = None # type: Optional[IOStream] 

647 self.client_terminated = False 

648 self.server_terminated = False 

649 

650 def _run_callback( 

651 self, callback: Callable, *args: Any, **kwargs: Any 

652 ) -> "Optional[Future[Any]]": 

653 """Runs the given callback with exception handling. 

654 

655 If the callback is a coroutine, returns its Future. On error, aborts the 

656 websocket connection and returns None. 

657 """ 

658 try: 

659 result = callback(*args, **kwargs) 

660 except Exception: 

661 self.handler.log_exception(*sys.exc_info()) 

662 self._abort() 

663 return None 

664 else: 

665 if result is not None: 

666 result = gen.convert_yielded(result) 

667 assert self.stream is not None 

668 self.stream.io_loop.add_future(result, lambda f: f.result()) 

669 return result 

670 

671 def on_connection_close(self) -> None: 

672 self._abort() 

673 

674 def _abort(self) -> None: 

675 """Instantly aborts the WebSocket connection by closing the socket""" 

676 self.client_terminated = True 

677 self.server_terminated = True 

678 if self.stream is not None: 

679 self.stream.close() # forcibly tear down the connection 

680 self.close() # let the subclass cleanup 

681 

682 @abc.abstractmethod 

683 def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None: 

684 raise NotImplementedError() 

685 

686 @abc.abstractmethod 

687 def is_closing(self) -> bool: 

688 raise NotImplementedError() 

689 

690 @abc.abstractmethod 

691 async def accept_connection(self, handler: WebSocketHandler) -> None: 

692 raise NotImplementedError() 

693 

694 @abc.abstractmethod 

695 def write_message( 

696 self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False 

697 ) -> "Future[None]": 

698 raise NotImplementedError() 

699 

700 @property 

701 @abc.abstractmethod 

702 def selected_subprotocol(self) -> Optional[str]: 

703 raise NotImplementedError() 

704 

705 @abc.abstractmethod 

706 def write_ping(self, data: bytes) -> None: 

707 raise NotImplementedError() 

708 

709 # The entry points below are used by WebSocketClientConnection, 

710 # which was introduced after we only supported a single version of 

711 # WebSocketProtocol. The WebSocketProtocol/WebSocketProtocol13 

712 # boundary is currently pretty ad-hoc. 

713 @abc.abstractmethod 

714 def _process_server_headers( 

715 self, key: Union[str, bytes], headers: httputil.HTTPHeaders 

716 ) -> None: 

717 raise NotImplementedError() 

718 

719 @abc.abstractmethod 

720 def start_pinging(self) -> None: 

721 raise NotImplementedError() 

722 

723 @abc.abstractmethod 

724 async def _receive_frame_loop(self) -> None: 

725 raise NotImplementedError() 

726 

727 @abc.abstractmethod 

728 def set_nodelay(self, x: bool) -> None: 

729 raise NotImplementedError() 

730 

731 

732class _PerMessageDeflateCompressor: 

733 def __init__( 

734 self, 

735 persistent: bool, 

736 max_wbits: Optional[int], 

737 compression_options: Optional[Dict[str, Any]] = None, 

738 ) -> None: 

739 if max_wbits is None: 

740 max_wbits = zlib.MAX_WBITS 

741 # There is no symbolic constant for the minimum wbits value. 

742 if not (8 <= max_wbits <= zlib.MAX_WBITS): 

743 raise ValueError( 

744 "Invalid max_wbits value %r; allowed range 8-%d", 

745 max_wbits, 

746 zlib.MAX_WBITS, 

747 ) 

748 self._max_wbits = max_wbits 

749 

750 if ( 

751 compression_options is None 

752 or "compression_level" not in compression_options 

753 ): 

754 self._compression_level = tornado.web.GZipContentEncoding.GZIP_LEVEL 

755 else: 

756 self._compression_level = compression_options["compression_level"] 

757 

758 if compression_options is None or "mem_level" not in compression_options: 

759 self._mem_level = 8 

760 else: 

761 self._mem_level = compression_options["mem_level"] 

762 

763 if persistent: 

764 self._compressor = self._create_compressor() # type: Optional[_Compressor] 

765 else: 

766 self._compressor = None 

767 

768 def _create_compressor(self) -> "_Compressor": 

769 return zlib.compressobj( 

770 self._compression_level, zlib.DEFLATED, -self._max_wbits, self._mem_level 

771 ) 

772 

773 def compress(self, data: bytes) -> bytes: 

774 compressor = self._compressor or self._create_compressor() 

775 data = compressor.compress(data) + compressor.flush(zlib.Z_SYNC_FLUSH) 

776 assert data.endswith(b"\x00\x00\xff\xff") 

777 return data[:-4] 

778 

779 

780class _PerMessageDeflateDecompressor: 

781 def __init__( 

782 self, 

783 persistent: bool, 

784 max_wbits: Optional[int], 

785 max_message_size: int, 

786 compression_options: Optional[Dict[str, Any]] = None, 

787 ) -> None: 

788 self._max_message_size = max_message_size 

789 if max_wbits is None: 

790 max_wbits = zlib.MAX_WBITS 

791 if not (8 <= max_wbits <= zlib.MAX_WBITS): 

792 raise ValueError( 

793 "Invalid max_wbits value %r; allowed range 8-%d", 

794 max_wbits, 

795 zlib.MAX_WBITS, 

796 ) 

797 self._max_wbits = max_wbits 

798 if persistent: 

799 self._decompressor = ( 

800 self._create_decompressor() 

801 ) # type: Optional[_Decompressor] 

802 else: 

803 self._decompressor = None 

804 

805 def _create_decompressor(self) -> "_Decompressor": 

806 return zlib.decompressobj(-self._max_wbits) 

807 

808 def decompress(self, data: bytes) -> bytes: 

809 decompressor = self._decompressor or self._create_decompressor() 

810 result = decompressor.decompress( 

811 data + b"\x00\x00\xff\xff", self._max_message_size 

812 ) 

813 if decompressor.unconsumed_tail: 

814 raise _DecompressTooLargeError() 

815 return result 

816 

817 

818class WebSocketProtocol13(WebSocketProtocol): 

819 """Implementation of the WebSocket protocol from RFC 6455. 

820 

821 This class supports versions 7 and 8 of the protocol in addition to the 

822 final version 13. 

823 """ 

824 

825 # Bit masks for the first byte of a frame. 

826 FIN = 0x80 

827 RSV1 = 0x40 

828 RSV2 = 0x20 

829 RSV3 = 0x10 

830 RSV_MASK = RSV1 | RSV2 | RSV3 

831 OPCODE_MASK = 0x0F 

832 

833 stream = None # type: IOStream 

834 

835 def __init__( 

836 self, 

837 handler: "_WebSocketDelegate", 

838 mask_outgoing: bool, 

839 params: _WebSocketParams, 

840 ) -> None: 

841 WebSocketProtocol.__init__(self, handler) 

842 self.mask_outgoing = mask_outgoing 

843 self.params = params 

844 self._final_frame = False 

845 self._frame_opcode = None 

846 self._masked_frame = None 

847 self._frame_mask = None # type: Optional[bytes] 

848 self._frame_length = None 

849 self._fragmented_message_buffer = None # type: Optional[bytearray] 

850 self._fragmented_message_opcode = None 

851 self._waiting = None # type: object 

852 self._compression_options = params.compression_options 

853 self._decompressor = None # type: Optional[_PerMessageDeflateDecompressor] 

854 self._compressor = None # type: Optional[_PerMessageDeflateCompressor] 

855 self._frame_compressed = None # type: Optional[bool] 

856 # The total uncompressed size of all messages received or sent. 

857 # Unicode messages are encoded to utf8. 

858 # Only for testing; subject to change. 

859 self._message_bytes_in = 0 

860 self._message_bytes_out = 0 

861 # The total size of all packets received or sent. Includes 

862 # the effect of compression, frame overhead, and control frames. 

863 self._wire_bytes_in = 0 

864 self._wire_bytes_out = 0 

865 self._received_pong = False # type: bool 

866 self.close_code = None # type: Optional[int] 

867 self.close_reason = None # type: Optional[str] 

868 self._ping_coroutine = None # type: Optional[asyncio.Task] 

869 

870 # Use a property for this to satisfy the abc. 

871 @property 

872 def selected_subprotocol(self) -> Optional[str]: 

873 return self._selected_subprotocol 

874 

875 @selected_subprotocol.setter 

876 def selected_subprotocol(self, value: Optional[str]) -> None: 

877 self._selected_subprotocol = value 

878 

879 async def accept_connection(self, handler: WebSocketHandler) -> None: 

880 try: 

881 self._handle_websocket_headers(handler) 

882 except ValueError: 

883 handler.set_status(400) 

884 log_msg = "Missing/Invalid WebSocket headers" 

885 handler.finish(log_msg) 

886 gen_log.debug(log_msg) 

887 return 

888 

889 try: 

890 await self._accept_connection(handler) 

891 except asyncio.CancelledError: 

892 self._abort() 

893 return 

894 except ValueError: 

895 gen_log.debug("Malformed WebSocket request received", exc_info=True) 

896 self._abort() 

897 return 

898 

899 def _handle_websocket_headers(self, handler: WebSocketHandler) -> None: 

900 """Verifies all invariant- and required headers 

901 

902 If a header is missing or have an incorrect value ValueError will be 

903 raised 

904 """ 

905 fields = ("Host", "Sec-Websocket-Key", "Sec-Websocket-Version") 

906 if not all(map(lambda f: handler.request.headers.get(f), fields)): 

907 raise ValueError("Missing/Invalid WebSocket headers") 

908 

909 @staticmethod 

910 def compute_accept_value(key: Union[str, bytes]) -> str: 

911 """Computes the value for the Sec-WebSocket-Accept header, 

912 given the value for Sec-WebSocket-Key. 

913 """ 

914 sha1 = hashlib.sha1() 

915 sha1.update(utf8(key)) 

916 sha1.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11") # Magic value 

917 return native_str(base64.b64encode(sha1.digest())) 

918 

919 def _challenge_response(self, handler: WebSocketHandler) -> str: 

920 return WebSocketProtocol13.compute_accept_value( 

921 cast(str, handler.request.headers.get("Sec-Websocket-Key")) 

922 ) 

923 

924 async def _accept_connection(self, handler: WebSocketHandler) -> None: 

925 subprotocol_header = handler.request.headers.get("Sec-WebSocket-Protocol") 

926 if subprotocol_header: 

927 subprotocols = [s.strip() for s in subprotocol_header.split(",")] 

928 else: 

929 subprotocols = [] 

930 self.selected_subprotocol = handler.select_subprotocol(subprotocols) 

931 if self.selected_subprotocol: 

932 assert self.selected_subprotocol in subprotocols 

933 handler.set_header("Sec-WebSocket-Protocol", self.selected_subprotocol) 

934 

935 extensions = self._parse_extensions_header(handler.request.headers) 

936 for ext in extensions: 

937 if ext[0] == "permessage-deflate" and self._compression_options is not None: 

938 # TODO: negotiate parameters if compression_options 

939 # specifies limits. 

940 self._create_compressors("server", ext[1], self._compression_options) 

941 if ( 

942 "client_max_window_bits" in ext[1] 

943 and ext[1]["client_max_window_bits"] is None 

944 ): 

945 # Don't echo an offered client_max_window_bits 

946 # parameter with no value. 

947 del ext[1]["client_max_window_bits"] 

948 handler.set_header( 

949 "Sec-WebSocket-Extensions", 

950 httputil._encode_header("permessage-deflate", ext[1]), 

951 ) 

952 break 

953 

954 handler.clear_header("Content-Type") 

955 handler.set_status(101) 

956 handler.set_header("Upgrade", "websocket") 

957 handler.set_header("Connection", "Upgrade") 

958 handler.set_header("Sec-WebSocket-Accept", self._challenge_response(handler)) 

959 handler.finish() 

960 

961 self.stream = handler._detach_stream() 

962 

963 self.start_pinging() 

964 try: 

965 open_result = handler.open(*handler.open_args, **handler.open_kwargs) 

966 if open_result is not None: 

967 await open_result 

968 except Exception: 

969 handler.log_exception(*sys.exc_info()) 

970 self._abort() 

971 return 

972 

973 await self._receive_frame_loop() 

974 

975 def _parse_extensions_header( 

976 self, headers: httputil.HTTPHeaders 

977 ) -> List[Tuple[str, Dict[str, str]]]: 

978 extensions = headers.get("Sec-WebSocket-Extensions", "") 

979 if extensions: 

980 return [httputil._parse_header(e.strip()) for e in extensions.split(",")] 

981 return [] 

982 

983 def _process_server_headers( 

984 self, key: Union[str, bytes], headers: httputil.HTTPHeaders 

985 ) -> None: 

986 """Process the headers sent by the server to this client connection. 

987 

988 'key' is the websocket handshake challenge/response key. 

989 """ 

990 assert headers["Upgrade"].lower() == "websocket" 

991 assert headers["Connection"].lower() == "upgrade" 

992 accept = self.compute_accept_value(key) 

993 assert headers["Sec-Websocket-Accept"] == accept 

994 

995 extensions = self._parse_extensions_header(headers) 

996 for ext in extensions: 

997 if ext[0] == "permessage-deflate" and self._compression_options is not None: 

998 self._create_compressors("client", ext[1]) 

999 else: 

1000 raise ValueError("unsupported extension %r", ext) 

1001 

1002 self.selected_subprotocol = headers.get("Sec-WebSocket-Protocol", None) 

1003 

1004 def _get_compressor_options( 

1005 self, 

1006 side: str, 

1007 agreed_parameters: Dict[str, Any], 

1008 compression_options: Optional[Dict[str, Any]] = None, 

1009 ) -> Dict[str, Any]: 

1010 """Converts a websocket agreed_parameters set to keyword arguments 

1011 for our compressor objects. 

1012 """ 

1013 options = dict( 

1014 persistent=(side + "_no_context_takeover") not in agreed_parameters 

1015 ) # type: Dict[str, Any] 

1016 wbits_header = agreed_parameters.get(side + "_max_window_bits", None) 

1017 if wbits_header is None: 

1018 options["max_wbits"] = zlib.MAX_WBITS 

1019 else: 

1020 options["max_wbits"] = int(wbits_header) 

1021 options["compression_options"] = compression_options 

1022 return options 

1023 

1024 def _create_compressors( 

1025 self, 

1026 side: str, 

1027 agreed_parameters: Dict[str, Any], 

1028 compression_options: Optional[Dict[str, Any]] = None, 

1029 ) -> None: 

1030 # TODO: handle invalid parameters gracefully 

1031 allowed_keys = { 

1032 "server_no_context_takeover", 

1033 "client_no_context_takeover", 

1034 "server_max_window_bits", 

1035 "client_max_window_bits", 

1036 } 

1037 for key in agreed_parameters: 

1038 if key not in allowed_keys: 

1039 raise ValueError("unsupported compression parameter %r" % key) 

1040 other_side = "client" if (side == "server") else "server" 

1041 self._compressor = _PerMessageDeflateCompressor( 

1042 **self._get_compressor_options(side, agreed_parameters, compression_options) 

1043 ) 

1044 self._decompressor = _PerMessageDeflateDecompressor( 

1045 max_message_size=self.params.max_message_size, 

1046 **self._get_compressor_options( 

1047 other_side, agreed_parameters, compression_options 

1048 ), 

1049 ) 

1050 

1051 def _write_frame( 

1052 self, fin: bool, opcode: int, data: bytes, flags: int = 0 

1053 ) -> "Future[None]": 

1054 data_len = len(data) 

1055 if opcode & 0x8: 

1056 # All control frames MUST have a payload length of 125 

1057 # bytes or less and MUST NOT be fragmented. 

1058 if not fin: 

1059 raise ValueError("control frames may not be fragmented") 

1060 if data_len > 125: 

1061 raise ValueError("control frame payloads may not exceed 125 bytes") 

1062 if fin: 

1063 finbit = self.FIN 

1064 else: 

1065 finbit = 0 

1066 frame = struct.pack("B", finbit | opcode | flags) 

1067 if self.mask_outgoing: 

1068 mask_bit = 0x80 

1069 else: 

1070 mask_bit = 0 

1071 if data_len < 126: 

1072 frame += struct.pack("B", data_len | mask_bit) 

1073 elif data_len <= 0xFFFF: 

1074 frame += struct.pack("!BH", 126 | mask_bit, data_len) 

1075 else: 

1076 frame += struct.pack("!BQ", 127 | mask_bit, data_len) 

1077 if self.mask_outgoing: 

1078 mask = os.urandom(4) 

1079 data = mask + _websocket_mask(mask, data) 

1080 frame += data 

1081 self._wire_bytes_out += len(frame) 

1082 return self.stream.write(frame) 

1083 

1084 def write_message( 

1085 self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False 

1086 ) -> "Future[None]": 

1087 """Sends the given message to the client of this Web Socket.""" 

1088 if binary: 

1089 opcode = 0x2 

1090 else: 

1091 opcode = 0x1 

1092 if isinstance(message, dict): 

1093 message = tornado.escape.json_encode(message) 

1094 message = tornado.escape.utf8(message) 

1095 assert isinstance(message, bytes) 

1096 self._message_bytes_out += len(message) 

1097 flags = 0 

1098 if self._compressor: 

1099 message = self._compressor.compress(message) 

1100 flags |= self.RSV1 

1101 # For historical reasons, write methods in Tornado operate in a semi-synchronous 

1102 # mode in which awaiting the Future they return is optional (But errors can 

1103 # still be raised). This requires us to go through an awkward dance here 

1104 # to transform the errors that may be returned while presenting the same 

1105 # semi-synchronous interface. 

1106 try: 

1107 fut = self._write_frame(True, opcode, message, flags=flags) 

1108 except StreamClosedError: 

1109 raise WebSocketClosedError() 

1110 

1111 async def wrapper() -> None: 

1112 try: 

1113 await fut 

1114 except StreamClosedError: 

1115 raise WebSocketClosedError() 

1116 

1117 return asyncio.ensure_future(wrapper()) 

1118 

1119 def write_ping(self, data: bytes) -> None: 

1120 """Send ping frame.""" 

1121 assert isinstance(data, bytes) 

1122 self._write_frame(True, 0x9, data) 

1123 

1124 async def _receive_frame_loop(self) -> None: 

1125 try: 

1126 while not self.client_terminated: 

1127 await self._receive_frame() 

1128 except StreamClosedError: 

1129 self._abort() 

1130 self.handler.on_ws_connection_close(self.close_code, self.close_reason) 

1131 

1132 async def _read_bytes(self, n: int) -> bytes: 

1133 data = await self.stream.read_bytes(n) 

1134 self._wire_bytes_in += n 

1135 return data 

1136 

1137 async def _receive_frame(self) -> None: 

1138 # Read the frame header. 

1139 data = await self._read_bytes(2) 

1140 header, mask_payloadlen = struct.unpack("BB", data) 

1141 is_final_frame = header & self.FIN 

1142 reserved_bits = header & self.RSV_MASK 

1143 opcode = header & self.OPCODE_MASK 

1144 opcode_is_control = opcode & 0x8 

1145 if self._decompressor is not None and opcode != 0: 

1146 # Compression flag is present in the first frame's header, 

1147 # but we can't decompress until we have all the frames of 

1148 # the message. 

1149 self._frame_compressed = bool(reserved_bits & self.RSV1) 

1150 reserved_bits &= ~self.RSV1 

1151 if reserved_bits: 

1152 # client is using as-yet-undefined extensions; abort 

1153 self._abort() 

1154 return 

1155 is_masked = bool(mask_payloadlen & 0x80) 

1156 payloadlen = mask_payloadlen & 0x7F 

1157 

1158 # Parse and validate the length. 

1159 if opcode_is_control and payloadlen >= 126: 

1160 # control frames must have payload < 126 

1161 self._abort() 

1162 return 

1163 if payloadlen < 126: 

1164 self._frame_length = payloadlen 

1165 elif payloadlen == 126: 

1166 data = await self._read_bytes(2) 

1167 payloadlen = struct.unpack("!H", data)[0] 

1168 elif payloadlen == 127: 

1169 data = await self._read_bytes(8) 

1170 payloadlen = struct.unpack("!Q", data)[0] 

1171 new_len = payloadlen 

1172 if self._fragmented_message_buffer is not None: 

1173 new_len += len(self._fragmented_message_buffer) 

1174 if new_len > self.params.max_message_size: 

1175 self.close(1009, "message too big") 

1176 self._abort() 

1177 return 

1178 

1179 # Read the payload, unmasking if necessary. 

1180 if is_masked: 

1181 self._frame_mask = await self._read_bytes(4) 

1182 data = await self._read_bytes(payloadlen) 

1183 if is_masked: 

1184 assert self._frame_mask is not None 

1185 data = _websocket_mask(self._frame_mask, data) 

1186 

1187 # Decide what to do with this frame. 

1188 if opcode_is_control: 

1189 # control frames may be interleaved with a series of fragmented 

1190 # data frames, so control frames must not interact with 

1191 # self._fragmented_* 

1192 if not is_final_frame: 

1193 # control frames must not be fragmented 

1194 self._abort() 

1195 return 

1196 elif opcode == 0: # continuation frame 

1197 if self._fragmented_message_buffer is None: 

1198 # nothing to continue 

1199 self._abort() 

1200 return 

1201 self._fragmented_message_buffer.extend(data) 

1202 if is_final_frame: 

1203 opcode = self._fragmented_message_opcode 

1204 data = bytes(self._fragmented_message_buffer) 

1205 self._fragmented_message_buffer = None 

1206 else: # start of new data message 

1207 if self._fragmented_message_buffer is not None: 

1208 # can't start new message until the old one is finished 

1209 self._abort() 

1210 return 

1211 if not is_final_frame: 

1212 self._fragmented_message_opcode = opcode 

1213 self._fragmented_message_buffer = bytearray(data) 

1214 

1215 if is_final_frame: 

1216 handled_future = self._handle_message(opcode, data) 

1217 if handled_future is not None: 

1218 await handled_future 

1219 

1220 def _handle_message(self, opcode: int, data: bytes) -> "Optional[Future[None]]": 

1221 """Execute on_message, returning its Future if it is a coroutine.""" 

1222 if self.client_terminated: 

1223 return None 

1224 

1225 if self._frame_compressed: 

1226 assert self._decompressor is not None 

1227 try: 

1228 data = self._decompressor.decompress(data) 

1229 except _DecompressTooLargeError: 

1230 self.close(1009, "message too big after decompression") 

1231 self._abort() 

1232 return None 

1233 

1234 if opcode == 0x1: 

1235 # UTF-8 data 

1236 self._message_bytes_in += len(data) 

1237 try: 

1238 decoded = data.decode("utf-8") 

1239 except UnicodeDecodeError: 

1240 self._abort() 

1241 return None 

1242 return self._run_callback(self.handler.on_message, decoded) 

1243 elif opcode == 0x2: 

1244 # Binary data 

1245 self._message_bytes_in += len(data) 

1246 return self._run_callback(self.handler.on_message, data) 

1247 elif opcode == 0x8: 

1248 # Close 

1249 self.client_terminated = True 

1250 if len(data) >= 2: 

1251 self.close_code = struct.unpack(">H", data[:2])[0] 

1252 if len(data) > 2: 

1253 self.close_reason = to_unicode(data[2:]) 

1254 # Echo the received close code, if any (RFC 6455 section 5.5.1). 

1255 self.close(self.close_code) 

1256 elif opcode == 0x9: 

1257 # Ping 

1258 try: 

1259 self._write_frame(True, 0xA, data) 

1260 except StreamClosedError: 

1261 self._abort() 

1262 self._run_callback(self.handler.on_ping, data) 

1263 elif opcode == 0xA: 

1264 # Pong 

1265 self._received_pong = True 

1266 return self._run_callback(self.handler.on_pong, data) 

1267 else: 

1268 self._abort() 

1269 return None 

1270 

1271 def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None: 

1272 """Closes the WebSocket connection.""" 

1273 if not self.server_terminated: 

1274 if not self.stream.closed(): 

1275 if code is None and reason is not None: 

1276 code = 1000 # "normal closure" status code 

1277 if code is None: 

1278 close_data = b"" 

1279 else: 

1280 close_data = struct.pack(">H", code) 

1281 if reason is not None: 

1282 close_data += utf8(reason) 

1283 try: 

1284 self._write_frame(True, 0x8, close_data) 

1285 except StreamClosedError: 

1286 self._abort() 

1287 self.server_terminated = True 

1288 if self.client_terminated: 

1289 if self._waiting is not None: 

1290 self.stream.io_loop.remove_timeout(self._waiting) 

1291 self._waiting = None 

1292 self.stream.close() 

1293 elif self._waiting is None: 

1294 # Give the client a few seconds to complete a clean shutdown, 

1295 # otherwise just close the connection. 

1296 self._waiting = self.stream.io_loop.add_timeout( 

1297 self.stream.io_loop.time() + 5, self._abort 

1298 ) 

1299 if self._ping_coroutine: 

1300 self._ping_coroutine.cancel() 

1301 self._ping_coroutine = None 

1302 

1303 def is_closing(self) -> bool: 

1304 """Return ``True`` if this connection is closing. 

1305 

1306 The connection is considered closing if either side has 

1307 initiated its closing handshake or if the stream has been 

1308 shut down uncleanly. 

1309 """ 

1310 return self.stream.closed() or self.client_terminated or self.server_terminated 

1311 

1312 def set_nodelay(self, x: bool) -> None: 

1313 self.stream.set_nodelay(x) 

1314 

1315 @property 

1316 def ping_interval(self) -> float: 

1317 interval = self.params.ping_interval 

1318 if interval is not None: 

1319 return interval 

1320 return 0 

1321 

1322 @property 

1323 def ping_timeout(self) -> float: 

1324 timeout = self.params.ping_timeout 

1325 if timeout is not None: 

1326 if self.ping_interval and timeout > self.ping_interval: 

1327 de_dupe_gen_log( 

1328 # Note: using de_dupe_gen_log to prevent this message from 

1329 # being duplicated for each connection 

1330 logging.WARNING, 

1331 f"The websocket_ping_timeout ({timeout}) cannot be longer" 

1332 f" than the websocket_ping_interval ({self.ping_interval})." 

1333 f"\nSetting websocket_ping_timeout={self.ping_interval}", 

1334 ) 

1335 return self.ping_interval 

1336 return timeout 

1337 return self.ping_interval 

1338 

1339 def start_pinging(self) -> None: 

1340 """Start sending periodic pings to keep the connection alive""" 

1341 if ( 

1342 # prevent multiple ping coroutines being run in parallel 

1343 not self._ping_coroutine 

1344 # only run the ping coroutine if a ping interval is configured 

1345 and self.ping_interval > 0 

1346 ): 

1347 self._ping_coroutine = asyncio.create_task(self.periodic_ping()) 

1348 

1349 async def periodic_ping(self) -> None: 

1350 """Send a ping and wait for a pong if ping_timeout is configured. 

1351 

1352 Called periodically if the websocket_ping_interval is set and non-zero. 

1353 """ 

1354 interval = self.ping_interval 

1355 timeout = self.ping_timeout 

1356 

1357 await asyncio.sleep(interval) 

1358 

1359 while True: 

1360 # send a ping 

1361 self._received_pong = False 

1362 ping_time = IOLoop.current().time() 

1363 self.write_ping(b"") 

1364 

1365 # wait until the ping timeout 

1366 await asyncio.sleep(timeout) 

1367 

1368 # make sure we received a pong within the timeout 

1369 if timeout > 0 and not self._received_pong: 

1370 self.close(reason="ping timed out") 

1371 return 

1372 

1373 # wait until the next scheduled ping 

1374 await asyncio.sleep(IOLoop.current().time() - ping_time + interval) 

1375 

1376 

1377class WebSocketClientConnection(simple_httpclient._HTTPConnection): 

1378 """WebSocket client connection. 

1379 

1380 This class should not be instantiated directly; use the 

1381 `websocket_connect` function instead. 

1382 """ 

1383 

1384 protocol = None # type: WebSocketProtocol 

1385 

1386 def __init__( 

1387 self, 

1388 request: httpclient.HTTPRequest, 

1389 on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]] = None, 

1390 compression_options: Optional[Dict[str, Any]] = None, 

1391 ping_interval: Optional[float] = None, 

1392 ping_timeout: Optional[float] = None, 

1393 max_message_size: int = _default_max_message_size, 

1394 subprotocols: Optional[List[str]] = None, 

1395 resolver: Optional[Resolver] = None, 

1396 ) -> None: 

1397 self.connect_future = Future() # type: Future[WebSocketClientConnection] 

1398 self.read_queue = Queue(1) # type: Queue[Union[None, str, bytes]] 

1399 self.key = base64.b64encode(os.urandom(16)) 

1400 self._on_message_callback = on_message_callback 

1401 self.close_code = None # type: Optional[int] 

1402 self.close_reason = None # type: Optional[str] 

1403 self.params = _WebSocketParams( 

1404 ping_interval=ping_interval, 

1405 ping_timeout=ping_timeout, 

1406 max_message_size=max_message_size, 

1407 compression_options=compression_options, 

1408 ) 

1409 

1410 scheme, sep, rest = request.url.partition(":") 

1411 scheme = {"ws": "http", "wss": "https"}[scheme] 

1412 request.url = scheme + sep + rest 

1413 request.headers.update( 

1414 { 

1415 "Upgrade": "websocket", 

1416 "Connection": "Upgrade", 

1417 "Sec-WebSocket-Key": to_unicode(self.key), 

1418 "Sec-WebSocket-Version": "13", 

1419 } 

1420 ) 

1421 if subprotocols is not None: 

1422 request.headers["Sec-WebSocket-Protocol"] = ",".join(subprotocols) 

1423 if compression_options is not None: 

1424 # Always offer to let the server set our max_wbits (and even though 

1425 # we don't offer it, we will accept a client_no_context_takeover 

1426 # from the server). 

1427 # TODO: set server parameters for deflate extension 

1428 # if requested in self.compression_options. 

1429 request.headers["Sec-WebSocket-Extensions"] = ( 

1430 "permessage-deflate; client_max_window_bits" 

1431 ) 

1432 

1433 # Websocket connection is currently unable to follow redirects 

1434 request.follow_redirects = False 

1435 

1436 self.tcp_client = TCPClient(resolver=resolver) 

1437 super().__init__( 

1438 None, 

1439 request, 

1440 lambda: None, 

1441 self._on_http_response, 

1442 104857600, 

1443 self.tcp_client, 

1444 65536, 

1445 104857600, 

1446 ) 

1447 

1448 def __del__(self) -> None: 

1449 if self.protocol is not None: 

1450 # Unclosed client connections can sometimes log "task was destroyed but 

1451 # was pending" warnings if shutdown strikes at the wrong time (such as 

1452 # while a ping is being processed due to ping_interval). Log our own 

1453 # warning to make it a little more deterministic (although it's still 

1454 # dependent on GC timing). 

1455 warnings.warn("Unclosed WebSocketClientConnection", ResourceWarning) 

1456 

1457 def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None: 

1458 """Closes the websocket connection. 

1459 

1460 ``code`` and ``reason`` are documented under 

1461 `WebSocketHandler.close`. 

1462 

1463 .. versionadded:: 3.2 

1464 

1465 .. versionchanged:: 4.0 

1466 

1467 Added the ``code`` and ``reason`` arguments. 

1468 """ 

1469 if self.protocol is not None: 

1470 self.protocol.close(code, reason) 

1471 self.protocol = None # type: ignore 

1472 

1473 def on_connection_close(self) -> None: 

1474 if not self.connect_future.done(): 

1475 self.connect_future.set_exception(StreamClosedError()) 

1476 self._on_message(None) 

1477 self.tcp_client.close() 

1478 super().on_connection_close() 

1479 

1480 def on_ws_connection_close( 

1481 self, close_code: Optional[int] = None, close_reason: Optional[str] = None 

1482 ) -> None: 

1483 self.close_code = close_code 

1484 self.close_reason = close_reason 

1485 self.on_connection_close() 

1486 

1487 def _on_http_response(self, response: httpclient.HTTPResponse) -> None: 

1488 if not self.connect_future.done(): 

1489 if response.error: 

1490 self.connect_future.set_exception(response.error) 

1491 else: 

1492 self.connect_future.set_exception( 

1493 WebSocketError("Non-websocket response") 

1494 ) 

1495 

1496 async def headers_received( 

1497 self, 

1498 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], 

1499 headers: httputil.HTTPHeaders, 

1500 ) -> None: 

1501 assert isinstance(start_line, httputil.ResponseStartLine) 

1502 if start_line.code != 101: 

1503 await super().headers_received(start_line, headers) 

1504 return 

1505 

1506 if self._timeout is not None: 

1507 self.io_loop.remove_timeout(self._timeout) 

1508 self._timeout = None 

1509 

1510 self.headers = headers 

1511 self.protocol = self.get_websocket_protocol() 

1512 self.protocol._process_server_headers(self.key, self.headers) 

1513 self.protocol.stream = self.connection.detach() 

1514 

1515 IOLoop.current().add_callback(self.protocol._receive_frame_loop) 

1516 self.protocol.start_pinging() 

1517 

1518 # Once we've taken over the connection, clear the final callback 

1519 # we set on the http request. This deactivates the error handling 

1520 # in simple_httpclient that would otherwise interfere with our 

1521 # ability to see exceptions. 

1522 self.final_callback = None # type: ignore 

1523 

1524 future_set_result_unless_cancelled(self.connect_future, self) 

1525 

1526 def write_message( 

1527 self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False 

1528 ) -> "Future[None]": 

1529 """Sends a message to the WebSocket server. 

1530 

1531 If the stream is closed, raises `WebSocketClosedError`. 

1532 Returns a `.Future` which can be used for flow control. 

1533 

1534 .. versionchanged:: 5.0 

1535 Exception raised on a closed stream changed from `.StreamClosedError` 

1536 to `WebSocketClosedError`. 

1537 """ 

1538 if self.protocol is None: 

1539 raise WebSocketClosedError("Client connection has been closed") 

1540 return self.protocol.write_message(message, binary=binary) 

1541 

1542 def read_message( 

1543 self, 

1544 callback: Optional[Callable[["Future[Union[None, str, bytes]]"], None]] = None, 

1545 ) -> Awaitable[Union[None, str, bytes]]: 

1546 """Reads a message from the WebSocket server. 

1547 

1548 If on_message_callback was specified at WebSocket 

1549 initialization, this function will never return messages 

1550 

1551 Returns a future whose result is the message, or None 

1552 if the connection is closed. If a callback argument 

1553 is given it will be called with the future when it is 

1554 ready. 

1555 """ 

1556 

1557 awaitable = self.read_queue.get() 

1558 if callback is not None: 

1559 self.io_loop.add_future(asyncio.ensure_future(awaitable), callback) 

1560 return awaitable 

1561 

1562 def on_message(self, message: Union[str, bytes]) -> Optional[Awaitable[None]]: 

1563 return self._on_message(message) 

1564 

1565 def _on_message( 

1566 self, message: Union[None, str, bytes] 

1567 ) -> Optional[Awaitable[None]]: 

1568 if self._on_message_callback: 

1569 self._on_message_callback(message) 

1570 return None 

1571 else: 

1572 return self.read_queue.put(message) 

1573 

1574 def ping(self, data: bytes = b"") -> None: 

1575 """Send ping frame to the remote end. 

1576 

1577 The data argument allows a small amount of data (up to 125 

1578 bytes) to be sent as a part of the ping message. Note that not 

1579 all websocket implementations expose this data to 

1580 applications. 

1581 

1582 Consider using the ``ping_interval`` argument to 

1583 `websocket_connect` instead of sending pings manually. 

1584 

1585 .. versionadded:: 5.1 

1586 

1587 """ 

1588 data = utf8(data) 

1589 if self.protocol is None: 

1590 raise WebSocketClosedError() 

1591 self.protocol.write_ping(data) 

1592 

1593 def on_pong(self, data: bytes) -> None: 

1594 pass 

1595 

1596 def on_ping(self, data: bytes) -> None: 

1597 pass 

1598 

1599 def get_websocket_protocol(self) -> WebSocketProtocol: 

1600 return WebSocketProtocol13(self, mask_outgoing=True, params=self.params) 

1601 

1602 @property 

1603 def selected_subprotocol(self) -> Optional[str]: 

1604 """The subprotocol selected by the server. 

1605 

1606 .. versionadded:: 5.1 

1607 """ 

1608 return self.protocol.selected_subprotocol 

1609 

1610 def log_exception( 

1611 self, 

1612 typ: "Optional[Type[BaseException]]", 

1613 value: Optional[BaseException], 

1614 tb: Optional[TracebackType], 

1615 ) -> None: 

1616 assert typ is not None 

1617 assert value is not None 

1618 app_log.error("Uncaught exception %s", value, exc_info=(typ, value, tb)) 

1619 

1620 

1621def websocket_connect( 

1622 url: Union[str, httpclient.HTTPRequest], 

1623 callback: Optional[Callable[["Future[WebSocketClientConnection]"], None]] = None, 

1624 connect_timeout: Optional[float] = None, 

1625 on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]] = None, 

1626 compression_options: Optional[Dict[str, Any]] = None, 

1627 ping_interval: Optional[float] = None, 

1628 ping_timeout: Optional[float] = None, 

1629 max_message_size: int = _default_max_message_size, 

1630 subprotocols: Optional[List[str]] = None, 

1631 resolver: Optional[Resolver] = None, 

1632) -> "Awaitable[WebSocketClientConnection]": 

1633 """Client-side websocket support. 

1634 

1635 Takes a url and returns a Future whose result is a 

1636 `WebSocketClientConnection`. 

1637 

1638 ``compression_options`` is interpreted in the same way as the 

1639 return value of `.WebSocketHandler.get_compression_options`. 

1640 

1641 The connection supports two styles of operation. In the coroutine 

1642 style, the application typically calls 

1643 `~.WebSocketClientConnection.read_message` in a loop:: 

1644 

1645 conn = yield websocket_connect(url) 

1646 while True: 

1647 msg = yield conn.read_message() 

1648 if msg is None: break 

1649 # Do something with msg 

1650 

1651 In the callback style, pass an ``on_message_callback`` to 

1652 ``websocket_connect``. In both styles, a message of ``None`` 

1653 indicates that the connection has been closed. 

1654 

1655 ``subprotocols`` may be a list of strings specifying proposed 

1656 subprotocols. The selected protocol may be found on the 

1657 ``selected_subprotocol`` attribute of the connection object 

1658 when the connection is complete. 

1659 

1660 .. versionchanged:: 3.2 

1661 Also accepts ``HTTPRequest`` objects in place of urls. 

1662 

1663 .. versionchanged:: 4.1 

1664 Added ``compression_options`` and ``on_message_callback``. 

1665 

1666 .. versionchanged:: 4.5 

1667 Added the ``ping_interval``, ``ping_timeout``, and ``max_message_size`` 

1668 arguments, which have the same meaning as in `WebSocketHandler`. 

1669 

1670 .. versionchanged:: 5.0 

1671 The ``io_loop`` argument (deprecated since version 4.1) has been removed. 

1672 

1673 .. versionchanged:: 5.1 

1674 Added the ``subprotocols`` argument. 

1675 

1676 .. versionchanged:: 6.3 

1677 Added the ``resolver`` argument. 

1678 

1679 .. deprecated:: 6.5 

1680 The ``callback`` argument is deprecated and will be removed in Tornado 7.0. 

1681 Use the returned Future instead. Note that ``on_message_callback`` is not 

1682 deprecated and may still be used. 

1683 """ 

1684 if isinstance(url, httpclient.HTTPRequest): 

1685 assert connect_timeout is None 

1686 request = url 

1687 # Copy and convert the headers dict/object (see comments in 

1688 # AsyncHTTPClient.fetch) 

1689 request.headers = httputil.HTTPHeaders(request.headers) 

1690 else: 

1691 request = httpclient.HTTPRequest(url, connect_timeout=connect_timeout) 

1692 request = cast( 

1693 httpclient.HTTPRequest, 

1694 httpclient._RequestProxy(request, httpclient.HTTPRequest._DEFAULTS), 

1695 ) 

1696 conn = WebSocketClientConnection( 

1697 request, 

1698 on_message_callback=on_message_callback, 

1699 compression_options=compression_options, 

1700 ping_interval=ping_interval, 

1701 ping_timeout=ping_timeout, 

1702 max_message_size=max_message_size, 

1703 subprotocols=subprotocols, 

1704 resolver=resolver, 

1705 ) 

1706 if callback is not None: 

1707 warnings.warn( 

1708 "The callback argument to websocket_connect is deprecated. " 

1709 "Use the returned Future instead.", 

1710 DeprecationWarning, 

1711 stacklevel=2, 

1712 ) 

1713 IOLoop.current().add_future(conn.connect_future, callback) 

1714 return conn.connect_future