Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/client_ws.py: 22%

212 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1"""WebSocket client for asyncio.""" 

2 

3import asyncio 

4import dataclasses 

5from typing import Any, Optional, cast 

6 

7import async_timeout 

8from typing_extensions import Final 

9 

10from .client_exceptions import ClientError 

11from .client_reqrep import ClientResponse 

12from .helpers import call_later, set_result 

13from .http import ( 

14 WS_CLOSED_MESSAGE, 

15 WS_CLOSING_MESSAGE, 

16 WebSocketError, 

17 WSCloseCode, 

18 WSMessage, 

19 WSMsgType, 

20) 

21from .http_websocket import WebSocketWriter # WSMessage 

22from .streams import EofStream, FlowControlDataQueue 

23from .typedefs import ( 

24 DEFAULT_JSON_DECODER, 

25 DEFAULT_JSON_ENCODER, 

26 JSONDecoder, 

27 JSONEncoder, 

28) 

29 

30 

31@dataclasses.dataclass(frozen=True) 

32class ClientWSTimeout: 

33 ws_receive: Optional[float] = None 

34 ws_close: Optional[float] = None 

35 

36 

37DEFAULT_WS_CLIENT_TIMEOUT: Final[ClientWSTimeout] = ClientWSTimeout( 

38 ws_receive=None, ws_close=10.0 

39) 

40 

41 

42class ClientWebSocketResponse: 

43 def __init__( 

44 self, 

45 reader: "FlowControlDataQueue[WSMessage]", 

46 writer: WebSocketWriter, 

47 protocol: Optional[str], 

48 response: ClientResponse, 

49 timeout: ClientWSTimeout, 

50 autoclose: bool, 

51 autoping: bool, 

52 loop: asyncio.AbstractEventLoop, 

53 *, 

54 heartbeat: Optional[float] = None, 

55 compress: int = 0, 

56 client_notakeover: bool = False, 

57 ) -> None: 

58 self._response = response 

59 self._conn = response.connection 

60 

61 self._writer = writer 

62 self._reader = reader 

63 self._protocol = protocol 

64 self._closed = False 

65 self._closing = False 

66 self._close_code: Optional[int] = None 

67 self._timeout: ClientWSTimeout = timeout 

68 self._autoclose = autoclose 

69 self._autoping = autoping 

70 self._heartbeat = heartbeat 

71 self._heartbeat_cb: Optional[asyncio.TimerHandle] = None 

72 if heartbeat is not None: 

73 self._pong_heartbeat = heartbeat / 2.0 

74 self._pong_response_cb: Optional[asyncio.TimerHandle] = None 

75 self._loop = loop 

76 self._waiting: Optional[asyncio.Future[bool]] = None 

77 self._exception: Optional[BaseException] = None 

78 self._compress = compress 

79 self._client_notakeover = client_notakeover 

80 

81 self._reset_heartbeat() 

82 

83 def _cancel_heartbeat(self) -> None: 

84 if self._pong_response_cb is not None: 

85 self._pong_response_cb.cancel() 

86 self._pong_response_cb = None 

87 

88 if self._heartbeat_cb is not None: 

89 self._heartbeat_cb.cancel() 

90 self._heartbeat_cb = None 

91 

92 def _reset_heartbeat(self) -> None: 

93 self._cancel_heartbeat() 

94 

95 if self._heartbeat is not None: 

96 self._heartbeat_cb = call_later( 

97 self._send_heartbeat, 

98 self._heartbeat, 

99 self._loop, 

100 timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold 

101 if self._conn is not None 

102 else 5, 

103 ) 

104 

105 def _send_heartbeat(self) -> None: 

106 if self._heartbeat is not None and not self._closed: 

107 # fire-and-forget a task is not perfect but maybe ok for 

108 # sending ping. Otherwise we need a long-living heartbeat 

109 # task in the class. 

110 self._loop.create_task(self._writer.ping()) 

111 

112 if self._pong_response_cb is not None: 

113 self._pong_response_cb.cancel() 

114 self._pong_response_cb = call_later( 

115 self._pong_not_received, 

116 self._pong_heartbeat, 

117 self._loop, 

118 timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold 

119 if self._conn is not None 

120 else 5, 

121 ) 

122 

123 def _pong_not_received(self) -> None: 

124 if not self._closed: 

125 self._closed = True 

126 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

127 self._exception = asyncio.TimeoutError() 

128 self._response.close() 

129 

130 @property 

131 def closed(self) -> bool: 

132 return self._closed 

133 

134 @property 

135 def close_code(self) -> Optional[int]: 

136 return self._close_code 

137 

138 @property 

139 def protocol(self) -> Optional[str]: 

140 return self._protocol 

141 

142 @property 

143 def compress(self) -> int: 

144 return self._compress 

145 

146 @property 

147 def client_notakeover(self) -> bool: 

148 return self._client_notakeover 

149 

150 def get_extra_info(self, name: str, default: Any = None) -> Any: 

151 """extra info from connection transport""" 

152 conn = self._response.connection 

153 if conn is None: 

154 return default 

155 transport = conn.transport 

156 if transport is None: 

157 return default 

158 return transport.get_extra_info(name, default) 

159 

160 def exception(self) -> Optional[BaseException]: 

161 return self._exception 

162 

163 async def ping(self, message: bytes = b"") -> None: 

164 await self._writer.ping(message) 

165 

166 async def pong(self, message: bytes = b"") -> None: 

167 await self._writer.pong(message) 

168 

169 async def send_str(self, data: str, compress: Optional[int] = None) -> None: 

170 if not isinstance(data, str): 

171 raise TypeError("data argument must be str (%r)" % type(data)) 

172 await self._writer.send(data, binary=False, compress=compress) 

173 

174 async def send_bytes(self, data: bytes, compress: Optional[int] = None) -> None: 

175 if not isinstance(data, (bytes, bytearray, memoryview)): 

176 raise TypeError("data argument must be byte-ish (%r)" % type(data)) 

177 await self._writer.send(data, binary=True, compress=compress) 

178 

179 async def send_json( 

180 self, 

181 data: Any, 

182 compress: Optional[int] = None, 

183 *, 

184 dumps: JSONEncoder = DEFAULT_JSON_ENCODER, 

185 ) -> None: 

186 await self.send_str(dumps(data), compress=compress) 

187 

188 async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bool: 

189 # we need to break `receive()` cycle first, 

190 # `close()` may be called from different task 

191 if self._waiting is not None and not self._closed: 

192 self._reader.feed_data(WS_CLOSING_MESSAGE, 0) 

193 await self._waiting 

194 

195 if not self._closed: 

196 self._cancel_heartbeat() 

197 self._closed = True 

198 try: 

199 await self._writer.close(code, message) 

200 except asyncio.CancelledError: 

201 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

202 self._response.close() 

203 raise 

204 except Exception as exc: 

205 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

206 self._exception = exc 

207 self._response.close() 

208 return True 

209 

210 if self._closing: 

211 self._response.close() 

212 return True 

213 

214 while True: 

215 try: 

216 async with async_timeout.timeout(self._timeout.ws_close): 

217 msg = await self._reader.read() 

218 except asyncio.CancelledError: 

219 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

220 self._response.close() 

221 raise 

222 except Exception as exc: 

223 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

224 self._exception = exc 

225 self._response.close() 

226 return True 

227 

228 if msg.type == WSMsgType.CLOSE: 

229 self._close_code = msg.data 

230 self._response.close() 

231 return True 

232 else: 

233 return False 

234 

235 async def receive(self, timeout: Optional[float] = None) -> WSMessage: 

236 while True: 

237 if self._waiting is not None: 

238 raise RuntimeError("Concurrent call to receive() is not allowed") 

239 

240 if self._closed: 

241 return WS_CLOSED_MESSAGE 

242 elif self._closing: 

243 await self.close() 

244 return WS_CLOSED_MESSAGE 

245 

246 try: 

247 self._waiting = self._loop.create_future() 

248 try: 

249 async with async_timeout.timeout( 

250 timeout or self._timeout.ws_receive 

251 ): 

252 msg = await self._reader.read() 

253 self._reset_heartbeat() 

254 finally: 

255 waiter = self._waiting 

256 self._waiting = None 

257 set_result(waiter, True) 

258 except (asyncio.CancelledError, asyncio.TimeoutError): 

259 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

260 raise 

261 except EofStream: 

262 self._close_code = WSCloseCode.OK 

263 await self.close() 

264 return WSMessage(WSMsgType.CLOSED, None, None) 

265 except ClientError: 

266 self._closed = True 

267 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

268 return WS_CLOSED_MESSAGE 

269 except WebSocketError as exc: 

270 self._close_code = exc.code 

271 await self.close(code=exc.code) 

272 return WSMessage(WSMsgType.ERROR, exc, None) 

273 except Exception as exc: 

274 self._exception = exc 

275 self._closing = True 

276 self._close_code = WSCloseCode.ABNORMAL_CLOSURE 

277 await self.close() 

278 return WSMessage(WSMsgType.ERROR, exc, None) 

279 

280 if msg.type == WSMsgType.CLOSE: 

281 self._closing = True 

282 self._close_code = msg.data 

283 if not self._closed and self._autoclose: 

284 await self.close() 

285 elif msg.type == WSMsgType.CLOSING: 

286 self._closing = True 

287 elif msg.type == WSMsgType.PING and self._autoping: 

288 await self.pong(msg.data) 

289 continue 

290 elif msg.type == WSMsgType.PONG and self._autoping: 

291 continue 

292 

293 return msg 

294 

295 async def receive_str(self, *, timeout: Optional[float] = None) -> str: 

296 msg = await self.receive(timeout) 

297 if msg.type != WSMsgType.TEXT: 

298 raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str") 

299 return cast(str, msg.data) 

300 

301 async def receive_bytes(self, *, timeout: Optional[float] = None) -> bytes: 

302 msg = await self.receive(timeout) 

303 if msg.type != WSMsgType.BINARY: 

304 raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes") 

305 return cast(bytes, msg.data) 

306 

307 async def receive_json( 

308 self, 

309 *, 

310 loads: JSONDecoder = DEFAULT_JSON_DECODER, 

311 timeout: Optional[float] = None, 

312 ) -> Any: 

313 data = await self.receive_str(timeout=timeout) 

314 return loads(data) 

315 

316 def __aiter__(self) -> "ClientWebSocketResponse": 

317 return self 

318 

319 async def __anext__(self) -> WSMessage: 

320 msg = await self.receive() 

321 if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED): 

322 raise StopAsyncIteration 

323 return msg