Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/websocket.py: 21%
735 statements
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-10 06:20 +0000
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-10 06:20 +0000
1"""Implementation of the WebSocket protocol.
3`WebSockets <http://dev.w3.org/html5/websockets/>`_ allow for bidirectional
4communication between the browser and server.
6WebSockets are supported in the current versions of all major browsers,
7although older versions that do not support WebSockets are still in use
8(refer to http://caniuse.com/websockets for details).
10This module implements the final version of the WebSocket protocol as
11defined in `RFC 6455 <http://tools.ietf.org/html/rfc6455>`_. Certain
12browser versions (notably Safari 5.x) implemented an earlier draft of
13the protocol (known as "draft 76") and are not compatible with this module.
15.. versionchanged:: 4.0
16 Removed support for the draft 76 protocol version.
17"""
19import abc
20import asyncio
21import base64
22import hashlib
23import os
24import sys
25import struct
26import tornado.escape
27import tornado.web
28from urllib.parse import urlparse
29import zlib
31from tornado.concurrent import Future, future_set_result_unless_cancelled
32from tornado.escape import utf8, native_str, to_unicode
33from tornado import gen, httpclient, httputil
34from tornado.ioloop import IOLoop, PeriodicCallback
35from tornado.iostream import StreamClosedError, IOStream
36from tornado.log import gen_log, app_log
37from tornado import simple_httpclient
38from tornado.queues import Queue
39from tornado.tcpclient import TCPClient
40from tornado.util import _websocket_mask
42from typing import (
43 TYPE_CHECKING,
44 cast,
45 Any,
46 Optional,
47 Dict,
48 Union,
49 List,
50 Awaitable,
51 Callable,
52 Tuple,
53 Type,
54)
55from types import TracebackType
57if TYPE_CHECKING:
58 from typing_extensions import Protocol
60 # The zlib compressor types aren't actually exposed anywhere
61 # publicly, so declare protocols for the portions we use.
62 class _Compressor(Protocol):
63 def compress(self, data: bytes) -> bytes:
64 pass
66 def flush(self, mode: int) -> bytes:
67 pass
69 class _Decompressor(Protocol):
70 unconsumed_tail = b"" # type: bytes
72 def decompress(self, data: bytes, max_length: int) -> bytes:
73 pass
75 class _WebSocketDelegate(Protocol):
76 # The common base interface implemented by WebSocketHandler on
77 # the server side and WebSocketClientConnection on the client
78 # side.
79 def on_ws_connection_close(
80 self, close_code: Optional[int] = None, close_reason: Optional[str] = None
81 ) -> None:
82 pass
84 def on_message(self, message: Union[str, bytes]) -> Optional["Awaitable[None]"]:
85 pass
87 def on_ping(self, data: bytes) -> None:
88 pass
90 def on_pong(self, data: bytes) -> None:
91 pass
93 def log_exception(
94 self,
95 typ: Optional[Type[BaseException]],
96 value: Optional[BaseException],
97 tb: Optional[TracebackType],
98 ) -> None:
99 pass
102_default_max_message_size = 10 * 1024 * 1024
105class WebSocketError(Exception):
106 pass
109class WebSocketClosedError(WebSocketError):
110 """Raised by operations on a closed connection.
112 .. versionadded:: 3.2
113 """
115 pass
118class _DecompressTooLargeError(Exception):
119 pass
122class _WebSocketParams(object):
123 def __init__(
124 self,
125 ping_interval: Optional[float] = None,
126 ping_timeout: Optional[float] = None,
127 max_message_size: int = _default_max_message_size,
128 compression_options: Optional[Dict[str, Any]] = None,
129 ) -> None:
130 self.ping_interval = ping_interval
131 self.ping_timeout = ping_timeout
132 self.max_message_size = max_message_size
133 self.compression_options = compression_options
136class WebSocketHandler(tornado.web.RequestHandler):
137 """Subclass this class to create a basic WebSocket handler.
139 Override `on_message` to handle incoming messages, and use
140 `write_message` to send messages to the client. You can also
141 override `open` and `on_close` to handle opened and closed
142 connections.
144 Custom upgrade response headers can be sent by overriding
145 `~tornado.web.RequestHandler.set_default_headers` or
146 `~tornado.web.RequestHandler.prepare`.
148 See http://dev.w3.org/html5/websockets/ for details on the
149 JavaScript interface. The protocol is specified at
150 http://tools.ietf.org/html/rfc6455.
152 Here is an example WebSocket handler that echos back all received messages
153 back to the client:
155 .. testcode::
157 class EchoWebSocket(tornado.websocket.WebSocketHandler):
158 def open(self):
159 print("WebSocket opened")
161 def on_message(self, message):
162 self.write_message(u"You said: " + message)
164 def on_close(self):
165 print("WebSocket closed")
167 .. testoutput::
168 :hide:
170 WebSockets are not standard HTTP connections. The "handshake" is
171 HTTP, but after the handshake, the protocol is
172 message-based. Consequently, most of the Tornado HTTP facilities
173 are not available in handlers of this type. The only communication
174 methods available to you are `write_message()`, `ping()`, and
175 `close()`. Likewise, your request handler class should implement
176 `open()` method rather than ``get()`` or ``post()``.
178 If you map the handler above to ``/websocket`` in your application, you can
179 invoke it in JavaScript with::
181 var ws = new WebSocket("ws://localhost:8888/websocket");
182 ws.onopen = function() {
183 ws.send("Hello, world");
184 };
185 ws.onmessage = function (evt) {
186 alert(evt.data);
187 };
189 This script pops up an alert box that says "You said: Hello, world".
191 Web browsers allow any site to open a websocket connection to any other,
192 instead of using the same-origin policy that governs other network
193 access from JavaScript. This can be surprising and is a potential
194 security hole, so since Tornado 4.0 `WebSocketHandler` requires
195 applications that wish to receive cross-origin websockets to opt in
196 by overriding the `~WebSocketHandler.check_origin` method (see that
197 method's docs for details). Failure to do so is the most likely
198 cause of 403 errors when making a websocket connection.
200 When using a secure websocket connection (``wss://``) with a self-signed
201 certificate, the connection from a browser may fail because it wants
202 to show the "accept this certificate" dialog but has nowhere to show it.
203 You must first visit a regular HTML page using the same certificate
204 to accept it before the websocket connection will succeed.
206 If the application setting ``websocket_ping_interval`` has a non-zero
207 value, a ping will be sent periodically, and the connection will be
208 closed if a response is not received before the ``websocket_ping_timeout``.
210 Messages larger than the ``websocket_max_message_size`` application setting
211 (default 10MiB) will not be accepted.
213 .. versionchanged:: 4.5
214 Added ``websocket_ping_interval``, ``websocket_ping_timeout``, and
215 ``websocket_max_message_size``.
216 """
218 def __init__(
219 self,
220 application: tornado.web.Application,
221 request: httputil.HTTPServerRequest,
222 **kwargs: Any
223 ) -> None:
224 super().__init__(application, request, **kwargs)
225 self.ws_connection = None # type: Optional[WebSocketProtocol]
226 self.close_code = None # type: Optional[int]
227 self.close_reason = None # type: Optional[str]
228 self._on_close_called = False
230 async def get(self, *args: Any, **kwargs: Any) -> None:
231 self.open_args = args
232 self.open_kwargs = kwargs
234 # Upgrade header should be present and should be equal to WebSocket
235 if self.request.headers.get("Upgrade", "").lower() != "websocket":
236 self.set_status(400)
237 log_msg = 'Can "Upgrade" only to "WebSocket".'
238 self.finish(log_msg)
239 gen_log.debug(log_msg)
240 return
242 # Connection header should be upgrade.
243 # Some proxy servers/load balancers
244 # might mess with it.
245 headers = self.request.headers
246 connection = map(
247 lambda s: s.strip().lower(), headers.get("Connection", "").split(",")
248 )
249 if "upgrade" not in connection:
250 self.set_status(400)
251 log_msg = '"Connection" must be "Upgrade".'
252 self.finish(log_msg)
253 gen_log.debug(log_msg)
254 return
256 # Handle WebSocket Origin naming convention differences
257 # The difference between version 8 and 13 is that in 8 the
258 # client sends a "Sec-Websocket-Origin" header and in 13 it's
259 # simply "Origin".
260 if "Origin" in self.request.headers:
261 origin = self.request.headers.get("Origin")
262 else:
263 origin = self.request.headers.get("Sec-Websocket-Origin", None)
265 # If there was an origin header, check to make sure it matches
266 # according to check_origin. When the origin is None, we assume it
267 # did not come from a browser and that it can be passed on.
268 if origin is not None and not self.check_origin(origin):
269 self.set_status(403)
270 log_msg = "Cross origin websockets not allowed"
271 self.finish(log_msg)
272 gen_log.debug(log_msg)
273 return
275 self.ws_connection = self.get_websocket_protocol()
276 if self.ws_connection:
277 await self.ws_connection.accept_connection(self)
278 else:
279 self.set_status(426, "Upgrade Required")
280 self.set_header("Sec-WebSocket-Version", "7, 8, 13")
282 @property
283 def ping_interval(self) -> Optional[float]:
284 """The interval for websocket keep-alive pings.
286 Set websocket_ping_interval = 0 to disable pings.
287 """
288 return self.settings.get("websocket_ping_interval", None)
290 @property
291 def ping_timeout(self) -> Optional[float]:
292 """If no ping is received in this many seconds,
293 close the websocket connection (VPNs, etc. can fail to cleanly close ws connections).
294 Default is max of 3 pings or 30 seconds.
295 """
296 return self.settings.get("websocket_ping_timeout", None)
298 @property
299 def max_message_size(self) -> int:
300 """Maximum allowed message size.
302 If the remote peer sends a message larger than this, the connection
303 will be closed.
305 Default is 10MiB.
306 """
307 return self.settings.get(
308 "websocket_max_message_size", _default_max_message_size
309 )
311 def write_message(
312 self, message: Union[bytes, str, Dict[str, Any]], binary: bool = False
313 ) -> "Future[None]":
314 """Sends the given message to the client of this Web Socket.
316 The message may be either a string or a dict (which will be
317 encoded as json). If the ``binary`` argument is false, the
318 message will be sent as utf8; in binary mode any byte string
319 is allowed.
321 If the connection is already closed, raises `WebSocketClosedError`.
322 Returns a `.Future` which can be used for flow control.
324 .. versionchanged:: 3.2
325 `WebSocketClosedError` was added (previously a closed connection
326 would raise an `AttributeError`)
328 .. versionchanged:: 4.3
329 Returns a `.Future` which can be used for flow control.
331 .. versionchanged:: 5.0
332 Consistently raises `WebSocketClosedError`. Previously could
333 sometimes raise `.StreamClosedError`.
334 """
335 if self.ws_connection is None or self.ws_connection.is_closing():
336 raise WebSocketClosedError()
337 if isinstance(message, dict):
338 message = tornado.escape.json_encode(message)
339 return self.ws_connection.write_message(message, binary=binary)
341 def select_subprotocol(self, subprotocols: List[str]) -> Optional[str]:
342 """Override to implement subprotocol negotiation.
344 ``subprotocols`` is a list of strings identifying the
345 subprotocols proposed by the client. This method may be
346 overridden to return one of those strings to select it, or
347 ``None`` to not select a subprotocol.
349 Failure to select a subprotocol does not automatically abort
350 the connection, although clients may close the connection if
351 none of their proposed subprotocols was selected.
353 The list may be empty, in which case this method must return
354 None. This method is always called exactly once even if no
355 subprotocols were proposed so that the handler can be advised
356 of this fact.
358 .. versionchanged:: 5.1
360 Previously, this method was called with a list containing
361 an empty string instead of an empty list if no subprotocols
362 were proposed by the client.
363 """
364 return None
366 @property
367 def selected_subprotocol(self) -> Optional[str]:
368 """The subprotocol returned by `select_subprotocol`.
370 .. versionadded:: 5.1
371 """
372 assert self.ws_connection is not None
373 return self.ws_connection.selected_subprotocol
375 def get_compression_options(self) -> Optional[Dict[str, Any]]:
376 """Override to return compression options for the connection.
378 If this method returns None (the default), compression will
379 be disabled. If it returns a dict (even an empty one), it
380 will be enabled. The contents of the dict may be used to
381 control the following compression options:
383 ``compression_level`` specifies the compression level.
385 ``mem_level`` specifies the amount of memory used for the internal compression state.
387 These parameters are documented in details here:
388 https://docs.python.org/3.6/library/zlib.html#zlib.compressobj
390 .. versionadded:: 4.1
392 .. versionchanged:: 4.5
394 Added ``compression_level`` and ``mem_level``.
395 """
396 # TODO: Add wbits option.
397 return None
399 def open(self, *args: str, **kwargs: str) -> Optional[Awaitable[None]]:
400 """Invoked when a new WebSocket is opened.
402 The arguments to `open` are extracted from the `tornado.web.URLSpec`
403 regular expression, just like the arguments to
404 `tornado.web.RequestHandler.get`.
406 `open` may be a coroutine. `on_message` will not be called until
407 `open` has returned.
409 .. versionchanged:: 5.1
411 ``open`` may be a coroutine.
412 """
413 pass
415 def on_message(self, message: Union[str, bytes]) -> Optional[Awaitable[None]]:
416 """Handle incoming messages on the WebSocket
418 This method must be overridden.
420 .. versionchanged:: 4.5
422 ``on_message`` can be a coroutine.
423 """
424 raise NotImplementedError
426 def ping(self, data: Union[str, bytes] = b"") -> None:
427 """Send ping frame to the remote end.
429 The data argument allows a small amount of data (up to 125
430 bytes) to be sent as a part of the ping message. Note that not
431 all websocket implementations expose this data to
432 applications.
434 Consider using the ``websocket_ping_interval`` application
435 setting instead of sending pings manually.
437 .. versionchanged:: 5.1
439 The data argument is now optional.
441 """
442 data = utf8(data)
443 if self.ws_connection is None or self.ws_connection.is_closing():
444 raise WebSocketClosedError()
445 self.ws_connection.write_ping(data)
447 def on_pong(self, data: bytes) -> None:
448 """Invoked when the response to a ping frame is received."""
449 pass
451 def on_ping(self, data: bytes) -> None:
452 """Invoked when the a ping frame is received."""
453 pass
455 def on_close(self) -> None:
456 """Invoked when the WebSocket is closed.
458 If the connection was closed cleanly and a status code or reason
459 phrase was supplied, these values will be available as the attributes
460 ``self.close_code`` and ``self.close_reason``.
462 .. versionchanged:: 4.0
464 Added ``close_code`` and ``close_reason`` attributes.
465 """
466 pass
468 def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None:
469 """Closes this Web Socket.
471 Once the close handshake is successful the socket will be closed.
473 ``code`` may be a numeric status code, taken from the values
474 defined in `RFC 6455 section 7.4.1
475 <https://tools.ietf.org/html/rfc6455#section-7.4.1>`_.
476 ``reason`` may be a textual message about why the connection is
477 closing. These values are made available to the client, but are
478 not otherwise interpreted by the websocket protocol.
480 .. versionchanged:: 4.0
482 Added the ``code`` and ``reason`` arguments.
483 """
484 if self.ws_connection:
485 self.ws_connection.close(code, reason)
486 self.ws_connection = None
488 def check_origin(self, origin: str) -> bool:
489 """Override to enable support for allowing alternate origins.
491 The ``origin`` argument is the value of the ``Origin`` HTTP
492 header, the url responsible for initiating this request. This
493 method is not called for clients that do not send this header;
494 such requests are always allowed (because all browsers that
495 implement WebSockets support this header, and non-browser
496 clients do not have the same cross-site security concerns).
498 Should return ``True`` to accept the request or ``False`` to
499 reject it. By default, rejects all requests with an origin on
500 a host other than this one.
502 This is a security protection against cross site scripting attacks on
503 browsers, since WebSockets are allowed to bypass the usual same-origin
504 policies and don't use CORS headers.
506 .. warning::
508 This is an important security measure; don't disable it
509 without understanding the security implications. In
510 particular, if your authentication is cookie-based, you
511 must either restrict the origins allowed by
512 ``check_origin()`` or implement your own XSRF-like
513 protection for websocket connections. See `these
514 <https://www.christian-schneider.net/CrossSiteWebSocketHijacking.html>`_
515 `articles
516 <https://devcenter.heroku.com/articles/websocket-security>`_
517 for more.
519 To accept all cross-origin traffic (which was the default prior to
520 Tornado 4.0), simply override this method to always return ``True``::
522 def check_origin(self, origin):
523 return True
525 To allow connections from any subdomain of your site, you might
526 do something like::
528 def check_origin(self, origin):
529 parsed_origin = urllib.parse.urlparse(origin)
530 return parsed_origin.netloc.endswith(".mydomain.com")
532 .. versionadded:: 4.0
534 """
535 parsed_origin = urlparse(origin)
536 origin = parsed_origin.netloc
537 origin = origin.lower()
539 host = self.request.headers.get("Host")
541 # Check to see that origin matches host directly, including ports
542 return origin == host
544 def set_nodelay(self, value: bool) -> None:
545 """Set the no-delay flag for this stream.
547 By default, small messages may be delayed and/or combined to minimize
548 the number of packets sent. This can sometimes cause 200-500ms delays
549 due to the interaction between Nagle's algorithm and TCP delayed
550 ACKs. To reduce this delay (at the expense of possibly increasing
551 bandwidth usage), call ``self.set_nodelay(True)`` once the websocket
552 connection is established.
554 See `.BaseIOStream.set_nodelay` for additional details.
556 .. versionadded:: 3.1
557 """
558 assert self.ws_connection is not None
559 self.ws_connection.set_nodelay(value)
561 def on_connection_close(self) -> None:
562 if self.ws_connection:
563 self.ws_connection.on_connection_close()
564 self.ws_connection = None
565 if not self._on_close_called:
566 self._on_close_called = True
567 self.on_close()
568 self._break_cycles()
570 def on_ws_connection_close(
571 self, close_code: Optional[int] = None, close_reason: Optional[str] = None
572 ) -> None:
573 self.close_code = close_code
574 self.close_reason = close_reason
575 self.on_connection_close()
577 def _break_cycles(self) -> None:
578 # WebSocketHandlers call finish() early, but we don't want to
579 # break up reference cycles (which makes it impossible to call
580 # self.render_string) until after we've really closed the
581 # connection (if it was established in the first place,
582 # indicated by status code 101).
583 if self.get_status() != 101 or self._on_close_called:
584 super()._break_cycles()
586 def get_websocket_protocol(self) -> Optional["WebSocketProtocol"]:
587 websocket_version = self.request.headers.get("Sec-WebSocket-Version")
588 if websocket_version in ("7", "8", "13"):
589 params = _WebSocketParams(
590 ping_interval=self.ping_interval,
591 ping_timeout=self.ping_timeout,
592 max_message_size=self.max_message_size,
593 compression_options=self.get_compression_options(),
594 )
595 return WebSocketProtocol13(self, False, params)
596 return None
598 def _detach_stream(self) -> IOStream:
599 # disable non-WS methods
600 for method in [
601 "write",
602 "redirect",
603 "set_header",
604 "set_cookie",
605 "set_status",
606 "flush",
607 "finish",
608 ]:
609 setattr(self, method, _raise_not_supported_for_websockets)
610 return self.detach()
613def _raise_not_supported_for_websockets(*args: Any, **kwargs: Any) -> None:
614 raise RuntimeError("Method not supported for Web Sockets")
617class WebSocketProtocol(abc.ABC):
618 """Base class for WebSocket protocol versions."""
620 def __init__(self, handler: "_WebSocketDelegate") -> None:
621 self.handler = handler
622 self.stream = None # type: Optional[IOStream]
623 self.client_terminated = False
624 self.server_terminated = False
626 def _run_callback(
627 self, callback: Callable, *args: Any, **kwargs: Any
628 ) -> "Optional[Future[Any]]":
629 """Runs the given callback with exception handling.
631 If the callback is a coroutine, returns its Future. On error, aborts the
632 websocket connection and returns None.
633 """
634 try:
635 result = callback(*args, **kwargs)
636 except Exception:
637 self.handler.log_exception(*sys.exc_info())
638 self._abort()
639 return None
640 else:
641 if result is not None:
642 result = gen.convert_yielded(result)
643 assert self.stream is not None
644 self.stream.io_loop.add_future(result, lambda f: f.result())
645 return result
647 def on_connection_close(self) -> None:
648 self._abort()
650 def _abort(self) -> None:
651 """Instantly aborts the WebSocket connection by closing the socket"""
652 self.client_terminated = True
653 self.server_terminated = True
654 if self.stream is not None:
655 self.stream.close() # forcibly tear down the connection
656 self.close() # let the subclass cleanup
658 @abc.abstractmethod
659 def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None:
660 raise NotImplementedError()
662 @abc.abstractmethod
663 def is_closing(self) -> bool:
664 raise NotImplementedError()
666 @abc.abstractmethod
667 async def accept_connection(self, handler: WebSocketHandler) -> None:
668 raise NotImplementedError()
670 @abc.abstractmethod
671 def write_message(
672 self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False
673 ) -> "Future[None]":
674 raise NotImplementedError()
676 @property
677 @abc.abstractmethod
678 def selected_subprotocol(self) -> Optional[str]:
679 raise NotImplementedError()
681 @abc.abstractmethod
682 def write_ping(self, data: bytes) -> None:
683 raise NotImplementedError()
685 # The entry points below are used by WebSocketClientConnection,
686 # which was introduced after we only supported a single version of
687 # WebSocketProtocol. The WebSocketProtocol/WebSocketProtocol13
688 # boundary is currently pretty ad-hoc.
689 @abc.abstractmethod
690 def _process_server_headers(
691 self, key: Union[str, bytes], headers: httputil.HTTPHeaders
692 ) -> None:
693 raise NotImplementedError()
695 @abc.abstractmethod
696 def start_pinging(self) -> None:
697 raise NotImplementedError()
699 @abc.abstractmethod
700 async def _receive_frame_loop(self) -> None:
701 raise NotImplementedError()
703 @abc.abstractmethod
704 def set_nodelay(self, x: bool) -> None:
705 raise NotImplementedError()
708class _PerMessageDeflateCompressor(object):
709 def __init__(
710 self,
711 persistent: bool,
712 max_wbits: Optional[int],
713 compression_options: Optional[Dict[str, Any]] = None,
714 ) -> None:
715 if max_wbits is None:
716 max_wbits = zlib.MAX_WBITS
717 # There is no symbolic constant for the minimum wbits value.
718 if not (8 <= max_wbits <= zlib.MAX_WBITS):
719 raise ValueError(
720 "Invalid max_wbits value %r; allowed range 8-%d",
721 max_wbits,
722 zlib.MAX_WBITS,
723 )
724 self._max_wbits = max_wbits
726 if (
727 compression_options is None
728 or "compression_level" not in compression_options
729 ):
730 self._compression_level = tornado.web.GZipContentEncoding.GZIP_LEVEL
731 else:
732 self._compression_level = compression_options["compression_level"]
734 if compression_options is None or "mem_level" not in compression_options:
735 self._mem_level = 8
736 else:
737 self._mem_level = compression_options["mem_level"]
739 if persistent:
740 self._compressor = self._create_compressor() # type: Optional[_Compressor]
741 else:
742 self._compressor = None
744 def _create_compressor(self) -> "_Compressor":
745 return zlib.compressobj(
746 self._compression_level, zlib.DEFLATED, -self._max_wbits, self._mem_level
747 )
749 def compress(self, data: bytes) -> bytes:
750 compressor = self._compressor or self._create_compressor()
751 data = compressor.compress(data) + compressor.flush(zlib.Z_SYNC_FLUSH)
752 assert data.endswith(b"\x00\x00\xff\xff")
753 return data[:-4]
756class _PerMessageDeflateDecompressor(object):
757 def __init__(
758 self,
759 persistent: bool,
760 max_wbits: Optional[int],
761 max_message_size: int,
762 compression_options: Optional[Dict[str, Any]] = None,
763 ) -> None:
764 self._max_message_size = max_message_size
765 if max_wbits is None:
766 max_wbits = zlib.MAX_WBITS
767 if not (8 <= max_wbits <= zlib.MAX_WBITS):
768 raise ValueError(
769 "Invalid max_wbits value %r; allowed range 8-%d",
770 max_wbits,
771 zlib.MAX_WBITS,
772 )
773 self._max_wbits = max_wbits
774 if persistent:
775 self._decompressor = (
776 self._create_decompressor()
777 ) # type: Optional[_Decompressor]
778 else:
779 self._decompressor = None
781 def _create_decompressor(self) -> "_Decompressor":
782 return zlib.decompressobj(-self._max_wbits)
784 def decompress(self, data: bytes) -> bytes:
785 decompressor = self._decompressor or self._create_decompressor()
786 result = decompressor.decompress(
787 data + b"\x00\x00\xff\xff", self._max_message_size
788 )
789 if decompressor.unconsumed_tail:
790 raise _DecompressTooLargeError()
791 return result
794class WebSocketProtocol13(WebSocketProtocol):
795 """Implementation of the WebSocket protocol from RFC 6455.
797 This class supports versions 7 and 8 of the protocol in addition to the
798 final version 13.
799 """
801 # Bit masks for the first byte of a frame.
802 FIN = 0x80
803 RSV1 = 0x40
804 RSV2 = 0x20
805 RSV3 = 0x10
806 RSV_MASK = RSV1 | RSV2 | RSV3
807 OPCODE_MASK = 0x0F
809 stream = None # type: IOStream
811 def __init__(
812 self,
813 handler: "_WebSocketDelegate",
814 mask_outgoing: bool,
815 params: _WebSocketParams,
816 ) -> None:
817 WebSocketProtocol.__init__(self, handler)
818 self.mask_outgoing = mask_outgoing
819 self.params = params
820 self._final_frame = False
821 self._frame_opcode = None
822 self._masked_frame = None
823 self._frame_mask = None # type: Optional[bytes]
824 self._frame_length = None
825 self._fragmented_message_buffer = None # type: Optional[bytes]
826 self._fragmented_message_opcode = None
827 self._waiting = None # type: object
828 self._compression_options = params.compression_options
829 self._decompressor = None # type: Optional[_PerMessageDeflateDecompressor]
830 self._compressor = None # type: Optional[_PerMessageDeflateCompressor]
831 self._frame_compressed = None # type: Optional[bool]
832 # The total uncompressed size of all messages received or sent.
833 # Unicode messages are encoded to utf8.
834 # Only for testing; subject to change.
835 self._message_bytes_in = 0
836 self._message_bytes_out = 0
837 # The total size of all packets received or sent. Includes
838 # the effect of compression, frame overhead, and control frames.
839 self._wire_bytes_in = 0
840 self._wire_bytes_out = 0
841 self.ping_callback = None # type: Optional[PeriodicCallback]
842 self.last_ping = 0.0
843 self.last_pong = 0.0
844 self.close_code = None # type: Optional[int]
845 self.close_reason = None # type: Optional[str]
847 # Use a property for this to satisfy the abc.
848 @property
849 def selected_subprotocol(self) -> Optional[str]:
850 return self._selected_subprotocol
852 @selected_subprotocol.setter
853 def selected_subprotocol(self, value: Optional[str]) -> None:
854 self._selected_subprotocol = value
856 async def accept_connection(self, handler: WebSocketHandler) -> None:
857 try:
858 self._handle_websocket_headers(handler)
859 except ValueError:
860 handler.set_status(400)
861 log_msg = "Missing/Invalid WebSocket headers"
862 handler.finish(log_msg)
863 gen_log.debug(log_msg)
864 return
866 try:
867 await self._accept_connection(handler)
868 except asyncio.CancelledError:
869 self._abort()
870 return
871 except ValueError:
872 gen_log.debug("Malformed WebSocket request received", exc_info=True)
873 self._abort()
874 return
876 def _handle_websocket_headers(self, handler: WebSocketHandler) -> None:
877 """Verifies all invariant- and required headers
879 If a header is missing or have an incorrect value ValueError will be
880 raised
881 """
882 fields = ("Host", "Sec-Websocket-Key", "Sec-Websocket-Version")
883 if not all(map(lambda f: handler.request.headers.get(f), fields)):
884 raise ValueError("Missing/Invalid WebSocket headers")
886 @staticmethod
887 def compute_accept_value(key: Union[str, bytes]) -> str:
888 """Computes the value for the Sec-WebSocket-Accept header,
889 given the value for Sec-WebSocket-Key.
890 """
891 sha1 = hashlib.sha1()
892 sha1.update(utf8(key))
893 sha1.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11") # Magic value
894 return native_str(base64.b64encode(sha1.digest()))
896 def _challenge_response(self, handler: WebSocketHandler) -> str:
897 return WebSocketProtocol13.compute_accept_value(
898 cast(str, handler.request.headers.get("Sec-Websocket-Key"))
899 )
901 async def _accept_connection(self, handler: WebSocketHandler) -> None:
902 subprotocol_header = handler.request.headers.get("Sec-WebSocket-Protocol")
903 if subprotocol_header:
904 subprotocols = [s.strip() for s in subprotocol_header.split(",")]
905 else:
906 subprotocols = []
907 self.selected_subprotocol = handler.select_subprotocol(subprotocols)
908 if self.selected_subprotocol:
909 assert self.selected_subprotocol in subprotocols
910 handler.set_header("Sec-WebSocket-Protocol", self.selected_subprotocol)
912 extensions = self._parse_extensions_header(handler.request.headers)
913 for ext in extensions:
914 if ext[0] == "permessage-deflate" and self._compression_options is not None:
915 # TODO: negotiate parameters if compression_options
916 # specifies limits.
917 self._create_compressors("server", ext[1], self._compression_options)
918 if (
919 "client_max_window_bits" in ext[1]
920 and ext[1]["client_max_window_bits"] is None
921 ):
922 # Don't echo an offered client_max_window_bits
923 # parameter with no value.
924 del ext[1]["client_max_window_bits"]
925 handler.set_header(
926 "Sec-WebSocket-Extensions",
927 httputil._encode_header("permessage-deflate", ext[1]),
928 )
929 break
931 handler.clear_header("Content-Type")
932 handler.set_status(101)
933 handler.set_header("Upgrade", "websocket")
934 handler.set_header("Connection", "Upgrade")
935 handler.set_header("Sec-WebSocket-Accept", self._challenge_response(handler))
936 handler.finish()
938 self.stream = handler._detach_stream()
940 self.start_pinging()
941 try:
942 open_result = handler.open(*handler.open_args, **handler.open_kwargs)
943 if open_result is not None:
944 await open_result
945 except Exception:
946 handler.log_exception(*sys.exc_info())
947 self._abort()
948 return
950 await self._receive_frame_loop()
952 def _parse_extensions_header(
953 self, headers: httputil.HTTPHeaders
954 ) -> List[Tuple[str, Dict[str, str]]]:
955 extensions = headers.get("Sec-WebSocket-Extensions", "")
956 if extensions:
957 return [httputil._parse_header(e.strip()) for e in extensions.split(",")]
958 return []
960 def _process_server_headers(
961 self, key: Union[str, bytes], headers: httputil.HTTPHeaders
962 ) -> None:
963 """Process the headers sent by the server to this client connection.
965 'key' is the websocket handshake challenge/response key.
966 """
967 assert headers["Upgrade"].lower() == "websocket"
968 assert headers["Connection"].lower() == "upgrade"
969 accept = self.compute_accept_value(key)
970 assert headers["Sec-Websocket-Accept"] == accept
972 extensions = self._parse_extensions_header(headers)
973 for ext in extensions:
974 if ext[0] == "permessage-deflate" and self._compression_options is not None:
975 self._create_compressors("client", ext[1])
976 else:
977 raise ValueError("unsupported extension %r", ext)
979 self.selected_subprotocol = headers.get("Sec-WebSocket-Protocol", None)
981 def _get_compressor_options(
982 self,
983 side: str,
984 agreed_parameters: Dict[str, Any],
985 compression_options: Optional[Dict[str, Any]] = None,
986 ) -> Dict[str, Any]:
987 """Converts a websocket agreed_parameters set to keyword arguments
988 for our compressor objects.
989 """
990 options = dict(
991 persistent=(side + "_no_context_takeover") not in agreed_parameters
992 ) # type: Dict[str, Any]
993 wbits_header = agreed_parameters.get(side + "_max_window_bits", None)
994 if wbits_header is None:
995 options["max_wbits"] = zlib.MAX_WBITS
996 else:
997 options["max_wbits"] = int(wbits_header)
998 options["compression_options"] = compression_options
999 return options
1001 def _create_compressors(
1002 self,
1003 side: str,
1004 agreed_parameters: Dict[str, Any],
1005 compression_options: Optional[Dict[str, Any]] = None,
1006 ) -> None:
1007 # TODO: handle invalid parameters gracefully
1008 allowed_keys = set(
1009 [
1010 "server_no_context_takeover",
1011 "client_no_context_takeover",
1012 "server_max_window_bits",
1013 "client_max_window_bits",
1014 ]
1015 )
1016 for key in agreed_parameters:
1017 if key not in allowed_keys:
1018 raise ValueError("unsupported compression parameter %r" % key)
1019 other_side = "client" if (side == "server") else "server"
1020 self._compressor = _PerMessageDeflateCompressor(
1021 **self._get_compressor_options(side, agreed_parameters, compression_options)
1022 )
1023 self._decompressor = _PerMessageDeflateDecompressor(
1024 max_message_size=self.params.max_message_size,
1025 **self._get_compressor_options(
1026 other_side, agreed_parameters, compression_options
1027 )
1028 )
1030 def _write_frame(
1031 self, fin: bool, opcode: int, data: bytes, flags: int = 0
1032 ) -> "Future[None]":
1033 data_len = len(data)
1034 if opcode & 0x8:
1035 # All control frames MUST have a payload length of 125
1036 # bytes or less and MUST NOT be fragmented.
1037 if not fin:
1038 raise ValueError("control frames may not be fragmented")
1039 if data_len > 125:
1040 raise ValueError("control frame payloads may not exceed 125 bytes")
1041 if fin:
1042 finbit = self.FIN
1043 else:
1044 finbit = 0
1045 frame = struct.pack("B", finbit | opcode | flags)
1046 if self.mask_outgoing:
1047 mask_bit = 0x80
1048 else:
1049 mask_bit = 0
1050 if data_len < 126:
1051 frame += struct.pack("B", data_len | mask_bit)
1052 elif data_len <= 0xFFFF:
1053 frame += struct.pack("!BH", 126 | mask_bit, data_len)
1054 else:
1055 frame += struct.pack("!BQ", 127 | mask_bit, data_len)
1056 if self.mask_outgoing:
1057 mask = os.urandom(4)
1058 data = mask + _websocket_mask(mask, data)
1059 frame += data
1060 self._wire_bytes_out += len(frame)
1061 return self.stream.write(frame)
1063 def write_message(
1064 self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False
1065 ) -> "Future[None]":
1066 """Sends the given message to the client of this Web Socket."""
1067 if binary:
1068 opcode = 0x2
1069 else:
1070 opcode = 0x1
1071 if isinstance(message, dict):
1072 message = tornado.escape.json_encode(message)
1073 message = tornado.escape.utf8(message)
1074 assert isinstance(message, bytes)
1075 self._message_bytes_out += len(message)
1076 flags = 0
1077 if self._compressor:
1078 message = self._compressor.compress(message)
1079 flags |= self.RSV1
1080 # For historical reasons, write methods in Tornado operate in a semi-synchronous
1081 # mode in which awaiting the Future they return is optional (But errors can
1082 # still be raised). This requires us to go through an awkward dance here
1083 # to transform the errors that may be returned while presenting the same
1084 # semi-synchronous interface.
1085 try:
1086 fut = self._write_frame(True, opcode, message, flags=flags)
1087 except StreamClosedError:
1088 raise WebSocketClosedError()
1090 async def wrapper() -> None:
1091 try:
1092 await fut
1093 except StreamClosedError:
1094 raise WebSocketClosedError()
1096 return asyncio.ensure_future(wrapper())
1098 def write_ping(self, data: bytes) -> None:
1099 """Send ping frame."""
1100 assert isinstance(data, bytes)
1101 self._write_frame(True, 0x9, data)
1103 async def _receive_frame_loop(self) -> None:
1104 try:
1105 while not self.client_terminated:
1106 await self._receive_frame()
1107 except StreamClosedError:
1108 self._abort()
1109 self.handler.on_ws_connection_close(self.close_code, self.close_reason)
1111 async def _read_bytes(self, n: int) -> bytes:
1112 data = await self.stream.read_bytes(n)
1113 self._wire_bytes_in += n
1114 return data
1116 async def _receive_frame(self) -> None:
1117 # Read the frame header.
1118 data = await self._read_bytes(2)
1119 header, mask_payloadlen = struct.unpack("BB", data)
1120 is_final_frame = header & self.FIN
1121 reserved_bits = header & self.RSV_MASK
1122 opcode = header & self.OPCODE_MASK
1123 opcode_is_control = opcode & 0x8
1124 if self._decompressor is not None and opcode != 0:
1125 # Compression flag is present in the first frame's header,
1126 # but we can't decompress until we have all the frames of
1127 # the message.
1128 self._frame_compressed = bool(reserved_bits & self.RSV1)
1129 reserved_bits &= ~self.RSV1
1130 if reserved_bits:
1131 # client is using as-yet-undefined extensions; abort
1132 self._abort()
1133 return
1134 is_masked = bool(mask_payloadlen & 0x80)
1135 payloadlen = mask_payloadlen & 0x7F
1137 # Parse and validate the length.
1138 if opcode_is_control and payloadlen >= 126:
1139 # control frames must have payload < 126
1140 self._abort()
1141 return
1142 if payloadlen < 126:
1143 self._frame_length = payloadlen
1144 elif payloadlen == 126:
1145 data = await self._read_bytes(2)
1146 payloadlen = struct.unpack("!H", data)[0]
1147 elif payloadlen == 127:
1148 data = await self._read_bytes(8)
1149 payloadlen = struct.unpack("!Q", data)[0]
1150 new_len = payloadlen
1151 if self._fragmented_message_buffer is not None:
1152 new_len += len(self._fragmented_message_buffer)
1153 if new_len > self.params.max_message_size:
1154 self.close(1009, "message too big")
1155 self._abort()
1156 return
1158 # Read the payload, unmasking if necessary.
1159 if is_masked:
1160 self._frame_mask = await self._read_bytes(4)
1161 data = await self._read_bytes(payloadlen)
1162 if is_masked:
1163 assert self._frame_mask is not None
1164 data = _websocket_mask(self._frame_mask, data)
1166 # Decide what to do with this frame.
1167 if opcode_is_control:
1168 # control frames may be interleaved with a series of fragmented
1169 # data frames, so control frames must not interact with
1170 # self._fragmented_*
1171 if not is_final_frame:
1172 # control frames must not be fragmented
1173 self._abort()
1174 return
1175 elif opcode == 0: # continuation frame
1176 if self._fragmented_message_buffer is None:
1177 # nothing to continue
1178 self._abort()
1179 return
1180 self._fragmented_message_buffer += data
1181 if is_final_frame:
1182 opcode = self._fragmented_message_opcode
1183 data = self._fragmented_message_buffer
1184 self._fragmented_message_buffer = None
1185 else: # start of new data message
1186 if self._fragmented_message_buffer is not None:
1187 # can't start new message until the old one is finished
1188 self._abort()
1189 return
1190 if not is_final_frame:
1191 self._fragmented_message_opcode = opcode
1192 self._fragmented_message_buffer = data
1194 if is_final_frame:
1195 handled_future = self._handle_message(opcode, data)
1196 if handled_future is not None:
1197 await handled_future
1199 def _handle_message(self, opcode: int, data: bytes) -> "Optional[Future[None]]":
1200 """Execute on_message, returning its Future if it is a coroutine."""
1201 if self.client_terminated:
1202 return None
1204 if self._frame_compressed:
1205 assert self._decompressor is not None
1206 try:
1207 data = self._decompressor.decompress(data)
1208 except _DecompressTooLargeError:
1209 self.close(1009, "message too big after decompression")
1210 self._abort()
1211 return None
1213 if opcode == 0x1:
1214 # UTF-8 data
1215 self._message_bytes_in += len(data)
1216 try:
1217 decoded = data.decode("utf-8")
1218 except UnicodeDecodeError:
1219 self._abort()
1220 return None
1221 return self._run_callback(self.handler.on_message, decoded)
1222 elif opcode == 0x2:
1223 # Binary data
1224 self._message_bytes_in += len(data)
1225 return self._run_callback(self.handler.on_message, data)
1226 elif opcode == 0x8:
1227 # Close
1228 self.client_terminated = True
1229 if len(data) >= 2:
1230 self.close_code = struct.unpack(">H", data[:2])[0]
1231 if len(data) > 2:
1232 self.close_reason = to_unicode(data[2:])
1233 # Echo the received close code, if any (RFC 6455 section 5.5.1).
1234 self.close(self.close_code)
1235 elif opcode == 0x9:
1236 # Ping
1237 try:
1238 self._write_frame(True, 0xA, data)
1239 except StreamClosedError:
1240 self._abort()
1241 self._run_callback(self.handler.on_ping, data)
1242 elif opcode == 0xA:
1243 # Pong
1244 self.last_pong = IOLoop.current().time()
1245 return self._run_callback(self.handler.on_pong, data)
1246 else:
1247 self._abort()
1248 return None
1250 def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None:
1251 """Closes the WebSocket connection."""
1252 if not self.server_terminated:
1253 if not self.stream.closed():
1254 if code is None and reason is not None:
1255 code = 1000 # "normal closure" status code
1256 if code is None:
1257 close_data = b""
1258 else:
1259 close_data = struct.pack(">H", code)
1260 if reason is not None:
1261 close_data += utf8(reason)
1262 try:
1263 self._write_frame(True, 0x8, close_data)
1264 except StreamClosedError:
1265 self._abort()
1266 self.server_terminated = True
1267 if self.client_terminated:
1268 if self._waiting is not None:
1269 self.stream.io_loop.remove_timeout(self._waiting)
1270 self._waiting = None
1271 self.stream.close()
1272 elif self._waiting is None:
1273 # Give the client a few seconds to complete a clean shutdown,
1274 # otherwise just close the connection.
1275 self._waiting = self.stream.io_loop.add_timeout(
1276 self.stream.io_loop.time() + 5, self._abort
1277 )
1278 if self.ping_callback:
1279 self.ping_callback.stop()
1280 self.ping_callback = None
1282 def is_closing(self) -> bool:
1283 """Return ``True`` if this connection is closing.
1285 The connection is considered closing if either side has
1286 initiated its closing handshake or if the stream has been
1287 shut down uncleanly.
1288 """
1289 return self.stream.closed() or self.client_terminated or self.server_terminated
1291 @property
1292 def ping_interval(self) -> Optional[float]:
1293 interval = self.params.ping_interval
1294 if interval is not None:
1295 return interval
1296 return 0
1298 @property
1299 def ping_timeout(self) -> Optional[float]:
1300 timeout = self.params.ping_timeout
1301 if timeout is not None:
1302 return timeout
1303 assert self.ping_interval is not None
1304 return max(3 * self.ping_interval, 30)
1306 def start_pinging(self) -> None:
1307 """Start sending periodic pings to keep the connection alive"""
1308 assert self.ping_interval is not None
1309 if self.ping_interval > 0:
1310 self.last_ping = self.last_pong = IOLoop.current().time()
1311 self.ping_callback = PeriodicCallback(
1312 self.periodic_ping, self.ping_interval * 1000
1313 )
1314 self.ping_callback.start()
1316 def periodic_ping(self) -> None:
1317 """Send a ping to keep the websocket alive
1319 Called periodically if the websocket_ping_interval is set and non-zero.
1320 """
1321 if self.is_closing() and self.ping_callback is not None:
1322 self.ping_callback.stop()
1323 return
1325 # Check for timeout on pong. Make sure that we really have
1326 # sent a recent ping in case the machine with both server and
1327 # client has been suspended since the last ping.
1328 now = IOLoop.current().time()
1329 since_last_pong = now - self.last_pong
1330 since_last_ping = now - self.last_ping
1331 assert self.ping_interval is not None
1332 assert self.ping_timeout is not None
1333 if (
1334 since_last_ping < 2 * self.ping_interval
1335 and since_last_pong > self.ping_timeout
1336 ):
1337 self.close()
1338 return
1340 self.write_ping(b"")
1341 self.last_ping = now
1343 def set_nodelay(self, x: bool) -> None:
1344 self.stream.set_nodelay(x)
1347class WebSocketClientConnection(simple_httpclient._HTTPConnection):
1348 """WebSocket client connection.
1350 This class should not be instantiated directly; use the
1351 `websocket_connect` function instead.
1352 """
1354 protocol = None # type: WebSocketProtocol
1356 def __init__(
1357 self,
1358 request: httpclient.HTTPRequest,
1359 on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]] = None,
1360 compression_options: Optional[Dict[str, Any]] = None,
1361 ping_interval: Optional[float] = None,
1362 ping_timeout: Optional[float] = None,
1363 max_message_size: int = _default_max_message_size,
1364 subprotocols: Optional[List[str]] = [],
1365 ) -> None:
1366 self.connect_future = Future() # type: Future[WebSocketClientConnection]
1367 self.read_queue = Queue(1) # type: Queue[Union[None, str, bytes]]
1368 self.key = base64.b64encode(os.urandom(16))
1369 self._on_message_callback = on_message_callback
1370 self.close_code = None # type: Optional[int]
1371 self.close_reason = None # type: Optional[str]
1372 self.params = _WebSocketParams(
1373 ping_interval=ping_interval,
1374 ping_timeout=ping_timeout,
1375 max_message_size=max_message_size,
1376 compression_options=compression_options,
1377 )
1379 scheme, sep, rest = request.url.partition(":")
1380 scheme = {"ws": "http", "wss": "https"}[scheme]
1381 request.url = scheme + sep + rest
1382 request.headers.update(
1383 {
1384 "Upgrade": "websocket",
1385 "Connection": "Upgrade",
1386 "Sec-WebSocket-Key": self.key,
1387 "Sec-WebSocket-Version": "13",
1388 }
1389 )
1390 if subprotocols is not None:
1391 request.headers["Sec-WebSocket-Protocol"] = ",".join(subprotocols)
1392 if compression_options is not None:
1393 # Always offer to let the server set our max_wbits (and even though
1394 # we don't offer it, we will accept a client_no_context_takeover
1395 # from the server).
1396 # TODO: set server parameters for deflate extension
1397 # if requested in self.compression_options.
1398 request.headers[
1399 "Sec-WebSocket-Extensions"
1400 ] = "permessage-deflate; client_max_window_bits"
1402 # Websocket connection is currently unable to follow redirects
1403 request.follow_redirects = False
1405 self.tcp_client = TCPClient()
1406 super().__init__(
1407 None,
1408 request,
1409 lambda: None,
1410 self._on_http_response,
1411 104857600,
1412 self.tcp_client,
1413 65536,
1414 104857600,
1415 )
1417 def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None:
1418 """Closes the websocket connection.
1420 ``code`` and ``reason`` are documented under
1421 `WebSocketHandler.close`.
1423 .. versionadded:: 3.2
1425 .. versionchanged:: 4.0
1427 Added the ``code`` and ``reason`` arguments.
1428 """
1429 if self.protocol is not None:
1430 self.protocol.close(code, reason)
1431 self.protocol = None # type: ignore
1433 def on_connection_close(self) -> None:
1434 if not self.connect_future.done():
1435 self.connect_future.set_exception(StreamClosedError())
1436 self._on_message(None)
1437 self.tcp_client.close()
1438 super().on_connection_close()
1440 def on_ws_connection_close(
1441 self, close_code: Optional[int] = None, close_reason: Optional[str] = None
1442 ) -> None:
1443 self.close_code = close_code
1444 self.close_reason = close_reason
1445 self.on_connection_close()
1447 def _on_http_response(self, response: httpclient.HTTPResponse) -> None:
1448 if not self.connect_future.done():
1449 if response.error:
1450 self.connect_future.set_exception(response.error)
1451 else:
1452 self.connect_future.set_exception(
1453 WebSocketError("Non-websocket response")
1454 )
1456 async def headers_received(
1457 self,
1458 start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
1459 headers: httputil.HTTPHeaders,
1460 ) -> None:
1461 assert isinstance(start_line, httputil.ResponseStartLine)
1462 if start_line.code != 101:
1463 await super().headers_received(start_line, headers)
1464 return
1466 if self._timeout is not None:
1467 self.io_loop.remove_timeout(self._timeout)
1468 self._timeout = None
1470 self.headers = headers
1471 self.protocol = self.get_websocket_protocol()
1472 self.protocol._process_server_headers(self.key, self.headers)
1473 self.protocol.stream = self.connection.detach()
1475 IOLoop.current().add_callback(self.protocol._receive_frame_loop)
1476 self.protocol.start_pinging()
1478 # Once we've taken over the connection, clear the final callback
1479 # we set on the http request. This deactivates the error handling
1480 # in simple_httpclient that would otherwise interfere with our
1481 # ability to see exceptions.
1482 self.final_callback = None # type: ignore
1484 future_set_result_unless_cancelled(self.connect_future, self)
1486 def write_message(
1487 self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False
1488 ) -> "Future[None]":
1489 """Sends a message to the WebSocket server.
1491 If the stream is closed, raises `WebSocketClosedError`.
1492 Returns a `.Future` which can be used for flow control.
1494 .. versionchanged:: 5.0
1495 Exception raised on a closed stream changed from `.StreamClosedError`
1496 to `WebSocketClosedError`.
1497 """
1498 if self.protocol is None:
1499 raise WebSocketClosedError("Client connection has been closed")
1500 return self.protocol.write_message(message, binary=binary)
1502 def read_message(
1503 self,
1504 callback: Optional[Callable[["Future[Union[None, str, bytes]]"], None]] = None,
1505 ) -> Awaitable[Union[None, str, bytes]]:
1506 """Reads a message from the WebSocket server.
1508 If on_message_callback was specified at WebSocket
1509 initialization, this function will never return messages
1511 Returns a future whose result is the message, or None
1512 if the connection is closed. If a callback argument
1513 is given it will be called with the future when it is
1514 ready.
1515 """
1517 awaitable = self.read_queue.get()
1518 if callback is not None:
1519 self.io_loop.add_future(asyncio.ensure_future(awaitable), callback)
1520 return awaitable
1522 def on_message(self, message: Union[str, bytes]) -> Optional[Awaitable[None]]:
1523 return self._on_message(message)
1525 def _on_message(
1526 self, message: Union[None, str, bytes]
1527 ) -> Optional[Awaitable[None]]:
1528 if self._on_message_callback:
1529 self._on_message_callback(message)
1530 return None
1531 else:
1532 return self.read_queue.put(message)
1534 def ping(self, data: bytes = b"") -> None:
1535 """Send ping frame to the remote end.
1537 The data argument allows a small amount of data (up to 125
1538 bytes) to be sent as a part of the ping message. Note that not
1539 all websocket implementations expose this data to
1540 applications.
1542 Consider using the ``ping_interval`` argument to
1543 `websocket_connect` instead of sending pings manually.
1545 .. versionadded:: 5.1
1547 """
1548 data = utf8(data)
1549 if self.protocol is None:
1550 raise WebSocketClosedError()
1551 self.protocol.write_ping(data)
1553 def on_pong(self, data: bytes) -> None:
1554 pass
1556 def on_ping(self, data: bytes) -> None:
1557 pass
1559 def get_websocket_protocol(self) -> WebSocketProtocol:
1560 return WebSocketProtocol13(self, mask_outgoing=True, params=self.params)
1562 @property
1563 def selected_subprotocol(self) -> Optional[str]:
1564 """The subprotocol selected by the server.
1566 .. versionadded:: 5.1
1567 """
1568 return self.protocol.selected_subprotocol
1570 def log_exception(
1571 self,
1572 typ: "Optional[Type[BaseException]]",
1573 value: Optional[BaseException],
1574 tb: Optional[TracebackType],
1575 ) -> None:
1576 assert typ is not None
1577 assert value is not None
1578 app_log.error("Uncaught exception %s", value, exc_info=(typ, value, tb))
1581def websocket_connect(
1582 url: Union[str, httpclient.HTTPRequest],
1583 callback: Optional[Callable[["Future[WebSocketClientConnection]"], None]] = None,
1584 connect_timeout: Optional[float] = None,
1585 on_message_callback: Optional[Callable[[Union[None, str, bytes]], None]] = None,
1586 compression_options: Optional[Dict[str, Any]] = None,
1587 ping_interval: Optional[float] = None,
1588 ping_timeout: Optional[float] = None,
1589 max_message_size: int = _default_max_message_size,
1590 subprotocols: Optional[List[str]] = None,
1591) -> "Awaitable[WebSocketClientConnection]":
1592 """Client-side websocket support.
1594 Takes a url and returns a Future whose result is a
1595 `WebSocketClientConnection`.
1597 ``compression_options`` is interpreted in the same way as the
1598 return value of `.WebSocketHandler.get_compression_options`.
1600 The connection supports two styles of operation. In the coroutine
1601 style, the application typically calls
1602 `~.WebSocketClientConnection.read_message` in a loop::
1604 conn = yield websocket_connect(url)
1605 while True:
1606 msg = yield conn.read_message()
1607 if msg is None: break
1608 # Do something with msg
1610 In the callback style, pass an ``on_message_callback`` to
1611 ``websocket_connect``. In both styles, a message of ``None``
1612 indicates that the connection has been closed.
1614 ``subprotocols`` may be a list of strings specifying proposed
1615 subprotocols. The selected protocol may be found on the
1616 ``selected_subprotocol`` attribute of the connection object
1617 when the connection is complete.
1619 .. versionchanged:: 3.2
1620 Also accepts ``HTTPRequest`` objects in place of urls.
1622 .. versionchanged:: 4.1
1623 Added ``compression_options`` and ``on_message_callback``.
1625 .. versionchanged:: 4.5
1626 Added the ``ping_interval``, ``ping_timeout``, and ``max_message_size``
1627 arguments, which have the same meaning as in `WebSocketHandler`.
1629 .. versionchanged:: 5.0
1630 The ``io_loop`` argument (deprecated since version 4.1) has been removed.
1632 .. versionchanged:: 5.1
1633 Added the ``subprotocols`` argument.
1634 """
1635 if isinstance(url, httpclient.HTTPRequest):
1636 assert connect_timeout is None
1637 request = url
1638 # Copy and convert the headers dict/object (see comments in
1639 # AsyncHTTPClient.fetch)
1640 request.headers = httputil.HTTPHeaders(request.headers)
1641 else:
1642 request = httpclient.HTTPRequest(url, connect_timeout=connect_timeout)
1643 request = cast(
1644 httpclient.HTTPRequest,
1645 httpclient._RequestProxy(request, httpclient.HTTPRequest._DEFAULTS),
1646 )
1647 conn = WebSocketClientConnection(
1648 request,
1649 on_message_callback=on_message_callback,
1650 compression_options=compression_options,
1651 ping_interval=ping_interval,
1652 ping_timeout=ping_timeout,
1653 max_message_size=max_message_size,
1654 subprotocols=subprotocols,
1655 )
1656 if callback is not None:
1657 IOLoop.current().add_future(conn.connect_future, callback)
1658 return conn.connect_future