Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client_ws.py: 19%
206 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:56 +0000
1"""WebSocket client for asyncio."""
3import asyncio
4from typing import Any, Optional, cast
6import async_timeout
8from .client_exceptions import ClientError
9from .client_reqrep import ClientResponse
10from .helpers import call_later, set_result
11from .http import (
12 WS_CLOSED_MESSAGE,
13 WS_CLOSING_MESSAGE,
14 WebSocketError,
15 WSCloseCode,
16 WSMessage,
17 WSMsgType,
18)
19from .http_websocket import WebSocketWriter # WSMessage
20from .streams import EofStream, FlowControlDataQueue
21from .typedefs import (
22 DEFAULT_JSON_DECODER,
23 DEFAULT_JSON_ENCODER,
24 JSONDecoder,
25 JSONEncoder,
26)
29class ClientWebSocketResponse:
30 def __init__(
31 self,
32 reader: "FlowControlDataQueue[WSMessage]",
33 writer: WebSocketWriter,
34 protocol: Optional[str],
35 response: ClientResponse,
36 timeout: float,
37 autoclose: bool,
38 autoping: bool,
39 loop: asyncio.AbstractEventLoop,
40 *,
41 receive_timeout: Optional[float] = None,
42 heartbeat: Optional[float] = None,
43 compress: int = 0,
44 client_notakeover: bool = False,
45 ) -> None:
46 self._response = response
47 self._conn = response.connection
49 self._writer = writer
50 self._reader = reader
51 self._protocol = protocol
52 self._closed = False
53 self._closing = False
54 self._close_code: Optional[int] = None
55 self._timeout = timeout
56 self._receive_timeout = receive_timeout
57 self._autoclose = autoclose
58 self._autoping = autoping
59 self._heartbeat = heartbeat
60 self._heartbeat_cb: Optional[asyncio.TimerHandle] = None
61 if heartbeat is not None:
62 self._pong_heartbeat = heartbeat / 2.0
63 self._pong_response_cb: Optional[asyncio.TimerHandle] = None
64 self._loop = loop
65 self._waiting: Optional[asyncio.Future[bool]] = None
66 self._exception: Optional[BaseException] = None
67 self._compress = compress
68 self._client_notakeover = client_notakeover
70 self._reset_heartbeat()
72 def _cancel_heartbeat(self) -> None:
73 if self._pong_response_cb is not None:
74 self._pong_response_cb.cancel()
75 self._pong_response_cb = None
77 if self._heartbeat_cb is not None:
78 self._heartbeat_cb.cancel()
79 self._heartbeat_cb = None
81 def _reset_heartbeat(self) -> None:
82 self._cancel_heartbeat()
84 if self._heartbeat is not None:
85 self._heartbeat_cb = call_later(
86 self._send_heartbeat, self._heartbeat, self._loop
87 )
89 def _send_heartbeat(self) -> None:
90 if self._heartbeat is not None and not self._closed:
91 # fire-and-forget a task is not perfect but maybe ok for
92 # sending ping. Otherwise we need a long-living heartbeat
93 # task in the class.
94 self._loop.create_task(self._writer.ping())
96 if self._pong_response_cb is not None:
97 self._pong_response_cb.cancel()
98 self._pong_response_cb = call_later(
99 self._pong_not_received, self._pong_heartbeat, self._loop
100 )
102 def _pong_not_received(self) -> None:
103 if not self._closed:
104 self._closed = True
105 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
106 self._exception = asyncio.TimeoutError()
107 self._response.close()
109 @property
110 def closed(self) -> bool:
111 return self._closed
113 @property
114 def close_code(self) -> Optional[int]:
115 return self._close_code
117 @property
118 def protocol(self) -> Optional[str]:
119 return self._protocol
121 @property
122 def compress(self) -> int:
123 return self._compress
125 @property
126 def client_notakeover(self) -> bool:
127 return self._client_notakeover
129 def get_extra_info(self, name: str, default: Any = None) -> Any:
130 """extra info from connection transport"""
131 conn = self._response.connection
132 if conn is None:
133 return default
134 transport = conn.transport
135 if transport is None:
136 return default
137 return transport.get_extra_info(name, default)
139 def exception(self) -> Optional[BaseException]:
140 return self._exception
142 async def ping(self, message: bytes = b"") -> None:
143 await self._writer.ping(message)
145 async def pong(self, message: bytes = b"") -> None:
146 await self._writer.pong(message)
148 async def send_str(self, data: str, compress: Optional[int] = None) -> None:
149 if not isinstance(data, str):
150 raise TypeError("data argument must be str (%r)" % type(data))
151 await self._writer.send(data, binary=False, compress=compress)
153 async def send_bytes(self, data: bytes, compress: Optional[int] = None) -> None:
154 if not isinstance(data, (bytes, bytearray, memoryview)):
155 raise TypeError("data argument must be byte-ish (%r)" % type(data))
156 await self._writer.send(data, binary=True, compress=compress)
158 async def send_json(
159 self,
160 data: Any,
161 compress: Optional[int] = None,
162 *,
163 dumps: JSONEncoder = DEFAULT_JSON_ENCODER,
164 ) -> None:
165 await self.send_str(dumps(data), compress=compress)
167 async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bool:
168 # we need to break `receive()` cycle first,
169 # `close()` may be called from different task
170 if self._waiting is not None and not self._closed:
171 self._reader.feed_data(WS_CLOSING_MESSAGE, 0)
172 await self._waiting
174 if not self._closed:
175 self._cancel_heartbeat()
176 self._closed = True
177 try:
178 await self._writer.close(code, message)
179 except asyncio.CancelledError:
180 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
181 self._response.close()
182 raise
183 except Exception as exc:
184 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
185 self._exception = exc
186 self._response.close()
187 return True
189 if self._closing:
190 self._response.close()
191 return True
193 while True:
194 try:
195 async with async_timeout.timeout(self._timeout):
196 msg = await self._reader.read()
197 except asyncio.CancelledError:
198 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
199 self._response.close()
200 raise
201 except Exception as exc:
202 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
203 self._exception = exc
204 self._response.close()
205 return True
207 if msg.type == WSMsgType.CLOSE:
208 self._close_code = msg.data
209 self._response.close()
210 return True
211 else:
212 return False
214 async def receive(self, timeout: Optional[float] = None) -> WSMessage:
215 while True:
216 if self._waiting is not None:
217 raise RuntimeError("Concurrent call to receive() is not allowed")
219 if self._closed:
220 return WS_CLOSED_MESSAGE
221 elif self._closing:
222 await self.close()
223 return WS_CLOSED_MESSAGE
225 try:
226 self._waiting = self._loop.create_future()
227 try:
228 async with async_timeout.timeout(timeout or self._receive_timeout):
229 msg = await self._reader.read()
230 self._reset_heartbeat()
231 finally:
232 waiter = self._waiting
233 self._waiting = None
234 set_result(waiter, True)
235 except (asyncio.CancelledError, asyncio.TimeoutError):
236 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
237 raise
238 except EofStream:
239 self._close_code = WSCloseCode.OK
240 await self.close()
241 return WSMessage(WSMsgType.CLOSED, None, None)
242 except ClientError:
243 self._closed = True
244 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
245 return WS_CLOSED_MESSAGE
246 except WebSocketError as exc:
247 self._close_code = exc.code
248 await self.close(code=exc.code)
249 return WSMessage(WSMsgType.ERROR, exc, None)
250 except Exception as exc:
251 self._exception = exc
252 self._closing = True
253 self._close_code = WSCloseCode.ABNORMAL_CLOSURE
254 await self.close()
255 return WSMessage(WSMsgType.ERROR, exc, None)
257 if msg.type == WSMsgType.CLOSE:
258 self._closing = True
259 self._close_code = msg.data
260 if not self._closed and self._autoclose:
261 await self.close()
262 elif msg.type == WSMsgType.CLOSING:
263 self._closing = True
264 elif msg.type == WSMsgType.PING and self._autoping:
265 await self.pong(msg.data)
266 continue
267 elif msg.type == WSMsgType.PONG and self._autoping:
268 continue
270 return msg
272 async def receive_str(self, *, timeout: Optional[float] = None) -> str:
273 msg = await self.receive(timeout)
274 if msg.type != WSMsgType.TEXT:
275 raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str")
276 return cast(str, msg.data)
278 async def receive_bytes(self, *, timeout: Optional[float] = None) -> bytes:
279 msg = await self.receive(timeout)
280 if msg.type != WSMsgType.BINARY:
281 raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes")
282 return cast(bytes, msg.data)
284 async def receive_json(
285 self,
286 *,
287 loads: JSONDecoder = DEFAULT_JSON_DECODER,
288 timeout: Optional[float] = None,
289 ) -> Any:
290 data = await self.receive_str(timeout=timeout)
291 return loads(data)
293 def __aiter__(self) -> "ClientWebSocketResponse":
294 return self
296 async def __anext__(self) -> WSMessage:
297 msg = await self.receive()
298 if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
299 raise StopAsyncIteration
300 return msg