Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_ws.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

449 statements  

1import asyncio 

2import base64 

3import binascii 

4import hashlib 

5import json 

6import sys 

7from collections.abc import Callable, Iterable 

8from typing import Any, Final, Generic, Literal, Union, overload 

9 

10from multidict import CIMultiDict 

11 

12from . import hdrs 

13from ._websocket.reader import WebSocketDataQueue 

14from ._websocket.writer import DEFAULT_LIMIT 

15from .abc import AbstractStreamWriter 

16from .client_exceptions import WSMessageTypeError 

17from .helpers import ( 

18 calculate_timeout_when, 

19 frozen_dataclass_decorator, 

20 set_exception, 

21 set_result, 

22) 

23from .http import ( 

24 WS_CLOSED_MESSAGE, 

25 WS_CLOSING_MESSAGE, 

26 WS_KEY, 

27 WebSocketError, 

28 WebSocketReader, 

29 WebSocketWriter, 

30 WSCloseCode, 

31 WSMessageDecodeText, 

32 WSMessageNoDecodeText, 

33 WSMsgType, 

34 ws_ext_gen, 

35 ws_ext_parse, 

36) 

37from .http_websocket import _INTERNAL_RECEIVE_TYPES, WSMessageError 

38from .log import ws_logger 

39from .streams import EofStream 

40from .typedefs import JSONBytesEncoder, JSONDecoder, JSONEncoder 

41from .web_exceptions import HTTPBadRequest, HTTPException 

42from .web_request import BaseRequest 

43from .web_response import StreamResponse 

44 

45if sys.version_info >= (3, 13): 

46 from typing import TypeVar 

47else: 

48 from typing_extensions import TypeVar 

49 

50if sys.version_info >= (3, 11): 

51 import asyncio as async_timeout 

52 from typing import Self 

53else: 

54 import async_timeout 

55 from typing_extensions import Self 

56 

57__all__ = ( 

58 "WebSocketResponse", 

59 "WebSocketReady", 

60 "WSMsgType", 

61) 

62 

63THRESHOLD_CONNLOST_ACCESS: Final[int] = 5 

64 

65# TypeVar for whether text messages are decoded to str (True) or kept as bytes (False) 

66_DecodeText = TypeVar("_DecodeText", bound=bool, covariant=True, default=Literal[True]) 

67 

68 

69@frozen_dataclass_decorator 

70class WebSocketReady: 

71 ok: bool 

72 protocol: str | None 

73 

74 def __bool__(self) -> bool: 

75 return self.ok 

76 

77 

78class WebSocketResponse(StreamResponse, Generic[_DecodeText]): 

79 

80 _length_check: bool = False 

81 _ws_protocol: str | None = None 

82 _writer: WebSocketWriter | None = None 

83 _reader: WebSocketDataQueue | None = None 

84 _closed: bool = False 

85 _closing: bool = False 

86 _conn_lost: int = 0 

87 _close_code: int | None = None 

88 _loop: asyncio.AbstractEventLoop | None = None 

89 _waiting: bool = False 

90 _close_wait: asyncio.Future[None] | None = None 

91 _exception: BaseException | None = None 

92 _heartbeat_when: float = 0.0 

93 _heartbeat_cb: asyncio.TimerHandle | None = None 

94 _pong_response_cb: asyncio.TimerHandle | None = None 

95 _ping_task: asyncio.Task[None] | None = None 

96 _need_heartbeat_reset: bool = False 

97 _heartbeat_reset_handle: asyncio.Handle | None = None 

98 

99 def __init__( 

100 self, 

101 *, 

102 timeout: float = 10.0, 

103 receive_timeout: float | None = None, 

104 autoclose: bool = True, 

105 autoping: bool = True, 

106 heartbeat: float | None = None, 

107 protocols: Iterable[str] = (), 

108 compress: bool = True, 

109 max_msg_size: int = 4 * 1024 * 1024, 

110 writer_limit: int = DEFAULT_LIMIT, 

111 decode_text: bool = True, 

112 ) -> None: 

113 super().__init__(status=101) 

114 self._protocols = protocols 

115 self._timeout = timeout 

116 self._receive_timeout = receive_timeout 

117 self._autoclose = autoclose 

118 self._autoping = autoping 

119 self._heartbeat = heartbeat 

120 if heartbeat is not None: 

121 self._pong_heartbeat = heartbeat / 2.0 

122 self._compress: bool | int = compress 

123 self._max_msg_size = max_msg_size 

124 self._writer_limit = writer_limit 

125 self._decode_text = decode_text 

126 self._need_heartbeat_reset = False 

127 self._heartbeat_reset_handle = None 

128 

129 def _cancel_heartbeat(self) -> None: 

130 self._cancel_pong_response_cb() 

131 if self._heartbeat_reset_handle is not None: 

132 self._heartbeat_reset_handle.cancel() 

133 self._heartbeat_reset_handle = None 

134 self._need_heartbeat_reset = False 

135 if self._heartbeat_cb is not None: 

136 self._heartbeat_cb.cancel() 

137 self._heartbeat_cb = None 

138 if self._ping_task is not None: 

139 self._ping_task.cancel() 

140 self._ping_task = None 

141 

142 def _cancel_pong_response_cb(self) -> None: 

143 if self._pong_response_cb is not None: 

144 self._pong_response_cb.cancel() 

145 self._pong_response_cb = None 

146 

147 def _on_data_received(self) -> None: 

148 if self._heartbeat is None or self._need_heartbeat_reset: 

149 return 

150 loop = self._loop 

151 assert loop is not None 

152 # Coalesce multiple chunks received in the same loop tick into a single 

153 # heartbeat reset. Resetting immediately per chunk increases timer churn. 

154 self._need_heartbeat_reset = True 

155 self._heartbeat_reset_handle = loop.call_soon(self._flush_heartbeat_reset) 

156 

157 def _flush_heartbeat_reset(self) -> None: 

158 self._heartbeat_reset_handle = None 

159 if not self._need_heartbeat_reset: 

160 return 

161 self._reset_heartbeat() 

162 self._need_heartbeat_reset = False 

163 

164 def _reset_heartbeat(self) -> None: 

165 if self._heartbeat is None: 

166 return 

167 self._cancel_pong_response_cb() 

168 req = self._req 

169 timeout_ceil_threshold = ( 

170 req._protocol._timeout_ceil_threshold if req is not None else 5 

171 ) 

172 loop = self._loop 

173 assert loop is not None 

174 now = loop.time() 

175 when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold) 

176 self._heartbeat_when = when 

177 if self._heartbeat_cb is None: 

178 # We do not cancel the previous heartbeat_cb here because 

179 # it generates a significant amount of TimerHandle churn 

180 # which causes asyncio to rebuild the heap frequently. 

181 # Instead _send_heartbeat() will reschedule the next 

182 # heartbeat if it fires too early. 

183 self._heartbeat_cb = loop.call_at(when, self._send_heartbeat) 

184 

185 def _send_heartbeat(self) -> None: 

186 self._heartbeat_cb = None 

187 

188 # If heartbeat reset is pending (data is being received), skip sending 

189 # the ping and let the reset callback handle rescheduling the heartbeat. 

190 if self._need_heartbeat_reset: 

191 return 

192 

193 loop = self._loop 

194 assert loop is not None and self._writer is not None 

195 now = loop.time() 

196 if now < self._heartbeat_when: 

197 # Heartbeat fired too early, reschedule 

198 self._heartbeat_cb = loop.call_at( 

199 self._heartbeat_when, self._send_heartbeat 

200 ) 

201 return 

202 

203 req = self._req 

204 timeout_ceil_threshold = ( 

205 req._protocol._timeout_ceil_threshold if req is not None else 5 

206 ) 

207 when = calculate_timeout_when(now, self._pong_heartbeat, timeout_ceil_threshold) 

208 self._cancel_pong_response_cb() 

209 self._pong_response_cb = loop.call_at(when, self._pong_not_received) 

210 

211 coro = self._writer.send_frame(b"", WSMsgType.PING) 

212 if sys.version_info >= (3, 12): 

213 # Optimization for Python 3.12, try to send the ping 

214 # immediately to avoid having to schedule 

215 # the task on the event loop. 

216 ping_task = asyncio.Task(coro, loop=loop, eager_start=True) 

217 else: 

218 ping_task = loop.create_task(coro) 

219 

220 if not ping_task.done(): 

221 self._ping_task = ping_task 

222 ping_task.add_done_callback(self._ping_task_done) 

223 else: 

224 self._ping_task_done(ping_task) 

225 

226 def _ping_task_done(self, task: "asyncio.Task[None]") -> None: 

227 """Callback for when the ping task completes.""" 

228 if not task.cancelled() and (exc := task.exception()): 

229 self._handle_ping_pong_exception(exc) 

230 self._ping_task = None 

231 

232 def _pong_not_received(self) -> None: 

233 if self._req is not None and self._req.transport is not None: 

234 self._handle_ping_pong_exception( 

235 asyncio.TimeoutError( 

236 f"No PONG received after {self._pong_heartbeat} seconds" 

237 ) 

238 ) 

239 

240 def _handle_ping_pong_exception(self, exc: BaseException) -> None: 

241 """Handle exceptions raised during ping/pong processing.""" 

242 if self._closed: 

243 return 

244 self._set_closed() 

245 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) 

246 self._exception = exc 

247 if self._waiting and not self._closing and self._reader is not None: 

248 self._reader.feed_data(WSMessageError(data=exc, extra=None)) 

249 

250 def _set_closed(self) -> None: 

251 """Set the connection to closed. 

252 

253 Cancel any heartbeat timers and set the closed flag. 

254 """ 

255 self._closed = True 

256 self._cancel_heartbeat() 

257 

258 async def prepare(self, request: BaseRequest) -> AbstractStreamWriter: 

259 # make pre-check to don't hide it by do_handshake() exceptions 

260 if self._payload_writer is not None: 

261 return self._payload_writer 

262 

263 protocol, writer = self._pre_start(request) 

264 payload_writer = await super().prepare(request) 

265 assert payload_writer is not None 

266 self._post_start(request, protocol, writer) 

267 await payload_writer.drain() 

268 return payload_writer 

269 

270 def _handshake( 

271 self, request: BaseRequest 

272 ) -> tuple["CIMultiDict[str]", str | None, int, bool]: 

273 headers = request.headers 

274 if "websocket" != headers.get(hdrs.UPGRADE, "").lower().strip(): 

275 raise HTTPBadRequest( 

276 text=( 

277 f"No WebSocket UPGRADE hdr: {headers.get(hdrs.UPGRADE)}\n Can " 

278 '"Upgrade" only to "WebSocket".' 

279 ) 

280 ) 

281 

282 if "upgrade" not in headers.get(hdrs.CONNECTION, "").lower(): 

283 raise HTTPBadRequest( 

284 text=f"No CONNECTION upgrade hdr: {headers.get(hdrs.CONNECTION)}" 

285 ) 

286 

287 # find common sub-protocol between client and server 

288 protocol: str | None = None 

289 if hdrs.SEC_WEBSOCKET_PROTOCOL in headers: 

290 req_protocols = [ 

291 str(proto.strip()) 

292 for proto in headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",") 

293 ] 

294 

295 for proto in req_protocols: 

296 if proto in self._protocols: 

297 protocol = proto 

298 break 

299 else: 

300 # No overlap found: Return no protocol as per spec 

301 ws_logger.warning( 

302 "%s: Client protocols %r don’t overlap server-known ones %r", 

303 request.remote, 

304 req_protocols, 

305 self._protocols, 

306 ) 

307 

308 # check supported version 

309 version = headers.get(hdrs.SEC_WEBSOCKET_VERSION, "") 

310 if version not in ("13", "8", "7"): 

311 raise HTTPBadRequest(text=f"Unsupported version: {version}") 

312 

313 # check client handshake for validity 

314 key = headers.get(hdrs.SEC_WEBSOCKET_KEY) 

315 try: 

316 if not key or len(base64.b64decode(key)) != 16: 

317 raise HTTPBadRequest(text=f"Handshake error: {key!r}") 

318 except binascii.Error: 

319 raise HTTPBadRequest(text=f"Handshake error: {key!r}") from None 

320 

321 accept_val = base64.b64encode( 

322 hashlib.sha1(key.encode() + WS_KEY).digest() 

323 ).decode() 

324 response_headers = CIMultiDict( 

325 { 

326 hdrs.UPGRADE: "websocket", 

327 hdrs.CONNECTION: "upgrade", 

328 hdrs.SEC_WEBSOCKET_ACCEPT: accept_val, 

329 } 

330 ) 

331 

332 notakeover = False 

333 compress = 0 

334 if self._compress: 

335 extensions = headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS) 

336 # Server side always get return with no exception. 

337 # If something happened, just drop compress extension 

338 compress, notakeover = ws_ext_parse(extensions, isserver=True) 

339 if compress: 

340 enabledext = ws_ext_gen( 

341 compress=compress, isserver=True, server_notakeover=notakeover 

342 ) 

343 response_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = enabledext 

344 

345 if protocol: 

346 response_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = protocol 

347 return ( 

348 response_headers, 

349 protocol, 

350 compress, 

351 notakeover, 

352 ) 

353 

354 def _pre_start(self, request: BaseRequest) -> tuple[str | None, WebSocketWriter]: 

355 self._loop = request._loop 

356 

357 headers, protocol, compress, notakeover = self._handshake(request) 

358 

359 self.set_status(101) 

360 self.headers.update(headers) 

361 self.force_close() 

362 self._compress = compress 

363 transport = request._protocol.transport 

364 if transport is None: 

365 raise ConnectionResetError("Connection lost") 

366 writer = WebSocketWriter( 

367 request._protocol, 

368 transport, 

369 compress=compress, 

370 notakeover=notakeover, 

371 limit=self._writer_limit, 

372 ) 

373 

374 return protocol, writer 

375 

376 def _post_start( 

377 self, request: BaseRequest, protocol: str | None, writer: WebSocketWriter 

378 ) -> None: 

379 self._ws_protocol = protocol 

380 self._writer = writer 

381 

382 self._reset_heartbeat() 

383 

384 loop = self._loop 

385 assert loop is not None 

386 self._reader = WebSocketDataQueue(request._protocol, 2**16, loop=loop) 

387 parser = WebSocketReader( 

388 self._reader, 

389 self._max_msg_size, 

390 compress=bool(self._compress), 

391 decode_text=self._decode_text, 

392 ) 

393 cb = None if self._heartbeat is None else self._on_data_received 

394 request.protocol.set_parser(parser, data_received_cb=cb) 

395 # disable HTTP keepalive for WebSocket 

396 request.protocol.keep_alive(False) 

397 

398 def can_prepare(self, request: BaseRequest) -> WebSocketReady: 

399 if self._writer is not None: 

400 raise RuntimeError("Already started") 

401 try: 

402 _, protocol, _, _ = self._handshake(request) 

403 except HTTPException: 

404 return WebSocketReady(False, None) 

405 else: 

406 return WebSocketReady(True, protocol) 

407 

408 @property 

409 def prepared(self) -> bool: 

410 return self._writer is not None 

411 

412 @property 

413 def closed(self) -> bool: 

414 return self._closed 

415 

416 @property 

417 def close_code(self) -> int | None: 

418 return self._close_code 

419 

420 @property 

421 def ws_protocol(self) -> str | None: 

422 return self._ws_protocol 

423 

424 @property 

425 def compress(self) -> int | bool: 

426 return self._compress 

427 

428 def get_extra_info(self, name: str, default: Any = None) -> Any: 

429 """Get optional transport information. 

430 

431 If no value associated with ``name`` is found, ``default`` is returned. 

432 """ 

433 writer = self._writer 

434 if writer is None: 

435 return default 

436 return writer.transport.get_extra_info(name, default) 

437 

438 def exception(self) -> BaseException | None: 

439 return self._exception 

440 

441 async def ping(self, message: bytes = b"") -> None: 

442 if self._writer is None: 

443 raise RuntimeError("Call .prepare() first") 

444 await self._writer.send_frame(message, WSMsgType.PING) 

445 

446 async def pong(self, message: bytes = b"") -> None: 

447 # unsolicited pong 

448 if self._writer is None: 

449 raise RuntimeError("Call .prepare() first") 

450 await self._writer.send_frame(message, WSMsgType.PONG) 

451 

452 async def send_frame( 

453 self, message: bytes, opcode: WSMsgType, compress: int | None = None 

454 ) -> None: 

455 """Send a frame over the websocket.""" 

456 if self._writer is None: 

457 raise RuntimeError("Call .prepare() first") 

458 await self._writer.send_frame(message, opcode, compress) 

459 

460 async def send_str(self, data: str, compress: int | None = None) -> None: 

461 if self._writer is None: 

462 raise RuntimeError("Call .prepare() first") 

463 if not isinstance(data, str): 

464 raise TypeError("data argument must be str (%r)" % type(data)) 

465 await self._writer.send_frame( 

466 data.encode("utf-8"), WSMsgType.TEXT, compress=compress 

467 ) 

468 

469 async def send_bytes(self, data: bytes, compress: int | None = None) -> None: 

470 if self._writer is None: 

471 raise RuntimeError("Call .prepare() first") 

472 if not isinstance(data, (bytes, bytearray, memoryview)): 

473 raise TypeError("data argument must be byte-ish (%r)" % type(data)) 

474 await self._writer.send_frame(data, WSMsgType.BINARY, compress=compress) 

475 

476 async def send_json( 

477 self, 

478 data: Any, 

479 compress: int | None = None, 

480 *, 

481 dumps: JSONEncoder = json.dumps, 

482 ) -> None: 

483 await self.send_str(dumps(data), compress=compress) 

484 

485 async def send_json_bytes( 

486 self, 

487 data: Any, 

488 compress: int | None = None, 

489 *, 

490 dumps: JSONBytesEncoder, 

491 ) -> None: 

492 """Send JSON data using a bytes-returning encoder as a binary frame. 

493 

494 Use this when your JSON encoder (like orjson) returns bytes 

495 instead of str, avoiding the encode/decode overhead. 

496 """ 

497 await self.send_bytes(dumps(data), compress=compress) 

498 

499 async def write_eof(self) -> None: # type: ignore[override] 

500 if self._eof_sent: 

501 return 

502 if self._payload_writer is None: 

503 raise RuntimeError("Response has not been started") 

504 

505 await self.close() 

506 self._eof_sent = True 

507 

508 async def close( 

509 self, *, code: int = WSCloseCode.OK, message: bytes = b"", drain: bool = True 

510 ) -> bool: 

511 """Close websocket connection.""" 

512 if self._writer is None: 

513 raise RuntimeError("Call .prepare() first") 

514 

515 if self._closed: 

516 return False 

517 self._set_closed() 

518 

519 try: 

520 await self._writer.close(code, message) 

521 writer = self._payload_writer 

522 assert writer is not None 

523 if drain: 

524 await writer.drain() 

525 except (asyncio.CancelledError, asyncio.TimeoutError): 

526 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) 

527 raise 

528 except Exception as exc: 

529 self._exception = exc 

530 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) 

531 return True 

532 

533 reader = self._reader 

534 assert reader is not None 

535 # we need to break `receive()` cycle before we can call 

536 # `reader.read()` as `close()` may be called from different task 

537 if self._waiting: 

538 assert self._loop is not None 

539 assert self._close_wait is None 

540 self._close_wait = self._loop.create_future() 

541 reader.feed_data(WS_CLOSING_MESSAGE) 

542 await self._close_wait 

543 

544 if self._closing: 

545 self._close_transport() 

546 return True 

547 

548 try: 

549 async with async_timeout.timeout(self._timeout): 

550 while True: 

551 msg = await reader.read() 

552 if msg.type is WSMsgType.CLOSE: 

553 self._set_code_close_transport(msg.data) 

554 return True 

555 except asyncio.CancelledError: 

556 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) 

557 raise 

558 except Exception as exc: 

559 self._exception = exc 

560 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) 

561 return True 

562 

563 def _set_closing(self, code: int) -> None: 

564 """Set the close code and mark the connection as closing.""" 

565 self._closing = True 

566 self._close_code = code 

567 self._cancel_heartbeat() 

568 

569 def _set_code_close_transport(self, code: int) -> None: 

570 """Set the close code and close the transport.""" 

571 self._close_code = code 

572 self._close_transport() 

573 

574 def _close_transport(self) -> None: 

575 """Close the transport.""" 

576 if self._req is not None and self._req.transport is not None: 

577 self._req.transport.close() 

578 

579 @overload 

580 async def receive( 

581 self: "WebSocketResponse[Literal[True]]", timeout: float | None = None 

582 ) -> WSMessageDecodeText: ... 

583 

584 @overload 

585 async def receive( 

586 self: "WebSocketResponse[Literal[False]]", timeout: float | None = None 

587 ) -> WSMessageNoDecodeText: ... 

588 

589 @overload 

590 async def receive( 

591 self: "WebSocketResponse[_DecodeText]", timeout: float | None = None 

592 ) -> WSMessageDecodeText | WSMessageNoDecodeText: ... 

593 

594 async def receive( 

595 self, timeout: float | None = None 

596 ) -> WSMessageDecodeText | WSMessageNoDecodeText: 

597 if self._reader is None: 

598 raise RuntimeError("Call .prepare() first") 

599 

600 receive_timeout = timeout or self._receive_timeout 

601 while True: 

602 if self._waiting: 

603 raise RuntimeError("Concurrent call to receive() is not allowed") 

604 

605 if self._closed: 

606 self._conn_lost += 1 

607 if self._conn_lost >= THRESHOLD_CONNLOST_ACCESS: 

608 raise RuntimeError("WebSocket connection is closed.") 

609 return WS_CLOSED_MESSAGE 

610 elif self._closing: 

611 return WS_CLOSING_MESSAGE 

612 

613 try: 

614 self._waiting = True 

615 try: 

616 if receive_timeout: 

617 # Entering the context manager and creating 

618 # Timeout() object can take almost 50% of the 

619 # run time in this loop so we avoid it if 

620 # there is no read timeout. 

621 async with async_timeout.timeout(receive_timeout): 

622 msg = await self._reader.read() 

623 else: 

624 msg = await self._reader.read() 

625 finally: 

626 self._waiting = False 

627 if self._close_wait: 

628 set_result(self._close_wait, None) 

629 except asyncio.TimeoutError: 

630 raise 

631 except EofStream: 

632 self._close_code = WSCloseCode.OK 

633 await self.close() 

634 return WS_CLOSED_MESSAGE 

635 except WebSocketError as exc: 

636 self._close_code = exc.code 

637 await self.close(code=exc.code) 

638 return WSMessageError(data=exc) 

639 except Exception as exc: 

640 self._exception = exc 

641 self._set_closing(WSCloseCode.ABNORMAL_CLOSURE) 

642 await self.close() 

643 return WSMessageError(data=exc) 

644 

645 if msg.type not in _INTERNAL_RECEIVE_TYPES: 

646 # If its not a close/closing/ping/pong message 

647 # we can return it immediately 

648 return msg 

649 

650 if msg.type is WSMsgType.CLOSE: 

651 self._set_closing(msg.data) 

652 # Could be closed while awaiting reader. 

653 if not self._closed and self._autoclose: # type: ignore[redundant-expr] 

654 # The client is likely going to close the 

655 # connection out from under us so we do not 

656 # want to drain any pending writes as it will 

657 # likely result writing to a broken pipe. 

658 await self.close(drain=False) 

659 elif msg.type is WSMsgType.CLOSING: 

660 self._set_closing(WSCloseCode.OK) 

661 elif msg.type is WSMsgType.PING and self._autoping: 

662 await self.pong(msg.data) 

663 continue 

664 elif msg.type is WSMsgType.PONG and self._autoping: 

665 continue 

666 

667 return msg 

668 

669 @overload 

670 async def receive_str( 

671 self: "WebSocketResponse[Literal[True]]", *, timeout: float | None = None 

672 ) -> str: ... 

673 

674 @overload 

675 async def receive_str( 

676 self: "WebSocketResponse[Literal[False]]", *, timeout: float | None = None 

677 ) -> bytes: ... 

678 

679 @overload 

680 async def receive_str( 

681 self: "WebSocketResponse[_DecodeText]", *, timeout: float | None = None 

682 ) -> str | bytes: ... 

683 

684 async def receive_str(self, *, timeout: float | None = None) -> str | bytes: 

685 """Receive TEXT message. 

686 

687 Returns str when decode_text=True (default), bytes when decode_text=False. 

688 """ 

689 msg = await self.receive(timeout) 

690 if msg.type is not WSMsgType.TEXT: 

691 raise WSMessageTypeError( 

692 f"Received message {msg.type}:{msg.data!r} is not WSMsgType.TEXT" 

693 ) 

694 return msg.data 

695 

696 async def receive_bytes(self, *, timeout: float | None = None) -> bytes: 

697 msg = await self.receive(timeout) 

698 if msg.type is not WSMsgType.BINARY: 

699 raise WSMessageTypeError( 

700 f"Received message {msg.type}:{msg.data!r} is not WSMsgType.BINARY" 

701 ) 

702 return msg.data 

703 

704 @overload 

705 async def receive_json( 

706 self: "WebSocketResponse[Literal[True]]", 

707 *, 

708 loads: JSONDecoder = ..., 

709 timeout: float | None = None, 

710 ) -> Any: ... 

711 

712 @overload 

713 async def receive_json( 

714 self: "WebSocketResponse[Literal[False]]", 

715 *, 

716 loads: Callable[[bytes], Any] = ..., 

717 timeout: float | None = None, 

718 ) -> Any: ... 

719 

720 @overload 

721 async def receive_json( 

722 self: "WebSocketResponse[_DecodeText]", 

723 *, 

724 loads: JSONDecoder | Callable[[bytes], Any] = ..., 

725 timeout: float | None = None, 

726 ) -> Any: ... 

727 

728 async def receive_json( 

729 self, 

730 *, 

731 loads: JSONDecoder | Callable[[bytes], Any] = json.loads, 

732 timeout: float | None = None, 

733 ) -> Any: 

734 data = await self.receive_str(timeout=timeout) 

735 return loads(data) # type: ignore[arg-type] 

736 

737 async def write( 

738 self, data: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

739 ) -> None: 

740 raise RuntimeError("Cannot call .write() for websocket") 

741 

742 def __aiter__(self) -> Self: 

743 return self 

744 

745 @overload 

746 async def __anext__( 

747 self: "WebSocketResponse[Literal[True]]", 

748 ) -> WSMessageDecodeText: ... 

749 

750 @overload 

751 async def __anext__( 

752 self: "WebSocketResponse[Literal[False]]", 

753 ) -> WSMessageNoDecodeText: ... 

754 

755 @overload 

756 async def __anext__( 

757 self: "WebSocketResponse[_DecodeText]", 

758 ) -> WSMessageDecodeText | WSMessageNoDecodeText: ... 

759 

760 async def __anext__(self) -> WSMessageDecodeText | WSMessageNoDecodeText: 

761 msg = await self.receive() 

762 if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED): 

763 raise StopAsyncIteration 

764 return msg 

765 

766 def _cancel(self, exc: BaseException) -> None: 

767 # web_protocol calls this from connection_lost 

768 # or when the server is shutting down. 

769 self._closing = True 

770 self._cancel_heartbeat() 

771 if self._reader is not None: 

772 set_exception(self._reader, exc)