Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/websocket/_core.py: 23%

207 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1import socket 

2import struct 

3import threading 

4import time 

5 

6# websocket modules 

7from ._abnf import * 

8from ._exceptions import * 

9from ._handshake import * 

10from ._http import * 

11from ._logging import * 

12from ._socket import * 

13from ._ssl_compat import * 

14from ._utils import * 

15 

16""" 

17_core.py 

18websocket - WebSocket client library for Python 

19 

20Copyright 2022 engn33r 

21 

22Licensed under the Apache License, Version 2.0 (the "License"); 

23you may not use this file except in compliance with the License. 

24You may obtain a copy of the License at 

25 

26 http://www.apache.org/licenses/LICENSE-2.0 

27 

28Unless required by applicable law or agreed to in writing, software 

29distributed under the License is distributed on an "AS IS" BASIS, 

30WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

31See the License for the specific language governing permissions and 

32limitations under the License. 

33""" 

34 

35__all__ = ['WebSocket', 'create_connection'] 

36 

37 

38class WebSocket: 

39 """ 

40 Low level WebSocket interface. 

41 

42 This class is based on the WebSocket protocol `draft-hixie-thewebsocketprotocol-76 <http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76>`_ 

43 

44 We can connect to the websocket server and send/receive data. 

45 The following example is an echo client. 

46 

47 >>> import websocket 

48 >>> ws = websocket.WebSocket() 

49 >>> ws.connect("ws://echo.websocket.events") 

50 >>> ws.recv() 

51 'echo.websocket.events sponsored by Lob.com' 

52 >>> ws.send("Hello, Server") 

53 19 

54 >>> ws.recv() 

55 'Hello, Server' 

56 >>> ws.close() 

57 

58 Parameters 

59 ---------- 

60 get_mask_key: func 

61 A callable function to get new mask keys, see the 

62 WebSocket.set_mask_key's docstring for more information. 

63 sockopt: tuple 

64 Values for socket.setsockopt. 

65 sockopt must be tuple and each element is argument of sock.setsockopt. 

66 sslopt: dict 

67 Optional dict object for ssl socket options. See FAQ for details. 

68 fire_cont_frame: bool 

69 Fire recv event for each cont frame. Default is False. 

70 enable_multithread: bool 

71 If set to True, lock send method. 

72 skip_utf8_validation: bool 

73 Skip utf8 validation. 

74 """ 

75 

76 def __init__(self, get_mask_key=None, sockopt=None, sslopt=None, 

77 fire_cont_frame=False, enable_multithread=True, 

78 skip_utf8_validation=False, **_): 

79 """ 

80 Initialize WebSocket object. 

81 

82 Parameters 

83 ---------- 

84 sslopt: dict 

85 Optional dict object for ssl socket options. See FAQ for details. 

86 """ 

87 self.sock_opt = sock_opt(sockopt, sslopt) 

88 self.handshake_response = None 

89 self.sock = None 

90 

91 self.connected = False 

92 self.get_mask_key = get_mask_key 

93 # These buffer over the build-up of a single frame. 

94 self.frame_buffer = frame_buffer(self._recv, skip_utf8_validation) 

95 self.cont_frame = continuous_frame( 

96 fire_cont_frame, skip_utf8_validation) 

97 

98 if enable_multithread: 

99 self.lock = threading.Lock() 

100 self.readlock = threading.Lock() 

101 else: 

102 self.lock = NoLock() 

103 self.readlock = NoLock() 

104 

105 def __iter__(self): 

106 """ 

107 Allow iteration over websocket, implying sequential `recv` executions. 

108 """ 

109 while True: 

110 yield self.recv() 

111 

112 def __next__(self): 

113 return self.recv() 

114 

115 def next(self): 

116 return self.__next__() 

117 

118 def fileno(self): 

119 return self.sock.fileno() 

120 

121 def set_mask_key(self, func): 

122 """ 

123 Set function to create mask key. You can customize mask key generator. 

124 Mainly, this is for testing purpose. 

125 

126 Parameters 

127 ---------- 

128 func: func 

129 callable object. the func takes 1 argument as integer. 

130 The argument means length of mask key. 

131 This func must return string(byte array), 

132 which length is argument specified. 

133 """ 

134 self.get_mask_key = func 

135 

136 def gettimeout(self): 

137 """ 

138 Get the websocket timeout (in seconds) as an int or float 

139 

140 Returns 

141 ---------- 

142 timeout: int or float 

143 returns timeout value (in seconds). This value could be either float/integer. 

144 """ 

145 return self.sock_opt.timeout 

146 

147 def settimeout(self, timeout): 

148 """ 

149 Set the timeout to the websocket. 

150 

151 Parameters 

152 ---------- 

153 timeout: int or float 

154 timeout time (in seconds). This value could be either float/integer. 

155 """ 

156 self.sock_opt.timeout = timeout 

157 if self.sock: 

158 self.sock.settimeout(timeout) 

159 

160 timeout = property(gettimeout, settimeout) 

161 

162 def getsubprotocol(self): 

163 """ 

164 Get subprotocol 

165 """ 

166 if self.handshake_response: 

167 return self.handshake_response.subprotocol 

168 else: 

169 return None 

170 

171 subprotocol = property(getsubprotocol) 

172 

173 def getstatus(self): 

174 """ 

175 Get handshake status 

176 """ 

177 if self.handshake_response: 

178 return self.handshake_response.status 

179 else: 

180 return None 

181 

182 status = property(getstatus) 

183 

184 def getheaders(self): 

185 """ 

186 Get handshake response header 

187 """ 

188 if self.handshake_response: 

189 return self.handshake_response.headers 

190 else: 

191 return None 

192 

193 def is_ssl(self): 

194 try: 

195 return isinstance(self.sock, ssl.SSLSocket) 

196 except: 

197 return False 

198 

199 headers = property(getheaders) 

200 

201 def connect(self, url, **options): 

202 """ 

203 Connect to url. url is websocket url scheme. 

204 ie. ws://host:port/resource 

205 You can customize using 'options'. 

206 If you set "header" list object, you can set your own custom header. 

207 

208 >>> ws = WebSocket() 

209 >>> ws.connect("ws://echo.websocket.events", 

210 ... header=["User-Agent: MyProgram", 

211 ... "x-custom: header"]) 

212 

213 Parameters 

214 ---------- 

215 header: list or dict 

216 Custom http header list or dict. 

217 cookie: str 

218 Cookie value. 

219 origin: str 

220 Custom origin url. 

221 connection: str 

222 Custom connection header value. 

223 Default value "Upgrade" set in _handshake.py 

224 suppress_origin: bool 

225 Suppress outputting origin header. 

226 host: str 

227 Custom host header string. 

228 timeout: int or float 

229 Socket timeout time. This value is an integer or float. 

230 If you set None for this value, it means "use default_timeout value" 

231 http_proxy_host: str 

232 HTTP proxy host name. 

233 http_proxy_port: str or int 

234 HTTP proxy port. Default is 80. 

235 http_no_proxy: list 

236 Whitelisted host names that don't use the proxy. 

237 http_proxy_auth: tuple 

238 HTTP proxy auth information. Tuple of username and password. Default is None. 

239 http_proxy_timeout: int or float 

240 HTTP proxy timeout, default is 60 sec as per python-socks. 

241 redirect_limit: int 

242 Number of redirects to follow. 

243 subprotocols: list 

244 List of available subprotocols. Default is None. 

245 socket: socket 

246 Pre-initialized stream socket. 

247 """ 

248 self.sock_opt.timeout = options.get('timeout', self.sock_opt.timeout) 

249 self.sock, addrs = connect(url, self.sock_opt, proxy_info(**options), 

250 options.pop('socket', None)) 

251 

252 try: 

253 self.handshake_response = handshake(self.sock, url, *addrs, **options) 

254 for attempt in range(options.pop('redirect_limit', 3)): 

255 if self.handshake_response.status in SUPPORTED_REDIRECT_STATUSES: 

256 url = self.handshake_response.headers['location'] 

257 self.sock.close() 

258 self.sock, addrs = connect(url, self.sock_opt, proxy_info(**options), 

259 options.pop('socket', None)) 

260 self.handshake_response = handshake(self.sock, url, *addrs, **options) 

261 self.connected = True 

262 except: 

263 if self.sock: 

264 self.sock.close() 

265 self.sock = None 

266 raise 

267 

268 def send(self, payload, opcode=ABNF.OPCODE_TEXT): 

269 """ 

270 Send the data as string. 

271 

272 Parameters 

273 ---------- 

274 payload: str 

275 Payload must be utf-8 string or unicode, 

276 If the opcode is OPCODE_TEXT. 

277 Otherwise, it must be string(byte array). 

278 opcode: int 

279 Operation code (opcode) to send. 

280 """ 

281 

282 frame = ABNF.create_frame(payload, opcode) 

283 return self.send_frame(frame) 

284 

285 def send_frame(self, frame): 

286 """ 

287 Send the data frame. 

288 

289 >>> ws = create_connection("ws://echo.websocket.events") 

290 >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT) 

291 >>> ws.send_frame(frame) 

292 >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0) 

293 >>> ws.send_frame(frame) 

294 >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1) 

295 >>> ws.send_frame(frame) 

296 

297 Parameters 

298 ---------- 

299 frame: ABNF frame 

300 frame data created by ABNF.create_frame 

301 """ 

302 if self.get_mask_key: 

303 frame.get_mask_key = self.get_mask_key 

304 data = frame.format() 

305 length = len(data) 

306 if (isEnabledForTrace()): 

307 trace("++Sent raw: " + repr(data)) 

308 trace("++Sent decoded: " + frame.__str__()) 

309 with self.lock: 

310 while data: 

311 l = self._send(data) 

312 data = data[l:] 

313 

314 return length 

315 

316 def send_binary(self, payload): 

317 """ 

318 Send a binary message (OPCODE_BINARY). 

319 

320 Parameters 

321 ---------- 

322 payload: bytes 

323 payload of message to send. 

324 """ 

325 return self.send(payload, ABNF.OPCODE_BINARY) 

326 

327 def ping(self, payload=""): 

328 """ 

329 Send ping data. 

330 

331 Parameters 

332 ---------- 

333 payload: str 

334 data payload to send server. 

335 """ 

336 if isinstance(payload, str): 

337 payload = payload.encode("utf-8") 

338 self.send(payload, ABNF.OPCODE_PING) 

339 

340 def pong(self, payload=""): 

341 """ 

342 Send pong data. 

343 

344 Parameters 

345 ---------- 

346 payload: str 

347 data payload to send server. 

348 """ 

349 if isinstance(payload, str): 

350 payload = payload.encode("utf-8") 

351 self.send(payload, ABNF.OPCODE_PONG) 

352 

353 def recv(self): 

354 """ 

355 Receive string data(byte array) from the server. 

356 

357 Returns 

358 ---------- 

359 data: string (byte array) value. 

360 """ 

361 with self.readlock: 

362 opcode, data = self.recv_data() 

363 if opcode == ABNF.OPCODE_TEXT: 

364 return data.decode("utf-8") 

365 elif opcode == ABNF.OPCODE_TEXT or opcode == ABNF.OPCODE_BINARY: 

366 return data 

367 else: 

368 return '' 

369 

370 def recv_data(self, control_frame=False): 

371 """ 

372 Receive data with operation code. 

373 

374 Parameters 

375 ---------- 

376 control_frame: bool 

377 a boolean flag indicating whether to return control frame 

378 data, defaults to False 

379 

380 Returns 

381 ------- 

382 opcode, frame.data: tuple 

383 tuple of operation code and string(byte array) value. 

384 """ 

385 opcode, frame = self.recv_data_frame(control_frame) 

386 return opcode, frame.data 

387 

388 def recv_data_frame(self, control_frame=False): 

389 """ 

390 Receive data with operation code. 

391 

392 If a valid ping message is received, a pong response is sent. 

393 

394 Parameters 

395 ---------- 

396 control_frame: bool 

397 a boolean flag indicating whether to return control frame 

398 data, defaults to False 

399 

400 Returns 

401 ------- 

402 frame.opcode, frame: tuple 

403 tuple of operation code and string(byte array) value. 

404 """ 

405 while True: 

406 frame = self.recv_frame() 

407 if (isEnabledForTrace()): 

408 trace("++Rcv raw: " + repr(frame.format())) 

409 trace("++Rcv decoded: " + frame.__str__()) 

410 if not frame: 

411 # handle error: 

412 # 'NoneType' object has no attribute 'opcode' 

413 raise WebSocketProtocolException( 

414 "Not a valid frame {frame}".format(frame=frame)) 

415 elif frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY, ABNF.OPCODE_CONT): 

416 self.cont_frame.validate(frame) 

417 self.cont_frame.add(frame) 

418 

419 if self.cont_frame.is_fire(frame): 

420 return self.cont_frame.extract(frame) 

421 

422 elif frame.opcode == ABNF.OPCODE_CLOSE: 

423 self.send_close() 

424 return frame.opcode, frame 

425 elif frame.opcode == ABNF.OPCODE_PING: 

426 if len(frame.data) < 126: 

427 self.pong(frame.data) 

428 else: 

429 raise WebSocketProtocolException( 

430 "Ping message is too long") 

431 if control_frame: 

432 return frame.opcode, frame 

433 elif frame.opcode == ABNF.OPCODE_PONG: 

434 if control_frame: 

435 return frame.opcode, frame 

436 

437 def recv_frame(self): 

438 """ 

439 Receive data as frame from server. 

440 

441 Returns 

442 ------- 

443 self.frame_buffer.recv_frame(): ABNF frame object 

444 """ 

445 return self.frame_buffer.recv_frame() 

446 

447 def send_close(self, status=STATUS_NORMAL, reason=b""): 

448 """ 

449 Send close data to the server. 

450 

451 Parameters 

452 ---------- 

453 status: int 

454 Status code to send. See STATUS_XXX. 

455 reason: str or bytes 

456 The reason to close. This must be string or UTF-8 bytes. 

457 """ 

458 if status < 0 or status >= ABNF.LENGTH_16: 

459 raise ValueError("code is invalid range") 

460 self.connected = False 

461 self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE) 

462 

463 def close(self, status=STATUS_NORMAL, reason=b"", timeout=3): 

464 """ 

465 Close Websocket object 

466 

467 Parameters 

468 ---------- 

469 status: int 

470 Status code to send. See STATUS_XXX. 

471 reason: bytes 

472 The reason to close in UTF-8. 

473 timeout: int or float 

474 Timeout until receive a close frame. 

475 If None, it will wait forever until receive a close frame. 

476 """ 

477 if self.connected: 

478 if status < 0 or status >= ABNF.LENGTH_16: 

479 raise ValueError("code is invalid range") 

480 

481 try: 

482 self.connected = False 

483 self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE) 

484 sock_timeout = self.sock.gettimeout() 

485 self.sock.settimeout(timeout) 

486 start_time = time.time() 

487 while timeout is None or time.time() - start_time < timeout: 

488 try: 

489 frame = self.recv_frame() 

490 if frame.opcode != ABNF.OPCODE_CLOSE: 

491 continue 

492 if isEnabledForError(): 

493 recv_status = struct.unpack("!H", frame.data[0:2])[0] 

494 if recv_status >= 3000 and recv_status <= 4999: 

495 debug("close status: " + repr(recv_status)) 

496 elif recv_status != STATUS_NORMAL: 

497 error("close status: " + repr(recv_status)) 

498 break 

499 except: 

500 break 

501 self.sock.settimeout(sock_timeout) 

502 self.sock.shutdown(socket.SHUT_RDWR) 

503 except: 

504 pass 

505 

506 self.shutdown() 

507 

508 def abort(self): 

509 """ 

510 Low-level asynchronous abort, wakes up other threads that are waiting in recv_* 

511 """ 

512 if self.connected: 

513 self.sock.shutdown(socket.SHUT_RDWR) 

514 

515 def shutdown(self): 

516 """ 

517 close socket, immediately. 

518 """ 

519 if self.sock: 

520 self.sock.close() 

521 self.sock = None 

522 self.connected = False 

523 

524 def _send(self, data): 

525 return send(self.sock, data) 

526 

527 def _recv(self, bufsize): 

528 try: 

529 return recv(self.sock, bufsize) 

530 except WebSocketConnectionClosedException: 

531 if self.sock: 

532 self.sock.close() 

533 self.sock = None 

534 self.connected = False 

535 raise 

536 

537 

538def create_connection(url, timeout=None, class_=WebSocket, **options): 

539 """ 

540 Connect to url and return websocket object. 

541 

542 Connect to url and return the WebSocket object. 

543 Passing optional timeout parameter will set the timeout on the socket. 

544 If no timeout is supplied, 

545 the global default timeout setting returned by getdefaulttimeout() is used. 

546 You can customize using 'options'. 

547 If you set "header" list object, you can set your own custom header. 

548 

549 >>> conn = create_connection("ws://echo.websocket.events", 

550 ... header=["User-Agent: MyProgram", 

551 ... "x-custom: header"]) 

552 

553 Parameters 

554 ---------- 

555 class_: class 

556 class to instantiate when creating the connection. It has to implement 

557 settimeout and connect. It's __init__ should be compatible with 

558 WebSocket.__init__, i.e. accept all of it's kwargs. 

559 header: list or dict 

560 custom http header list or dict. 

561 cookie: str 

562 Cookie value. 

563 origin: str 

564 custom origin url. 

565 suppress_origin: bool 

566 suppress outputting origin header. 

567 host: str 

568 custom host header string. 

569 timeout: int or float 

570 socket timeout time. This value could be either float/integer. 

571 If set to None, it uses the default_timeout value. 

572 http_proxy_host: str 

573 HTTP proxy host name. 

574 http_proxy_port: str or int 

575 HTTP proxy port. If not set, set to 80. 

576 http_no_proxy: list 

577 Whitelisted host names that don't use the proxy. 

578 http_proxy_auth: tuple 

579 HTTP proxy auth information. tuple of username and password. Default is None. 

580 http_proxy_timeout: int or float 

581 HTTP proxy timeout, default is 60 sec as per python-socks. 

582 enable_multithread: bool 

583 Enable lock for multithread. 

584 redirect_limit: int 

585 Number of redirects to follow. 

586 sockopt: tuple 

587 Values for socket.setsockopt. 

588 sockopt must be a tuple and each element is an argument of sock.setsockopt. 

589 sslopt: dict 

590 Optional dict object for ssl socket options. See FAQ for details. 

591 subprotocols: list 

592 List of available subprotocols. Default is None. 

593 skip_utf8_validation: bool 

594 Skip utf8 validation. 

595 socket: socket 

596 Pre-initialized stream socket. 

597 """ 

598 sockopt = options.pop("sockopt", []) 

599 sslopt = options.pop("sslopt", {}) 

600 fire_cont_frame = options.pop("fire_cont_frame", False) 

601 enable_multithread = options.pop("enable_multithread", True) 

602 skip_utf8_validation = options.pop("skip_utf8_validation", False) 

603 websock = class_(sockopt=sockopt, sslopt=sslopt, 

604 fire_cont_frame=fire_cont_frame, 

605 enable_multithread=enable_multithread, 

606 skip_utf8_validation=skip_utf8_validation, **options) 

607 websock.settimeout(timeout if timeout is not None else getdefaulttimeout()) 

608 websock.connect(url, **options) 

609 return websock