Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/websocket/_core.py: 23%

208 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:34 +0000

1import socket 

2import struct 

3import threading 

4import time 

5 

6from typing import Optional, Union 

7 

8# websocket modules 

9from ._abnf import * 

10from ._exceptions import * 

11from ._handshake import * 

12from ._http import * 

13from ._logging import * 

14from ._socket import * 

15from ._ssl_compat import * 

16from ._utils import * 

17 

18""" 

19_core.py 

20websocket - WebSocket client library for Python 

21 

22Copyright 2023 engn33r 

23 

24Licensed under the Apache License, Version 2.0 (the "License"); 

25you may not use this file except in compliance with the License. 

26You may obtain a copy of the License at 

27 

28 http://www.apache.org/licenses/LICENSE-2.0 

29 

30Unless required by applicable law or agreed to in writing, software 

31distributed under the License is distributed on an "AS IS" BASIS, 

32WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

33See the License for the specific language governing permissions and 

34limitations under the License. 

35""" 

36 

37__all__ = ['WebSocket', 'create_connection'] 

38 

39 

40class WebSocket: 

41 """ 

42 Low level WebSocket interface. 

43 

44 This class is based on the WebSocket protocol `draft-hixie-thewebsocketprotocol-76 <http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76>`_ 

45 

46 We can connect to the websocket server and send/receive data. 

47 The following example is an echo client. 

48 

49 >>> import websocket 

50 >>> ws = websocket.WebSocket() 

51 >>> ws.connect("ws://echo.websocket.events") 

52 >>> ws.recv() 

53 'echo.websocket.events sponsored by Lob.com' 

54 >>> ws.send("Hello, Server") 

55 19 

56 >>> ws.recv() 

57 'Hello, Server' 

58 >>> ws.close() 

59 

60 Parameters 

61 ---------- 

62 get_mask_key: func 

63 A callable function to get new mask keys, see the 

64 WebSocket.set_mask_key's docstring for more information. 

65 sockopt: tuple 

66 Values for socket.setsockopt. 

67 sockopt must be tuple and each element is argument of sock.setsockopt. 

68 sslopt: dict 

69 Optional dict object for ssl socket options. See FAQ for details. 

70 fire_cont_frame: bool 

71 Fire recv event for each cont frame. Default is False. 

72 enable_multithread: bool 

73 If set to True, lock send method. 

74 skip_utf8_validation: bool 

75 Skip utf8 validation. 

76 """ 

77 

78 def __init__(self, get_mask_key=None, sockopt=None, sslopt=None, 

79 fire_cont_frame: bool = False, enable_multithread: bool = True, 

80 skip_utf8_validation: bool = False, **_): 

81 """ 

82 Initialize WebSocket object. 

83 

84 Parameters 

85 ---------- 

86 sslopt: dict 

87 Optional dict object for ssl socket options. See FAQ for details. 

88 """ 

89 self.sock_opt = sock_opt(sockopt, sslopt) 

90 self.handshake_response = None 

91 self.sock = None 

92 

93 self.connected = False 

94 self.get_mask_key = get_mask_key 

95 # These buffer over the build-up of a single frame. 

96 self.frame_buffer = frame_buffer(self._recv, skip_utf8_validation) 

97 self.cont_frame = continuous_frame( 

98 fire_cont_frame, skip_utf8_validation) 

99 

100 if enable_multithread: 

101 self.lock = threading.Lock() 

102 self.readlock = threading.Lock() 

103 else: 

104 self.lock = NoLock() 

105 self.readlock = NoLock() 

106 

107 def __iter__(self): 

108 """ 

109 Allow iteration over websocket, implying sequential `recv` executions. 

110 """ 

111 while True: 

112 yield self.recv() 

113 

114 def __next__(self): 

115 return self.recv() 

116 

117 def next(self): 

118 return self.__next__() 

119 

120 def fileno(self): 

121 return self.sock.fileno() 

122 

123 def set_mask_key(self, func): 

124 """ 

125 Set function to create mask key. You can customize mask key generator. 

126 Mainly, this is for testing purpose. 

127 

128 Parameters 

129 ---------- 

130 func: func 

131 callable object. the func takes 1 argument as integer. 

132 The argument means length of mask key. 

133 This func must return string(byte array), 

134 which length is argument specified. 

135 """ 

136 self.get_mask_key = func 

137 

138 def gettimeout(self) -> float: 

139 """ 

140 Get the websocket timeout (in seconds) as an int or float 

141 

142 Returns 

143 ---------- 

144 timeout: int or float 

145 returns timeout value (in seconds). This value could be either float/integer. 

146 """ 

147 return self.sock_opt.timeout 

148 

149 def settimeout(self, timeout: Optional[float]): 

150 """ 

151 Set the timeout to the websocket. 

152 

153 Parameters 

154 ---------- 

155 timeout: int or float 

156 timeout time (in seconds). This value could be either float/integer. 

157 """ 

158 self.sock_opt.timeout = timeout 

159 if self.sock: 

160 self.sock.settimeout(timeout) 

161 

162 timeout = property(gettimeout, settimeout) 

163 

164 def getsubprotocol(self): 

165 """ 

166 Get subprotocol 

167 """ 

168 if self.handshake_response: 

169 return self.handshake_response.subprotocol 

170 else: 

171 return None 

172 

173 subprotocol = property(getsubprotocol) 

174 

175 def getstatus(self): 

176 """ 

177 Get handshake status 

178 """ 

179 if self.handshake_response: 

180 return self.handshake_response.status 

181 else: 

182 return None 

183 

184 status = property(getstatus) 

185 

186 def getheaders(self): 

187 """ 

188 Get handshake response header 

189 """ 

190 if self.handshake_response: 

191 return self.handshake_response.headers 

192 else: 

193 return None 

194 

195 def is_ssl(self): 

196 try: 

197 return isinstance(self.sock, ssl.SSLSocket) 

198 except: 

199 return False 

200 

201 headers = property(getheaders) 

202 

203 def connect(self, url, **options): 

204 """ 

205 Connect to url. url is websocket url scheme. 

206 ie. ws://host:port/resource 

207 You can customize using 'options'. 

208 If you set "header" list object, you can set your own custom header. 

209 

210 >>> ws = WebSocket() 

211 >>> ws.connect("ws://echo.websocket.events", 

212 ... header=["User-Agent: MyProgram", 

213 ... "x-custom: header"]) 

214 

215 Parameters 

216 ---------- 

217 header: list or dict 

218 Custom http header list or dict. 

219 cookie: str 

220 Cookie value. 

221 origin: str 

222 Custom origin url. 

223 connection: str 

224 Custom connection header value. 

225 Default value "Upgrade" set in _handshake.py 

226 suppress_origin: bool 

227 Suppress outputting origin header. 

228 host: str 

229 Custom host header string. 

230 timeout: int or float 

231 Socket timeout time. This value is an integer or float. 

232 If you set None for this value, it means "use default_timeout value" 

233 http_proxy_host: str 

234 HTTP proxy host name. 

235 http_proxy_port: str or int 

236 HTTP proxy port. Default is 80. 

237 http_no_proxy: list 

238 Whitelisted host names that don't use the proxy. 

239 http_proxy_auth: tuple 

240 HTTP proxy auth information. Tuple of username and password. Default is None. 

241 http_proxy_timeout: int or float 

242 HTTP proxy timeout, default is 60 sec as per python-socks. 

243 redirect_limit: int 

244 Number of redirects to follow. 

245 subprotocols: list 

246 List of available subprotocols. Default is None. 

247 socket: socket 

248 Pre-initialized stream socket. 

249 """ 

250 self.sock_opt.timeout = options.get('timeout', self.sock_opt.timeout) 

251 self.sock, addrs = connect(url, self.sock_opt, proxy_info(**options), 

252 options.pop('socket', None)) 

253 

254 try: 

255 self.handshake_response = handshake(self.sock, url, *addrs, **options) 

256 for attempt in range(options.pop('redirect_limit', 3)): 

257 if self.handshake_response.status in SUPPORTED_REDIRECT_STATUSES: 

258 url = self.handshake_response.headers['location'] 

259 self.sock.close() 

260 self.sock, addrs = connect(url, self.sock_opt, proxy_info(**options), 

261 options.pop('socket', None)) 

262 self.handshake_response = handshake(self.sock, url, *addrs, **options) 

263 self.connected = True 

264 except: 

265 if self.sock: 

266 self.sock.close() 

267 self.sock = None 

268 raise 

269 

270 def send(self, payload: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> int: 

271 """ 

272 Send the data as string. 

273 

274 Parameters 

275 ---------- 

276 payload: str 

277 Payload must be utf-8 string or unicode, 

278 If the opcode is OPCODE_TEXT. 

279 Otherwise, it must be string(byte array). 

280 opcode: int 

281 Operation code (opcode) to send. 

282 """ 

283 

284 frame = ABNF.create_frame(payload, opcode) 

285 return self.send_frame(frame) 

286 

287 def send_frame(self, frame) -> int: 

288 """ 

289 Send the data frame. 

290 

291 >>> ws = create_connection("ws://echo.websocket.events") 

292 >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT) 

293 >>> ws.send_frame(frame) 

294 >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0) 

295 >>> ws.send_frame(frame) 

296 >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1) 

297 >>> ws.send_frame(frame) 

298 

299 Parameters 

300 ---------- 

301 frame: ABNF frame 

302 frame data created by ABNF.create_frame 

303 """ 

304 if self.get_mask_key: 

305 frame.get_mask_key = self.get_mask_key 

306 data = frame.format() 

307 length = len(data) 

308 if (isEnabledForTrace()): 

309 trace("++Sent raw: " + repr(data)) 

310 trace("++Sent decoded: " + frame.__str__()) 

311 with self.lock: 

312 while data: 

313 l = self._send(data) 

314 data = data[l:] 

315 

316 return length 

317 

318 def send_binary(self, payload: bytes) -> int: 

319 """ 

320 Send a binary message (OPCODE_BINARY). 

321 

322 Parameters 

323 ---------- 

324 payload: bytes 

325 payload of message to send. 

326 """ 

327 return self.send(payload, ABNF.OPCODE_BINARY) 

328 

329 def ping(self, payload: Union[str, bytes] = ""): 

330 """ 

331 Send ping data. 

332 

333 Parameters 

334 ---------- 

335 payload: str 

336 data payload to send server. 

337 """ 

338 if isinstance(payload, str): 

339 payload = payload.encode("utf-8") 

340 self.send(payload, ABNF.OPCODE_PING) 

341 

342 def pong(self, payload: Union[str, bytes] = ""): 

343 """ 

344 Send pong data. 

345 

346 Parameters 

347 ---------- 

348 payload: str 

349 data payload to send server. 

350 """ 

351 if isinstance(payload, str): 

352 payload = payload.encode("utf-8") 

353 self.send(payload, ABNF.OPCODE_PONG) 

354 

355 def recv(self) -> Union[str, bytes]: 

356 """ 

357 Receive string data(byte array) from the server. 

358 

359 Returns 

360 ---------- 

361 data: string (byte array) value. 

362 """ 

363 with self.readlock: 

364 opcode, data = self.recv_data() 

365 if opcode == ABNF.OPCODE_TEXT: 

366 return data.decode("utf-8") 

367 elif opcode == ABNF.OPCODE_TEXT or opcode == ABNF.OPCODE_BINARY: 

368 return data 

369 else: 

370 return '' 

371 

372 def recv_data(self, control_frame: bool = False) -> tuple: 

373 """ 

374 Receive data with operation code. 

375 

376 Parameters 

377 ---------- 

378 control_frame: bool 

379 a boolean flag indicating whether to return control frame 

380 data, defaults to False 

381 

382 Returns 

383 ------- 

384 opcode, frame.data: tuple 

385 tuple of operation code and string(byte array) value. 

386 """ 

387 opcode, frame = self.recv_data_frame(control_frame) 

388 return opcode, frame.data 

389 

390 def recv_data_frame(self, control_frame: bool = False): 

391 """ 

392 Receive data with operation code. 

393 

394 If a valid ping message is received, a pong response is sent. 

395 

396 Parameters 

397 ---------- 

398 control_frame: bool 

399 a boolean flag indicating whether to return control frame 

400 data, defaults to False 

401 

402 Returns 

403 ------- 

404 frame.opcode, frame: tuple 

405 tuple of operation code and string(byte array) value. 

406 """ 

407 while True: 

408 frame = self.recv_frame() 

409 if (isEnabledForTrace()): 

410 trace("++Rcv raw: " + repr(frame.format())) 

411 trace("++Rcv decoded: " + frame.__str__()) 

412 if not frame: 

413 # handle error: 

414 # 'NoneType' object has no attribute 'opcode' 

415 raise WebSocketProtocolException( 

416 "Not a valid frame {frame}".format(frame=frame)) 

417 elif frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY, ABNF.OPCODE_CONT): 

418 self.cont_frame.validate(frame) 

419 self.cont_frame.add(frame) 

420 

421 if self.cont_frame.is_fire(frame): 

422 return self.cont_frame.extract(frame) 

423 

424 elif frame.opcode == ABNF.OPCODE_CLOSE: 

425 self.send_close() 

426 return frame.opcode, frame 

427 elif frame.opcode == ABNF.OPCODE_PING: 

428 if len(frame.data) < 126: 

429 self.pong(frame.data) 

430 else: 

431 raise WebSocketProtocolException( 

432 "Ping message is too long") 

433 if control_frame: 

434 return frame.opcode, frame 

435 elif frame.opcode == ABNF.OPCODE_PONG: 

436 if control_frame: 

437 return frame.opcode, frame 

438 

439 def recv_frame(self): 

440 """ 

441 Receive data as frame from server. 

442 

443 Returns 

444 ------- 

445 self.frame_buffer.recv_frame(): ABNF frame object 

446 """ 

447 return self.frame_buffer.recv_frame() 

448 

449 def send_close(self, status: int = STATUS_NORMAL, reason: bytes = b""): 

450 """ 

451 Send close data to the server. 

452 

453 Parameters 

454 ---------- 

455 status: int 

456 Status code to send. See STATUS_XXX. 

457 reason: str or bytes 

458 The reason to close. This must be string or UTF-8 bytes. 

459 """ 

460 if status < 0 or status >= ABNF.LENGTH_16: 

461 raise ValueError("code is invalid range") 

462 self.connected = False 

463 self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE) 

464 

465 def close(self, status: int = STATUS_NORMAL, reason: bytes = b"", timeout: float = 3): 

466 """ 

467 Close Websocket object 

468 

469 Parameters 

470 ---------- 

471 status: int 

472 Status code to send. See VALID_CLOSE_STATUS in ABNF. 

473 reason: bytes 

474 The reason to close in UTF-8. 

475 timeout: int or float 

476 Timeout until receive a close frame. 

477 If None, it will wait forever until receive a close frame. 

478 """ 

479 if self.connected: 

480 if status < 0 or status >= ABNF.LENGTH_16: 

481 raise ValueError("code is invalid range") 

482 

483 try: 

484 self.connected = False 

485 self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE) 

486 sock_timeout = self.sock.gettimeout() 

487 self.sock.settimeout(timeout) 

488 start_time = time.time() 

489 while timeout is None or time.time() - start_time < timeout: 

490 try: 

491 frame = self.recv_frame() 

492 if frame.opcode != ABNF.OPCODE_CLOSE: 

493 continue 

494 if isEnabledForError(): 

495 recv_status = struct.unpack("!H", frame.data[0:2])[0] 

496 if recv_status >= 3000 and recv_status <= 4999: 

497 debug("close status: " + repr(recv_status)) 

498 elif recv_status != STATUS_NORMAL: 

499 error("close status: " + repr(recv_status)) 

500 break 

501 except: 

502 break 

503 self.sock.settimeout(sock_timeout) 

504 self.sock.shutdown(socket.SHUT_RDWR) 

505 except: 

506 pass 

507 

508 self.shutdown() 

509 

510 def abort(self): 

511 """ 

512 Low-level asynchronous abort, wakes up other threads that are waiting in recv_* 

513 """ 

514 if self.connected: 

515 self.sock.shutdown(socket.SHUT_RDWR) 

516 

517 def shutdown(self): 

518 """ 

519 close socket, immediately. 

520 """ 

521 if self.sock: 

522 self.sock.close() 

523 self.sock = None 

524 self.connected = False 

525 

526 def _send(self, data: Union[str, bytes]): 

527 return send(self.sock, data) 

528 

529 def _recv(self, bufsize): 

530 try: 

531 return recv(self.sock, bufsize) 

532 except WebSocketConnectionClosedException: 

533 if self.sock: 

534 self.sock.close() 

535 self.sock = None 

536 self.connected = False 

537 raise 

538 

539 

540def create_connection(url: str, timeout=None, class_=WebSocket, **options): 

541 """ 

542 Connect to url and return websocket object. 

543 

544 Connect to url and return the WebSocket object. 

545 Passing optional timeout parameter will set the timeout on the socket. 

546 If no timeout is supplied, 

547 the global default timeout setting returned by getdefaulttimeout() is used. 

548 You can customize using 'options'. 

549 If you set "header" list object, you can set your own custom header. 

550 

551 >>> conn = create_connection("ws://echo.websocket.events", 

552 ... header=["User-Agent: MyProgram", 

553 ... "x-custom: header"]) 

554 

555 Parameters 

556 ---------- 

557 class_: class 

558 class to instantiate when creating the connection. It has to implement 

559 settimeout and connect. It's __init__ should be compatible with 

560 WebSocket.__init__, i.e. accept all of it's kwargs. 

561 header: list or dict 

562 custom http header list or dict. 

563 cookie: str 

564 Cookie value. 

565 origin: str 

566 custom origin url. 

567 suppress_origin: bool 

568 suppress outputting origin header. 

569 host: str 

570 custom host header string. 

571 timeout: int or float 

572 socket timeout time. This value could be either float/integer. 

573 If set to None, it uses the default_timeout value. 

574 http_proxy_host: str 

575 HTTP proxy host name. 

576 http_proxy_port: str or int 

577 HTTP proxy port. If not set, set to 80. 

578 http_no_proxy: list 

579 Whitelisted host names that don't use the proxy. 

580 http_proxy_auth: tuple 

581 HTTP proxy auth information. tuple of username and password. Default is None. 

582 http_proxy_timeout: int or float 

583 HTTP proxy timeout, default is 60 sec as per python-socks. 

584 enable_multithread: bool 

585 Enable lock for multithread. 

586 redirect_limit: int 

587 Number of redirects to follow. 

588 sockopt: tuple 

589 Values for socket.setsockopt. 

590 sockopt must be a tuple and each element is an argument of sock.setsockopt. 

591 sslopt: dict 

592 Optional dict object for ssl socket options. See FAQ for details. 

593 subprotocols: list 

594 List of available subprotocols. Default is None. 

595 skip_utf8_validation: bool 

596 Skip utf8 validation. 

597 socket: socket 

598 Pre-initialized stream socket. 

599 """ 

600 sockopt = options.pop("sockopt", []) 

601 sslopt = options.pop("sslopt", {}) 

602 fire_cont_frame = options.pop("fire_cont_frame", False) 

603 enable_multithread = options.pop("enable_multithread", True) 

604 skip_utf8_validation = options.pop("skip_utf8_validation", False) 

605 websock = class_(sockopt=sockopt, sslopt=sslopt, 

606 fire_cont_frame=fire_cont_frame, 

607 enable_multithread=enable_multithread, 

608 skip_utf8_validation=skip_utf8_validation, **options) 

609 websock.settimeout(timeout if timeout is not None else getdefaulttimeout()) 

610 websock.connect(url, **options) 

611 return websock