Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client_ws.py: 20%
210 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1"""WebSocket client for asyncio."""
3import asyncio
4import sys
5from typing import Any, Optional, cast
7from .client_exceptions import ClientError
8from .client_reqrep import ClientResponse
9from .helpers import call_later, set_result
10from .http import (
11 WS_CLOSED_MESSAGE,
12 WS_CLOSING_MESSAGE,
13 WebSocketError,
14 WSCloseCode,
15 WSMessage,
16 WSMsgType,
17)
18from .http_websocket import WebSocketWriter # WSMessage
19from .streams import EofStream, FlowControlDataQueue
20from .typedefs import (
21 DEFAULT_JSON_DECODER,
22 DEFAULT_JSON_ENCODER,
23 JSONDecoder,
24 JSONEncoder,
25)
27if sys.version_info >= (3, 11):
28 import asyncio as async_timeout
29else:
30 import async_timeout
33class ClientWebSocketResponse:
34 def __init__(
35 self,
36 reader: "FlowControlDataQueue[WSMessage]",
37 writer: WebSocketWriter,
38 protocol: Optional[str],
39 response: ClientResponse,
40 timeout: float,
41 autoclose: bool,
42 autoping: bool,
43 loop: asyncio.AbstractEventLoop,
44 *,
45 receive_timeout: Optional[float] = None,
46 heartbeat: Optional[float] = None,
47 compress: int = 0,
48 client_notakeover: bool = False,
49 ) -> None:
50 self._response = response
51 self._conn = response.connection
53 self._writer = writer
54 self._reader = reader
55 self._protocol = protocol
56 self._closed = False
57 self._closing = False
58 self._close_code: Optional[int] = None
59 self._timeout = timeout
60 self._receive_timeout = receive_timeout
61 self._autoclose = autoclose
62 self._autoping = autoping
63 self._heartbeat = heartbeat
64 self._heartbeat_cb: Optional[asyncio.TimerHandle] = None
65 if heartbeat is not None:
66 self._pong_heartbeat = heartbeat / 2.0
67 self._pong_response_cb: Optional[asyncio.TimerHandle] = None
68 self._loop = loop
69 self._waiting: Optional[asyncio.Future[bool]] = None
70 self._exception: Optional[BaseException] = None
71 self._compress = compress
72 self._client_notakeover = client_notakeover
74 self._reset_heartbeat()
76 def _cancel_heartbeat(self) -> None:
77 if self._pong_response_cb is not None:
78 self._pong_response_cb.cancel()
79 self._pong_response_cb = None
81 if self._heartbeat_cb is not None:
82 self._heartbeat_cb.cancel()
83 self._heartbeat_cb = None
85 def _reset_heartbeat(self) -> None:
86 self._cancel_heartbeat()
88 if self._heartbeat is not None:
89 self._heartbeat_cb = call_later(
90 self._send_heartbeat,
91 self._heartbeat,
92 self._loop,
93 timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold
94 if self._conn is not None
95 else 5,
96 )
98 def _send_heartbeat(self) -> None:
99 if self._heartbeat is not None and not self._closed:
100 # fire-and-forget a task is not perfect but maybe ok for
101 # sending ping. Otherwise we need a long-living heartbeat
102 # task in the class.
103 self._loop.create_task(self._writer.ping())
105 if self._pong_response_cb is not None:
106 self._pong_response_cb.cancel()
107 self._pong_response_cb = call_later(
108 self._pong_not_received,
109 self._pong_heartbeat,
110 self._loop,
111 timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold
112 if self._conn is not None
113 else 5,
114 )
116 def _pong_not_received(self) -> None:
117 if not self._closed:
118 self._closed = True
119 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
120 self._exception = asyncio.TimeoutError()
121 self._response.close()
123 @property
124 def closed(self) -> bool:
125 return self._closed
127 @property
128 def close_code(self) -> Optional[int]:
129 return self._close_code
131 @property
132 def protocol(self) -> Optional[str]:
133 return self._protocol
135 @property
136 def compress(self) -> int:
137 return self._compress
139 @property
140 def client_notakeover(self) -> bool:
141 return self._client_notakeover
143 def get_extra_info(self, name: str, default: Any = None) -> Any:
144 """extra info from connection transport"""
145 conn = self._response.connection
146 if conn is None:
147 return default
148 transport = conn.transport
149 if transport is None:
150 return default
151 return transport.get_extra_info(name, default)
153 def exception(self) -> Optional[BaseException]:
154 return self._exception
156 async def ping(self, message: bytes = b"") -> None:
157 await self._writer.ping(message)
159 async def pong(self, message: bytes = b"") -> None:
160 await self._writer.pong(message)
162 async def send_str(self, data: str, compress: Optional[int] = None) -> None:
163 if not isinstance(data, str):
164 raise TypeError("data argument must be str (%r)" % type(data))
165 await self._writer.send(data, binary=False, compress=compress)
167 async def send_bytes(self, data: bytes, compress: Optional[int] = None) -> None:
168 if not isinstance(data, (bytes, bytearray, memoryview)):
169 raise TypeError("data argument must be byte-ish (%r)" % type(data))
170 await self._writer.send(data, binary=True, compress=compress)
172 async def send_json(
173 self,
174 data: Any,
175 compress: Optional[int] = None,
176 *,
177 dumps: JSONEncoder = DEFAULT_JSON_ENCODER,
178 ) -> None:
179 await self.send_str(dumps(data), compress=compress)
181 async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bool:
182 # we need to break `receive()` cycle first,
183 # `close()` may be called from different task
184 if self._waiting is not None and not self._closing:
185 self._closing = True
186 self._reader.feed_data(WS_CLOSING_MESSAGE, 0)
187 await self._waiting
189 if not self._closed:
190 self._cancel_heartbeat()
191 self._closed = True
192 try:
193 await self._writer.close(code, message)
194 except asyncio.CancelledError:
195 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
196 self._response.close()
197 raise
198 except Exception as exc:
199 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
200 self._exception = exc
201 self._response.close()
202 return True
204 if self._close_code:
205 self._response.close()
206 return True
208 while True:
209 try:
210 async with async_timeout.timeout(self._timeout):
211 msg = await self._reader.read()
212 except asyncio.CancelledError:
213 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
214 self._response.close()
215 raise
216 except Exception as exc:
217 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
218 self._exception = exc
219 self._response.close()
220 return True
222 if msg.type == WSMsgType.CLOSE:
223 self._close_code = msg.data
224 self._response.close()
225 return True
226 else:
227 return False
229 async def receive(self, timeout: Optional[float] = None) -> WSMessage:
230 while True:
231 if self._waiting is not None:
232 raise RuntimeError("Concurrent call to receive() is not allowed")
234 if self._closed:
235 return WS_CLOSED_MESSAGE
236 elif self._closing:
237 await self.close()
238 return WS_CLOSED_MESSAGE
240 try:
241 self._waiting = self._loop.create_future()
242 try:
243 async with async_timeout.timeout(timeout or self._receive_timeout):
244 msg = await self._reader.read()
245 self._reset_heartbeat()
246 finally:
247 waiter = self._waiting
248 self._waiting = None
249 set_result(waiter, True)
250 except (asyncio.CancelledError, asyncio.TimeoutError):
251 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
252 raise
253 except EofStream:
254 self._close_code = WSCloseCode.OK
255 await self.close()
256 return WSMessage(WSMsgType.CLOSED, None, None)
257 except ClientError:
258 self._closed = True
259 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
260 return WS_CLOSED_MESSAGE
261 except WebSocketError as exc:
262 self._close_code = exc.code
263 await self.close(code=exc.code)
264 return WSMessage(WSMsgType.ERROR, exc, None)
265 except Exception as exc:
266 self._exception = exc
267 self._closing = True
268 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
269 await self.close()
270 return WSMessage(WSMsgType.ERROR, exc, None)
272 if msg.type == WSMsgType.CLOSE:
273 self._closing = True
274 self._close_code = msg.data
275 if not self._closed and self._autoclose:
276 await self.close()
277 elif msg.type == WSMsgType.CLOSING:
278 self._closing = True
279 elif msg.type == WSMsgType.PING and self._autoping:
280 await self.pong(msg.data)
281 continue
282 elif msg.type == WSMsgType.PONG and self._autoping:
283 continue
285 return msg
287 async def receive_str(self, *, timeout: Optional[float] = None) -> str:
288 msg = await self.receive(timeout)
289 if msg.type != WSMsgType.TEXT:
290 raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str")
291 return cast(str, msg.data)
293 async def receive_bytes(self, *, timeout: Optional[float] = None) -> bytes:
294 msg = await self.receive(timeout)
295 if msg.type != WSMsgType.BINARY:
296 raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes")
297 return cast(bytes, msg.data)
299 async def receive_json(
300 self,
301 *,
302 loads: JSONDecoder = DEFAULT_JSON_DECODER,
303 timeout: Optional[float] = None,
304 ) -> Any:
305 data = await self.receive_str(timeout=timeout)
306 return loads(data)
308 def __aiter__(self) -> "ClientWebSocketResponse":
309 return self
311 async def __anext__(self) -> WSMessage:
312 msg = await self.receive()
313 if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
314 raise StopAsyncIteration
315 return msg