Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/client_ws.py: 19%

206 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:56 +0000

1"""WebSocket client for asyncio.""" 

2 

3import asyncio 

4from typing import Any, Optional, cast 

5 

6import async_timeout 

7 

8from .client_exceptions import ClientError 

9from .client_reqrep import ClientResponse 

10from .helpers import call_later, set_result 

11from .http import ( 

12 WS_CLOSED_MESSAGE, 

13 WS_CLOSING_MESSAGE, 

14 WebSocketError, 

15 WSCloseCode, 

16 WSMessage, 

17 WSMsgType, 

18) 

19from .http_websocket import WebSocketWriter # WSMessage 

20from .streams import EofStream, FlowControlDataQueue 

21from .typedefs import ( 

22 DEFAULT_JSON_DECODER, 

23 DEFAULT_JSON_ENCODER, 

24 JSONDecoder, 

25 JSONEncoder, 

26) 

27 

28 

29class ClientWebSocketResponse: 

30 def __init__( 

31 self, 

32 reader: "FlowControlDataQueue[WSMessage]", 

33 writer: WebSocketWriter, 

34 protocol: Optional[str], 

35 response: ClientResponse, 

36 timeout: float, 

37 autoclose: bool, 

38 autoping: bool, 

39 loop: asyncio.AbstractEventLoop, 

40 *, 

41 receive_timeout: Optional[float] = None, 

42 heartbeat: Optional[float] = None, 

43 compress: int = 0, 

44 client_notakeover: bool = False, 

45 ) -> None: 

46 self._response = response 

47 self._conn = response.connection 

48 

49 self._writer = writer 

50 self._reader = reader 

51 self._protocol = protocol 

52 self._closed = False 

53 self._closing = False 

54 self._close_code: Optional[int] = None 

55 self._timeout = timeout 

56 self._receive_timeout = receive_timeout 

57 self._autoclose = autoclose 

58 self._autoping = autoping 

59 self._heartbeat = heartbeat 

60 self._heartbeat_cb: Optional[asyncio.TimerHandle] = None 

61 if heartbeat is not None: 

62 self._pong_heartbeat = heartbeat / 2.0 

63 self._pong_response_cb: Optional[asyncio.TimerHandle] = None 

64 self._loop = loop 

65 self._waiting: Optional[asyncio.Future[bool]] = None 

66 self._exception: Optional[BaseException] = None 

67 self._compress = compress 

68 self._client_notakeover = client_notakeover 

69 

70 self._reset_heartbeat() 

71 

72 def _cancel_heartbeat(self) -> None: 

73 if self._pong_response_cb is not None: 

74 self._pong_response_cb.cancel() 

75 self._pong_response_cb = None 

76 

77 if self._heartbeat_cb is not None: 

78 self._heartbeat_cb.cancel() 

79 self._heartbeat_cb = None 

80 

81 def _reset_heartbeat(self) -> None: 

82 self._cancel_heartbeat() 

83 

84 if self._heartbeat is not None: 

85 self._heartbeat_cb = call_later( 

86 self._send_heartbeat, self._heartbeat, self._loop 

87 ) 

88 

89 def _send_heartbeat(self) -> None: 

90 if self._heartbeat is not None and not self._closed: 

91 # fire-and-forget a task is not perfect but maybe ok for 

92 # sending ping. Otherwise we need a long-living heartbeat 

93 # task in the class. 

94 self._loop.create_task(self._writer.ping()) 

95 

96 if self._pong_response_cb is not None: 

97 self._pong_response_cb.cancel() 

98 self._pong_response_cb = call_later( 

99 self._pong_not_received, self._pong_heartbeat, self._loop 

100 ) 

101 

102 def _pong_not_received(self) -> None: 

103 if not self._closed: 

104 self._closed = True 

105 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

106 self._exception = asyncio.TimeoutError() 

107 self._response.close() 

108 

109 @property 

110 def closed(self) -> bool: 

111 return self._closed 

112 

113 @property 

114 def close_code(self) -> Optional[int]: 

115 return self._close_code 

116 

117 @property 

118 def protocol(self) -> Optional[str]: 

119 return self._protocol 

120 

121 @property 

122 def compress(self) -> int: 

123 return self._compress 

124 

125 @property 

126 def client_notakeover(self) -> bool: 

127 return self._client_notakeover 

128 

129 def get_extra_info(self, name: str, default: Any = None) -> Any: 

130 """extra info from connection transport""" 

131 conn = self._response.connection 

132 if conn is None: 

133 return default 

134 transport = conn.transport 

135 if transport is None: 

136 return default 

137 return transport.get_extra_info(name, default) 

138 

139 def exception(self) -> Optional[BaseException]: 

140 return self._exception 

141 

142 async def ping(self, message: bytes = b"") -> None: 

143 await self._writer.ping(message) 

144 

145 async def pong(self, message: bytes = b"") -> None: 

146 await self._writer.pong(message) 

147 

148 async def send_str(self, data: str, compress: Optional[int] = None) -> None: 

149 if not isinstance(data, str): 

150 raise TypeError("data argument must be str (%r)" % type(data)) 

151 await self._writer.send(data, binary=False, compress=compress) 

152 

153 async def send_bytes(self, data: bytes, compress: Optional[int] = None) -> None: 

154 if not isinstance(data, (bytes, bytearray, memoryview)): 

155 raise TypeError("data argument must be byte-ish (%r)" % type(data)) 

156 await self._writer.send(data, binary=True, compress=compress) 

157 

158 async def send_json( 

159 self, 

160 data: Any, 

161 compress: Optional[int] = None, 

162 *, 

163 dumps: JSONEncoder = DEFAULT_JSON_ENCODER, 

164 ) -> None: 

165 await self.send_str(dumps(data), compress=compress) 

166 

167 async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bool: 

168 # we need to break `receive()` cycle first, 

169 # `close()` may be called from different task 

170 if self._waiting is not None and not self._closed: 

171 self._reader.feed_data(WS_CLOSING_MESSAGE, 0) 

172 await self._waiting 

173 

174 if not self._closed: 

175 self._cancel_heartbeat() 

176 self._closed = True 

177 try: 

178 await self._writer.close(code, message) 

179 except asyncio.CancelledError: 

180 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

181 self._response.close() 

182 raise 

183 except Exception as exc: 

184 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

185 self._exception = exc 

186 self._response.close() 

187 return True 

188 

189 if self._closing: 

190 self._response.close() 

191 return True 

192 

193 while True: 

194 try: 

195 async with async_timeout.timeout(self._timeout): 

196 msg = await self._reader.read() 

197 except asyncio.CancelledError: 

198 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

199 self._response.close() 

200 raise 

201 except Exception as exc: 

202 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

203 self._exception = exc 

204 self._response.close() 

205 return True 

206 

207 if msg.type == WSMsgType.CLOSE: 

208 self._close_code = msg.data 

209 self._response.close() 

210 return True 

211 else: 

212 return False 

213 

214 async def receive(self, timeout: Optional[float] = None) -> WSMessage: 

215 while True: 

216 if self._waiting is not None: 

217 raise RuntimeError("Concurrent call to receive() is not allowed") 

218 

219 if self._closed: 

220 return WS_CLOSED_MESSAGE 

221 elif self._closing: 

222 await self.close() 

223 return WS_CLOSED_MESSAGE 

224 

225 try: 

226 self._waiting = self._loop.create_future() 

227 try: 

228 async with async_timeout.timeout(timeout or self._receive_timeout): 

229 msg = await self._reader.read() 

230 self._reset_heartbeat() 

231 finally: 

232 waiter = self._waiting 

233 self._waiting = None 

234 set_result(waiter, True) 

235 except (asyncio.CancelledError, asyncio.TimeoutError): 

236 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

237 raise 

238 except EofStream: 

239 self._close_code = WSCloseCode.OK 

240 await self.close() 

241 return WSMessage(WSMsgType.CLOSED, None, None) 

242 except ClientError: 

243 self._closed = True 

244 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

245 return WS_CLOSED_MESSAGE 

246 except WebSocketError as exc: 

247 self._close_code = exc.code 

248 await self.close(code=exc.code) 

249 return WSMessage(WSMsgType.ERROR, exc, None) 

250 except Exception as exc: 

251 self._exception = exc 

252 self._closing = True 

253 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

254 await self.close() 

255 return WSMessage(WSMsgType.ERROR, exc, None) 

256 

257 if msg.type == WSMsgType.CLOSE: 

258 self._closing = True 

259 self._close_code = msg.data 

260 if not self._closed and self._autoclose: 

261 await self.close() 

262 elif msg.type == WSMsgType.CLOSING: 

263 self._closing = True 

264 elif msg.type == WSMsgType.PING and self._autoping: 

265 await self.pong(msg.data) 

266 continue 

267 elif msg.type == WSMsgType.PONG and self._autoping: 

268 continue 

269 

270 return msg 

271 

272 async def receive_str(self, *, timeout: Optional[float] = None) -> str: 

273 msg = await self.receive(timeout) 

274 if msg.type != WSMsgType.TEXT: 

275 raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str") 

276 return cast(str, msg.data) 

277 

278 async def receive_bytes(self, *, timeout: Optional[float] = None) -> bytes: 

279 msg = await self.receive(timeout) 

280 if msg.type != WSMsgType.BINARY: 

281 raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes") 

282 return cast(bytes, msg.data) 

283 

284 async def receive_json( 

285 self, 

286 *, 

287 loads: JSONDecoder = DEFAULT_JSON_DECODER, 

288 timeout: Optional[float] = None, 

289 ) -> Any: 

290 data = await self.receive_str(timeout=timeout) 

291 return loads(data) 

292 

293 def __aiter__(self) -> "ClientWebSocketResponse": 

294 return self 

295 

296 async def __anext__(self) -> WSMessage: 

297 msg = await self.receive() 

298 if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED): 

299 raise StopAsyncIteration 

300 return msg