Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/websocket/_core.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

225 statements  

1import socket 

2import struct 

3import threading 

4import time 

5from typing import Optional, Union 

6 

7# websocket modules 

8from ._abnf import ABNF, STATUS_NORMAL, continuous_frame, frame_buffer 

9from ._exceptions import WebSocketProtocolException, WebSocketConnectionClosedException 

10from ._handshake import SUPPORTED_REDIRECT_STATUSES, handshake 

11from ._http import connect, proxy_info 

12from ._logging import debug, error, trace, isEnabledForError, isEnabledForTrace 

13from ._socket import getdefaulttimeout, recv, send, sock_opt 

14from ._ssl_compat import ssl 

15from ._utils import NoLock 

16from ._dispatcher import DispatcherBase, WrappedDispatcher 

17 

18""" 

19_core.py 

20websocket - WebSocket client library for Python 

21 

22Copyright 2024 engn33r 

23 

24Licensed under the Apache License, Version 2.0 (the "License"); 

25you may not use this file except in compliance with the License. 

26You may obtain a copy of the License at 

27 

28 http://www.apache.org/licenses/LICENSE-2.0 

29 

30Unless required by applicable law or agreed to in writing, software 

31distributed under the License is distributed on an "AS IS" BASIS, 

32WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

33See the License for the specific language governing permissions and 

34limitations under the License. 

35""" 

36 

37__all__ = ["WebSocket", "create_connection"] 

38 

39 

40class WebSocket: 

41 """ 

42 Low level WebSocket interface. 

43 

44 This class is based on the WebSocket protocol `draft-hixie-thewebsocketprotocol-76 <http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76>`_ 

45 

46 We can connect to the websocket server and send/receive data. 

47 The following example is an echo client. 

48 

49 >>> import websocket 

50 >>> ws = websocket.WebSocket() 

51 >>> ws.connect("ws://echo.websocket.events") 

52 >>> ws.recv() 

53 'echo.websocket.events sponsored by Lob.com' 

54 >>> ws.send("Hello, Server") 

55 19 

56 >>> ws.recv() 

57 'Hello, Server' 

58 >>> ws.close() 

59 

60 Parameters 

61 ---------- 

62 get_mask_key: func 

63 A callable function to get new mask keys, see the 

64 WebSocket.set_mask_key's docstring for more information. 

65 sockopt: tuple 

66 Values for socket.setsockopt. 

67 sockopt must be tuple and each element is argument of sock.setsockopt. 

68 sslopt: dict 

69 Optional dict object for ssl socket options. See FAQ for details. 

70 fire_cont_frame: bool 

71 Fire recv event for each cont frame. Default is False. 

72 enable_multithread: bool 

73 If set to True, lock send method. 

74 skip_utf8_validation: bool 

75 Skip utf8 validation. 

76 """ 

77 

78 def __init__( 

79 self, 

80 get_mask_key=None, 

81 sockopt=None, 

82 sslopt=None, 

83 fire_cont_frame: bool = False, 

84 enable_multithread: bool = True, 

85 skip_utf8_validation: bool = False, 

86 dispatcher: Union[DispatcherBase, WrappedDispatcher] = None, 

87 **_, 

88 ): 

89 """ 

90 Initialize WebSocket object. 

91 

92 Parameters 

93 ---------- 

94 sslopt: dict 

95 Optional dict object for ssl socket options. See FAQ for details. 

96 """ 

97 self.sock_opt = sock_opt(sockopt, sslopt) 

98 self.handshake_response = None 

99 self.sock: Optional[socket.socket] = None 

100 

101 self.connected = False 

102 self.get_mask_key = get_mask_key 

103 # These buffer over the build-up of a single frame. 

104 self.frame_buffer = frame_buffer(self._recv, skip_utf8_validation) 

105 self.cont_frame = continuous_frame(fire_cont_frame, skip_utf8_validation) 

106 self.dispatcher = dispatcher 

107 

108 if enable_multithread: 

109 self.lock = threading.Lock() 

110 self.readlock = threading.Lock() 

111 else: 

112 self.lock = NoLock() 

113 self.readlock = NoLock() 

114 

115 def __iter__(self): 

116 """ 

117 Allow iteration over websocket, implying sequential `recv` executions. 

118 """ 

119 while True: 

120 yield self.recv() 

121 

122 def __next__(self): 

123 return self.recv() 

124 

125 def next(self): 

126 return self.__next__() 

127 

128 def fileno(self): 

129 return self.sock.fileno() 

130 

131 def set_mask_key(self, func): 

132 """ 

133 Set function to create mask key. You can customize mask key generator. 

134 Mainly, this is for testing purpose. 

135 

136 Parameters 

137 ---------- 

138 func: func 

139 callable object. the func takes 1 argument as integer. 

140 The argument means length of mask key. 

141 This func must return string(byte array), 

142 which length is argument specified. 

143 """ 

144 self.get_mask_key = func 

145 

146 def gettimeout(self) -> Union[float, int, None]: 

147 """ 

148 Get the websocket timeout (in seconds) as an int or float 

149 

150 Returns 

151 ---------- 

152 timeout: int or float 

153 returns timeout value (in seconds). This value could be either float/integer. 

154 """ 

155 return self.sock_opt.timeout 

156 

157 def settimeout(self, timeout: Union[float, int, None]): 

158 """ 

159 Set the timeout to the websocket. 

160 

161 Parameters 

162 ---------- 

163 timeout: int or float 

164 timeout time (in seconds). This value could be either float/integer. 

165 """ 

166 self.sock_opt.timeout = timeout 

167 if self.sock: 

168 self.sock.settimeout(timeout) 

169 

170 timeout = property(gettimeout, settimeout) 

171 

172 def getsubprotocol(self): 

173 """ 

174 Get subprotocol 

175 """ 

176 if self.handshake_response: 

177 return self.handshake_response.subprotocol 

178 else: 

179 return None 

180 

181 subprotocol = property(getsubprotocol) 

182 

183 def getstatus(self): 

184 """ 

185 Get handshake status 

186 """ 

187 if self.handshake_response: 

188 return self.handshake_response.status 

189 else: 

190 return None 

191 

192 status = property(getstatus) 

193 

194 def getheaders(self): 

195 """ 

196 Get handshake response header 

197 """ 

198 if self.handshake_response: 

199 return self.handshake_response.headers 

200 else: 

201 return None 

202 

203 def is_ssl(self): 

204 try: 

205 return isinstance(self.sock, ssl.SSLSocket) 

206 except: 

207 return False 

208 

209 headers = property(getheaders) 

210 

211 def connect(self, url, **options): 

212 """ 

213 Connect to url. url is websocket url scheme. 

214 ie. ws://host:port/resource 

215 You can customize using 'options'. 

216 If you set "header" list object, you can set your own custom header. 

217 

218 >>> ws = WebSocket() 

219 >>> ws.connect("ws://echo.websocket.events", 

220 ... header=["User-Agent: MyProgram", 

221 ... "x-custom: header"]) 

222 

223 Parameters 

224 ---------- 

225 header: list or dict 

226 Custom http header list or dict. 

227 cookie: str 

228 Cookie value. 

229 origin: str 

230 Custom origin url. 

231 connection: str 

232 Custom connection header value. 

233 Default value "Upgrade" set in _handshake.py 

234 suppress_origin: bool 

235 Suppress outputting origin header. 

236 host: str 

237 Custom host header string. 

238 timeout: int or float 

239 Socket timeout time. This value is an integer or float. 

240 If you set None for this value, it means "use default_timeout value" 

241 http_proxy_host: str 

242 HTTP proxy host name. 

243 http_proxy_port: str or int 

244 HTTP proxy port. Default is 80. 

245 http_no_proxy: list 

246 Whitelisted host names that don't use the proxy. 

247 http_proxy_auth: tuple 

248 HTTP proxy auth information. Tuple of username and password. Default is None. 

249 http_proxy_timeout: int or float 

250 HTTP proxy timeout, default is 60 sec as per python-socks. 

251 redirect_limit: int 

252 Number of redirects to follow. 

253 subprotocols: list 

254 List of available subprotocols. Default is None. 

255 socket: socket 

256 Pre-initialized stream socket. 

257 """ 

258 self.sock_opt.timeout = options.get("timeout", self.sock_opt.timeout) 

259 self.sock, addrs = connect( 

260 url, self.sock_opt, proxy_info(**options), options.pop("socket", None) 

261 ) 

262 

263 try: 

264 self.handshake_response = handshake(self.sock, url, *addrs, **options) 

265 for _ in range(options.pop("redirect_limit", 3)): 

266 if self.handshake_response.status in SUPPORTED_REDIRECT_STATUSES: 

267 url = self.handshake_response.headers["location"] 

268 self.sock.close() 

269 self.sock, addrs = connect( 

270 url, 

271 self.sock_opt, 

272 proxy_info(**options), 

273 options.pop("socket", None), 

274 ) 

275 self.handshake_response = handshake( 

276 self.sock, url, *addrs, **options 

277 ) 

278 self.connected = True 

279 except: 

280 if self.sock: 

281 self.sock.close() 

282 self.sock = None 

283 raise 

284 

285 def send(self, payload: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> int: 

286 """ 

287 Send the data as string. 

288 

289 Parameters 

290 ---------- 

291 payload: str 

292 Payload must be utf-8 string or unicode, 

293 If the opcode is OPCODE_TEXT. 

294 Otherwise, it must be string(byte array). 

295 opcode: int 

296 Operation code (opcode) to send. 

297 """ 

298 

299 frame = ABNF.create_frame(payload, opcode) 

300 return self.send_frame(frame) 

301 

302 def send_text(self, text_data: str) -> int: 

303 """ 

304 Sends UTF-8 encoded text. 

305 """ 

306 return self.send(text_data, ABNF.OPCODE_TEXT) 

307 

308 def send_bytes(self, data: Union[bytes, bytearray]) -> int: 

309 """ 

310 Sends a sequence of bytes. 

311 """ 

312 return self.send(data, ABNF.OPCODE_BINARY) 

313 

314 def send_frame(self, frame) -> int: 

315 """ 

316 Send the data frame. 

317 

318 >>> ws = create_connection("ws://echo.websocket.events") 

319 >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT) 

320 >>> ws.send_frame(frame) 

321 >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0) 

322 >>> ws.send_frame(frame) 

323 >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1) 

324 >>> ws.send_frame(frame) 

325 

326 Parameters 

327 ---------- 

328 frame: ABNF frame 

329 frame data created by ABNF.create_frame 

330 """ 

331 if self.get_mask_key: 

332 frame.get_mask_key = self.get_mask_key 

333 data = frame.format() 

334 length = len(data) 

335 if isEnabledForTrace(): 

336 trace(f"++Sent raw: {repr(data)}") 

337 trace(f"++Sent decoded: {frame.__str__()}") 

338 with self.lock: 

339 while data: 

340 l = self._send(data) 

341 data = data[l:] 

342 

343 return length 

344 

345 def send_binary(self, payload: bytes) -> int: 

346 """ 

347 Send a binary message (OPCODE_BINARY). 

348 

349 Parameters 

350 ---------- 

351 payload: bytes 

352 payload of message to send. 

353 """ 

354 return self.send(payload, ABNF.OPCODE_BINARY) 

355 

356 def ping(self, payload: Union[str, bytes] = ""): 

357 """ 

358 Send ping data. 

359 

360 Parameters 

361 ---------- 

362 payload: str 

363 data payload to send server. 

364 """ 

365 if isinstance(payload, str): 

366 payload = payload.encode("utf-8") 

367 self.send(payload, ABNF.OPCODE_PING) 

368 

369 def pong(self, payload: Union[str, bytes] = ""): 

370 """ 

371 Send pong data. 

372 

373 Parameters 

374 ---------- 

375 payload: str 

376 data payload to send server. 

377 """ 

378 if isinstance(payload, str): 

379 payload = payload.encode("utf-8") 

380 self.send(payload, ABNF.OPCODE_PONG) 

381 

382 def recv(self) -> Union[str, bytes]: 

383 """ 

384 Receive string data(byte array) from the server. 

385 

386 Returns 

387 ---------- 

388 data: string (byte array) value. 

389 """ 

390 with self.readlock: 

391 opcode, data = self.recv_data() 

392 if opcode == ABNF.OPCODE_TEXT: 

393 data_received: Union[bytes, str] = data 

394 if isinstance(data_received, bytes): 

395 return data_received.decode("utf-8") 

396 elif isinstance(data_received, str): 

397 return data_received 

398 elif opcode == ABNF.OPCODE_BINARY: 

399 data_binary: bytes = data 

400 return data_binary 

401 else: 

402 return "" 

403 

404 def recv_data(self, control_frame: bool = False) -> tuple: 

405 """ 

406 Receive data with operation code. 

407 

408 Parameters 

409 ---------- 

410 control_frame: bool 

411 a boolean flag indicating whether to return control frame 

412 data, defaults to False 

413 

414 Returns 

415 ------- 

416 opcode, frame.data: tuple 

417 tuple of operation code and string(byte array) value. 

418 """ 

419 opcode, frame = self.recv_data_frame(control_frame) 

420 return opcode, frame.data 

421 

422 def recv_data_frame(self, control_frame: bool = False) -> tuple: 

423 """ 

424 Receive data with operation code. 

425 

426 If a valid ping message is received, a pong response is sent. 

427 

428 Parameters 

429 ---------- 

430 control_frame: bool 

431 a boolean flag indicating whether to return control frame 

432 data, defaults to False 

433 

434 Returns 

435 ------- 

436 frame.opcode, frame: tuple 

437 tuple of operation code and string(byte array) value. 

438 """ 

439 while True: 

440 frame = self.recv_frame() 

441 if isEnabledForTrace(): 

442 trace(f"++Rcv raw: {repr(frame.format())}") 

443 trace(f"++Rcv decoded: {frame.__str__()}") 

444 if not frame: 

445 # handle error: 

446 # 'NoneType' object has no attribute 'opcode' 

447 raise WebSocketProtocolException(f"Not a valid frame {frame}") 

448 elif frame.opcode in ( 

449 ABNF.OPCODE_TEXT, 

450 ABNF.OPCODE_BINARY, 

451 ABNF.OPCODE_CONT, 

452 ): 

453 self.cont_frame.validate(frame) 

454 self.cont_frame.add(frame) 

455 

456 if self.cont_frame.is_fire(frame): 

457 return self.cont_frame.extract(frame) 

458 

459 elif frame.opcode == ABNF.OPCODE_CLOSE: 

460 self.send_close() 

461 return frame.opcode, frame 

462 elif frame.opcode == ABNF.OPCODE_PING: 

463 if len(frame.data) < 126: 

464 self.pong(frame.data) 

465 else: 

466 raise WebSocketProtocolException("Ping message is too long") 

467 if control_frame: 

468 return frame.opcode, frame 

469 elif frame.opcode == ABNF.OPCODE_PONG: 

470 if control_frame: 

471 return frame.opcode, frame 

472 

473 def recv_frame(self): 

474 """ 

475 Receive data as frame from server. 

476 

477 Returns 

478 ------- 

479 self.frame_buffer.recv_frame(): ABNF frame object 

480 """ 

481 return self.frame_buffer.recv_frame() 

482 

483 def send_close(self, status: int = STATUS_NORMAL, reason: bytes = b""): 

484 """ 

485 Send close data to the server. 

486 

487 Parameters 

488 ---------- 

489 status: int 

490 Status code to send. See STATUS_XXX. 

491 reason: str or bytes 

492 The reason to close. This must be string or UTF-8 bytes. 

493 """ 

494 if status < 0 or status >= ABNF.LENGTH_16: 

495 raise ValueError("code is invalid range") 

496 self.connected = False 

497 self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE) 

498 

499 def close(self, status: int = STATUS_NORMAL, reason: bytes = b"", timeout: int = 3): 

500 """ 

501 Close Websocket object 

502 

503 Parameters 

504 ---------- 

505 status: int 

506 Status code to send. See VALID_CLOSE_STATUS in ABNF. 

507 reason: bytes 

508 The reason to close in UTF-8. 

509 timeout: int or float 

510 Timeout until receive a close frame. 

511 If None, it will wait forever until receive a close frame. 

512 """ 

513 if not self.connected: 

514 return 

515 if status < 0 or status >= ABNF.LENGTH_16: 

516 raise ValueError("code is invalid range") 

517 

518 try: 

519 self.connected = False 

520 self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE) 

521 sock_timeout = self.sock.gettimeout() 

522 self.sock.settimeout(timeout) 

523 start_time = time.time() 

524 while timeout is None or time.time() - start_time < timeout: 

525 try: 

526 frame = self.recv_frame() 

527 if frame.opcode != ABNF.OPCODE_CLOSE: 

528 continue 

529 if isEnabledForError(): 

530 recv_status = struct.unpack("!H", frame.data[0:2])[0] 

531 if recv_status >= 3000 and recv_status <= 4999: 

532 debug(f"close status: {repr(recv_status)}") 

533 elif recv_status != STATUS_NORMAL: 

534 error(f"close status: {repr(recv_status)}") 

535 break 

536 except: 

537 break 

538 self.sock.settimeout(sock_timeout) 

539 self.sock.shutdown(socket.SHUT_RDWR) 

540 except: 

541 pass 

542 

543 self.shutdown() 

544 

545 def abort(self): 

546 """ 

547 Low-level asynchronous abort, wakes up other threads that are waiting in recv_* 

548 """ 

549 if self.connected: 

550 self.sock.shutdown(socket.SHUT_RDWR) 

551 

552 def shutdown(self): 

553 """ 

554 close socket, immediately. 

555 """ 

556 if self.sock: 

557 self.sock.close() 

558 self.sock = None 

559 self.connected = False 

560 

561 def _send(self, data: Union[str, bytes]): 

562 if self.dispatcher: 

563 return self.dispatcher.send(self.sock, data) 

564 return send(self.sock, data) 

565 

566 def _recv(self, bufsize): 

567 try: 

568 return recv(self.sock, bufsize) 

569 except WebSocketConnectionClosedException: 

570 if self.sock: 

571 self.sock.close() 

572 self.sock = None 

573 self.connected = False 

574 raise 

575 

576 

577def create_connection(url: str, timeout=None, class_=WebSocket, **options): 

578 """ 

579 Connect to url and return websocket object. 

580 

581 Connect to url and return the WebSocket object. 

582 Passing optional timeout parameter will set the timeout on the socket. 

583 If no timeout is supplied, 

584 the global default timeout setting returned by getdefaulttimeout() is used. 

585 You can customize using 'options'. 

586 If you set "header" list object, you can set your own custom header. 

587 

588 >>> conn = create_connection("ws://echo.websocket.events", 

589 ... header=["User-Agent: MyProgram", 

590 ... "x-custom: header"]) 

591 

592 Parameters 

593 ---------- 

594 class_: class 

595 class to instantiate when creating the connection. It has to implement 

596 settimeout and connect. It's __init__ should be compatible with 

597 WebSocket.__init__, i.e. accept all of it's kwargs. 

598 header: list or dict 

599 custom http header list or dict. 

600 cookie: str 

601 Cookie value. 

602 origin: str 

603 custom origin url. 

604 suppress_origin: bool 

605 suppress outputting origin header. 

606 host: str 

607 custom host header string. 

608 timeout: int or float 

609 socket timeout time. This value could be either float/integer. 

610 If set to None, it uses the default_timeout value. 

611 http_proxy_host: str 

612 HTTP proxy host name. 

613 http_proxy_port: str or int 

614 HTTP proxy port. If not set, set to 80. 

615 http_no_proxy: list 

616 Whitelisted host names that don't use the proxy. 

617 http_proxy_auth: tuple 

618 HTTP proxy auth information. tuple of username and password. Default is None. 

619 http_proxy_timeout: int or float 

620 HTTP proxy timeout, default is 60 sec as per python-socks. 

621 enable_multithread: bool 

622 Enable lock for multithread. 

623 redirect_limit: int 

624 Number of redirects to follow. 

625 sockopt: tuple 

626 Values for socket.setsockopt. 

627 sockopt must be a tuple and each element is an argument of sock.setsockopt. 

628 sslopt: dict 

629 Optional dict object for ssl socket options. See FAQ for details. 

630 subprotocols: list 

631 List of available subprotocols. Default is None. 

632 skip_utf8_validation: bool 

633 Skip utf8 validation. 

634 socket: socket 

635 Pre-initialized stream socket. 

636 """ 

637 sockopt = options.pop("sockopt", []) 

638 sslopt = options.pop("sslopt", {}) 

639 fire_cont_frame = options.pop("fire_cont_frame", False) 

640 enable_multithread = options.pop("enable_multithread", True) 

641 skip_utf8_validation = options.pop("skip_utf8_validation", False) 

642 websock = class_( 

643 sockopt=sockopt, 

644 sslopt=sslopt, 

645 fire_cont_frame=fire_cont_frame, 

646 enable_multithread=enable_multithread, 

647 skip_utf8_validation=skip_utf8_validation, 

648 **options, 

649 ) 

650 websock.settimeout(timeout if timeout is not None else getdefaulttimeout()) 

651 websock.connect(url, **options) 

652 return websock