Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_ws.py: 28%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import base64
3import binascii
4import hashlib
5import json
6import sys
7from collections.abc import Callable, Iterable
8from typing import Any, Final, Generic, Literal, Union, overload
10from multidict import CIMultiDict
12from . import hdrs
13from ._websocket.reader import WebSocketDataQueue
14from ._websocket.writer import DEFAULT_LIMIT
15from .abc import AbstractStreamWriter
16from .client_exceptions import WSMessageTypeError
17from .helpers import (
18 calculate_timeout_when,
19 frozen_dataclass_decorator,
20 set_exception,
21 set_result,
22)
23from .http import (
24 WS_CLOSED_MESSAGE,
25 WS_CLOSING_MESSAGE,
26 WS_KEY,
27 WebSocketError,
28 WebSocketReader,
29 WebSocketWriter,
30 WSCloseCode,
31 WSMessageDecodeText,
32 WSMessageNoDecodeText,
33 WSMsgType,
34 ws_ext_gen,
35 ws_ext_parse,
36)
37from .http_websocket import _INTERNAL_RECEIVE_TYPES, WSMessageError
38from .log import ws_logger
39from .streams import EofStream
40from .typedefs import JSONBytesEncoder, JSONDecoder, JSONEncoder
41from .web_exceptions import HTTPBadRequest, HTTPException
42from .web_request import BaseRequest
43from .web_response import StreamResponse
45if sys.version_info >= (3, 13):
46 from typing import TypeVar
47else:
48 from typing_extensions import TypeVar
50if sys.version_info >= (3, 11):
51 import asyncio as async_timeout
52 from typing import Self
53else:
54 import async_timeout
55 from typing_extensions import Self
57__all__ = (
58 "WebSocketResponse",
59 "WebSocketReady",
60 "WSMsgType",
61)
63THRESHOLD_CONNLOST_ACCESS: Final[int] = 5
65# TypeVar for whether text messages are decoded to str (True) or kept as bytes (False)
66_DecodeText = TypeVar("_DecodeText", bound=bool, covariant=True, default=Literal[True])
69@frozen_dataclass_decorator
70class WebSocketReady:
71 ok: bool
72 protocol: str | None
74 def __bool__(self) -> bool:
75 return self.ok
78class WebSocketResponse(StreamResponse, Generic[_DecodeText]):
80 _length_check: bool = False
81 _ws_protocol: str | None = None
82 _writer: WebSocketWriter | None = None
83 _reader: WebSocketDataQueue | None = None
84 _closed: bool = False
85 _closing: bool = False
86 _conn_lost: int = 0
87 _close_code: int | None = None
88 _loop: asyncio.AbstractEventLoop | None = None
89 _waiting: bool = False
90 _close_wait: asyncio.Future[None] | None = None
91 _exception: BaseException | None = None
92 _heartbeat_when: float = 0.0
93 _heartbeat_cb: asyncio.TimerHandle | None = None
94 _pong_response_cb: asyncio.TimerHandle | None = None
95 _ping_task: asyncio.Task[None] | None = None
96 _need_heartbeat_reset: bool = False
97 _heartbeat_reset_handle: asyncio.Handle | None = None
99 def __init__(
100 self,
101 *,
102 timeout: float = 10.0,
103 receive_timeout: float | None = None,
104 autoclose: bool = True,
105 autoping: bool = True,
106 heartbeat: float | None = None,
107 protocols: Iterable[str] = (),
108 compress: bool = True,
109 max_msg_size: int = 4 * 1024 * 1024,
110 writer_limit: int = DEFAULT_LIMIT,
111 decode_text: bool = True,
112 ) -> None:
113 super().__init__(status=101)
114 self._protocols = protocols
115 self._timeout = timeout
116 self._receive_timeout = receive_timeout
117 self._autoclose = autoclose
118 self._autoping = autoping
119 self._heartbeat = heartbeat
120 if heartbeat is not None:
121 self._pong_heartbeat = heartbeat / 2.0
122 self._compress: bool | int = compress
123 self._max_msg_size = max_msg_size
124 self._writer_limit = writer_limit
125 self._decode_text = decode_text
126 self._need_heartbeat_reset = False
127 self._heartbeat_reset_handle = None
129 def _cancel_heartbeat(self) -> None:
130 self._cancel_pong_response_cb()
131 if self._heartbeat_reset_handle is not None:
132 self._heartbeat_reset_handle.cancel()
133 self._heartbeat_reset_handle = None
134 self._need_heartbeat_reset = False
135 if self._heartbeat_cb is not None:
136 self._heartbeat_cb.cancel()
137 self._heartbeat_cb = None
138 if self._ping_task is not None:
139 self._ping_task.cancel()
140 self._ping_task = None
142 def _cancel_pong_response_cb(self) -> None:
143 if self._pong_response_cb is not None:
144 self._pong_response_cb.cancel()
145 self._pong_response_cb = None
147 def _on_data_received(self) -> None:
148 if self._heartbeat is None or self._need_heartbeat_reset:
149 return
150 loop = self._loop
151 assert loop is not None
152 # Coalesce multiple chunks received in the same loop tick into a single
153 # heartbeat reset. Resetting immediately per chunk increases timer churn.
154 self._need_heartbeat_reset = True
155 self._heartbeat_reset_handle = loop.call_soon(self._flush_heartbeat_reset)
157 def _flush_heartbeat_reset(self) -> None:
158 self._heartbeat_reset_handle = None
159 if not self._need_heartbeat_reset:
160 return
161 self._reset_heartbeat()
162 self._need_heartbeat_reset = False
164 def _reset_heartbeat(self) -> None:
165 if self._heartbeat is None:
166 return
167 self._cancel_pong_response_cb()
168 req = self._req
169 timeout_ceil_threshold = (
170 req._protocol._timeout_ceil_threshold if req is not None else 5
171 )
172 loop = self._loop
173 assert loop is not None
174 now = loop.time()
175 when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold)
176 self._heartbeat_when = when
177 if self._heartbeat_cb is None:
178 # We do not cancel the previous heartbeat_cb here because
179 # it generates a significant amount of TimerHandle churn
180 # which causes asyncio to rebuild the heap frequently.
181 # Instead _send_heartbeat() will reschedule the next
182 # heartbeat if it fires too early.
183 self._heartbeat_cb = loop.call_at(when, self._send_heartbeat)
185 def _send_heartbeat(self) -> None:
186 self._heartbeat_cb = None
188 # If heartbeat reset is pending (data is being received), skip sending
189 # the ping and let the reset callback handle rescheduling the heartbeat.
190 if self._need_heartbeat_reset:
191 return
193 loop = self._loop
194 assert loop is not None and self._writer is not None
195 now = loop.time()
196 if now < self._heartbeat_when:
197 # Heartbeat fired too early, reschedule
198 self._heartbeat_cb = loop.call_at(
199 self._heartbeat_when, self._send_heartbeat
200 )
201 return
203 req = self._req
204 timeout_ceil_threshold = (
205 req._protocol._timeout_ceil_threshold if req is not None else 5
206 )
207 when = calculate_timeout_when(now, self._pong_heartbeat, timeout_ceil_threshold)
208 self._cancel_pong_response_cb()
209 self._pong_response_cb = loop.call_at(when, self._pong_not_received)
211 coro = self._writer.send_frame(b"", WSMsgType.PING)
212 if sys.version_info >= (3, 12):
213 # Optimization for Python 3.12, try to send the ping
214 # immediately to avoid having to schedule
215 # the task on the event loop.
216 ping_task = asyncio.Task(coro, loop=loop, eager_start=True)
217 else:
218 ping_task = loop.create_task(coro)
220 if not ping_task.done():
221 self._ping_task = ping_task
222 ping_task.add_done_callback(self._ping_task_done)
223 else:
224 self._ping_task_done(ping_task)
226 def _ping_task_done(self, task: "asyncio.Task[None]") -> None:
227 """Callback for when the ping task completes."""
228 if not task.cancelled() and (exc := task.exception()):
229 self._handle_ping_pong_exception(exc)
230 self._ping_task = None
232 def _pong_not_received(self) -> None:
233 if self._req is not None and self._req.transport is not None:
234 self._handle_ping_pong_exception(
235 asyncio.TimeoutError(
236 f"No PONG received after {self._pong_heartbeat} seconds"
237 )
238 )
240 def _handle_ping_pong_exception(self, exc: BaseException) -> None:
241 """Handle exceptions raised during ping/pong processing."""
242 if self._closed:
243 return
244 self._set_closed()
245 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
246 self._exception = exc
247 if self._waiting and not self._closing and self._reader is not None:
248 self._reader.feed_data(WSMessageError(data=exc, extra=None))
250 def _set_closed(self) -> None:
251 """Set the connection to closed.
253 Cancel any heartbeat timers and set the closed flag.
254 """
255 self._closed = True
256 self._cancel_heartbeat()
258 async def prepare(self, request: BaseRequest) -> AbstractStreamWriter:
259 # make pre-check to don't hide it by do_handshake() exceptions
260 if self._payload_writer is not None:
261 return self._payload_writer
263 protocol, writer = self._pre_start(request)
264 payload_writer = await super().prepare(request)
265 assert payload_writer is not None
266 self._post_start(request, protocol, writer)
267 await payload_writer.drain()
268 return payload_writer
270 def _handshake(
271 self, request: BaseRequest
272 ) -> tuple["CIMultiDict[str]", str | None, int, bool]:
273 headers = request.headers
274 if "websocket" != headers.get(hdrs.UPGRADE, "").lower().strip():
275 raise HTTPBadRequest(
276 text=(
277 f"No WebSocket UPGRADE hdr: {headers.get(hdrs.UPGRADE)}\n Can "
278 '"Upgrade" only to "WebSocket".'
279 )
280 )
282 if "upgrade" not in headers.get(hdrs.CONNECTION, "").lower():
283 raise HTTPBadRequest(
284 text=f"No CONNECTION upgrade hdr: {headers.get(hdrs.CONNECTION)}"
285 )
287 # find common sub-protocol between client and server
288 protocol: str | None = None
289 if hdrs.SEC_WEBSOCKET_PROTOCOL in headers:
290 req_protocols = [
291 str(proto.strip())
292 for proto in headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",")
293 ]
295 for proto in req_protocols:
296 if proto in self._protocols:
297 protocol = proto
298 break
299 else:
300 # No overlap found: Return no protocol as per spec
301 ws_logger.warning(
302 "%s: Client protocols %r don’t overlap server-known ones %r",
303 request.remote,
304 req_protocols,
305 self._protocols,
306 )
308 # check supported version
309 version = headers.get(hdrs.SEC_WEBSOCKET_VERSION, "")
310 if version not in ("13", "8", "7"):
311 raise HTTPBadRequest(text=f"Unsupported version: {version}")
313 # check client handshake for validity
314 key = headers.get(hdrs.SEC_WEBSOCKET_KEY)
315 try:
316 if not key or len(base64.b64decode(key)) != 16:
317 raise HTTPBadRequest(text=f"Handshake error: {key!r}")
318 except binascii.Error:
319 raise HTTPBadRequest(text=f"Handshake error: {key!r}") from None
321 accept_val = base64.b64encode(
322 hashlib.sha1(key.encode() + WS_KEY).digest()
323 ).decode()
324 response_headers = CIMultiDict(
325 {
326 hdrs.UPGRADE: "websocket",
327 hdrs.CONNECTION: "upgrade",
328 hdrs.SEC_WEBSOCKET_ACCEPT: accept_val,
329 }
330 )
332 notakeover = False
333 compress = 0
334 if self._compress:
335 extensions = headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS)
336 # Server side always get return with no exception.
337 # If something happened, just drop compress extension
338 compress, notakeover = ws_ext_parse(extensions, isserver=True)
339 if compress:
340 enabledext = ws_ext_gen(
341 compress=compress, isserver=True, server_notakeover=notakeover
342 )
343 response_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = enabledext
345 if protocol:
346 response_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = protocol
347 return (
348 response_headers,
349 protocol,
350 compress,
351 notakeover,
352 )
354 def _pre_start(self, request: BaseRequest) -> tuple[str | None, WebSocketWriter]:
355 self._loop = request._loop
357 headers, protocol, compress, notakeover = self._handshake(request)
359 self.set_status(101)
360 self.headers.update(headers)
361 self.force_close()
362 self._compress = compress
363 transport = request._protocol.transport
364 if transport is None:
365 raise ConnectionResetError("Connection lost")
366 writer = WebSocketWriter(
367 request._protocol,
368 transport,
369 compress=compress,
370 notakeover=notakeover,
371 limit=self._writer_limit,
372 )
374 return protocol, writer
376 def _post_start(
377 self, request: BaseRequest, protocol: str | None, writer: WebSocketWriter
378 ) -> None:
379 self._ws_protocol = protocol
380 self._writer = writer
382 self._reset_heartbeat()
384 loop = self._loop
385 assert loop is not None
386 self._reader = WebSocketDataQueue(request._protocol, 2**16, loop=loop)
387 parser = WebSocketReader(
388 self._reader,
389 self._max_msg_size,
390 compress=bool(self._compress),
391 decode_text=self._decode_text,
392 )
393 cb = None if self._heartbeat is None else self._on_data_received
394 request.protocol.set_parser(parser, data_received_cb=cb)
395 # disable HTTP keepalive for WebSocket
396 request.protocol.keep_alive(False)
398 def can_prepare(self, request: BaseRequest) -> WebSocketReady:
399 if self._writer is not None:
400 raise RuntimeError("Already started")
401 try:
402 _, protocol, _, _ = self._handshake(request)
403 except HTTPException:
404 return WebSocketReady(False, None)
405 else:
406 return WebSocketReady(True, protocol)
408 @property
409 def prepared(self) -> bool:
410 return self._writer is not None
412 @property
413 def closed(self) -> bool:
414 return self._closed
416 @property
417 def close_code(self) -> int | None:
418 return self._close_code
420 @property
421 def ws_protocol(self) -> str | None:
422 return self._ws_protocol
424 @property
425 def compress(self) -> int | bool:
426 return self._compress
428 def get_extra_info(self, name: str, default: Any = None) -> Any:
429 """Get optional transport information.
431 If no value associated with ``name`` is found, ``default`` is returned.
432 """
433 writer = self._writer
434 if writer is None:
435 return default
436 return writer.transport.get_extra_info(name, default)
438 def exception(self) -> BaseException | None:
439 return self._exception
441 async def ping(self, message: bytes = b"") -> None:
442 if self._writer is None:
443 raise RuntimeError("Call .prepare() first")
444 await self._writer.send_frame(message, WSMsgType.PING)
446 async def pong(self, message: bytes = b"") -> None:
447 # unsolicited pong
448 if self._writer is None:
449 raise RuntimeError("Call .prepare() first")
450 await self._writer.send_frame(message, WSMsgType.PONG)
452 async def send_frame(
453 self, message: bytes, opcode: WSMsgType, compress: int | None = None
454 ) -> None:
455 """Send a frame over the websocket."""
456 if self._writer is None:
457 raise RuntimeError("Call .prepare() first")
458 await self._writer.send_frame(message, opcode, compress)
460 async def send_str(self, data: str, compress: int | None = None) -> None:
461 if self._writer is None:
462 raise RuntimeError("Call .prepare() first")
463 if not isinstance(data, str):
464 raise TypeError("data argument must be str (%r)" % type(data))
465 await self._writer.send_frame(
466 data.encode("utf-8"), WSMsgType.TEXT, compress=compress
467 )
469 async def send_bytes(self, data: bytes, compress: int | None = None) -> None:
470 if self._writer is None:
471 raise RuntimeError("Call .prepare() first")
472 if not isinstance(data, (bytes, bytearray, memoryview)):
473 raise TypeError("data argument must be byte-ish (%r)" % type(data))
474 await self._writer.send_frame(data, WSMsgType.BINARY, compress=compress)
476 async def send_json(
477 self,
478 data: Any,
479 compress: int | None = None,
480 *,
481 dumps: JSONEncoder = json.dumps,
482 ) -> None:
483 await self.send_str(dumps(data), compress=compress)
485 async def send_json_bytes(
486 self,
487 data: Any,
488 compress: int | None = None,
489 *,
490 dumps: JSONBytesEncoder,
491 ) -> None:
492 """Send JSON data using a bytes-returning encoder as a binary frame.
494 Use this when your JSON encoder (like orjson) returns bytes
495 instead of str, avoiding the encode/decode overhead.
496 """
497 await self.send_bytes(dumps(data), compress=compress)
499 async def write_eof(self) -> None: # type: ignore[override]
500 if self._eof_sent:
501 return
502 if self._payload_writer is None:
503 raise RuntimeError("Response has not been started")
505 await self.close()
506 self._eof_sent = True
508 async def close(
509 self, *, code: int = WSCloseCode.OK, message: bytes = b"", drain: bool = True
510 ) -> bool:
511 """Close websocket connection."""
512 if self._writer is None:
513 raise RuntimeError("Call .prepare() first")
515 if self._closed:
516 return False
517 self._set_closed()
519 try:
520 await self._writer.close(code, message)
521 writer = self._payload_writer
522 assert writer is not None
523 if drain:
524 await writer.drain()
525 except (asyncio.CancelledError, asyncio.TimeoutError):
526 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
527 raise
528 except Exception as exc:
529 self._exception = exc
530 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
531 return True
533 reader = self._reader
534 assert reader is not None
535 # we need to break `receive()` cycle before we can call
536 # `reader.read()` as `close()` may be called from different task
537 if self._waiting:
538 assert self._loop is not None
539 assert self._close_wait is None
540 self._close_wait = self._loop.create_future()
541 reader.feed_data(WS_CLOSING_MESSAGE)
542 await self._close_wait
544 if self._closing:
545 self._close_transport()
546 return True
548 try:
549 async with async_timeout.timeout(self._timeout):
550 while True:
551 msg = await reader.read()
552 if msg.type is WSMsgType.CLOSE:
553 self._set_code_close_transport(msg.data)
554 return True
555 except asyncio.CancelledError:
556 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
557 raise
558 except Exception as exc:
559 self._exception = exc
560 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
561 return True
563 def _set_closing(self, code: int) -> None:
564 """Set the close code and mark the connection as closing."""
565 self._closing = True
566 self._close_code = code
567 self._cancel_heartbeat()
569 def _set_code_close_transport(self, code: int) -> None:
570 """Set the close code and close the transport."""
571 self._close_code = code
572 self._close_transport()
574 def _close_transport(self) -> None:
575 """Close the transport."""
576 if self._req is not None and self._req.transport is not None:
577 self._req.transport.close()
579 @overload
580 async def receive(
581 self: "WebSocketResponse[Literal[True]]", timeout: float | None = None
582 ) -> WSMessageDecodeText: ...
584 @overload
585 async def receive(
586 self: "WebSocketResponse[Literal[False]]", timeout: float | None = None
587 ) -> WSMessageNoDecodeText: ...
589 @overload
590 async def receive(
591 self: "WebSocketResponse[_DecodeText]", timeout: float | None = None
592 ) -> WSMessageDecodeText | WSMessageNoDecodeText: ...
594 async def receive(
595 self, timeout: float | None = None
596 ) -> WSMessageDecodeText | WSMessageNoDecodeText:
597 if self._reader is None:
598 raise RuntimeError("Call .prepare() first")
600 receive_timeout = timeout or self._receive_timeout
601 while True:
602 if self._waiting:
603 raise RuntimeError("Concurrent call to receive() is not allowed")
605 if self._closed:
606 self._conn_lost += 1
607 if self._conn_lost >= THRESHOLD_CONNLOST_ACCESS:
608 raise RuntimeError("WebSocket connection is closed.")
609 return WS_CLOSED_MESSAGE
610 elif self._closing:
611 return WS_CLOSING_MESSAGE
613 try:
614 self._waiting = True
615 try:
616 if receive_timeout:
617 # Entering the context manager and creating
618 # Timeout() object can take almost 50% of the
619 # run time in this loop so we avoid it if
620 # there is no read timeout.
621 async with async_timeout.timeout(receive_timeout):
622 msg = await self._reader.read()
623 else:
624 msg = await self._reader.read()
625 finally:
626 self._waiting = False
627 if self._close_wait:
628 set_result(self._close_wait, None)
629 except asyncio.TimeoutError:
630 raise
631 except EofStream:
632 self._close_code = WSCloseCode.OK
633 await self.close()
634 return WS_CLOSED_MESSAGE
635 except WebSocketError as exc:
636 self._close_code = exc.code
637 await self.close(code=exc.code)
638 return WSMessageError(data=exc)
639 except Exception as exc:
640 self._exception = exc
641 self._set_closing(WSCloseCode.ABNORMAL_CLOSURE)
642 await self.close()
643 return WSMessageError(data=exc)
645 if msg.type not in _INTERNAL_RECEIVE_TYPES:
646 # If its not a close/closing/ping/pong message
647 # we can return it immediately
648 return msg
650 if msg.type is WSMsgType.CLOSE:
651 self._set_closing(msg.data)
652 # Could be closed while awaiting reader.
653 if not self._closed and self._autoclose: # type: ignore[redundant-expr]
654 # The client is likely going to close the
655 # connection out from under us so we do not
656 # want to drain any pending writes as it will
657 # likely result writing to a broken pipe.
658 await self.close(drain=False)
659 elif msg.type is WSMsgType.CLOSING:
660 self._set_closing(WSCloseCode.OK)
661 elif msg.type is WSMsgType.PING and self._autoping:
662 await self.pong(msg.data)
663 continue
664 elif msg.type is WSMsgType.PONG and self._autoping:
665 continue
667 return msg
669 @overload
670 async def receive_str(
671 self: "WebSocketResponse[Literal[True]]", *, timeout: float | None = None
672 ) -> str: ...
674 @overload
675 async def receive_str(
676 self: "WebSocketResponse[Literal[False]]", *, timeout: float | None = None
677 ) -> bytes: ...
679 @overload
680 async def receive_str(
681 self: "WebSocketResponse[_DecodeText]", *, timeout: float | None = None
682 ) -> str | bytes: ...
684 async def receive_str(self, *, timeout: float | None = None) -> str | bytes:
685 """Receive TEXT message.
687 Returns str when decode_text=True (default), bytes when decode_text=False.
688 """
689 msg = await self.receive(timeout)
690 if msg.type is not WSMsgType.TEXT:
691 raise WSMessageTypeError(
692 f"Received message {msg.type}:{msg.data!r} is not WSMsgType.TEXT"
693 )
694 return msg.data
696 async def receive_bytes(self, *, timeout: float | None = None) -> bytes:
697 msg = await self.receive(timeout)
698 if msg.type is not WSMsgType.BINARY:
699 raise WSMessageTypeError(
700 f"Received message {msg.type}:{msg.data!r} is not WSMsgType.BINARY"
701 )
702 return msg.data
704 @overload
705 async def receive_json(
706 self: "WebSocketResponse[Literal[True]]",
707 *,
708 loads: JSONDecoder = ...,
709 timeout: float | None = None,
710 ) -> Any: ...
712 @overload
713 async def receive_json(
714 self: "WebSocketResponse[Literal[False]]",
715 *,
716 loads: Callable[[bytes], Any] = ...,
717 timeout: float | None = None,
718 ) -> Any: ...
720 @overload
721 async def receive_json(
722 self: "WebSocketResponse[_DecodeText]",
723 *,
724 loads: JSONDecoder | Callable[[bytes], Any] = ...,
725 timeout: float | None = None,
726 ) -> Any: ...
728 async def receive_json(
729 self,
730 *,
731 loads: JSONDecoder | Callable[[bytes], Any] = json.loads,
732 timeout: float | None = None,
733 ) -> Any:
734 data = await self.receive_str(timeout=timeout)
735 return loads(data) # type: ignore[arg-type]
737 async def write(
738 self, data: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
739 ) -> None:
740 raise RuntimeError("Cannot call .write() for websocket")
742 def __aiter__(self) -> Self:
743 return self
745 @overload
746 async def __anext__(
747 self: "WebSocketResponse[Literal[True]]",
748 ) -> WSMessageDecodeText: ...
750 @overload
751 async def __anext__(
752 self: "WebSocketResponse[Literal[False]]",
753 ) -> WSMessageNoDecodeText: ...
755 @overload
756 async def __anext__(
757 self: "WebSocketResponse[_DecodeText]",
758 ) -> WSMessageDecodeText | WSMessageNoDecodeText: ...
760 async def __anext__(self) -> WSMessageDecodeText | WSMessageNoDecodeText:
761 msg = await self.receive()
762 if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
763 raise StopAsyncIteration
764 return msg
766 def _cancel(self, exc: BaseException) -> None:
767 # web_protocol calls this from connection_lost
768 # or when the server is shutting down.
769 self._closing = True
770 self._cancel_heartbeat()
771 if self._reader is not None:
772 set_exception(self._reader, exc)