Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/websocket.py: 21%

735 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-04-10 06:20 +0000

1"""Implementation of the WebSocket protocol. 

2 

3`WebSockets <http://dev.w3.org/html5/websockets/>`_ allow for bidirectional 

4communication between the browser and server. 

5 

6WebSockets are supported in the current versions of all major browsers, 

7although older versions that do not support WebSockets are still in use 

8(refer to http://caniuse.com/websockets for details). 

9 

10This module implements the final version of the WebSocket protocol as 

11defined in `RFC 6455 <http://tools.ietf.org/html/rfc6455>`_. Certain 

12browser versions (notably Safari 5.x) implemented an earlier draft of 

13the protocol (known as "draft 76") and are not compatible with this module. 

14 

15.. versionchanged:: 4.0 

16 Removed support for the draft 76 protocol version. 

17""" 

18 

19import abc 

20import asyncio 

21import base64 

22import hashlib 

23import os 

24import sys 

25import struct 

26import tornado.escape 

27import tornado.web 

28from urllib.parse import urlparse 

29import zlib 

30 

31from tornado.concurrent import Future, future_set_result_unless_cancelled 

32from tornado.escape import utf8, native_str, to_unicode 

33from tornado import gen, httpclient, httputil 

34from tornado.ioloop import IOLoop, PeriodicCallback 

35from tornado.iostream import StreamClosedError, IOStream 

36from tornado.log import gen_log, app_log 

37from tornado import simple_httpclient 

38from tornado.queues import Queue 

39from tornado.tcpclient import TCPClient 

40from tornado.util import _websocket_mask 

41 

42from typing import ( 

43 TYPE_CHECKING, 

44 cast, 

45 Any, 

46 Optional, 

47 Dict, 

48 Union, 

49 List, 

50 Awaitable, 

51 Callable, 

52 Tuple, 

53 Type, 

54) 

55from types import TracebackType 

56 

57if TYPE_CHECKING: 

58 from typing_extensions import Protocol 

59 

60 # The zlib compressor types aren't actually exposed anywhere 

61 # publicly, so declare protocols for the portions we use. 

62 class _Compressor(Protocol): 

63 def compress(self, data: bytes) -> bytes: 

64 pass 

65 

66 def flush(self, mode: int) -> bytes: 

67 pass 

68 

69 class _Decompressor(Protocol): 

70 unconsumed_tail = b"" # type: bytes 

71 

72 def decompress(self, data: bytes, max_length: int) -> bytes: 

73 pass 

74 

75 class _WebSocketDelegate(Protocol): 

76 # The common base interface implemented by WebSocketHandler on 

77 # the server side and WebSocketClientConnection on the client 

78 # side. 

79 def on_ws_connection_close( 

80 self, close_code: Optional[int] = None, close_reason: Optional[str] = None 

81 ) -> None: 

82 pass 

83 

84 def on_message(self, message: Union[str, bytes]) -> Optional["Awaitable[None]"]: 

85 pass 

86 

87 def on_ping(self, data: bytes) -> None: 

88 pass 

89 

90 def on_pong(self, data: bytes) -> None: 

91 pass 

92 

93 def log_exception( 

94 self, 

95 typ: Optional[Type[BaseException]], 

96 value: Optional[BaseException], 

97 tb: Optional[TracebackType], 

98 ) -> None: 

99 pass 

100 

101 

102_default_max_message_size = 10 * 1024 * 1024 

103 

104 

105class WebSocketError(Exception): 

106 pass 

107 

108 

109class WebSocketClosedError(WebSocketError): 

110 """Raised by operations on a closed connection. 

111 

112 .. versionadded:: 3.2 

113 """ 

114 

115 pass 

116 

117 

118class _DecompressTooLargeError(Exception): 

119 pass 

120 

121 

122class _WebSocketParams(object): 

123 def __init__( 

124 self, 

125 ping_interval: Optional[float] = None, 

126 ping_timeout: Optional[float] = None, 

127 max_message_size: int = _default_max_message_size, 

128 compression_options: Optional[Dict[str, Any]] = None, 

129 ) -> None: 

130 self.ping_interval = ping_interval 

131 self.ping_timeout = ping_timeout 

132 self.max_message_size = max_message_size 

133 self.compression_options = compression_options 

134 

135 

136class WebSocketHandler(tornado.web.RequestHandler): 

137 """Subclass this class to create a basic WebSocket handler. 

138 

139 Override `on_message` to handle incoming messages, and use 

140 `write_message` to send messages to the client. You can also 

141 override `open` and `on_close` to handle opened and closed 

142 connections. 

143 

144 Custom upgrade response headers can be sent by overriding 

145 `~tornado.web.RequestHandler.set_default_headers` or 

146 `~tornado.web.RequestHandler.prepare`. 

147 

148 See http://dev.w3.org/html5/websockets/ for details on the 

149 JavaScript interface. The protocol is specified at 

150 http://tools.ietf.org/html/rfc6455. 

151 

152 Here is an example WebSocket handler that echos back all received messages 

153 back to the client: 

154 

155 .. testcode:: 

156 

157 class EchoWebSocket(tornado.websocket.WebSocketHandler): 

158 def open(self): 

159 print("WebSocket opened") 

160 

161 def on_message(self, message): 

162 self.write_message(u"You said: " + message) 

163 

164 def on_close(self): 

165 print("WebSocket closed") 

166 

167 .. testoutput:: 

168 :hide: 

169 

170 WebSockets are not standard HTTP connections. The "handshake" is 

171 HTTP, but after the handshake, the protocol is 

172 message-based. Consequently, most of the Tornado HTTP facilities 

173 are not available in handlers of this type. The only communication 

174 methods available to you are `write_message()`, `ping()`, and 

175 `close()`. Likewise, your request handler class should implement 

176 `open()` method rather than ``get()`` or ``post()``. 

177 

178 If you map the handler above to ``/websocket`` in your application, you can 

179 invoke it in JavaScript with:: 

180 

181 var ws = new WebSocket("ws://localhost:8888/websocket"); 

182 ws.onopen = function() { 

183 ws.send("Hello, world"); 

184 }; 

185 ws.onmessage = function (evt) { 

186 alert(evt.data); 

187 }; 

188 

189 This script pops up an alert box that says "You said: Hello, world". 

190 

191 Web browsers allow any site to open a websocket connection to any other, 

192 instead of using the same-origin policy that governs other network 

193 access from JavaScript. This can be surprising and is a potential 

194 security hole, so since Tornado 4.0 `WebSocketHandler` requires 

195 applications that wish to receive cross-origin websockets to opt in 

196 by overriding the `~WebSocketHandler.check_origin` method (see that 

197 method's docs for details). Failure to do so is the most likely 

198 cause of 403 errors when making a websocket connection. 

199 

200 When using a secure websocket connection (``wss://``) with a self-signed 

201 certificate, the connection from a browser may fail because it wants 

202 to show the "accept this certificate" dialog but has nowhere to show it. 

203 You must first visit a regular HTML page using the same certificate 

204 to accept it before the websocket connection will succeed. 

205 

206 If the application setting ``websocket_ping_interval`` has a non-zero 

207 value, a ping will be sent periodically, and the connection will be 

208 closed if a response is not received before the ``websocket_ping_timeout``. 

209 

210 Messages larger than the ``websocket_max_message_size`` application setting 

211 (default 10MiB) will not be accepted. 

212 

213 .. versionchanged:: 4.5 

214 Added ``websocket_ping_interval``, ``websocket_ping_timeout``, and 

215 ``websocket_max_message_size``. 

216 """ 

217 

218 def __init__( 

219 self, 

220 application: tornado.web.Application, 

221 request: httputil.HTTPServerRequest, 

222 **kwargs: Any 

223 ) -> None: 

224 super().__init__(application, request, **kwargs) 

225 self.ws_connection = None # type: Optional[WebSocketProtocol] 

226 self.close_code = None # type: Optional[int] 

227 self.close_reason = None # type: Optional[str] 

228 self._on_close_called = False 

229 

230 async def get(self, *args: Any, **kwargs: Any) -> None: 

231 self.open_args = args 

232 self.open_kwargs = kwargs 

233 

234 # Upgrade header should be present and should be equal to WebSocket 

235 if self.request.headers.get("Upgrade", "").lower() != "websocket": 

236 self.set_status(400) 

237 log_msg = 'Can "Upgrade" only to "WebSocket".' 

238 self.finish(log_msg) 

239 gen_log.debug(log_msg) 

240 return 

241 

242 # Connection header should be upgrade. 

243 # Some proxy servers/load balancers 

244 # might mess with it. 

245 headers = self.request.headers 

246 connection = map( 

247 lambda s: s.strip().lower(), headers.get("Connection", "").split(",") 

248 ) 

249 if "upgrade" not in connection: 

250 self.set_status(400) 

251 log_msg = '"Connection" must be "Upgrade".' 

252 self.finish(log_msg) 

253 gen_log.debug(log_msg) 

254 return 

255 

256 # Handle WebSocket Origin naming convention differences 

257 # The difference between version 8 and 13 is that in 8 the 

258 # client sends a "Sec-Websocket-Origin" header and in 13 it's 

259 # simply "Origin". 

260 if "Origin" in self.request.headers: 

261 origin = self.request.headers.get("Origin") 

262 else: 

263 origin = self.request.headers.get("Sec-Websocket-Origin", None) 

264 

265 # If there was an origin header, check to make sure it matches 

266 # according to check_origin. When the origin is None, we assume it 

267 # did not come from a browser and that it can be passed on. 

268 if origin is not None and not self.check_origin(origin): 

269 self.set_status(403) 

270 log_msg = "Cross origin websockets not allowed" 

271 self.finish(log_msg) 

272 gen_log.debug(log_msg) 

273 return 

274 

275 self.ws_connection = self.get_websocket_protocol() 

276 if self.ws_connection: 

277 await self.ws_connection.accept_connection(self) 

278 else: 

279 self.set_status(426, "Upgrade Required") 

280 self.set_header("Sec-WebSocket-Version", "7, 8, 13") 

281 

282 @property 

283 def ping_interval(self) -> Optional[float]: 

284 """The interval for websocket keep-alive pings. 

285 

286 Set websocket_ping_interval = 0 to disable pings. 

287 """ 

288 return self.settings.get("websocket_ping_interval", None) 

289 

290 @property 

291 def ping_timeout(self) -> Optional[float]: 

292 """If no ping is received in this many seconds, 

293 close the websocket connection (VPNs, etc. can fail to cleanly close ws connections). 

294 Default is max of 3 pings or 30 seconds. 

295 """ 

296 return self.settings.get("websocket_ping_timeout", None) 

297 

298 @property 

299 def max_message_size(self) -> int: 

300 """Maximum allowed message size. 

301 

302 If the remote peer sends a message larger than this, the connection 

303 will be closed. 

304 

305 Default is 10MiB. 

306 """ 

307 return self.settings.get( 

308 "websocket_max_message_size", _default_max_message_size 

309 ) 

310 

311 def write_message( 

312 self, message: Union[bytes, str, Dict[str, Any]], binary: bool = False 

313 ) -> "Future[None]": 

314 """Sends the given message to the client of this Web Socket. 

315 

316 The message may be either a string or a dict (which will be 

317 encoded as json). If the ``binary`` argument is false, the 

318 message will be sent as utf8; in binary mode any byte string 

319 is allowed. 

320 

321 If the connection is already closed, raises `WebSocketClosedError`. 

322 Returns a `.Future` which can be used for flow control. 

323 

324 .. versionchanged:: 3.2 

325 `WebSocketClosedError` was added (previously a closed connection 

326 would raise an `AttributeError`) 

327 

328 .. versionchanged:: 4.3 

329 Returns a `.Future` which can be used for flow control. 

330 

331 .. versionchanged:: 5.0 

332 Consistently raises `WebSocketClosedError`. Previously could 

333 sometimes raise `.StreamClosedError`. 

334 """ 

335 if self.ws_connection is None or self.ws_connection.is_closing(): 

336 raise WebSocketClosedError() 

337 if isinstance(message, dict): 

338 message = tornado.escape.json_encode(message) 

339 return self.ws_connection.write_message(message, binary=binary) 

340 

341 def select_subprotocol(self, subprotocols: List[str]) -> Optional[str]: 

342 """Override to implement subprotocol negotiation. 

343 

344 ``subprotocols`` is a list of strings identifying the 

345 subprotocols proposed by the client. This method may be 

346 overridden to return one of those strings to select it, or 

347 ``None`` to not select a subprotocol. 

348 

349 Failure to select a subprotocol does not automatically abort 

350 the connection, although clients may close the connection if 

351 none of their proposed subprotocols was selected. 

352 

353 The list may be empty, in which case this method must return 

354 None. This method is always called exactly once even if no 

355 subprotocols were proposed so that the handler can be advised 

356 of this fact. 

357 

358 .. versionchanged:: 5.1 

359 

360 Previously, this method was called with a list containing 

361 an empty string instead of an empty list if no subprotocols 

362 were proposed by the client. 

363 """ 

364 return None 

365 

366 @property 

367 def selected_subprotocol(self) -> Optional[str]: 

368 """The subprotocol returned by `select_subprotocol`. 

369 

370 .. versionadded:: 5.1 

371 """ 

372 assert self.ws_connection is not None 

373 return self.ws_connection.selected_subprotocol 

374 

375 def get_compression_options(self) -> Optional[Dict[str, Any]]: 

376 """Override to return compression options for the connection. 

377 

378 If this method returns None (the default), compression will 

379 be disabled. If it returns a dict (even an empty one), it 

380 will be enabled. The contents of the dict may be used to 

381 control the following compression options: 

382 

383 ``compression_level`` specifies the compression level. 

384 

385 ``mem_level`` specifies the amount of memory used for the internal compression state. 

386 

387 These parameters are documented in details here: 

388 https://docs.python.org/3.6/library/zlib.html#zlib.compressobj 

389 

390 .. versionadded:: 4.1 

391 

392 .. versionchanged:: 4.5 

393 

394 Added ``compression_level`` and ``mem_level``. 

395 """ 

396 # TODO: Add wbits option. 

397 return None 

398 

399 def open(self, *args: str, **kwargs: str) -> Optional[Awaitable[None]]: 

400 """Invoked when a new WebSocket is opened. 

401 

402 The arguments to `open` are extracted from the `tornado.web.URLSpec` 

403 regular expression, just like the arguments to 

404 `tornado.web.RequestHandler.get`. 

405 

406 `open` may be a coroutine. `on_message` will not be called until 

407 `open` has returned. 

408 

409 .. versionchanged:: 5.1 

410 

411 ``open`` may be a coroutine. 

412 """ 

413 pass 

414 

415 def on_message(self, message: Union[str, bytes]) -> Optional[Awaitable[None]]: 

416 """Handle incoming messages on the WebSocket 

417 

418 This method must be overridden. 

419 

420 .. versionchanged:: 4.5 

421 

422 ``on_message`` can be a coroutine. 

423 """ 

424 raise NotImplementedError 

425 

426 def ping(self, data: Union[str, bytes] = b"") -> None: 

427 """Send ping frame to the remote end. 

428 

429 The data argument allows a small amount of data (up to 125 

430 bytes) to be sent as a part of the ping message. Note that not 

431 all websocket implementations expose this data to 

432 applications. 

433 

434 Consider using the ``websocket_ping_interval`` application 

435 setting instead of sending pings manually. 

436 

437 .. versionchanged:: 5.1 

438 

439 The data argument is now optional. 

440 

441 """ 

442 data = utf8(data) 

443 if self.ws_connection is None or self.ws_connection.is_closing(): 

444 raise WebSocketClosedError() 

445 self.ws_connection.write_ping(data) 

446 

447 def on_pong(self, data: bytes) -> None: 

448 """Invoked when the response to a ping frame is received.""" 

449 pass 

450 

451 def on_ping(self, data: bytes) -> None: 

452 """Invoked when the a ping frame is received.""" 

453 pass 

454 

455 def on_close(self) -> None: 

456 """Invoked when the WebSocket is closed. 

457 

458 If the connection was closed cleanly and a status code or reason 

459 phrase was supplied, these values will be available as the attributes 

460 ``self.close_code`` and ``self.close_reason``. 

461 

462 .. versionchanged:: 4.0 

463 

464 Added ``close_code`` and ``close_reason`` attributes. 

465 """ 

466 pass 

467 

468 def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None: 

469 """Closes this Web Socket. 

470 

471 Once the close handshake is successful the socket will be closed. 

472 

473 ``code`` may be a numeric status code, taken from the values 

474 defined in `RFC 6455 section 7.4.1 

475 <https://tools.ietf.org/html/rfc6455#section-7.4.1>`_. 

476 ``reason`` may be a textual message about why the connection is 

477 closing. These values are made available to the client, but are 

478 not otherwise interpreted by the websocket protocol. 

479 

480 .. versionchanged:: 4.0 

481 

482 Added the ``code`` and ``reason`` arguments. 

483 """ 

484 if self.ws_connection: 

485 self.ws_connection.close(code, reason) 

486 self.ws_connection = None 

487 

488 def check_origin(self, origin: str) -> bool: 

489 """Override to enable support for allowing alternate origins. 

490 

491 The ``origin`` argument is the value of the ``Origin`` HTTP 

492 header, the url responsible for initiating this request. This 

493 method is not called for clients that do not send this header; 

494 such requests are always allowed (because all browsers that 

495 implement WebSockets support this header, and non-browser 

496 clients do not have the same cross-site security concerns). 

497 

498 Should return ``True`` to accept the request or ``False`` to 

499 reject it. By default, rejects all requests with an origin on 

500 a host other than this one. 

501 

502 This is a security protection against cross site scripting attacks on 

503 browsers, since WebSockets are allowed to bypass the usual same-origin 

504 policies and don't use CORS headers. 

505 

506 .. warning:: 

507 

508 This is an important security measure; don't disable it 

509 without understanding the security implications. In 

510 particular, if your authentication is cookie-based, you 

511 must either restrict the origins allowed by 

512 ``check_origin()`` or implement your own XSRF-like 

513 protection for websocket connections. See `these 

514 <https://www.christian-schneider.net/CrossSiteWebSocketHijacking.html>`_ 

515 `articles 

516 <https://devcenter.heroku.com/articles/websocket-security>`_ 

517 for more. 

518 

519 To accept all cross-origin traffic (which was the default prior to 

520 Tornado 4.0), simply override this method to always return ``True``:: 

521 

522 def check_origin(self, origin): 

523 return True 

524 

525 To allow connections from any subdomain of your site, you might 

526 do something like:: 

527 

528 def check_origin(self, origin): 

529 parsed_origin = urllib.parse.urlparse(origin) 

530 return parsed_origin.netloc.endswith(".mydomain.com") 

531 

532 .. versionadded:: 4.0 

533 

534 """ 

535 parsed_origin = urlparse(origin) 

536 origin = parsed_origin.netloc 

537 origin = origin.lower() 

538 

539 host = self.request.headers.get("Host") 

540 

541 # Check to see that origin matches host directly, including ports 

542 return origin == host 

543 

544 def set_nodelay(self, value: bool) -> None: 

545 """Set the no-delay flag for this stream. 

546 

547 By default, small messages may be delayed and/or combined to minimize 

548 the number of packets sent. This can sometimes cause 200-500ms delays 

549 due to the interaction between Nagle's algorithm and TCP delayed 

550 ACKs. To reduce this delay (at the expense of possibly increasing 

551 bandwidth usage), call ``self.set_nodelay(True)`` once the websocket 

552 connection is established. 

553 

554 See `.BaseIOStream.set_nodelay` for additional details. 

555 

556 .. versionadded:: 3.1 

557 """ 

558 assert self.ws_connection is not None 

559 self.ws_connection.set_nodelay(value) 

560 

561 def on_connection_close(self) -> None: 

562 if self.ws_connection: 

563 self.ws_connection.on_connection_close() 

564 self.ws_connection = None 

565 if not self._on_close_called: 

566 self._on_close_called = True 

567 self.on_close() 

568 self._break_cycles() 

569 

570 def on_ws_connection_close( 

571 self, close_code: Optional[int] = None, close_reason: Optional[str] = None 

572 ) -> None: 

573 self.close_code = close_code 

574 self.close_reason = close_reason 

575 self.on_connection_close() 

576 

577 def _break_cycles(self) -> None: 

578 # WebSocketHandlers call finish() early, but we don't want to 

579 # break up reference cycles (which makes it impossible to call 

580 # self.render_string) until after we've really closed the 

581 # connection (if it was established in the first place, 

582 # indicated by status code 101). 

583 if self.get_status() != 101 or self._on_close_called: 

584 super()._break_cycles() 

585 

586 def get_websocket_protocol(self) -> Optional["WebSocketProtocol"]: 

587 websocket_version = self.request.headers.get("Sec-WebSocket-Version") 

588 if websocket_version in ("7", "8", "13"): 

589 params = _WebSocketParams( 

590 ping_interval=self.ping_interval, 

591 ping_timeout=self.ping_timeout, 

592 max_message_size=self.max_message_size, 

593 compression_options=self.get_compression_options(), 

594 ) 

595 return WebSocketProtocol13(self, False, params) 

596 return None 

597 

598 def _detach_stream(self) -> IOStream: 

599 # disable non-WS methods 

600 for method in [ 

601 "write", 

602 "redirect", 

603 "set_header", 

604 "set_cookie", 

605 "set_status", 

606 "flush", 

607 "finish", 

608 ]: 

609 setattr(self, method, _raise_not_supported_for_websockets) 

610 return self.detach() 

611 

612 

613def _raise_not_supported_for_websockets(*args: Any, **kwargs: Any) -> None: 

614 raise RuntimeError("Method not supported for Web Sockets") 

615 

616 

617class WebSocketProtocol(abc.ABC): 

618 """Base class for WebSocket protocol versions.""" 

619 

620 def __init__(self, handler: "_WebSocketDelegate") -> None: 

621 self.handler = handler 

622 self.stream = None # type: Optional[IOStream] 

623 self.client_terminated = False 

624 self.server_terminated = False 

625 

626 def _run_callback( 

627 self, callback: Callable, *args: Any, **kwargs: Any 

628 ) -> "Optional[Future[Any]]": 

629 """Runs the given callback with exception handling. 

630 

631 If the callback is a coroutine, returns its Future. On error, aborts the 

632 websocket connection and returns None. 

633 """ 

634 try: 

635 result = callback(*args, **kwargs) 

636 except Exception: 

637 self.handler.log_exception(*sys.exc_info()) 

638 self._abort() 

639 return None 

640 else: 

641 if result is not None: 

642 result = gen.convert_yielded(result) 

643 assert self.stream is not None 

644 self.stream.io_loop.add_future(result, lambda f: f.result()) 

645 return result 

646 

647 def on_connection_close(self) -> None: 

648 self._abort() 

649 

650 def _abort(self) -> None: 

651 """Instantly aborts the WebSocket connection by closing the socket""" 

652 self.client_terminated = True 

653 self.server_terminated = True 

654 if self.stream is not None: 

655 self.stream.close() # forcibly tear down the connection 

656 self.close() # let the subclass cleanup 

657 

658 @abc.abstractmethod 

659 def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None: 

660 raise NotImplementedError() 

661 

662 @abc.abstractmethod 

663 def is_closing(self) -> bool: 

664 raise NotImplementedError() 

665 

666 @abc.abstractmethod 

667 async def accept_connection(self, handler: WebSocketHandler) -> None: 

668 raise NotImplementedError() 

669 

670 @abc.abstractmethod 

671 def write_message( 

672 self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False 

673 ) -> "Future[None]": 

674 raise NotImplementedError() 

675 

676 @property 

677 @abc.abstractmethod 

678 def selected_subprotocol(self) -> Optional[str]: 

679 raise NotImplementedError() 

680 

681 @abc.abstractmethod 

682 def write_ping(self, data: bytes) -> None: 

683 raise NotImplementedError() 

684 

685 # The entry points below are used by WebSocketClientConnection, 

686 # which was introduced after we only supported a single version of 

687 # WebSocketProtocol. The WebSocketProtocol/WebSocketProtocol13 

688 # boundary is currently pretty ad-hoc. 

689 @abc.abstractmethod 

690 def _process_server_headers( 

691 self, key: Union[str, bytes], headers: httputil.HTTPHeaders 

692 ) -> None: 

693 raise NotImplementedError() 

694 

695 @abc.abstractmethod 

696 def start_pinging(self) -> None: 

697 raise NotImplementedError() 

698 

699 @abc.abstractmethod 

700 async def _receive_frame_loop(self) -> None: 

701 raise NotImplementedError() 

702 

703 @abc.abstractmethod 

704 def set_nodelay(self, x: bool) -> None: 

705 raise NotImplementedError() 

706 

707 

708class _PerMessageDeflateCompressor(object): 

709 def __init__( 

710 self, 

711 persistent: bool, 

712 max_wbits: Optional[int], 

713 compression_options: Optional[Dict[str, Any]] = None, 

714 ) -> None: 

715 if max_wbits is None: 

716 max_wbits = zlib.MAX_WBITS 

717 # There is no symbolic constant for the minimum wbits value. 

718 if not (8 <= max_wbits <= zlib.MAX_WBITS): 

719 raise ValueError( 

720 "Invalid max_wbits value %r; allowed range 8-%d", 

721 max_wbits, 

722 zlib.MAX_WBITS, 

723 ) 

724 self._max_wbits = max_wbits 

725 

726 if ( 

727 compression_options is None 

728 or "compression_level" not in compression_options 

729 ): 

730 self._compression_level = tornado.web.GZipContentEncoding.GZIP_LEVEL 

731 else: 

732 self._compression_level = compression_options["compression_level"] 

733 

734 if compression_options is None or "mem_level" not in compression_options: 

735 self._mem_level = 8 

736 else: 

737 self._mem_level = compression_options["mem_level"] 

738 

739 if persistent: 

740 self._compressor = self._create_compressor() # type: Optional[_Compressor] 

741 else: 

742 self._compressor = None 

743 

744 def _create_compressor(self) -> "_Compressor": 

745 return zlib.compressobj( 

746 self._compression_level, zlib.DEFLATED, -self._max_wbits, self._mem_level 

747 ) 

748 

749 def compress(self, data: bytes) -> bytes: 

750 compressor = self._compressor or self._create_compressor() 

751 data = compressor.compress(data) + compressor.flush(zlib.Z_SYNC_FLUSH) 

752 assert data.endswith(b"\x00\x00\xff\xff") 

753 return data[:-4] 

754 

755 

756class _PerMessageDeflateDecompressor(object): 

757 def __init__( 

758 self, 

759 persistent: bool, 

760 max_wbits: Optional[int], 

761 max_message_size: int, 

762 compression_options: Optional[Dict[str, Any]] = None, 

763 ) -> None: 

764 self._max_message_size = max_message_size 

765 if max_wbits is None: 

766 max_wbits = zlib.MAX_WBITS 

767 if not (8 <= max_wbits <= zlib.MAX_WBITS): 

768 raise ValueError( 

769 "Invalid max_wbits value %r; allowed range 8-%d", 

770 max_wbits, 

771 zlib.MAX_WBITS, 

772 ) 

773 self._max_wbits = max_wbits 

774 if persistent: 

775 self._decompressor = ( 

776 self._create_decompressor() 

777 ) # type: Optional[_Decompressor] 

778 else: 

779 self._decompressor = None 

780 

781 def _create_decompressor(self) -> "_Decompressor": 

782 return zlib.decompressobj(-self._max_wbits) 

783 

784 def decompress(self, data: bytes) -> bytes: 

785 decompressor = self._decompressor or self._create_decompressor() 

786 result = decompressor.decompress( 

787 data + b"\x00\x00\xff\xff", self._max_message_size 

788 ) 

789 if decompressor.unconsumed_tail: 

790 raise _DecompressTooLargeError() 

791 return result 

792 

793 

794class WebSocketProtocol13(WebSocketProtocol): 

795 """Implementation of the WebSocket protocol from RFC 6455. 

796 

797 This class supports versions 7 and 8 of the protocol in addition to the 

798 final version 13. 

799 """ 

800 

801 # Bit masks for the first byte of a frame. 

802 FIN = 0x80 

803 RSV1 = 0x40 

804 RSV2 = 0x20 

805 RSV3 = 0x10 

806 RSV_MASK = RSV1 | RSV2 | RSV3 

807 OPCODE_MASK = 0x0F 

808 

809 stream = None # type: IOStream 

810 

811 def __init__( 

812 self, 

813 handler: "_WebSocketDelegate", 

814 mask_outgoing: bool, 

815 params: _WebSocketParams, 

816 ) -> None: 

817 WebSocketProtocol.__init__(self, handler) 

818 self.mask_outgoing = mask_outgoing 

819 self.params = params 

820 self._final_frame = False 

821 self._frame_opcode = None 

822 self._masked_frame = None 

823 self._frame_mask = None # type: Optional[bytes] 

824 self._frame_length = None 

825 self._fragmented_message_buffer = None # type: Optional[bytes] 

826 self._fragmented_message_opcode = None 

827 self._waiting = None # type: object 

828 self._compression_options = params.compression_options 

829 self._decompressor = None # type: Optional[_PerMessageDeflateDecompressor] 

830 self._compressor = None # type: Optional[_PerMessageDeflateCompressor] 

831 self._frame_compressed = None # type: Optional[bool] 

832 # The total uncompressed size of all messages received or sent. 

833 # Unicode messages are encoded to utf8. 

834 # Only for testing; subject to change. 

835 self._message_bytes_in = 0 

836 self._message_bytes_out = 0 

837 # The total size of all packets received or sent. Includes 

838 # the effect of compression, frame overhead, and control frames. 

839 self._wire_bytes_in = 0 

840 self._wire_bytes_out = 0 

841 self.ping_callback = None # type: Optional[PeriodicCallback] 

842 self.last_ping = 0.0 

843 self.last_pong = 0.0 

844 self.close_code = None # type: Optional[int] 

845 self.close_reason = None # type: Optional[str] 

846 

847 # Use a property for this to satisfy the abc. 

848 @property 

849 def selected_subprotocol(self) -> Optional[str]: 

850 return self._selected_subprotocol 

851 

852 @selected_subprotocol.setter 

853 def selected_subprotocol(self, value: Optional[str]) -> None: 

854 self._selected_subprotocol = value 

855 

856 async def accept_connection(self, handler: WebSocketHandler) -> None: 

857 try: 

858 self._handle_websocket_headers(handler) 

859 except ValueError: 

860 handler.set_status(400) 

861 log_msg = "Missing/Invalid WebSocket headers" 

862 handler.finish(log_msg) 

863 gen_log.debug(log_msg) 

864 return 

865 

866 try: 

867 await self._accept_connection(handler) 

868 except asyncio.CancelledError: 

869 self._abort() 

870 return 

871 except ValueError: 

872 gen_log.debug("Malformed WebSocket request received", exc_info=True) 

873 self._abort() 

874 return 

875 

876 def _handle_websocket_headers(self, handler: WebSocketHandler) -> None: 

877 """Verifies all invariant- and required headers 

878 

879 If a header is missing or have an incorrect value ValueError will be 

880 raised 

881 """ 

882 fields = ("Host", "Sec-Websocket-Key", "Sec-Websocket-Version") 

883 if not all(map(lambda f: handler.request.headers.get(f), fields)): 

884 raise ValueError("Missing/Invalid WebSocket headers") 

885 

886 @staticmethod 

887 def compute_accept_value(key: Union[str, bytes]) -> str: 

888 """Computes the value for the Sec-WebSocket-Accept header, 

889 given the value for Sec-WebSocket-Key. 

890 """ 

891 sha1 = hashlib.sha1() 

892 sha1.update(utf8(key)) 

893 sha1.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11") # Magic value 

894 return native_str(base64.b64encode(sha1.digest())) 

895 

896 def _challenge_response(self, handler: WebSocketHandler) -> str: 

897 return WebSocketProtocol13.compute_accept_value( 

898 cast(str, handler.request.headers.get("Sec-Websocket-Key")) 

899 ) 

900 

901 async def _accept_connection(self, handler: WebSocketHandler) -> None: 

902 subprotocol_header = handler.request.headers.get("Sec-WebSocket-Protocol") 

903 if subprotocol_header: 

904 subprotocols = [s.strip() for s in subprotocol_header.split(",")] 

905 else: 

906 subprotocols = [] 

907 self.selected_subprotocol = handler.select_subprotocol(subprotocols) 

908 if self.selected_subprotocol: 

909 assert self.selected_subprotocol in subprotocols 

910 handler.set_header("Sec-WebSocket-Protocol", self.selected_subprotocol) 

911 

912 extensions = self._parse_extensions_header(handler.request.headers) 

913 for ext in extensions: 

914 if ext[0] == "permessage-deflate" and self._compression_options is not None: 

915 # TODO: negotiate parameters if compression_options 

916 # specifies limits. 

917 self._create_compressors("server", ext[1], self._compression_options) 

918 if ( 

919 "client_max_window_bits" in ext[1] 

920 and ext[1]["client_max_window_bits"] is None 

921 ): 

922 # Don't echo an offered client_max_window_bits 

923 # parameter with no value. 

924 del ext[1]["client_max_window_bits"] 

925 handler.set_header( 

926 "Sec-WebSocket-Extensions", 

927 httputil._encode_header("permessage-deflate", ext[1]), 

928 ) 

929 break 

930 

931 handler.clear_header("Content-Type") 

932 handler.set_status(101) 

933 handler.set_header("Upgrade", "websocket") 

934 handler.set_header("Connection", "Upgrade") 

935 handler.set_header("Sec-WebSocket-Accept", self._challenge_response(handler)) 

936 handler.finish() 

937 

938 self.stream = handler._detach_stream() 

939 

940 self.start_pinging() 

941 try: 

942 open_result = handler.open(*handler.open_args, **handler.open_kwargs) 

943 if open_result is not None: 

944 await open_result 

945 except Exception: 

946 handler.log_exception(*sys.exc_info()) 

947 self._abort() 

948 return 

949 

950 await self._receive_frame_loop() 

951 

952 def _parse_extensions_header( 

953 self, headers: httputil.HTTPHeaders 

954 ) -> List[Tuple[str, Dict[str, str]]]: 

955 extensions = headers.get("Sec-WebSocket-Extensions", "") 

956 if extensions: 

957 return [httputil._parse_header(e.strip()) for e in extensions.split(",")] 

958 return [] 

959 

960 def _process_server_headers( 

961 self, key: Union[str, bytes], headers: httputil.HTTPHeaders 

962 ) -> None: 

963 """Process the headers sent by the server to this client connection. 

964 

965 'key' is the websocket handshake challenge/response key. 

966 """ 

967 assert headers["Upgrade"].lower() == "websocket" 

968 assert headers["Connection"].lower() == "upgrade" 

969 accept = self.compute_accept_value(key) 

970 assert headers["Sec-Websocket-Accept"] == accept 

971 

972 extensions = self._parse_extensions_header(headers) 

973 for ext in extensions: 

974 if ext[0] == "permessage-deflate" and self._compression_options is not None: 

975 self._create_compressors("client", ext[1]) 

976 else: 

977 raise ValueError("unsupported extension %r", ext) 

978 

979 self.selected_subprotocol = headers.get("Sec-WebSocket-Protocol", None) 

980 

981 def _get_compressor_options( 

982 self, 

983 side: str, 

984 agreed_parameters: Dict[str, Any], 

985 compression_options: Optional[Dict[str, Any]] = None, 

986 ) -> Dict[str, Any]: 

987 """Converts a websocket agreed_parameters set to keyword arguments 

988 for our compressor objects. 

989 """ 

990 options = dict( 

991 persistent=(side + "_no_context_takeover") not in agreed_parameters 

992 ) # type: Dict[str, Any] 

993 wbits_header = agreed_parameters.get(side + "_max_window_bits", None) 

994 if wbits_header is None: 

995 options["max_wbits"] = zlib.MAX_WBITS 

996 else: 

997 options["max_wbits"] = int(wbits_header) 

998 options["compression_options"] = compression_options 

999 return options 

1000 

1001 def _create_compressors( 

1002 self, 

1003 side: str, 

1004 agreed_parameters: Dict[str, Any], 

1005 compression_options: Optional[Dict[str, Any]] = None, 

1006 ) -> None: 

1007 # TODO: handle invalid parameters gracefully 

1008 allowed_keys = set( 

1009 [ 

1010 "server_no_context_takeover", 

1011 "client_no_context_takeover", 

1012 "server_max_window_bits", 

1013 "client_max_window_bits", 

1014 ] 

1015 ) 

1016 for key in agreed_parameters: 

1017 if key not in allowed_keys: 

1018 raise ValueError("unsupported compression parameter %r" % key) 

1019 other_side = "client" if (side == "server") else "server" 

1020 self._compressor = _PerMessageDeflateCompressor( 

1021 **self._get_compressor_options(side, agreed_parameters, compression_options) 

1022 ) 

1023 self._decompressor = _PerMessageDeflateDecompressor( 

1024 max_message_size=self.params.max_message_size, 

1025 **self._get_compressor_options( 

1026 other_side, agreed_parameters, compression_options 

1027 ) 

1028 ) 

1029 

1030 def _write_frame( 

1031 self, fin: bool, opcode: int, data: bytes, flags: int = 0 

1032 ) -> "Future[None]": 

1033 data_len = len(data) 

1034 if opcode & 0x8: 

1035 # All control frames MUST have a payload length of 125 

1036 # bytes or less and MUST NOT be fragmented. 

1037 if not fin: 

1038 raise ValueError("control frames may not be fragmented") 

1039 if data_len > 125: 

1040 raise ValueError("control frame payloads may not exceed 125 bytes") 

1041 if fin: 

1042 finbit = self.FIN 

1043 else: 

1044 finbit = 0 

1045 frame = struct.pack("B", finbit | opcode | flags) 

1046 if self.mask_outgoing: 

1047 mask_bit = 0x80 

1048 else: 

1049 mask_bit = 0 

1050 if data_len < 126: 

1051 frame += struct.pack("B", data_len | mask_bit) 

1052 elif data_len <= 0xFFFF: 

1053 frame += struct.pack("!BH", 126 | mask_bit, data_len) 

1054 else: 

1055 frame += struct.pack("!BQ", 127 | mask_bit, data_len) 

1056 if self.mask_outgoing: 

1057 mask = os.urandom(4) 

1058 data = mask + _websocket_mask(mask, data) 

1059 frame += data 

1060 self._wire_bytes_out += len(frame) 

1061 return self.stream.write(frame) 

1062 

1063 def write_message( 

1064 self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False 

1065 ) -> "Future[None]": 

1066 """Sends the given message to the client of this Web Socket.""" 

1067 if binary: 

1068 opcode = 0x2 

1069 else: 

1070 opcode = 0x1 

1071 if isinstance(message, dict): 

1072 message = tornado.escape.json_encode(message) 

1073 message = tornado.escape.utf8(message) 

1074 assert isinstance(message, bytes) 

1075 self._message_bytes_out += len(message) 

1076 flags = 0 

1077 if self._compressor: 

1078 message = self._compressor.compress(message) 

1079 flags |= self.RSV1 

1080 # For historical reasons, write methods in Tornado operate in a semi-synchronous 

1081 # mode in which awaiting the Future they return is optional (But errors can 

1082 # still be raised). This requires us to go through an awkward dance here 

1083 # to transform the errors that may be returned while presenting the same 

1084 # semi-synchronous interface. 

1085 try: 

1086 fut = self._write_frame(True, opcode, message, flags=flags) 

1087 except StreamClosedError: 

1088 raise WebSocketClosedError() 

1089 

1090 async def wrapper() -> None: 

1091 try: 

1092 await fut 

1093 except StreamClosedError: 

1094 raise WebSocketClosedError() 

1095 

1096 return asyncio.ensure_future(wrapper()) 

1097 

1098 def write_ping(self, data: bytes) -> None: 

1099 """Send ping frame.""" 

1100 assert isinstance(data, bytes) 

1101 self._write_frame(True, 0x9, data) 

1102 

1103 async def _receive_frame_loop(self) -> None: 

1104 try: 

1105 while not self.client_terminated: 

1106 await self._receive_frame() 

1107 except StreamClosedError: 

1108 self._abort() 

1109 self.handler.on_ws_connection_close(self.close_code, self.close_reason) 

1110 

1111 async def _read_bytes(self, n: int) -> bytes: 

1112 data = await self.stream.read_bytes(n) 

1113 self._wire_bytes_in += n 

1114 return data 

1115 

1116 async def _receive_frame(self) -> None: 

1117 # Read the frame header. 

1118 data = await self._read_bytes(2) 

1119 header, mask_payloadlen = struct.unpack("BB", data) 

1120 is_final_frame = header & self.FIN 

1121 reserved_bits = header & self.RSV_MASK 

1122 opcode = header & self.OPCODE_MASK 

1123 opcode_is_control = opcode & 0x8 

1124 if self._decompressor is not None and opcode != 0: 

1125 # Compression flag is present in the first frame's header, 

1126 # but we can't decompress until we have all the frames of 

1127 # the message. 

1128 self._frame_compressed = bool(reserved_bits & self.RSV1) 

1129 reserved_bits &= ~self.RSV1 

1130 if reserved_bits: 

1131 # client is using as-yet-undefined extensions; abort 

1132 self._abort() 

1133 return 

1134 is_masked = bool(mask_payloadlen & 0x80) 

1135 payloadlen = mask_payloadlen & 0x7F 

1136 

1137 # Parse and validate the length. 

1138 if opcode_is_control and payloadlen >= 126: 

1139 # control frames must have payload < 126 

1140 self._abort() 

1141 return 

1142 if payloadlen < 126: 

1143 self._frame_length = payloadlen 

1144 elif payloadlen == 126: 

1145 data = await self._read_bytes(2) 

1146 payloadlen = struct.unpack("!H", data)[0] 

1147 elif payloadlen == 127: 

1148 data = await self._read_bytes(8) 

1149 payloadlen = struct.unpack("!Q", data)[0] 

1150 new_len = payloadlen 

1151 if self._fragmented_message_buffer is not None: 

1152 new_len += len(self._fragmented_message_buffer) 

1153 if new_len > self.params.max_message_size: 

1154 self.close(1009, "message too big") 

1155 self._abort() 

1156 return 

1157 

1158 # Read the payload, unmasking if necessary. 

1159 if is_masked: 

1160 self._frame_mask = await self._read_bytes(4) 

1161 data = await self._read_bytes(payloadlen) 

1162 if is_masked: 

1163 assert self._frame_mask is not None 

1164 data = _websocket_mask(self._frame_mask, data) 

1165 

1166 # Decide what to do with this frame. 

1167 if opcode_is_control: 

1168 # control frames may be interleaved with a series of fragmented 

1169 # data frames, so control frames must not interact with 

1170 # self._fragmented_* 

1171 if not is_final_frame: 

1172 # control frames must not be fragmented 

1173 self._abort() 

1174 return 

1175 elif opcode == 0: # continuation frame 

1176 if self._fragmented_message_buffer is None: 

1177 # nothing to continue 

1178 self._abort() 

1179 return 

1180 self._fragmented_message_buffer += data 

1181 if is_final_frame: 

1182 opcode = self._fragmented_message_opcode 

1183 data = self._fragmented_message_buffer 

1184 self._fragmented_message_buffer = None 

1185 else: # start of new data message 

1186 if self._fragmented_message_buffer is not None: 

1187 # can't start new message until the old one is finished 

1188 self._abort() 

1189 return 

1190 if not is_final_frame: 

1191 self._fragmented_message_opcode = opcode 

1192 self._fragmented_message_buffer = data 

1193 

1194 if is_final_frame: 

1195 handled_future = self._handle_message(opcode, data) 

1196 if handled_future is not None: 

1197 await handled_future 

1198 

1199 def _handle_message(self, opcode: int, data: bytes) -> "Optional[Future[None]]": 

1200 """Execute on_message, returning its Future if it is a coroutine.""" 

1201 if self.client_terminated: 

1202 return None 

1203 

1204 if self._frame_compressed: 

1205 assert self._decompressor is not None 

1206 try: 

1207 data = self._decompressor.decompress(data) 

1208 except _DecompressTooLargeError: 

1209 self.close(1009, "message too big after decompression") 

1210 self._abort() 

1211 return None 

1212 

1213 if opcode == 0x1: 

1214 # UTF-8 data 

1215 self._message_bytes_in += len(data) 

1216 try: 

1217 decoded = data.decode("utf-8") 

1218 except UnicodeDecodeError: 

1219 self._abort() 

1220 return None 

1221 return self._run_callback(self.handler.on_message, decoded) 

1222 elif opcode == 0x2: 

1223 # Binary data 

1224 self._message_bytes_in += len(data) 

1225 return self._run_callback(self.handler.on_message, data) 

1226 elif opcode == 0x8: 

1227 # Close 

1228 self.client_terminated = True 

1229 if len(data) >= 2: 

1230 self.close_code = struct.unpack(">H", data[:2])[0] 

1231 if len(data) > 2: 

1232 self.close_reason = to_unicode(data[2:]) 

1233 # Echo the received close code, if any (RFC 6455 section 5.5.1). 

1234 self.close(self.close_code) 

1235 elif opcode == 0x9: 

1236 # Ping 

1237 try: 

1238 self._write_frame(True, 0xA, data) 

1239 except StreamClosedError: 

1240 self._abort() 

1241 self._run_callback(self.handler.on_ping, data) 

1242 elif opcode == 0xA: 

1243 # Pong 

1244 self.last_pong = IOLoop.current().time() 

1245 return self._run_callback(self.handler.on_pong, data) 

1246 else: 

1247 self._abort() 

1248 return None 

1249 

1250 def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None: 

1251 """Closes the WebSocket connection.""" 

1252 if not self.server_terminated: 

1253 if not self.stream.closed(): 

1254 if code is None and reason is not None: 

1255 code = 1000 # "normal closure" status code 

1256 if code is None: 

1257 close_data = b"" 

1258 else: 

1259 close_data = struct.pack(">H", code) 

1260 if reason is not None: 

1261 close_data += utf8(reason) 

1262 try: 

1263 self._write_frame(True, 0x8, close_data) 

1264 except StreamClosedError: 

1265 self._abort() 

1266 self.server_terminated = True 

1267 if self.client_terminated: 

1268 if self._waiting is not None: 

1269 self.stream.io_loop.remove_timeout(self._waiting) 

1270 self._waiting = None 

1271 self.stream.close() 

1272 elif self._waiting is None: 

1273 # Give the client a few seconds to complete a clean shutdown, 

1274 # otherwise just close the connection. 

1275 self._waiting = self.stream.io_loop.add_timeout( 

1276 self.stream.io_loop.time() + 5, self._abort 

1277 ) 

1278 if self.ping_callback: 

1279 self.ping_callback.stop() 

1280 self.ping_callback = None 

1281 

1282 def is_closing(self) -> bool: 

1283 """Return ``True`` if this connection is closing. 

1284 

1285 The connection is considered closing if either side has 

1286 initiated its closing handshake or if the stream has been 

1287 shut down uncleanly. 

1288 """ 

1289 return self.stream.closed() or self.client_terminated or self.server_terminated 

1290 

1291 @property 

1292 def ping_interval(self) -> Optional[float]: 

1293 interval = self.params.ping_interval 

1294 if interval is not None: 

1295 return interval 

1296 return 0 

1297 

1298 @property 

1299 def ping_timeout(self) -> Optional[float]: 

1300 timeout = self.params.ping_timeout 

1301 if timeout is not None: 

1302 return timeout 

1303 assert self.ping_interval is not None 

1304 return max(3 * self.ping_interval, 30) 

1305 

1306 def start_pinging(self) -> None: 

1307 """Start sending periodic pings to keep the connection alive""" 

1308 assert self.ping_interval is not None 

1309 if self.ping_interval > 0: 

1310 self.last_ping = self.last_pong = IOLoop.current().time() 

1311 self.ping_callback = PeriodicCallback( 

1312 self.periodic_ping, self.ping_interval * 1000 

1313 ) 

1314 self.ping_callback.start() 

1315 

1316 def periodic_ping(self) -> None: 

1317 """Send a ping to keep the websocket alive 

1318 

1319 Called periodically if the websocket_ping_interval is set and non-zero. 

1320 """ 

1321 if self.is_closing() and self.ping_callback is not None: 

1322 self.ping_callback.stop() 

1323 return 

1324 

1325 # Check for timeout on pong. Make sure that we really have 

1326 # sent a recent ping in case the machine with both server and 

1327 # client has been suspended since the last ping. 

1328 now = IOLoop.current().time() 

1329 since_last_pong = now - self.last_pong 

1330 since_last_ping = now - self.last_ping 

1331 assert self.ping_interval is not None 

1332 assert self.ping_timeout is not None 

1333 if ( 

1334 since_last_ping < 2 * self.ping_interval 

1335 and since_last_pong > self.ping_timeout 

1336 ): 

1337 self.close() 

1338 return 

1339 

1340 self.write_ping(b"") 

1341 self.last_ping = now 

1342 

1343 def set_nodelay(self, x: bool) -> None: 

1344 self.stream.set_nodelay(x) 

1345 

1346 

1347class WebSocketClientConnection(simple_httpclient._HTTPConnection): 

1348 """WebSocket client connection. 

1349 

1350 This class should not be instantiated directly; use the 

1351 `websocket_connect` function instead. 

1352 """ 

1353 

1354 protocol = None # type: WebSocketProtocol 

1355 

1356 def __init__( 

1357 self, 

1358 request: httpclient.HTTPRequest, 

1359 on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]] = None, 

1360 compression_options: Optional[Dict[str, Any]] = None, 

1361 ping_interval: Optional[float] = None, 

1362 ping_timeout: Optional[float] = None, 

1363 max_message_size: int = _default_max_message_size, 

1364 subprotocols: Optional[List[str]] = [], 

1365 ) -> None: 

1366 self.connect_future = Future() # type: Future[WebSocketClientConnection] 

1367 self.read_queue = Queue(1) # type: Queue[Union[None, str, bytes]] 

1368 self.key = base64.b64encode(os.urandom(16)) 

1369 self._on_message_callback = on_message_callback 

1370 self.close_code = None # type: Optional[int] 

1371 self.close_reason = None # type: Optional[str] 

1372 self.params = _WebSocketParams( 

1373 ping_interval=ping_interval, 

1374 ping_timeout=ping_timeout, 

1375 max_message_size=max_message_size, 

1376 compression_options=compression_options, 

1377 ) 

1378 

1379 scheme, sep, rest = request.url.partition(":") 

1380 scheme = {"ws": "http", "wss": "https"}[scheme] 

1381 request.url = scheme + sep + rest 

1382 request.headers.update( 

1383 { 

1384 "Upgrade": "websocket", 

1385 "Connection": "Upgrade", 

1386 "Sec-WebSocket-Key": self.key, 

1387 "Sec-WebSocket-Version": "13", 

1388 } 

1389 ) 

1390 if subprotocols is not None: 

1391 request.headers["Sec-WebSocket-Protocol"] = ",".join(subprotocols) 

1392 if compression_options is not None: 

1393 # Always offer to let the server set our max_wbits (and even though 

1394 # we don't offer it, we will accept a client_no_context_takeover 

1395 # from the server). 

1396 # TODO: set server parameters for deflate extension 

1397 # if requested in self.compression_options. 

1398 request.headers[ 

1399 "Sec-WebSocket-Extensions" 

1400 ] = "permessage-deflate; client_max_window_bits" 

1401 

1402 # Websocket connection is currently unable to follow redirects 

1403 request.follow_redirects = False 

1404 

1405 self.tcp_client = TCPClient() 

1406 super().__init__( 

1407 None, 

1408 request, 

1409 lambda: None, 

1410 self._on_http_response, 

1411 104857600, 

1412 self.tcp_client, 

1413 65536, 

1414 104857600, 

1415 ) 

1416 

1417 def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None: 

1418 """Closes the websocket connection. 

1419 

1420 ``code`` and ``reason`` are documented under 

1421 `WebSocketHandler.close`. 

1422 

1423 .. versionadded:: 3.2 

1424 

1425 .. versionchanged:: 4.0 

1426 

1427 Added the ``code`` and ``reason`` arguments. 

1428 """ 

1429 if self.protocol is not None: 

1430 self.protocol.close(code, reason) 

1431 self.protocol = None # type: ignore 

1432 

1433 def on_connection_close(self) -> None: 

1434 if not self.connect_future.done(): 

1435 self.connect_future.set_exception(StreamClosedError()) 

1436 self._on_message(None) 

1437 self.tcp_client.close() 

1438 super().on_connection_close() 

1439 

1440 def on_ws_connection_close( 

1441 self, close_code: Optional[int] = None, close_reason: Optional[str] = None 

1442 ) -> None: 

1443 self.close_code = close_code 

1444 self.close_reason = close_reason 

1445 self.on_connection_close() 

1446 

1447 def _on_http_response(self, response: httpclient.HTTPResponse) -> None: 

1448 if not self.connect_future.done(): 

1449 if response.error: 

1450 self.connect_future.set_exception(response.error) 

1451 else: 

1452 self.connect_future.set_exception( 

1453 WebSocketError("Non-websocket response") 

1454 ) 

1455 

1456 async def headers_received( 

1457 self, 

1458 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine], 

1459 headers: httputil.HTTPHeaders, 

1460 ) -> None: 

1461 assert isinstance(start_line, httputil.ResponseStartLine) 

1462 if start_line.code != 101: 

1463 await super().headers_received(start_line, headers) 

1464 return 

1465 

1466 if self._timeout is not None: 

1467 self.io_loop.remove_timeout(self._timeout) 

1468 self._timeout = None 

1469 

1470 self.headers = headers 

1471 self.protocol = self.get_websocket_protocol() 

1472 self.protocol._process_server_headers(self.key, self.headers) 

1473 self.protocol.stream = self.connection.detach() 

1474 

1475 IOLoop.current().add_callback(self.protocol._receive_frame_loop) 

1476 self.protocol.start_pinging() 

1477 

1478 # Once we've taken over the connection, clear the final callback 

1479 # we set on the http request. This deactivates the error handling 

1480 # in simple_httpclient that would otherwise interfere with our 

1481 # ability to see exceptions. 

1482 self.final_callback = None # type: ignore 

1483 

1484 future_set_result_unless_cancelled(self.connect_future, self) 

1485 

1486 def write_message( 

1487 self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False 

1488 ) -> "Future[None]": 

1489 """Sends a message to the WebSocket server. 

1490 

1491 If the stream is closed, raises `WebSocketClosedError`. 

1492 Returns a `.Future` which can be used for flow control. 

1493 

1494 .. versionchanged:: 5.0 

1495 Exception raised on a closed stream changed from `.StreamClosedError` 

1496 to `WebSocketClosedError`. 

1497 """ 

1498 if self.protocol is None: 

1499 raise WebSocketClosedError("Client connection has been closed") 

1500 return self.protocol.write_message(message, binary=binary) 

1501 

1502 def read_message( 

1503 self, 

1504 callback: Optional[Callable[["Future[Union[None, str, bytes]]"], None]] = None, 

1505 ) -> Awaitable[Union[None, str, bytes]]: 

1506 """Reads a message from the WebSocket server. 

1507 

1508 If on_message_callback was specified at WebSocket 

1509 initialization, this function will never return messages 

1510 

1511 Returns a future whose result is the message, or None 

1512 if the connection is closed. If a callback argument 

1513 is given it will be called with the future when it is 

1514 ready. 

1515 """ 

1516 

1517 awaitable = self.read_queue.get() 

1518 if callback is not None: 

1519 self.io_loop.add_future(asyncio.ensure_future(awaitable), callback) 

1520 return awaitable 

1521 

1522 def on_message(self, message: Union[str, bytes]) -> Optional[Awaitable[None]]: 

1523 return self._on_message(message) 

1524 

1525 def _on_message( 

1526 self, message: Union[None, str, bytes] 

1527 ) -> Optional[Awaitable[None]]: 

1528 if self._on_message_callback: 

1529 self._on_message_callback(message) 

1530 return None 

1531 else: 

1532 return self.read_queue.put(message) 

1533 

1534 def ping(self, data: bytes = b"") -> None: 

1535 """Send ping frame to the remote end. 

1536 

1537 The data argument allows a small amount of data (up to 125 

1538 bytes) to be sent as a part of the ping message. Note that not 

1539 all websocket implementations expose this data to 

1540 applications. 

1541 

1542 Consider using the ``ping_interval`` argument to 

1543 `websocket_connect` instead of sending pings manually. 

1544 

1545 .. versionadded:: 5.1 

1546 

1547 """ 

1548 data = utf8(data) 

1549 if self.protocol is None: 

1550 raise WebSocketClosedError() 

1551 self.protocol.write_ping(data) 

1552 

1553 def on_pong(self, data: bytes) -> None: 

1554 pass 

1555 

1556 def on_ping(self, data: bytes) -> None: 

1557 pass 

1558 

1559 def get_websocket_protocol(self) -> WebSocketProtocol: 

1560 return WebSocketProtocol13(self, mask_outgoing=True, params=self.params) 

1561 

1562 @property 

1563 def selected_subprotocol(self) -> Optional[str]: 

1564 """The subprotocol selected by the server. 

1565 

1566 .. versionadded:: 5.1 

1567 """ 

1568 return self.protocol.selected_subprotocol 

1569 

1570 def log_exception( 

1571 self, 

1572 typ: "Optional[Type[BaseException]]", 

1573 value: Optional[BaseException], 

1574 tb: Optional[TracebackType], 

1575 ) -> None: 

1576 assert typ is not None 

1577 assert value is not None 

1578 app_log.error("Uncaught exception %s", value, exc_info=(typ, value, tb)) 

1579 

1580 

1581def websocket_connect( 

1582 url: Union[str, httpclient.HTTPRequest], 

1583 callback: Optional[Callable[["Future[WebSocketClientConnection]"], None]] = None, 

1584 connect_timeout: Optional[float] = None, 

1585 on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]] = None, 

1586 compression_options: Optional[Dict[str, Any]] = None, 

1587 ping_interval: Optional[float] = None, 

1588 ping_timeout: Optional[float] = None, 

1589 max_message_size: int = _default_max_message_size, 

1590 subprotocols: Optional[List[str]] = None, 

1591) -> "Awaitable[WebSocketClientConnection]": 

1592 """Client-side websocket support. 

1593 

1594 Takes a url and returns a Future whose result is a 

1595 `WebSocketClientConnection`. 

1596 

1597 ``compression_options`` is interpreted in the same way as the 

1598 return value of `.WebSocketHandler.get_compression_options`. 

1599 

1600 The connection supports two styles of operation. In the coroutine 

1601 style, the application typically calls 

1602 `~.WebSocketClientConnection.read_message` in a loop:: 

1603 

1604 conn = yield websocket_connect(url) 

1605 while True: 

1606 msg = yield conn.read_message() 

1607 if msg is None: break 

1608 # Do something with msg 

1609 

1610 In the callback style, pass an ``on_message_callback`` to 

1611 ``websocket_connect``. In both styles, a message of ``None`` 

1612 indicates that the connection has been closed. 

1613 

1614 ``subprotocols`` may be a list of strings specifying proposed 

1615 subprotocols. The selected protocol may be found on the 

1616 ``selected_subprotocol`` attribute of the connection object 

1617 when the connection is complete. 

1618 

1619 .. versionchanged:: 3.2 

1620 Also accepts ``HTTPRequest`` objects in place of urls. 

1621 

1622 .. versionchanged:: 4.1 

1623 Added ``compression_options`` and ``on_message_callback``. 

1624 

1625 .. versionchanged:: 4.5 

1626 Added the ``ping_interval``, ``ping_timeout``, and ``max_message_size`` 

1627 arguments, which have the same meaning as in `WebSocketHandler`. 

1628 

1629 .. versionchanged:: 5.0 

1630 The ``io_loop`` argument (deprecated since version 4.1) has been removed. 

1631 

1632 .. versionchanged:: 5.1 

1633 Added the ``subprotocols`` argument. 

1634 """ 

1635 if isinstance(url, httpclient.HTTPRequest): 

1636 assert connect_timeout is None 

1637 request = url 

1638 # Copy and convert the headers dict/object (see comments in 

1639 # AsyncHTTPClient.fetch) 

1640 request.headers = httputil.HTTPHeaders(request.headers) 

1641 else: 

1642 request = httpclient.HTTPRequest(url, connect_timeout=connect_timeout) 

1643 request = cast( 

1644 httpclient.HTTPRequest, 

1645 httpclient._RequestProxy(request, httpclient.HTTPRequest._DEFAULTS), 

1646 ) 

1647 conn = WebSocketClientConnection( 

1648 request, 

1649 on_message_callback=on_message_callback, 

1650 compression_options=compression_options, 

1651 ping_interval=ping_interval, 

1652 ping_timeout=ping_timeout, 

1653 max_message_size=max_message_size, 

1654 subprotocols=subprotocols, 

1655 ) 

1656 if callback is not None: 

1657 IOLoop.current().add_future(conn.connect_future, callback) 

1658 return conn.connect_future