Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client_ws.py: 20%

210 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1"""WebSocket client for asyncio.""" 

2 

3import asyncio 

4import sys 

5from typing import Any, Optional, cast 

6 

7from .client_exceptions import ClientError 

8from .client_reqrep import ClientResponse 

9from .helpers import call_later, set_result 

10from .http import ( 

11 WS_CLOSED_MESSAGE, 

12 WS_CLOSING_MESSAGE, 

13 WebSocketError, 

14 WSCloseCode, 

15 WSMessage, 

16 WSMsgType, 

17) 

18from .http_websocket import WebSocketWriter # WSMessage 

19from .streams import EofStream, FlowControlDataQueue 

20from .typedefs import ( 

21 DEFAULT_JSON_DECODER, 

22 DEFAULT_JSON_ENCODER, 

23 JSONDecoder, 

24 JSONEncoder, 

25) 

26 

27if sys.version_info >= (3, 11): 

28 import asyncio as async_timeout 

29else: 

30 import async_timeout 

31 

32 

33class ClientWebSocketResponse: 

34 def __init__( 

35 self, 

36 reader: "FlowControlDataQueue[WSMessage]", 

37 writer: WebSocketWriter, 

38 protocol: Optional[str], 

39 response: ClientResponse, 

40 timeout: float, 

41 autoclose: bool, 

42 autoping: bool, 

43 loop: asyncio.AbstractEventLoop, 

44 *, 

45 receive_timeout: Optional[float] = None, 

46 heartbeat: Optional[float] = None, 

47 compress: int = 0, 

48 client_notakeover: bool = False, 

49 ) -> None: 

50 self._response = response 

51 self._conn = response.connection 

52 

53 self._writer = writer 

54 self._reader = reader 

55 self._protocol = protocol 

56 self._closed = False 

57 self._closing = False 

58 self._close_code: Optional[int] = None 

59 self._timeout = timeout 

60 self._receive_timeout = receive_timeout 

61 self._autoclose = autoclose 

62 self._autoping = autoping 

63 self._heartbeat = heartbeat 

64 self._heartbeat_cb: Optional[asyncio.TimerHandle] = None 

65 if heartbeat is not None: 

66 self._pong_heartbeat = heartbeat / 2.0 

67 self._pong_response_cb: Optional[asyncio.TimerHandle] = None 

68 self._loop = loop 

69 self._waiting: Optional[asyncio.Future[bool]] = None 

70 self._exception: Optional[BaseException] = None 

71 self._compress = compress 

72 self._client_notakeover = client_notakeover 

73 

74 self._reset_heartbeat() 

75 

76 def _cancel_heartbeat(self) -> None: 

77 if self._pong_response_cb is not None: 

78 self._pong_response_cb.cancel() 

79 self._pong_response_cb = None 

80 

81 if self._heartbeat_cb is not None: 

82 self._heartbeat_cb.cancel() 

83 self._heartbeat_cb = None 

84 

85 def _reset_heartbeat(self) -> None: 

86 self._cancel_heartbeat() 

87 

88 if self._heartbeat is not None: 

89 self._heartbeat_cb = call_later( 

90 self._send_heartbeat, 

91 self._heartbeat, 

92 self._loop, 

93 timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold 

94 if self._conn is not None 

95 else 5, 

96 ) 

97 

98 def _send_heartbeat(self) -> None: 

99 if self._heartbeat is not None and not self._closed: 

100 # fire-and-forget a task is not perfect but maybe ok for 

101 # sending ping. Otherwise we need a long-living heartbeat 

102 # task in the class. 

103 self._loop.create_task(self._writer.ping()) 

104 

105 if self._pong_response_cb is not None: 

106 self._pong_response_cb.cancel() 

107 self._pong_response_cb = call_later( 

108 self._pong_not_received, 

109 self._pong_heartbeat, 

110 self._loop, 

111 timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold 

112 if self._conn is not None 

113 else 5, 

114 ) 

115 

116 def _pong_not_received(self) -> None: 

117 if not self._closed: 

118 self._closed = True 

119 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

120 self._exception = asyncio.TimeoutError() 

121 self._response.close() 

122 

123 @property 

124 def closed(self) -> bool: 

125 return self._closed 

126 

127 @property 

128 def close_code(self) -> Optional[int]: 

129 return self._close_code 

130 

131 @property 

132 def protocol(self) -> Optional[str]: 

133 return self._protocol 

134 

135 @property 

136 def compress(self) -> int: 

137 return self._compress 

138 

139 @property 

140 def client_notakeover(self) -> bool: 

141 return self._client_notakeover 

142 

143 def get_extra_info(self, name: str, default: Any = None) -> Any: 

144 """extra info from connection transport""" 

145 conn = self._response.connection 

146 if conn is None: 

147 return default 

148 transport = conn.transport 

149 if transport is None: 

150 return default 

151 return transport.get_extra_info(name, default) 

152 

153 def exception(self) -> Optional[BaseException]: 

154 return self._exception 

155 

156 async def ping(self, message: bytes = b"") -> None: 

157 await self._writer.ping(message) 

158 

159 async def pong(self, message: bytes = b"") -> None: 

160 await self._writer.pong(message) 

161 

162 async def send_str(self, data: str, compress: Optional[int] = None) -> None: 

163 if not isinstance(data, str): 

164 raise TypeError("data argument must be str (%r)" % type(data)) 

165 await self._writer.send(data, binary=False, compress=compress) 

166 

167 async def send_bytes(self, data: bytes, compress: Optional[int] = None) -> None: 

168 if not isinstance(data, (bytes, bytearray, memoryview)): 

169 raise TypeError("data argument must be byte-ish (%r)" % type(data)) 

170 await self._writer.send(data, binary=True, compress=compress) 

171 

172 async def send_json( 

173 self, 

174 data: Any, 

175 compress: Optional[int] = None, 

176 *, 

177 dumps: JSONEncoder = DEFAULT_JSON_ENCODER, 

178 ) -> None: 

179 await self.send_str(dumps(data), compress=compress) 

180 

181 async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bool: 

182 # we need to break `receive()` cycle first, 

183 # `close()` may be called from different task 

184 if self._waiting is not None and not self._closing: 

185 self._closing = True 

186 self._reader.feed_data(WS_CLOSING_MESSAGE, 0) 

187 await self._waiting 

188 

189 if not self._closed: 

190 self._cancel_heartbeat() 

191 self._closed = True 

192 try: 

193 await self._writer.close(code, message) 

194 except asyncio.CancelledError: 

195 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

196 self._response.close() 

197 raise 

198 except Exception as exc: 

199 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

200 self._exception = exc 

201 self._response.close() 

202 return True 

203 

204 if self._close_code: 

205 self._response.close() 

206 return True 

207 

208 while True: 

209 try: 

210 async with async_timeout.timeout(self._timeout): 

211 msg = await self._reader.read() 

212 except asyncio.CancelledError: 

213 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

214 self._response.close() 

215 raise 

216 except Exception as exc: 

217 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

218 self._exception = exc 

219 self._response.close() 

220 return True 

221 

222 if msg.type == WSMsgType.CLOSE: 

223 self._close_code = msg.data 

224 self._response.close() 

225 return True 

226 else: 

227 return False 

228 

229 async def receive(self, timeout: Optional[float] = None) -> WSMessage: 

230 while True: 

231 if self._waiting is not None: 

232 raise RuntimeError("Concurrent call to receive() is not allowed") 

233 

234 if self._closed: 

235 return WS_CLOSED_MESSAGE 

236 elif self._closing: 

237 await self.close() 

238 return WS_CLOSED_MESSAGE 

239 

240 try: 

241 self._waiting = self._loop.create_future() 

242 try: 

243 async with async_timeout.timeout(timeout or self._receive_timeout): 

244 msg = await self._reader.read() 

245 self._reset_heartbeat() 

246 finally: 

247 waiter = self._waiting 

248 self._waiting = None 

249 set_result(waiter, True) 

250 except (asyncio.CancelledError, asyncio.TimeoutError): 

251 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

252 raise 

253 except EofStream: 

254 self._close_code = WSCloseCode.OK 

255 await self.close() 

256 return WSMessage(WSMsgType.CLOSED, None, None) 

257 except ClientError: 

258 self._closed = True 

259 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

260 return WS_CLOSED_MESSAGE 

261 except WebSocketError as exc: 

262 self._close_code = exc.code 

263 await self.close(code=exc.code) 

264 return WSMessage(WSMsgType.ERROR, exc, None) 

265 except Exception as exc: 

266 self._exception = exc 

267 self._closing = True 

268 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

269 await self.close() 

270 return WSMessage(WSMsgType.ERROR, exc, None) 

271 

272 if msg.type == WSMsgType.CLOSE: 

273 self._closing = True 

274 self._close_code = msg.data 

275 if not self._closed and self._autoclose: 

276 await self.close() 

277 elif msg.type == WSMsgType.CLOSING: 

278 self._closing = True 

279 elif msg.type == WSMsgType.PING and self._autoping: 

280 await self.pong(msg.data) 

281 continue 

282 elif msg.type == WSMsgType.PONG and self._autoping: 

283 continue 

284 

285 return msg 

286 

287 async def receive_str(self, *, timeout: Optional[float] = None) -> str: 

288 msg = await self.receive(timeout) 

289 if msg.type != WSMsgType.TEXT: 

290 raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str") 

291 return cast(str, msg.data) 

292 

293 async def receive_bytes(self, *, timeout: Optional[float] = None) -> bytes: 

294 msg = await self.receive(timeout) 

295 if msg.type != WSMsgType.BINARY: 

296 raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes") 

297 return cast(bytes, msg.data) 

298 

299 async def receive_json( 

300 self, 

301 *, 

302 loads: JSONDecoder = DEFAULT_JSON_DECODER, 

303 timeout: Optional[float] = None, 

304 ) -> Any: 

305 data = await self.receive_str(timeout=timeout) 

306 return loads(data) 

307 

308 def __aiter__(self) -> "ClientWebSocketResponse": 

309 return self 

310 

311 async def __anext__(self) -> WSMessage: 

312 msg = await self.receive() 

313 if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED): 

314 raise StopAsyncIteration 

315 return msg