Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_ws.py: 22%
212 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1"""WebSocket client for asyncio."""
3import asyncio
4import dataclasses
5from typing import Any, Optional, cast
7import async_timeout
8from typing_extensions import Final
10from .client_exceptions import ClientError
11from .client_reqrep import ClientResponse
12from .helpers import call_later, set_result
13from .http import (
14 WS_CLOSED_MESSAGE,
15 WS_CLOSING_MESSAGE,
16 WebSocketError,
17 WSCloseCode,
18 WSMessage,
19 WSMsgType,
20)
21from .http_websocket import WebSocketWriter # WSMessage
22from .streams import EofStream, FlowControlDataQueue
23from .typedefs import (
24 DEFAULT_JSON_DECODER,
25 DEFAULT_JSON_ENCODER,
26 JSONDecoder,
27 JSONEncoder,
28)
31@dataclasses.dataclass(frozen=True)
32class ClientWSTimeout:
33 ws_receive: Optional[float] = None
34 ws_close: Optional[float] = None
37DEFAULT_WS_CLIENT_TIMEOUT: Final[ClientWSTimeout] = ClientWSTimeout(
38 ws_receive=None, ws_close=10.0
39)
42class ClientWebSocketResponse:
43 def __init__(
44 self,
45 reader: "FlowControlDataQueue[WSMessage]",
46 writer: WebSocketWriter,
47 protocol: Optional[str],
48 response: ClientResponse,
49 timeout: ClientWSTimeout,
50 autoclose: bool,
51 autoping: bool,
52 loop: asyncio.AbstractEventLoop,
53 *,
54 heartbeat: Optional[float] = None,
55 compress: int = 0,
56 client_notakeover: bool = False,
57 ) -> None:
58 self._response = response
59 self._conn = response.connection
61 self._writer = writer
62 self._reader = reader
63 self._protocol = protocol
64 self._closed = False
65 self._closing = False
66 self._close_code: Optional[int] = None
67 self._timeout: ClientWSTimeout = timeout
68 self._autoclose = autoclose
69 self._autoping = autoping
70 self._heartbeat = heartbeat
71 self._heartbeat_cb: Optional[asyncio.TimerHandle] = None
72 if heartbeat is not None:
73 self._pong_heartbeat = heartbeat / 2.0
74 self._pong_response_cb: Optional[asyncio.TimerHandle] = None
75 self._loop = loop
76 self._waiting: Optional[asyncio.Future[bool]] = None
77 self._exception: Optional[BaseException] = None
78 self._compress = compress
79 self._client_notakeover = client_notakeover
81 self._reset_heartbeat()
83 def _cancel_heartbeat(self) -> None:
84 if self._pong_response_cb is not None:
85 self._pong_response_cb.cancel()
86 self._pong_response_cb = None
88 if self._heartbeat_cb is not None:
89 self._heartbeat_cb.cancel()
90 self._heartbeat_cb = None
92 def _reset_heartbeat(self) -> None:
93 self._cancel_heartbeat()
95 if self._heartbeat is not None:
96 self._heartbeat_cb = call_later(
97 self._send_heartbeat,
98 self._heartbeat,
99 self._loop,
100 timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold
101 if self._conn is not None
102 else 5,
103 )
105 def _send_heartbeat(self) -> None:
106 if self._heartbeat is not None and not self._closed:
107 # fire-and-forget a task is not perfect but maybe ok for
108 # sending ping. Otherwise we need a long-living heartbeat
109 # task in the class.
110 self._loop.create_task(self._writer.ping())
112 if self._pong_response_cb is not None:
113 self._pong_response_cb.cancel()
114 self._pong_response_cb = call_later(
115 self._pong_not_received,
116 self._pong_heartbeat,
117 self._loop,
118 timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold
119 if self._conn is not None
120 else 5,
121 )
123 def _pong_not_received(self) -> None:
124 if not self._closed:
125 self._closed = True
126 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
127 self._exception = asyncio.TimeoutError()
128 self._response.close()
130 @property
131 def closed(self) -> bool:
132 return self._closed
134 @property
135 def close_code(self) -> Optional[int]:
136 return self._close_code
138 @property
139 def protocol(self) -> Optional[str]:
140 return self._protocol
142 @property
143 def compress(self) -> int:
144 return self._compress
146 @property
147 def client_notakeover(self) -> bool:
148 return self._client_notakeover
150 def get_extra_info(self, name: str, default: Any = None) -> Any:
151 """extra info from connection transport"""
152 conn = self._response.connection
153 if conn is None:
154 return default
155 transport = conn.transport
156 if transport is None:
157 return default
158 return transport.get_extra_info(name, default)
160 def exception(self) -> Optional[BaseException]:
161 return self._exception
163 async def ping(self, message: bytes = b"") -> None:
164 await self._writer.ping(message)
166 async def pong(self, message: bytes = b"") -> None:
167 await self._writer.pong(message)
169 async def send_str(self, data: str, compress: Optional[int] = None) -> None:
170 if not isinstance(data, str):
171 raise TypeError("data argument must be str (%r)" % type(data))
172 await self._writer.send(data, binary=False, compress=compress)
174 async def send_bytes(self, data: bytes, compress: Optional[int] = None) -> None:
175 if not isinstance(data, (bytes, bytearray, memoryview)):
176 raise TypeError("data argument must be byte-ish (%r)" % type(data))
177 await self._writer.send(data, binary=True, compress=compress)
179 async def send_json(
180 self,
181 data: Any,
182 compress: Optional[int] = None,
183 *,
184 dumps: JSONEncoder = DEFAULT_JSON_ENCODER,
185 ) -> None:
186 await self.send_str(dumps(data), compress=compress)
188 async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bool:
189 # we need to break `receive()` cycle first,
190 # `close()` may be called from different task
191 if self._waiting is not None and not self._closed:
192 self._reader.feed_data(WS_CLOSING_MESSAGE, 0)
193 await self._waiting
195 if not self._closed:
196 self._cancel_heartbeat()
197 self._closed = True
198 try:
199 await self._writer.close(code, message)
200 except asyncio.CancelledError:
201 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
202 self._response.close()
203 raise
204 except Exception as exc:
205 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
206 self._exception = exc
207 self._response.close()
208 return True
210 if self._closing:
211 self._response.close()
212 return True
214 while True:
215 try:
216 async with async_timeout.timeout(self._timeout.ws_close):
217 msg = await self._reader.read()
218 except asyncio.CancelledError:
219 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
220 self._response.close()
221 raise
222 except Exception as exc:
223 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
224 self._exception = exc
225 self._response.close()
226 return True
228 if msg.type == WSMsgType.CLOSE:
229 self._close_code = msg.data
230 self._response.close()
231 return True
232 else:
233 return False
235 async def receive(self, timeout: Optional[float] = None) -> WSMessage:
236 while True:
237 if self._waiting is not None:
238 raise RuntimeError("Concurrent call to receive() is not allowed")
240 if self._closed:
241 return WS_CLOSED_MESSAGE
242 elif self._closing:
243 await self.close()
244 return WS_CLOSED_MESSAGE
246 try:
247 self._waiting = self._loop.create_future()
248 try:
249 async with async_timeout.timeout(
250 timeout or self._timeout.ws_receive
251 ):
252 msg = await self._reader.read()
253 self._reset_heartbeat()
254 finally:
255 waiter = self._waiting
256 self._waiting = None
257 set_result(waiter, True)
258 except (asyncio.CancelledError, asyncio.TimeoutError):
259 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
260 raise
261 except EofStream:
262 self._close_code = WSCloseCode.OK
263 await self.close()
264 return WSMessage(WSMsgType.CLOSED, None, None)
265 except ClientError:
266 self._closed = True
267 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
268 return WS_CLOSED_MESSAGE
269 except WebSocketError as exc:
270 self._close_code = exc.code
271 await self.close(code=exc.code)
272 return WSMessage(WSMsgType.ERROR, exc, None)
273 except Exception as exc:
274 self._exception = exc
275 self._closing = True
276 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
277 await self.close()
278 return WSMessage(WSMsgType.ERROR, exc, None)
280 if msg.type == WSMsgType.CLOSE:
281 self._closing = True
282 self._close_code = msg.data
283 if not self._closed and self._autoclose:
284 await self.close()
285 elif msg.type == WSMsgType.CLOSING:
286 self._closing = True
287 elif msg.type == WSMsgType.PING and self._autoping:
288 await self.pong(msg.data)
289 continue
290 elif msg.type == WSMsgType.PONG and self._autoping:
291 continue
293 return msg
295 async def receive_str(self, *, timeout: Optional[float] = None) -> str:
296 msg = await self.receive(timeout)
297 if msg.type != WSMsgType.TEXT:
298 raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str")
299 return cast(str, msg.data)
301 async def receive_bytes(self, *, timeout: Optional[float] = None) -> bytes:
302 msg = await self.receive(timeout)
303 if msg.type != WSMsgType.BINARY:
304 raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes")
305 return cast(bytes, msg.data)
307 async def receive_json(
308 self,
309 *,
310 loads: JSONDecoder = DEFAULT_JSON_DECODER,
311 timeout: Optional[float] = None,
312 ) -> Any:
313 data = await self.receive_str(timeout=timeout)
314 return loads(data)
316 def __aiter__(self) -> "ClientWebSocketResponse":
317 return self
319 async def __anext__(self) -> WSMessage:
320 msg = await self.receive()
321 if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
322 raise StopAsyncIteration
323 return msg