Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_ws.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

448 statements  

1import asyncio 

2import base64 

3import binascii 

4import hashlib 

5import json 

6import sys 

7from collections.abc import Callable, Iterable 

8from typing import Any, Final, Generic, Literal, Union, overload 

9 

10from multidict import CIMultiDict 

11 

12from . import hdrs 

13from ._websocket.reader import WebSocketDataQueue 

14from .abc import AbstractStreamWriter 

15from .client_exceptions import WSMessageTypeError 

16from .helpers import ( 

17 DEFAULT_CHUNK_SIZE, 

18 calculate_timeout_when, 

19 frozen_dataclass_decorator, 

20 set_exception, 

21 set_result, 

22) 

23from .http import ( 

24 WS_CLOSED_MESSAGE, 

25 WS_CLOSING_MESSAGE, 

26 WS_KEY, 

27 WebSocketError, 

28 WebSocketReader, 

29 WebSocketWriter, 

30 WSCloseCode, 

31 WSMessageDecodeText, 

32 WSMessageNoDecodeText, 

33 WSMsgType, 

34 ws_ext_gen, 

35 ws_ext_parse, 

36) 

37from .http_websocket import _INTERNAL_RECEIVE_TYPES, WSMessageError 

38from .log import ws_logger 

39from .streams import EofStream 

40from .typedefs import JSONBytesEncoder, JSONDecoder, JSONEncoder 

41from .web_exceptions import HTTPBadRequest, HTTPException 

42from .web_request import BaseRequest 

43from .web_response import StreamResponse 

44 

45if sys.version_info >= (3, 13): 

46 from typing import TypeVar 

47else: 

48 from typing_extensions import TypeVar 

49 

50if sys.version_info >= (3, 11): 

51 import asyncio as async_timeout 

52 from typing import Self 

53else: 

54 import async_timeout 

55 from typing_extensions import Self 

56 

57__all__ = ( 

58 "WebSocketResponse", 

59 "WebSocketReady", 

60 "WSMsgType", 

61) 

62 

63THRESHOLD_CONNLOST_ACCESS: Final[int] = 5 

64 

65# TypeVar for whether text messages are decoded to str (True) or kept as bytes (False) 

66_DecodeText = TypeVar("_DecodeText", bound=bool, covariant=True, default=Literal[True]) 

67 

68 

69@frozen_dataclass_decorator 

70class WebSocketReady: 

71 ok: bool 

72 protocol: str | None 

73 

74 def __bool__(self) -> bool: 

75 return self.ok 

76 

77 

78class WebSocketResponse(StreamResponse, Generic[_DecodeText]): 

79 

80 _length_check: bool = False 

81 _ws_protocol: str | None = None 

82 _writer: WebSocketWriter | None = None 

83 _reader: WebSocketDataQueue | None = None 

84 _closed: bool = False 

85 _closing: bool = False 

86 _conn_lost: int = 0 

87 _close_code: int | None = None 

88 _loop: asyncio.AbstractEventLoop | None = None 

89 _waiting: bool = False 

90 _close_wait: asyncio.Future[None] | None = None 

91 _exception: BaseException | None = None 

92 _heartbeat_when: float = 0.0 

93 _heartbeat_cb: asyncio.TimerHandle | None = None 

94 _pong_response_cb: asyncio.TimerHandle | None = None 

95 _ping_task: asyncio.Task[None] | None = None 

96 _need_heartbeat_reset: bool = False 

97 _heartbeat_reset_handle: asyncio.Handle | None = None 

98 

99 def __init__( 

100 self, 

101 *, 

102 timeout: float = 10.0, 

103 receive_timeout: float | None = None, 

104 autoclose: bool = True, 

105 autoping: bool = True, 

106 heartbeat: float | None = None, 

107 protocols: Iterable[str] = (), 

108 compress: bool = True, 

109 max_msg_size: int = 4 * 1024 * 1024, 

110 writer_limit: int = DEFAULT_CHUNK_SIZE, 

111 decode_text: bool = True, 

112 ) -> None: 

113 super().__init__(status=101) 

114 self._protocols = protocols 

115 self._timeout = timeout 

116 self._receive_timeout = receive_timeout 

117 self._autoclose = autoclose 

118 self._autoping = autoping 

119 self._heartbeat = heartbeat 

120 if heartbeat is not None: 

121 self._pong_heartbeat = heartbeat / 2.0 

122 self._compress: bool | int = compress 

123 self._max_msg_size = max_msg_size 

124 self._writer_limit = writer_limit 

125 self._decode_text = decode_text 

126 self._need_heartbeat_reset = False 

127 self._heartbeat_reset_handle = None 

128 

129 def _cancel_heartbeat(self) -> None: 

130 self._cancel_pong_response_cb() 

131 if self._heartbeat_reset_handle is not None: 

132 self._heartbeat_reset_handle.cancel() 

133 self._heartbeat_reset_handle = None 

134 self._need_heartbeat_reset = False 

135 if self._heartbeat_cb is not None: 

136 self._heartbeat_cb.cancel() 

137 self._heartbeat_cb = None 

138 if self._ping_task is not None: 

139 self._ping_task.cancel() 

140 self._ping_task = None 

141 

142 def _cancel_pong_response_cb(self) -> None: 

143 if self._pong_response_cb is not None: 

144 self._pong_response_cb.cancel() 

145 self._pong_response_cb = None 

146 

147 def _on_data_received(self) -> None: 

148 if self._heartbeat is None or self._need_heartbeat_reset: 

149 return 

150 loop = self._loop 

151 assert loop is not None 

152 # Coalesce multiple chunks received in the same loop tick into a single 

153 # heartbeat reset. Resetting immediately per chunk increases timer churn. 

154 self._need_heartbeat_reset = True 

155 self._heartbeat_reset_handle = loop.call_soon(self._flush_heartbeat_reset) 

156 

157 def _flush_heartbeat_reset(self) -> None: 

158 self._heartbeat_reset_handle = None 

159 if not self._need_heartbeat_reset: 

160 return 

161 self._reset_heartbeat() 

162 self._need_heartbeat_reset = False 

163 

164 def _reset_heartbeat(self) -> None: 

165 if self._heartbeat is None: 

166 return 

167 self._cancel_pong_response_cb() 

168 req = self._req 

169 timeout_ceil_threshold = ( 

170 req._protocol._timeout_ceil_threshold if req is not None else 5 

171 ) 

172 loop = self._loop 

173 assert loop is not None 

174 now = loop.time() 

175 when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold) 

176 self._heartbeat_when = when 

177 if self._heartbeat_cb is None: 

178 # We do not cancel the previous heartbeat_cb here because 

179 # it generates a significant amount of TimerHandle churn 

180 # which causes asyncio to rebuild the heap frequently. 

181 # Instead _send_heartbeat() will reschedule the next 

182 # heartbeat if it fires too early. 

183 self._heartbeat_cb = loop.call_at(when, self._send_heartbeat) 

184 

185 def _send_heartbeat(self) -> None: 

186 self._heartbeat_cb = None 

187 

188 # If heartbeat reset is pending (data is being received), skip sending 

189 # the ping and let the reset callback handle rescheduling the heartbeat. 

190 if self._need_heartbeat_reset: 

191 return 

192 

193 loop = self._loop 

194 assert loop is not None and self._writer is not None 

195 now = loop.time() 

196 if now < self._heartbeat_when: 

197 # Heartbeat fired too early, reschedule 

198 self._heartbeat_cb = loop.call_at( 

199 self._heartbeat_when, self._send_heartbeat 

200 ) 

201 return 

202 

203 req = self._req 

204 timeout_ceil_threshold = ( 

205 req._protocol._timeout_ceil_threshold if req is not None else 5 

206 ) 

207 when = calculate_timeout_when(now, self._pong_heartbeat, timeout_ceil_threshold) 

208 self._cancel_pong_response_cb() 

209 self._pong_response_cb = loop.call_at(when, self._pong_not_received) 

210 

211 coro = self._writer.send_frame(b"", WSMsgType.PING) 

212 if sys.version_info >= (3, 12): 

213 # Optimization for Python 3.12, try to send the ping 

214 # immediately to avoid having to schedule 

215 # the task on the event loop. 

216 ping_task = asyncio.Task(coro, loop=loop, eager_start=True) 

217 else: 

218 ping_task = loop.create_task(coro) 

219 

220 if not ping_task.done(): 

221 self._ping_task = ping_task 

222 ping_task.add_done_callback(self._ping_task_done) 

223 else: 

224 self._ping_task_done(ping_task) 

225 

226 def _ping_task_done(self, task: "asyncio.Task[None]") -> None: 

227 """Callback for when the ping task completes.""" 

228 if not task.cancelled() and (exc := task.exception()): 

229 self._handle_ping_pong_exception(exc) 

230 self._ping_task = None 

231 

232 def _pong_not_received(self) -> None: 

233 if self._req is not None and self._req.transport is not None: 

234 self._handle_ping_pong_exception( 

235 asyncio.TimeoutError( 

236 f"No PONG received after {self._pong_heartbeat} seconds" 

237 ) 

238 ) 

239 

240 def _handle_ping_pong_exception(self, exc: BaseException) -> None: 

241 """Handle exceptions raised during ping/pong processing.""" 

242 if self._closed: 

243 return 

244 self._set_closed() 

245 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) 

246 self._exception = exc 

247 if self._waiting and not self._closing and self._reader is not None: 

248 self._reader.feed_data(WSMessageError(data=exc, extra=None)) 

249 

250 def _set_closed(self) -> None: 

251 """Set the connection to closed. 

252 

253 Cancel any heartbeat timers and set the closed flag. 

254 """ 

255 self._closed = True 

256 self._cancel_heartbeat() 

257 

258 async def prepare(self, request: BaseRequest) -> AbstractStreamWriter: 

259 # make pre-check to don't hide it by do_handshake() exceptions 

260 if self._payload_writer is not None: 

261 return self._payload_writer 

262 

263 protocol, writer = self._pre_start(request) 

264 payload_writer = await super().prepare(request) 

265 assert payload_writer is not None 

266 self._post_start(request, protocol, writer) 

267 await payload_writer.drain() 

268 return payload_writer 

269 

270 def _handshake( 

271 self, request: BaseRequest 

272 ) -> tuple["CIMultiDict[str]", str | None, int, bool]: 

273 headers = request.headers 

274 if "websocket" != headers.get(hdrs.UPGRADE, "").lower().strip(): 

275 raise HTTPBadRequest( 

276 text=( 

277 f"No WebSocket UPGRADE hdr: {headers.get(hdrs.UPGRADE)}\n Can " 

278 '"Upgrade" only to "WebSocket".' 

279 ) 

280 ) 

281 

282 if "upgrade" not in headers.get(hdrs.CONNECTION, "").lower(): 

283 raise HTTPBadRequest( 

284 text=f"No CONNECTION upgrade hdr: {headers.get(hdrs.CONNECTION)}" 

285 ) 

286 

287 # find common sub-protocol between client and server 

288 protocol: str | None = None 

289 if hdrs.SEC_WEBSOCKET_PROTOCOL in headers: 

290 req_protocols = [ 

291 str(proto.strip()) 

292 for proto in headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",") 

293 ] 

294 

295 for proto in req_protocols: 

296 if proto in self._protocols: 

297 protocol = proto 

298 break 

299 else: 

300 # No overlap found: Return no protocol as per spec 

301 ws_logger.warning( 

302 "%s: Client protocols %r don’t overlap server-known ones %r", 

303 request.remote, 

304 req_protocols, 

305 self._protocols, 

306 ) 

307 

308 # check supported version 

309 version = headers.get(hdrs.SEC_WEBSOCKET_VERSION, "") 

310 if version not in ("13", "8", "7"): 

311 raise HTTPBadRequest(text=f"Unsupported version: {version}") 

312 

313 # check client handshake for validity 

314 key = headers.get(hdrs.SEC_WEBSOCKET_KEY) 

315 try: 

316 if not key or len(base64.b64decode(key)) != 16: 

317 raise HTTPBadRequest(text=f"Handshake error: {key!r}") 

318 except binascii.Error: 

319 raise HTTPBadRequest(text=f"Handshake error: {key!r}") from None 

320 

321 accept_val = base64.b64encode( 

322 hashlib.sha1(key.encode() + WS_KEY).digest() 

323 ).decode() 

324 response_headers = CIMultiDict( 

325 { 

326 hdrs.UPGRADE: "websocket", 

327 hdrs.CONNECTION: "upgrade", 

328 hdrs.SEC_WEBSOCKET_ACCEPT: accept_val, 

329 } 

330 ) 

331 

332 notakeover = False 

333 compress = 0 

334 if self._compress: 

335 extensions = headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS) 

336 # Server side always get return with no exception. 

337 # If something happened, just drop compress extension 

338 compress, notakeover = ws_ext_parse(extensions, isserver=True) 

339 if compress: 

340 enabledext = ws_ext_gen( 

341 compress=compress, isserver=True, server_notakeover=notakeover 

342 ) 

343 response_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = enabledext 

344 

345 if protocol: 

346 response_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = protocol 

347 return ( 

348 response_headers, 

349 protocol, 

350 compress, 

351 notakeover, 

352 ) 

353 

354 def _pre_start(self, request: BaseRequest) -> tuple[str | None, WebSocketWriter]: 

355 self._loop = request._loop 

356 

357 headers, protocol, compress, notakeover = self._handshake(request) 

358 

359 self.set_status(101) 

360 self.headers.update(headers) 

361 self.force_close() 

362 self._compress = compress 

363 transport = request._protocol.transport 

364 if transport is None: 

365 raise ConnectionResetError("Connection lost") 

366 writer = WebSocketWriter( 

367 request._protocol, 

368 transport, 

369 compress=compress, 

370 notakeover=notakeover, 

371 limit=self._writer_limit, 

372 ) 

373 

374 return protocol, writer 

375 

376 def _post_start( 

377 self, request: BaseRequest, protocol: str | None, writer: WebSocketWriter 

378 ) -> None: 

379 self._ws_protocol = protocol 

380 self._writer = writer 

381 

382 self._reset_heartbeat() 

383 

384 loop = self._loop 

385 assert loop is not None 

386 self._reader = WebSocketDataQueue( 

387 request._protocol, DEFAULT_CHUNK_SIZE, loop=loop 

388 ) 

389 parser = WebSocketReader( 

390 self._reader, 

391 self._max_msg_size, 

392 compress=bool(self._compress), 

393 decode_text=self._decode_text, 

394 ) 

395 cb = None if self._heartbeat is None else self._on_data_received 

396 request.protocol.set_parser(parser, data_received_cb=cb) 

397 # disable HTTP keepalive for WebSocket 

398 request.protocol.keep_alive(False) 

399 

400 def can_prepare(self, request: BaseRequest) -> WebSocketReady: 

401 if self._writer is not None: 

402 raise RuntimeError("Already started") 

403 try: 

404 _, protocol, _, _ = self._handshake(request) 

405 except HTTPException: 

406 return WebSocketReady(False, None) 

407 else: 

408 return WebSocketReady(True, protocol) 

409 

410 @property 

411 def prepared(self) -> bool: 

412 return self._writer is not None 

413 

414 @property 

415 def closed(self) -> bool: 

416 return self._closed 

417 

418 @property 

419 def close_code(self) -> int | None: 

420 return self._close_code 

421 

422 @property 

423 def ws_protocol(self) -> str | None: 

424 return self._ws_protocol 

425 

426 @property 

427 def compress(self) -> int | bool: 

428 return self._compress 

429 

430 def get_extra_info(self, name: str, default: Any = None) -> Any: 

431 """Get optional transport information. 

432 

433 If no value associated with ``name`` is found, ``default`` is returned. 

434 """ 

435 writer = self._writer 

436 if writer is None: 

437 return default 

438 return writer.transport.get_extra_info(name, default) 

439 

440 def exception(self) -> BaseException | None: 

441 return self._exception 

442 

443 async def ping(self, message: bytes = b"") -> None: 

444 if self._writer is None: 

445 raise RuntimeError("Call .prepare() first") 

446 await self._writer.send_frame(message, WSMsgType.PING) 

447 

448 async def pong(self, message: bytes = b"") -> None: 

449 # unsolicited pong 

450 if self._writer is None: 

451 raise RuntimeError("Call .prepare() first") 

452 await self._writer.send_frame(message, WSMsgType.PONG) 

453 

454 async def send_frame( 

455 self, message: bytes, opcode: WSMsgType, compress: int | None = None 

456 ) -> None: 

457 """Send a frame over the websocket.""" 

458 if self._writer is None: 

459 raise RuntimeError("Call .prepare() first") 

460 await self._writer.send_frame(message, opcode, compress) 

461 

462 async def send_str(self, data: str, compress: int | None = None) -> None: 

463 if self._writer is None: 

464 raise RuntimeError("Call .prepare() first") 

465 if not isinstance(data, str): 

466 raise TypeError("data argument must be str (%r)" % type(data)) 

467 await self._writer.send_frame( 

468 data.encode("utf-8"), WSMsgType.TEXT, compress=compress 

469 ) 

470 

471 async def send_bytes(self, data: bytes, compress: int | None = None) -> None: 

472 if self._writer is None: 

473 raise RuntimeError("Call .prepare() first") 

474 if not isinstance(data, (bytes, bytearray, memoryview)): 

475 raise TypeError("data argument must be byte-ish (%r)" % type(data)) 

476 await self._writer.send_frame(data, WSMsgType.BINARY, compress=compress) 

477 

478 async def send_json( 

479 self, 

480 data: Any, 

481 compress: int | None = None, 

482 *, 

483 dumps: JSONEncoder = json.dumps, 

484 ) -> None: 

485 await self.send_str(dumps(data), compress=compress) 

486 

487 async def send_json_bytes( 

488 self, 

489 data: Any, 

490 compress: int | None = None, 

491 *, 

492 dumps: JSONBytesEncoder, 

493 ) -> None: 

494 """Send JSON data using a bytes-returning encoder as a binary frame. 

495 

496 Use this when your JSON encoder (like orjson) returns bytes 

497 instead of str, avoiding the encode/decode overhead. 

498 """ 

499 await self.send_bytes(dumps(data), compress=compress) 

500 

501 async def write_eof(self) -> None: # type: ignore[override] 

502 if self._eof_sent: 

503 return 

504 if self._payload_writer is None: 

505 raise RuntimeError("Response has not been started") 

506 

507 await self.close() 

508 self._eof_sent = True 

509 

510 async def close( 

511 self, *, code: int = WSCloseCode.OK, message: bytes = b"", drain: bool = True 

512 ) -> bool: 

513 """Close websocket connection.""" 

514 if self._writer is None: 

515 raise RuntimeError("Call .prepare() first") 

516 

517 if self._closed: 

518 return False 

519 self._set_closed() 

520 

521 try: 

522 await self._writer.close(code, message) 

523 writer = self._payload_writer 

524 assert writer is not None 

525 if drain: 

526 await writer.drain() 

527 except (asyncio.CancelledError, asyncio.TimeoutError): 

528 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) 

529 raise 

530 except Exception as exc: 

531 self._exception = exc 

532 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) 

533 return True 

534 

535 reader = self._reader 

536 assert reader is not None 

537 # we need to break `receive()` cycle before we can call 

538 # `reader.read()` as `close()` may be called from different task 

539 if self._waiting: 

540 assert self._loop is not None 

541 assert self._close_wait is None 

542 self._close_wait = self._loop.create_future() 

543 reader.feed_data(WS_CLOSING_MESSAGE) 

544 await self._close_wait 

545 

546 if self._closing: 

547 self._close_transport() 

548 return True 

549 

550 try: 

551 async with async_timeout.timeout(self._timeout): 

552 while True: 

553 msg = await reader.read() 

554 if msg.type is WSMsgType.CLOSE: 

555 self._set_code_close_transport(msg.data) 

556 return True 

557 except asyncio.CancelledError: 

558 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) 

559 raise 

560 except Exception as exc: 

561 self._exception = exc 

562 self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) 

563 return True 

564 

565 def _set_closing(self, code: int) -> None: 

566 """Set the close code and mark the connection as closing.""" 

567 self._closing = True 

568 self._close_code = code 

569 self._cancel_heartbeat() 

570 

571 def _set_code_close_transport(self, code: int) -> None: 

572 """Set the close code and close the transport.""" 

573 self._close_code = code 

574 self._close_transport() 

575 

576 def _close_transport(self) -> None: 

577 """Close the transport.""" 

578 if self._req is not None and self._req.transport is not None: 

579 self._req.transport.close() 

580 

581 @overload 

582 async def receive( 

583 self: "WebSocketResponse[Literal[True]]", timeout: float | None = None 

584 ) -> WSMessageDecodeText: ... 

585 

586 @overload 

587 async def receive( 

588 self: "WebSocketResponse[Literal[False]]", timeout: float | None = None 

589 ) -> WSMessageNoDecodeText: ... 

590 

591 @overload 

592 async def receive( 

593 self: "WebSocketResponse[_DecodeText]", timeout: float | None = None 

594 ) -> WSMessageDecodeText | WSMessageNoDecodeText: ... 

595 

596 async def receive( 

597 self, timeout: float | None = None 

598 ) -> WSMessageDecodeText | WSMessageNoDecodeText: 

599 if self._reader is None: 

600 raise RuntimeError("Call .prepare() first") 

601 

602 receive_timeout = timeout or self._receive_timeout 

603 while True: 

604 if self._waiting: 

605 raise RuntimeError("Concurrent call to receive() is not allowed") 

606 

607 if self._closed: 

608 self._conn_lost += 1 

609 if self._conn_lost >= THRESHOLD_CONNLOST_ACCESS: 

610 raise RuntimeError("WebSocket connection is closed.") 

611 return WS_CLOSED_MESSAGE 

612 elif self._closing: 

613 return WS_CLOSING_MESSAGE 

614 

615 try: 

616 self._waiting = True 

617 try: 

618 if receive_timeout: 

619 # Entering the context manager and creating 

620 # Timeout() object can take almost 50% of the 

621 # run time in this loop so we avoid it if 

622 # there is no read timeout. 

623 async with async_timeout.timeout(receive_timeout): 

624 msg = await self._reader.read() 

625 else: 

626 msg = await self._reader.read() 

627 finally: 

628 self._waiting = False 

629 if self._close_wait: 

630 set_result(self._close_wait, None) 

631 except asyncio.TimeoutError: 

632 raise 

633 except EofStream: 

634 self._close_code = WSCloseCode.OK 

635 await self.close() 

636 return WS_CLOSED_MESSAGE 

637 except WebSocketError as exc: 

638 self._close_code = exc.code 

639 await self.close(code=exc.code) 

640 return WSMessageError(data=exc) 

641 except Exception as exc: 

642 self._exception = exc 

643 self._set_closing(WSCloseCode.ABNORMAL_CLOSURE) 

644 await self.close() 

645 return WSMessageError(data=exc) 

646 

647 if msg.type not in _INTERNAL_RECEIVE_TYPES: 

648 # If its not a close/closing/ping/pong message 

649 # we can return it immediately 

650 return msg 

651 

652 if msg.type is WSMsgType.CLOSE: 

653 self._set_closing(msg.data) 

654 # Could be closed while awaiting reader. 

655 if not self._closed and self._autoclose: # type: ignore[redundant-expr] 

656 # The client is likely going to close the 

657 # connection out from under us so we do not 

658 # want to drain any pending writes as it will 

659 # likely result writing to a broken pipe. 

660 await self.close(drain=False) 

661 elif msg.type is WSMsgType.CLOSING: 

662 self._set_closing(WSCloseCode.OK) 

663 elif msg.type is WSMsgType.PING and self._autoping: 

664 await self.pong(msg.data) 

665 continue 

666 elif msg.type is WSMsgType.PONG and self._autoping: 

667 continue 

668 

669 return msg 

670 

671 @overload 

672 async def receive_str( 

673 self: "WebSocketResponse[Literal[True]]", *, timeout: float | None = None 

674 ) -> str: ... 

675 

676 @overload 

677 async def receive_str( 

678 self: "WebSocketResponse[Literal[False]]", *, timeout: float | None = None 

679 ) -> bytes: ... 

680 

681 @overload 

682 async def receive_str( 

683 self: "WebSocketResponse[_DecodeText]", *, timeout: float | None = None 

684 ) -> str | bytes: ... 

685 

686 async def receive_str(self, *, timeout: float | None = None) -> str | bytes: 

687 """Receive TEXT message. 

688 

689 Returns str when decode_text=True (default), bytes when decode_text=False. 

690 """ 

691 msg = await self.receive(timeout) 

692 if msg.type is not WSMsgType.TEXT: 

693 raise WSMessageTypeError( 

694 f"Received message {msg.type}:{msg.data!r} is not WSMsgType.TEXT" 

695 ) 

696 return msg.data 

697 

698 async def receive_bytes(self, *, timeout: float | None = None) -> bytes: 

699 msg = await self.receive(timeout) 

700 if msg.type is not WSMsgType.BINARY: 

701 raise WSMessageTypeError( 

702 f"Received message {msg.type}:{msg.data!r} is not WSMsgType.BINARY" 

703 ) 

704 return msg.data 

705 

706 @overload 

707 async def receive_json( 

708 self: "WebSocketResponse[Literal[True]]", 

709 *, 

710 loads: JSONDecoder = ..., 

711 timeout: float | None = None, 

712 ) -> Any: ... 

713 

714 @overload 

715 async def receive_json( 

716 self: "WebSocketResponse[Literal[False]]", 

717 *, 

718 loads: Callable[[bytes], Any] = ..., 

719 timeout: float | None = None, 

720 ) -> Any: ... 

721 

722 @overload 

723 async def receive_json( 

724 self: "WebSocketResponse[_DecodeText]", 

725 *, 

726 loads: JSONDecoder | Callable[[bytes], Any] = ..., 

727 timeout: float | None = None, 

728 ) -> Any: ... 

729 

730 async def receive_json( 

731 self, 

732 *, 

733 loads: JSONDecoder | Callable[[bytes], Any] = json.loads, 

734 timeout: float | None = None, 

735 ) -> Any: 

736 data = await self.receive_str(timeout=timeout) 

737 return loads(data) # type: ignore[arg-type] 

738 

739 async def write( 

740 self, data: Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

741 ) -> None: 

742 raise RuntimeError("Cannot call .write() for websocket") 

743 

744 def __aiter__(self) -> Self: 

745 return self 

746 

747 @overload 

748 async def __anext__( 

749 self: "WebSocketResponse[Literal[True]]", 

750 ) -> WSMessageDecodeText: ... 

751 

752 @overload 

753 async def __anext__( 

754 self: "WebSocketResponse[Literal[False]]", 

755 ) -> WSMessageNoDecodeText: ... 

756 

757 @overload 

758 async def __anext__( 

759 self: "WebSocketResponse[_DecodeText]", 

760 ) -> WSMessageDecodeText | WSMessageNoDecodeText: ... 

761 

762 async def __anext__(self) -> WSMessageDecodeText | WSMessageNoDecodeText: 

763 msg = await self.receive() 

764 if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED): 

765 raise StopAsyncIteration 

766 return msg 

767 

768 def _cancel(self, exc: BaseException) -> None: 

769 # web_protocol calls this from connection_lost 

770 # or when the server is shutting down. 

771 self._closing = True 

772 self._cancel_heartbeat() 

773 if self._reader is not None: 

774 set_exception(self._reader, exc)