Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_ws.py: 28%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import base64
3import binascii
4import hashlib
5import json
6import sys
7from collections.abc import Callable, Iterable
8from typing import Any, Final, Generic, Literal, Union, overload
10from multidict import CIMultiDict
12from . import hdrs
13from ._websocket.reader import WebSocketDataQueue
14from .abc import AbstractStreamWriter
15from .client_exceptions import WSMessageTypeError
16from .helpers import (
17 DEFAULT_CHUNK_SIZE,
18 calculate_timeout_when,
19 frozen_dataclass_decorator,
20 set_exception,
21 set_result,
22)
23from .http import (
24 WS_CLOSED_MESSAGE,
25 WS_CLOSING_MESSAGE,
26 WS_KEY,
27 WebSocketError,
28 WebSocketReader,
29 WebSocketWriter,
30 WSCloseCode,
31 WSMessageDecodeText,
32 WSMessageNoDecodeText,
33 WSMsgType,
34 ws_ext_gen,
35 ws_ext_parse,
36)
37from .http_websocket import _INTERNAL_RECEIVE_TYPES, WSMessageError
38from .log import ws_logger
39from .streams import EofStream
40from .typedefs import JSONBytesEncoder, JSONDecoder, JSONEncoder
41from .web_exceptions import HTTPBadRequest, HTTPException
42from .web_request import BaseRequest
43from .web_response import StreamResponse
45if sys.version_info >= (3, 13):
46 from typing import TypeVar
47else:
48 from typing_extensions import TypeVar
50if sys.version_info >= (3, 11):
51 import asyncio as async_timeout
52 from typing import Self
53else:
54 import async_timeout
55 from typing_extensions import Self
57__all__ = (
58 "WebSocketResponse",
59 "WebSocketReady",
60 "WSMsgType",
61)
63THRESHOLD_CONNLOST_ACCESS: Final[int] = 5
65# TypeVar for whether text messages are decoded to str (True) or kept as bytes (False)
66_DecodeText = TypeVar("_DecodeText", bound=bool, covariant=True, default=Literal[True])
69@frozen_dataclass_decorator
70class WebSocketReady:
71 ok: bool
72 protocol: str | None
74 def __bool__(self) -> bool:
75 return self.ok
78class WebSocketResponse(StreamResponse, Generic[_DecodeText]):
80 _length_check: bool = False
81 _ws_protocol: str | None = None
82 _writer: WebSocketWriter | None = None
83 _reader: WebSocketDataQueue | None = None
84 _closed: bool = False
85 _closing: bool = False
86 _conn_lost: int = 0
87 _close_code: int | None = None
88 _loop: asyncio.AbstractEventLoop | None = None
89 _waiting: bool = False
90 _close_wait: asyncio.Future[None] | None = None
91 _exception: BaseException | None = None
92 _heartbeat_when: float = 0.0
93 _heartbeat_cb: asyncio.TimerHandle | None = None
94 _pong_response_cb: asyncio.TimerHandle | None = None
95 _ping_task: asyncio.Task[None] | None = None
96 _need_heartbeat_reset: bool = False
97 _heartbeat_reset_handle: asyncio.Handle | None = None
99 def __init__(
100 self,
101 *,
102 timeout: float = 10.0,
103 receive_timeout: float | None = None,
104 autoclose: bool = True,
105 autoping: bool = True,
106 heartbeat: float | None = None,
107 protocols: Iterable[str] = (),
108 compress: bool = True,
109 max_msg_size: int = 4 * 1024 * 1024,
110 writer_limit: int = DEFAULT_CHUNK_SIZE,
111 decode_text: bool = True,
112 ) -> None:
113 super().__init__(status=101)
114 self._protocols = protocols
115 self._timeout = timeout
116 self._receive_timeout = receive_timeout
117 self._autoclose = autoclose
118 self._autoping = autoping
119 self._heartbeat = heartbeat
120 if heartbeat is not None:
121 self._pong_heartbeat = heartbeat / 2.0
122 self._compress: bool | int = compress
123 self._max_msg_size = max_msg_size
124 self._writer_limit = writer_limit
125 self._decode_text = decode_text
126 self._need_heartbeat_reset = False
127 self._heartbeat_reset_handle = None
129 def _cancel_heartbeat(self) -> None:
130 self._cancel_pong_response_cb()
131 if self._heartbeat_reset_handle is not None:
132 self._heartbeat_reset_handle.cancel()
133 self._heartbeat_reset_handle = None
134 self._need_heartbeat_reset = False
135 if self._heartbeat_cb is not None:
136 self._heartbeat_cb.cancel()
137 self._heartbeat_cb = None
138 if self._ping_task is not None:
139 self._ping_task.cancel()
140 self._ping_task = None
142 def _cancel_pong_response_cb(self) -> None:
143 if self._pong_response_cb is not None:
144 self._pong_response_cb.cancel()
145 self._pong_response_cb = None
147 def _on_data_received(self) -> None:
148 if self._heartbeat is None or self._need_heartbeat_reset:
149 return
150 loop = self._loop
151 assert loop is not None
152 # Coalesce multiple chunks received in the same loop tick into a single
153 # heartbeat reset. Resetting immediately per chunk increases timer churn.
154 self._need_heartbeat_reset = True
155 self._heartbeat_reset_handle = loop.call_soon(self._flush_heartbeat_reset)
157 def _flush_heartbeat_reset(self) -> None:
158 self._heartbeat_reset_handle = None
159 if not self._need_heartbeat_reset:
160 return
161 self._reset_heartbeat()
162 self._need_heartbeat_reset = False
164 def _reset_heartbeat(self) -> None:
165 if self._heartbeat is None:
166 return
167 self._cancel_pong_response_cb()
168 req = self._req
169 timeout_ceil_threshold = (
170 req._protocol._timeout_ceil_threshold if req is not None else 5
171 )
172 loop = self._loop
173 assert loop is not None
174 now = loop.time()
175 when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold)
176 self._heartbeat_when = when
177 if self._heartbeat_cb is None:
178 # We do not cancel the previous heartbeat_cb here because
179 # it generates a significant amount of TimerHandle churn
180 # which causes asyncio to rebuild the heap frequently.
181 # Instead _send_heartbeat() will reschedule the next
182 # heartbeat if it fires too early.
183 self._heartbeat_cb = loop.call_at(when, self._send_heartbeat)
185 def _send_heartbeat(self) -> None:
186 self._heartbeat_cb = None
188 # If heartbeat reset is pending (data is being received), skip sending
189 # the ping and let the reset callback handle rescheduling the heartbeat.
190 if self._need_heartbeat_reset:
191 return
193 loop = self._loop
194 assert loop is not None and self._writer is not None
195 now = loop.time()
196 if now < self._heartbeat_when:
197 # Heartbeat fired too early, reschedule
198 self._heartbeat_cb = loop.call_at(
199 self._heartbeat_when, self._send_heartbeat
200 )
201 return
203 req = self._req
204 timeout_ceil_threshold = (
205 req._protocol._timeout_ceil_threshold if req is not None else 5
206 )
207 when = calculate_timeout_when(now, self._pong_heartbeat, timeout_ceil_threshold)
208 self._cancel_pong_response_cb()
209 self._pong_response_cb = loop.call_at(when, self._pong_not_received)
211 coro = self._writer.send_frame(b"", WSMsgType.PING)
212 if sys.version_info >= (3, 12):
213 # Optimization for Python 3.12, try to send the ping
214 # immediately to avoid having to schedule
215 # the task on the event loop.
216 ping_task = asyncio.Task(coro, loop=loop, eager_start=True)
217 else:
218 ping_task = loop.create_task(coro)
220 if not ping_task.done():
221 self._ping_task = ping_task
222 ping_task.add_done_callback(self._ping_task_done)
223 else:
224 self._ping_task_done(ping_task)
226 def _ping_task_done(self, task: "asyncio.Task[None]") -> None:
227 """Callback for when the ping task completes."""
228 if not task.cancelled() and (exc := task.exception()):
229 self._handle_ping_pong_exception(exc)
230 self._ping_task = None
232 def _pong_not_received(self) -> None:
233 if self._req is not None and self._req.transport is not None:
234 self._handle_ping_pong_exception(
235 asyncio.TimeoutError(
236 f"No PONG received after {self._pong_heartbeat} seconds"
237 )
238 )
240 def _handle_ping_pong_exception(self, exc: BaseException) -> None:
241 """Handle exceptions raised during ping/pong processing."""
242 if self._closed:
243 return
244 self._set_closed()
245 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
246 self._exception = exc
247 if self._waiting and not self._closing and self._reader is not None:
248 self._reader.feed_data(WSMessageError(data=exc, extra=None))
250 def _set_closed(self) -> None:
251 """Set the connection to closed.
253 Cancel any heartbeat timers and set the closed flag.
254 """
255 self._closed = True
256 self._cancel_heartbeat()
258 async def prepare(self, request: BaseRequest) -> AbstractStreamWriter:
259 # make pre-check to don't hide it by do_handshake() exceptions
260 if self._payload_writer is not None:
261 return self._payload_writer
263 protocol, writer = self._pre_start(request)
264 payload_writer = await super().prepare(request)
265 assert payload_writer is not None
266 self._post_start(request, protocol, writer)
267 await payload_writer.drain()
268 return payload_writer
270 def _handshake(
271 self, request: BaseRequest
272 ) -> tuple["CIMultiDict[str]", str | None, int, bool]:
273 headers = request.headers
274 if "websocket" != headers.get(hdrs.UPGRADE, "").lower().strip():
275 raise HTTPBadRequest(
276 text=(
277 f"No WebSocket UPGRADE hdr: {headers.get(hdrs.UPGRADE)}\n Can "
278 '"Upgrade" only to "WebSocket".'
279 )
280 )
282 if "upgrade" not in headers.get(hdrs.CONNECTION, "").lower():
283 raise HTTPBadRequest(
284 text=f"No CONNECTION upgrade hdr: {headers.get(hdrs.CONNECTION)}"
285 )
287 # find common sub-protocol between client and server
288 protocol: str | None = None
289 if hdrs.SEC_WEBSOCKET_PROTOCOL in headers:
290 req_protocols = [
291 str(proto.strip())
292 for proto in headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",")
293 ]
295 for proto in req_protocols:
296 if proto in self._protocols:
297 protocol = proto
298 break
299 else:
300 # No overlap found: Return no protocol as per spec
301 ws_logger.warning(
302 "%s: Client protocols %r don’t overlap server-known ones %r",
303 request.remote,
304 req_protocols,
305 self._protocols,
306 )
308 # check supported version
309 version = headers.get(hdrs.SEC_WEBSOCKET_VERSION, "")
310 if version not in ("13", "8", "7"):
311 raise HTTPBadRequest(text=f"Unsupported version: {version}")
313 # check client handshake for validity
314 key = headers.get(hdrs.SEC_WEBSOCKET_KEY)
315 try:
316 if not key or len(base64.b64decode(key)) != 16:
317 raise HTTPBadRequest(text=f"Handshake error: {key!r}")
318 except binascii.Error:
319 raise HTTPBadRequest(text=f"Handshake error: {key!r}") from None
321 accept_val = base64.b64encode(
322 hashlib.sha1(key.encode() + WS_KEY).digest()
323 ).decode()
324 response_headers = CIMultiDict(
325 {
326 hdrs.UPGRADE: "websocket",
327 hdrs.CONNECTION: "upgrade",
328 hdrs.SEC_WEBSOCKET_ACCEPT: accept_val,
329 }
330 )
332 notakeover = False
333 compress = 0
334 if self._compress:
335 extensions = headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS)
336 # Server side always get return with no exception.
337 # If something happened, just drop compress extension
338 compress, notakeover = ws_ext_parse(extensions, isserver=True)
339 if compress:
340 enabledext = ws_ext_gen(
341 compress=compress, isserver=True, server_notakeover=notakeover
342 )
343 response_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = enabledext
345 if protocol:
346 response_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = protocol
347 return (
348 response_headers,
349 protocol,
350 compress,
351 notakeover,
352 )
354 def _pre_start(self, request: BaseRequest) -> tuple[str | None, WebSocketWriter]:
355 self._loop = request._loop
357 headers, protocol, compress, notakeover = self._handshake(request)
359 self.set_status(101)
360 self.headers.update(headers)
361 self.force_close()
362 self._compress = compress
363 transport = request._protocol.transport
364 if transport is None:
365 raise ConnectionResetError("Connection lost")
366 writer = WebSocketWriter(
367 request._protocol,
368 transport,
369 compress=compress,
370 notakeover=notakeover,
371 limit=self._writer_limit,
372 )
374 return protocol, writer
376 def _post_start(
377 self, request: BaseRequest, protocol: str | None, writer: WebSocketWriter
378 ) -> None:
379 self._ws_protocol = protocol
380 self._writer = writer
382 self._reset_heartbeat()
384 loop = self._loop
385 assert loop is not None
386 self._reader = WebSocketDataQueue(
387 request._protocol, DEFAULT_CHUNK_SIZE, loop=loop
388 )
389 parser = WebSocketReader(
390 self._reader,
391 self._max_msg_size,
392 compress=bool(self._compress),
393 decode_text=self._decode_text,
394 )
395 cb = None if self._heartbeat is None else self._on_data_received
396 request.protocol.set_parser(parser, data_received_cb=cb)
397 # disable HTTP keepalive for WebSocket
398 request.protocol.keep_alive(False)
400 def can_prepare(self, request: BaseRequest) -> WebSocketReady:
401 if self._writer is not None:
402 raise RuntimeError("Already started")
403 try:
404 _, protocol, _, _ = self._handshake(request)
405 except HTTPException:
406 return WebSocketReady(False, None)
407 else:
408 return WebSocketReady(True, protocol)
410 @property
411 def prepared(self) -> bool:
412 return self._writer is not None
414 @property
415 def closed(self) -> bool:
416 return self._closed
418 @property
419 def close_code(self) -> int | None:
420 return self._close_code
422 @property
423 def ws_protocol(self) -> str | None:
424 return self._ws_protocol
426 @property
427 def compress(self) -> int | bool:
428 return self._compress
430 def get_extra_info(self, name: str, default: Any = None) -> Any:
431 """Get optional transport information.
433 If no value associated with ``name`` is found, ``default`` is returned.
434 """
435 writer = self._writer
436 if writer is None:
437 return default
438 return writer.transport.get_extra_info(name, default)
440 def exception(self) -> BaseException | None:
441 return self._exception
443 async def ping(self, message: bytes = b"") -> None:
444 if self._writer is None:
445 raise RuntimeError("Call .prepare() first")
446 await self._writer.send_frame(message, WSMsgType.PING)
448 async def pong(self, message: bytes = b"") -> None:
449 # unsolicited pong
450 if self._writer is None:
451 raise RuntimeError("Call .prepare() first")
452 await self._writer.send_frame(message, WSMsgType.PONG)
454 async def send_frame(
455 self, message: bytes, opcode: WSMsgType, compress: int | None = None
456 ) -> None:
457 """Send a frame over the websocket."""
458 if self._writer is None:
459 raise RuntimeError("Call .prepare() first")
460 await self._writer.send_frame(message, opcode, compress)
462 async def send_str(self, data: str, compress: int | None = None) -> None:
463 if self._writer is None:
464 raise RuntimeError("Call .prepare() first")
465 if not isinstance(data, str):
466 raise TypeError("data argument must be str (%r)" % type(data))
467 await self._writer.send_frame(
468 data.encode("utf-8"), WSMsgType.TEXT, compress=compress
469 )
471 async def send_bytes(self, data: bytes, compress: int | None = None) -> None:
472 if self._writer is None:
473 raise RuntimeError("Call .prepare() first")
474 if not isinstance(data, (bytes, bytearray, memoryview)):
475 raise TypeError("data argument must be byte-ish (%r)" % type(data))
476 await self._writer.send_frame(data, WSMsgType.BINARY, compress=compress)
478 async def send_json(
479 self,
480 data: Any,
481 compress: int | None = None,
482 *,
483 dumps: JSONEncoder = json.dumps,
484 ) -> None:
485 await self.send_str(dumps(data), compress=compress)
487 async def send_json_bytes(
488 self,
489 data: Any,
490 compress: int | None = None,
491 *,
492 dumps: JSONBytesEncoder,
493 ) -> None:
494 """Send JSON data using a bytes-returning encoder as a binary frame.
496 Use this when your JSON encoder (like orjson) returns bytes
497 instead of str, avoiding the encode/decode overhead.
498 """
499 await self.send_bytes(dumps(data), compress=compress)
501 async def write_eof(self) -> None: # type: ignore[override]
502 if self._eof_sent:
503 return
504 if self._payload_writer is None:
505 raise RuntimeError("Response has not been started")
507 await self.close()
508 self._eof_sent = True
510 async def close(
511 self, *, code: int = WSCloseCode.OK, message: bytes = b"", drain: bool = True
512 ) -> bool:
513 """Close websocket connection."""
514 if self._writer is None:
515 raise RuntimeError("Call .prepare() first")
517 if self._closed:
518 return False
519 self._set_closed()
521 try:
522 await self._writer.close(code, message)
523 writer = self._payload_writer
524 assert writer is not None
525 if drain:
526 await writer.drain()
527 except (asyncio.CancelledError, asyncio.TimeoutError):
528 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
529 raise
530 except Exception as exc:
531 self._exception = exc
532 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
533 return True
535 reader = self._reader
536 assert reader is not None
537 # we need to break `receive()` cycle before we can call
538 # `reader.read()` as `close()` may be called from different task
539 if self._waiting:
540 assert self._loop is not None
541 assert self._close_wait is None
542 self._close_wait = self._loop.create_future()
543 reader.feed_data(WS_CLOSING_MESSAGE)
544 await self._close_wait
546 if self._closing:
547 self._close_transport()
548 return True
550 try:
551 async with async_timeout.timeout(self._timeout):
552 while True:
553 msg = await reader.read()
554 if msg.type is WSMsgType.CLOSE:
555 self._set_code_close_transport(msg.data)
556 return True
557 except asyncio.CancelledError:
558 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
559 raise
560 except Exception as exc:
561 self._exception = exc
562 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
563 return True
565 def _set_closing(self, code: int) -> None:
566 """Set the close code and mark the connection as closing."""
567 self._closing = True
568 self._close_code = code
569 self._cancel_heartbeat()
571 def _set_code_close_transport(self, code: int) -> None:
572 """Set the close code and close the transport."""
573 self._close_code = code
574 self._close_transport()
576 def _close_transport(self) -> None:
577 """Close the transport."""
578 if self._req is not None and self._req.transport is not None:
579 self._req.transport.close()
581 @overload
582 async def receive(
583 self: "WebSocketResponse[Literal[True]]", timeout: float | None = None
584 ) -> WSMessageDecodeText: ...
586 @overload
587 async def receive(
588 self: "WebSocketResponse[Literal[False]]", timeout: float | None = None
589 ) -> WSMessageNoDecodeText: ...
591 @overload
592 async def receive(
593 self: "WebSocketResponse[_DecodeText]", timeout: float | None = None
594 ) -> WSMessageDecodeText | WSMessageNoDecodeText: ...
596 async def receive(
597 self, timeout: float | None = None
598 ) -> WSMessageDecodeText | WSMessageNoDecodeText:
599 if self._reader is None:
600 raise RuntimeError("Call .prepare() first")
602 receive_timeout = timeout or self._receive_timeout
603 while True:
604 if self._waiting:
605 raise RuntimeError("Concurrent call to receive() is not allowed")
607 if self._closed:
608 self._conn_lost += 1
609 if self._conn_lost >= THRESHOLD_CONNLOST_ACCESS:
610 raise RuntimeError("WebSocket connection is closed.")
611 return WS_CLOSED_MESSAGE
612 elif self._closing:
613 return WS_CLOSING_MESSAGE
615 try:
616 self._waiting = True
617 try:
618 if receive_timeout:
619 # Entering the context manager and creating
620 # Timeout() object can take almost 50% of the
621 # run time in this loop so we avoid it if
622 # there is no read timeout.
623 async with async_timeout.timeout(receive_timeout):
624 msg = await self._reader.read()
625 else:
626 msg = await self._reader.read()
627 finally:
628 self._waiting = False
629 if self._close_wait:
630 set_result(self._close_wait, None)
631 except asyncio.TimeoutError:
632 raise
633 except EofStream:
634 self._close_code = WSCloseCode.OK
635 await self.close()
636 return WS_CLOSED_MESSAGE
637 except WebSocketError as exc:
638 self._close_code = exc.code
639 await self.close(code=exc.code)
640 return WSMessageError(data=exc)
641 except Exception as exc:
642 self._exception = exc
643 self._set_closing(WSCloseCode.ABNORMAL_CLOSURE)
644 await self.close()
645 return WSMessageError(data=exc)
647 if msg.type not in _INTERNAL_RECEIVE_TYPES:
648 # If its not a close/closing/ping/pong message
649 # we can return it immediately
650 return msg
652 if msg.type is WSMsgType.CLOSE:
653 self._set_closing(msg.data)
654 # Could be closed while awaiting reader.
655 if not self._closed and self._autoclose: # type: ignore[redundant-expr]
656 # The client is likely going to close the
657 # connection out from under us so we do not
658 # want to drain any pending writes as it will
659 # likely result writing to a broken pipe.
660 await self.close(drain=False)
661 elif msg.type is WSMsgType.CLOSING:
662 self._set_closing(WSCloseCode.OK)
663 elif msg.type is WSMsgType.PING and self._autoping:
664 await self.pong(msg.data)
665 continue
666 elif msg.type is WSMsgType.PONG and self._autoping:
667 continue
669 return msg
671 @overload
672 async def receive_str(
673 self: "WebSocketResponse[Literal[True]]", *, timeout: float | None = None
674 ) -> str: ...
676 @overload
677 async def receive_str(
678 self: "WebSocketResponse[Literal[False]]", *, timeout: float | None = None
679 ) -> bytes: ...
681 @overload
682 async def receive_str(
683 self: "WebSocketResponse[_DecodeText]", *, timeout: float | None = None
684 ) -> str | bytes: ...
686 async def receive_str(self, *, timeout: float | None = None) -> str | bytes:
687 """Receive TEXT message.
689 Returns str when decode_text=True (default), bytes when decode_text=False.
690 """
691 msg = await self.receive(timeout)
692 if msg.type is not WSMsgType.TEXT:
693 raise WSMessageTypeError(
694 f"Received message {msg.type}:{msg.data!r} is not WSMsgType.TEXT"
695 )
696 return msg.data
698 async def receive_bytes(self, *, timeout: float | None = None) -> bytes:
699 msg = await self.receive(timeout)
700 if msg.type is not WSMsgType.BINARY:
701 raise WSMessageTypeError(
702 f"Received message {msg.type}:{msg.data!r} is not WSMsgType.BINARY"
703 )
704 return msg.data
706 @overload
707 async def receive_json(
708 self: "WebSocketResponse[Literal[True]]",
709 *,
710 loads: JSONDecoder = ...,
711 timeout: float | None = None,
712 ) -> Any: ...
714 @overload
715 async def receive_json(
716 self: "WebSocketResponse[Literal[False]]",
717 *,
718 loads: Callable[[bytes], Any] = ...,
719 timeout: float | None = None,
720 ) -> Any: ...
722 @overload
723 async def receive_json(
724 self: "WebSocketResponse[_DecodeText]",
725 *,
726 loads: JSONDecoder | Callable[[bytes], Any] = ...,
727 timeout: float | None = None,
728 ) -> Any: ...
730 async def receive_json(
731 self,
732 *,
733 loads: JSONDecoder | Callable[[bytes], Any] = json.loads,
734 timeout: float | None = None,
735 ) -> Any:
736 data = await self.receive_str(timeout=timeout)
737 return loads(data) # type: ignore[arg-type]
739 async def write(
740 self, data: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
741 ) -> None:
742 raise RuntimeError("Cannot call .write() for websocket")
744 def __aiter__(self) -> Self:
745 return self
747 @overload
748 async def __anext__(
749 self: "WebSocketResponse[Literal[True]]",
750 ) -> WSMessageDecodeText: ...
752 @overload
753 async def __anext__(
754 self: "WebSocketResponse[Literal[False]]",
755 ) -> WSMessageNoDecodeText: ...
757 @overload
758 async def __anext__(
759 self: "WebSocketResponse[_DecodeText]",
760 ) -> WSMessageDecodeText | WSMessageNoDecodeText: ...
762 async def __anext__(self) -> WSMessageDecodeText | WSMessageNoDecodeText:
763 msg = await self.receive()
764 if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
765 raise StopAsyncIteration
766 return msg
768 def _cancel(self, exc: BaseException) -> None:
769 # web_protocol calls this from connection_lost
770 # or when the server is shutting down.
771 self._closing = True
772 self._cancel_heartbeat()
773 if self._reader is not None:
774 set_exception(self._reader, exc)