Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/sugar/socket.py: 32%

326 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1"""0MQ Socket pure Python methods.""" 

2 

3# Copyright (C) PyZMQ Developers 

4# Distributed under the terms of the Modified BSD License. 

5 

6 

7import errno 

8import pickle 

9import random 

10import sys 

11from typing import ( 

12 Any, 

13 Callable, 

14 Dict, 

15 Generic, 

16 List, 

17 Optional, 

18 Sequence, 

19 Type, 

20 TypeVar, 

21 Union, 

22 cast, 

23 overload, 

24) 

25from warnings import warn 

26 

27import zmq 

28from zmq._typing import Literal 

29from zmq.backend import Socket as SocketBase 

30from zmq.error import ZMQBindError, ZMQError 

31from zmq.utils import jsonapi 

32from zmq.utils.interop import cast_int_addr 

33 

34from ..constants import SocketOption, SocketType, _OptType 

35from .attrsettr import AttributeSetter 

36from .poll import Poller 

37 

38try: 

39 DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL 

40except AttributeError: 

41 DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL 

42 

43T = TypeVar("T", bound="Socket") 

44 

45 

46class _SocketContext(Generic[T]): 

47 """Context Manager for socket bind/unbind""" 

48 

49 socket: T 

50 kind: str 

51 addr: str 

52 

53 def __repr__(self): 

54 return f"<SocketContext({self.kind}={self.addr!r})>" 

55 

56 def __init__(self: "_SocketContext[T]", socket: T, kind: str, addr: str): 

57 assert kind in {"bind", "connect"} 

58 self.socket = socket 

59 self.kind = kind 

60 self.addr = addr 

61 

62 def __enter__(self: "_SocketContext[T]") -> T: 

63 return self.socket 

64 

65 def __exit__(self, *args): 

66 if self.socket.closed: 

67 return 

68 if self.kind == "bind": 

69 self.socket.unbind(self.addr) 

70 elif self.kind == "connect": 

71 self.socket.disconnect(self.addr) 

72 

73 

74ST = TypeVar("ST") 

75 

76 

77class Socket(SocketBase, AttributeSetter, Generic[ST]): 

78 """The ZMQ socket object 

79 

80 To create a Socket, first create a Context:: 

81 

82 ctx = zmq.Context.instance() 

83 

84 then call ``ctx.socket(socket_type)``:: 

85 

86 s = ctx.socket(zmq.ROUTER) 

87 

88 .. versionadded:: 25 

89 

90 Sockets can now be shadowed by passing another Socket. 

91 This helps in creating an async copy of a sync socket or vice versa:: 

92 

93 s = zmq.Socket(async_socket) 

94 

95 Which previously had to be:: 

96 

97 s = zmq.Socket.shadow(async_socket.underlying) 

98 """ 

99 

100 _shadow = False 

101 _shadow_obj = None 

102 _monitor_socket = None 

103 _type_name = 'UNKNOWN' 

104 

105 @overload 

106 def __init__( 

107 self: "Socket[bytes]", 

108 ctx_or_socket: "zmq.Context", 

109 socket_type: int, 

110 *, 

111 copy_threshold: Optional[int] = None, 

112 ): 

113 ... 

114 

115 @overload 

116 def __init__( 

117 self: "Socket[bytes]", 

118 *, 

119 shadow: Union["Socket", int], 

120 copy_threshold: Optional[int] = None, 

121 ): 

122 ... 

123 

124 @overload 

125 def __init__( 

126 self: "Socket[bytes]", 

127 ctx_or_socket: "Socket", 

128 ): 

129 ... 

130 

131 def __init__( 

132 self: "Socket[bytes]", 

133 ctx_or_socket: Optional[Union["zmq.Context", "Socket"]] = None, 

134 socket_type: int = 0, 

135 *, 

136 shadow: Union["Socket", int] = 0, 

137 copy_threshold: Optional[int] = None, 

138 ): 

139 if isinstance(ctx_or_socket, zmq.Socket): 

140 # positional Socket(other_socket) 

141 shadow = ctx_or_socket 

142 ctx_or_socket = None 

143 

144 shadow_address: int = 0 

145 

146 if shadow: 

147 self._shadow = True 

148 # hold a reference to the shadow object 

149 self._shadow_obj = shadow 

150 if not isinstance(shadow, int): 

151 try: 

152 shadow = cast(int, shadow.underlying) 

153 except AttributeError: 

154 pass 

155 shadow_address = cast_int_addr(shadow) 

156 else: 

157 self._shadow = False 

158 

159 super().__init__( 

160 ctx_or_socket, 

161 socket_type, 

162 shadow=shadow_address, 

163 copy_threshold=copy_threshold, 

164 ) 

165 

166 try: 

167 socket_type = cast(int, self.get(zmq.TYPE)) 

168 except Exception: 

169 pass 

170 else: 

171 try: 

172 self.__dict__["type"] = stype = SocketType(socket_type) 

173 except ValueError: 

174 self._type_name = str(socket_type) 

175 else: 

176 self._type_name = stype.name 

177 

178 def __del__(self): 

179 if not self._shadow and not self.closed: 

180 if warn is not None: 

181 # warn can be None during process teardown 

182 warn( 

183 f"Unclosed socket {self}", 

184 ResourceWarning, 

185 stacklevel=2, 

186 source=self, 

187 ) 

188 self.close() 

189 

190 _repr_cls = "zmq.Socket" 

191 

192 def __repr__(self): 

193 cls = self.__class__ 

194 # look up _repr_cls on exact class, not inherited 

195 _repr_cls = cls.__dict__.get("_repr_cls", None) 

196 if _repr_cls is None: 

197 _repr_cls = f"{cls.__module__}.{cls.__name__}" 

198 

199 closed = ' closed' if self._closed else '' 

200 

201 return f"<{_repr_cls}(zmq.{self._type_name}) at {hex(id(self))}{closed}>" 

202 

203 # socket as context manager: 

204 def __enter__(self: T) -> T: 

205 """Sockets are context managers 

206 

207 .. versionadded:: 14.4 

208 """ 

209 return self 

210 

211 def __exit__(self, *args, **kwargs): 

212 self.close() 

213 

214 # ------------------------------------------------------------------------- 

215 # Socket creation 

216 # ------------------------------------------------------------------------- 

217 

218 def __copy__(self: T, memo=None) -> T: 

219 """Copying a Socket creates a shadow copy""" 

220 return self.__class__.shadow(self.underlying) 

221 

222 __deepcopy__ = __copy__ 

223 

224 @classmethod 

225 def shadow(cls: Type[T], address: Union[int, "zmq.Socket"]) -> T: 

226 """Shadow an existing libzmq socket 

227 

228 address is a zmq.Socket or an integer (or FFI pointer) 

229 representing the address of the libzmq socket. 

230 

231 .. versionadded:: 14.1 

232 

233 .. versionadded:: 25 

234 Support for shadowing `zmq.Socket` objects, 

235 instead of just integer addresses. 

236 """ 

237 return cls(shadow=address) 

238 

239 def close(self, linger=None) -> None: 

240 """ 

241 Close the socket. 

242 

243 If linger is specified, LINGER sockopt will be set prior to closing. 

244 

245 Note: closing a zmq Socket may not close the underlying sockets 

246 if there are undelivered messages. 

247 Only after all messages are delivered or discarded by reaching the socket's LINGER timeout 

248 (default: forever) 

249 will the underlying sockets be closed. 

250 

251 This can be called to close the socket by hand. If this is not 

252 called, the socket will automatically be closed when it is 

253 garbage collected, 

254 in which case you may see a ResourceWarning about the unclosed socket. 

255 """ 

256 if self.context: 

257 self.context._rm_socket(self) 

258 super().close(linger=linger) 

259 

260 # ------------------------------------------------------------------------- 

261 # Connect/Bind context managers 

262 # ------------------------------------------------------------------------- 

263 

264 def _connect_cm(self: T, addr: str) -> _SocketContext[T]: 

265 """Context manager to disconnect on exit 

266 

267 .. versionadded:: 20.0 

268 """ 

269 return _SocketContext(self, 'connect', addr) 

270 

271 def _bind_cm(self: T, addr: str) -> _SocketContext[T]: 

272 """Context manager to unbind on exit 

273 

274 .. versionadded:: 20.0 

275 """ 

276 return _SocketContext(self, 'bind', addr) 

277 

278 def bind(self: T, addr: str) -> _SocketContext[T]: 

279 """s.bind(addr) 

280 

281 Bind the socket to an address. 

282 

283 This causes the socket to listen on a network port. Sockets on the 

284 other side of this connection will use ``Socket.connect(addr)`` to 

285 connect to this socket. 

286 

287 Returns a context manager which will call unbind on exit. 

288 

289 .. versionadded:: 20.0 

290 Can be used as a context manager. 

291 

292 Parameters 

293 ---------- 

294 addr : str 

295 The address string. This has the form 'protocol://interface:port', 

296 for example 'tcp://127.0.0.1:5555'. Protocols supported include 

297 tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is 

298 encoded to utf-8 first. 

299 

300 """ 

301 try: 

302 super().bind(addr) 

303 except ZMQError as e: 

304 e.strerror += f" (addr={addr!r})" 

305 raise 

306 return self._bind_cm(addr) 

307 

308 def connect(self: T, addr: str) -> _SocketContext[T]: 

309 """s.connect(addr) 

310 

311 Connect to a remote 0MQ socket. 

312 

313 Returns a context manager which will call disconnect on exit. 

314 

315 .. versionadded:: 20.0 

316 Can be used as a context manager. 

317 

318 Parameters 

319 ---------- 

320 addr : str 

321 The address string. This has the form 'protocol://interface:port', 

322 for example 'tcp://127.0.0.1:5555'. Protocols supported are 

323 tcp, udp, pgm, inproc and ipc. If the address is unicode, it is 

324 encoded to utf-8 first. 

325 

326 """ 

327 try: 

328 super().connect(addr) 

329 except ZMQError as e: 

330 e.strerror += f" (addr={addr!r})" 

331 raise 

332 return self._connect_cm(addr) 

333 

334 # ------------------------------------------------------------------------- 

335 # Deprecated aliases 

336 # ------------------------------------------------------------------------- 

337 

338 @property 

339 def socket_type(self) -> int: 

340 warn("Socket.socket_type is deprecated, use Socket.type", DeprecationWarning) 

341 return cast(int, self.type) 

342 

343 # ------------------------------------------------------------------------- 

344 # Hooks for sockopt completion 

345 # ------------------------------------------------------------------------- 

346 

347 def __dir__(self): 

348 keys = dir(self.__class__) 

349 keys.extend(SocketOption.__members__) 

350 return keys 

351 

352 # ------------------------------------------------------------------------- 

353 # Getting/Setting options 

354 # ------------------------------------------------------------------------- 

355 setsockopt = SocketBase.set 

356 getsockopt = SocketBase.get 

357 

358 def __setattr__(self, key, value): 

359 """Override to allow setting zmq.[UN]SUBSCRIBE even though we have a subscribe method""" 

360 if key in self.__dict__: 

361 object.__setattr__(self, key, value) 

362 return 

363 _key = key.lower() 

364 if _key in ('subscribe', 'unsubscribe'): 

365 if isinstance(value, str): 

366 value = value.encode('utf8') 

367 if _key == 'subscribe': 

368 self.set(zmq.SUBSCRIBE, value) 

369 else: 

370 self.set(zmq.UNSUBSCRIBE, value) 

371 return 

372 super().__setattr__(key, value) 

373 

374 def fileno(self) -> int: 

375 """Return edge-triggered file descriptor for this socket. 

376 

377 This is a read-only edge-triggered file descriptor for both read and write events on this socket. 

378 It is important that all available events be consumed when an event is detected, 

379 otherwise the read event will not trigger again. 

380 

381 .. versionadded:: 17.0 

382 """ 

383 return self.FD 

384 

385 def subscribe(self, topic: Union[str, bytes]) -> None: 

386 """Subscribe to a topic 

387 

388 Only for SUB sockets. 

389 

390 .. versionadded:: 15.3 

391 """ 

392 if isinstance(topic, str): 

393 topic = topic.encode('utf8') 

394 self.set(zmq.SUBSCRIBE, topic) 

395 

396 def unsubscribe(self, topic: Union[str, bytes]) -> None: 

397 """Unsubscribe from a topic 

398 

399 Only for SUB sockets. 

400 

401 .. versionadded:: 15.3 

402 """ 

403 if isinstance(topic, str): 

404 topic = topic.encode('utf8') 

405 self.set(zmq.UNSUBSCRIBE, topic) 

406 

407 def set_string(self, option: int, optval: str, encoding='utf-8') -> None: 

408 """Set socket options with a unicode object. 

409 

410 This is simply a wrapper for setsockopt to protect from encoding ambiguity. 

411 

412 See the 0MQ documentation for details on specific options. 

413 

414 Parameters 

415 ---------- 

416 option : int 

417 The name of the option to set. Can be any of: SUBSCRIBE, 

418 UNSUBSCRIBE, IDENTITY 

419 optval : str 

420 The value of the option to set. 

421 encoding : str 

422 The encoding to be used, default is utf8 

423 """ 

424 if not isinstance(optval, str): 

425 raise TypeError(f"strings only, not {type(optval)}: {optval!r}") 

426 return self.set(option, optval.encode(encoding)) 

427 

428 setsockopt_unicode = setsockopt_string = set_string 

429 

430 def get_string(self, option: int, encoding='utf-8') -> str: 

431 """Get the value of a socket option. 

432 

433 See the 0MQ documentation for details on specific options. 

434 

435 Parameters 

436 ---------- 

437 option : int 

438 The option to retrieve. 

439 

440 Returns 

441 ------- 

442 optval : str 

443 The value of the option as a unicode string. 

444 """ 

445 if SocketOption(option)._opt_type != _OptType.bytes: 

446 raise TypeError(f"option {option} will not return a string to be decoded") 

447 return cast(bytes, self.get(option)).decode(encoding) 

448 

449 getsockopt_unicode = getsockopt_string = get_string 

450 

451 def bind_to_random_port( 

452 self: T, 

453 addr: str, 

454 min_port: int = 49152, 

455 max_port: int = 65536, 

456 max_tries: int = 100, 

457 ) -> int: 

458 """Bind this socket to a random port in a range. 

459 

460 If the port range is unspecified, the system will choose the port. 

461 

462 Parameters 

463 ---------- 

464 addr : str 

465 The address string without the port to pass to ``Socket.bind()``. 

466 min_port : int, optional 

467 The minimum port in the range of ports to try (inclusive). 

468 max_port : int, optional 

469 The maximum port in the range of ports to try (exclusive). 

470 max_tries : int, optional 

471 The maximum number of bind attempts to make. 

472 

473 Returns 

474 ------- 

475 port : int 

476 The port the socket was bound to. 

477 

478 Raises 

479 ------ 

480 ZMQBindError 

481 if `max_tries` reached before successful bind 

482 """ 

483 if ( 

484 (zmq.zmq_version_info() >= (3, 2)) 

485 and min_port == 49152 

486 and max_port == 65536 

487 ): 

488 # if LAST_ENDPOINT is supported, and min_port / max_port weren't specified, 

489 # we can bind to port 0 and let the OS do the work 

490 self.bind("%s:*" % addr) 

491 url = cast(bytes, self.last_endpoint).decode('ascii', 'replace') 

492 _, port_s = url.rsplit(':', 1) 

493 return int(port_s) 

494 

495 for i in range(max_tries): 

496 try: 

497 port = random.randrange(min_port, max_port) 

498 self.bind(f'{addr}:{port}') 

499 except ZMQError as exception: 

500 en = exception.errno 

501 if en == zmq.EADDRINUSE: 

502 continue 

503 elif sys.platform == 'win32' and en == errno.EACCES: 

504 continue 

505 else: 

506 raise 

507 else: 

508 return port 

509 raise ZMQBindError("Could not bind socket to random port.") 

510 

511 def get_hwm(self) -> int: 

512 """Get the High Water Mark. 

513 

514 On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM 

515 """ 

516 major = zmq.zmq_version_info()[0] 

517 if major >= 3: 

518 # return sndhwm, fallback on rcvhwm 

519 try: 

520 return cast(int, self.get(zmq.SNDHWM)) 

521 except zmq.ZMQError: 

522 pass 

523 

524 return cast(int, self.get(zmq.RCVHWM)) 

525 else: 

526 return cast(int, self.get(zmq.HWM)) 

527 

528 def set_hwm(self, value: int) -> None: 

529 """Set the High Water Mark. 

530 

531 On libzmq ≥ 3, this sets both SNDHWM and RCVHWM 

532 

533 

534 .. warning:: 

535 

536 New values only take effect for subsequent socket 

537 bind/connects. 

538 """ 

539 major = zmq.zmq_version_info()[0] 

540 if major >= 3: 

541 raised = None 

542 try: 

543 self.sndhwm = value 

544 except Exception as e: 

545 raised = e 

546 try: 

547 self.rcvhwm = value 

548 except Exception as e: 

549 raised = e 

550 

551 if raised: 

552 raise raised 

553 else: 

554 self.set(zmq.HWM, value) 

555 

556 hwm = property( 

557 get_hwm, 

558 set_hwm, 

559 None, 

560 """Property for High Water Mark. 

561 

562 Setting hwm sets both SNDHWM and RCVHWM as appropriate. 

563 It gets SNDHWM if available, otherwise RCVHWM. 

564 """, 

565 ) 

566 

567 # ------------------------------------------------------------------------- 

568 # Sending and receiving messages 

569 # ------------------------------------------------------------------------- 

570 

571 @overload 

572 def send( 

573 self, 

574 data: Any, 

575 flags: int = ..., 

576 copy: bool = ..., 

577 *, 

578 track: Literal[True], 

579 routing_id: Optional[int] = ..., 

580 group: Optional[str] = ..., 

581 ) -> "zmq.MessageTracker": 

582 ... 

583 

584 @overload 

585 def send( 

586 self, 

587 data: Any, 

588 flags: int = ..., 

589 copy: bool = ..., 

590 *, 

591 track: Literal[False], 

592 routing_id: Optional[int] = ..., 

593 group: Optional[str] = ..., 

594 ) -> None: 

595 ... 

596 

597 @overload 

598 def send( 

599 self, 

600 data: Any, 

601 flags: int = ..., 

602 *, 

603 copy: bool = ..., 

604 routing_id: Optional[int] = ..., 

605 group: Optional[str] = ..., 

606 ) -> None: 

607 ... 

608 

609 @overload 

610 def send( 

611 self, 

612 data: Any, 

613 flags: int = ..., 

614 copy: bool = ..., 

615 track: bool = ..., 

616 routing_id: Optional[int] = ..., 

617 group: Optional[str] = ..., 

618 ) -> Optional["zmq.MessageTracker"]: 

619 ... 

620 

621 def send( 

622 self, 

623 data: Any, 

624 flags: int = 0, 

625 copy: bool = True, 

626 track: bool = False, 

627 routing_id: Optional[int] = None, 

628 group: Optional[str] = None, 

629 ) -> Optional["zmq.MessageTracker"]: 

630 """Send a single zmq message frame on this socket. 

631 

632 This queues the message to be sent by the IO thread at a later time. 

633 

634 With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full; 

635 otherwise, this waits until space is available. 

636 See :class:`Poller` for more general non-blocking I/O. 

637 

638 Parameters 

639 ---------- 

640 data : bytes, Frame, memoryview 

641 The content of the message. This can be any object that provides 

642 the Python buffer API (i.e. `memoryview(data)` can be called). 

643 flags : int 

644 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE. 

645 copy : bool 

646 Should the message be sent in a copying or non-copying manner. 

647 track : bool 

648 Should the message be tracked for notification that ZMQ has 

649 finished with it? (ignored if copy=True) 

650 routing_id : int 

651 For use with SERVER sockets 

652 group : str 

653 For use with RADIO sockets 

654 

655 Returns 

656 ------- 

657 None : if `copy` or not track 

658 None if message was sent, raises an exception otherwise. 

659 MessageTracker : if track and not copy 

660 a MessageTracker object, whose `pending` property will 

661 be True until the send is completed. 

662 

663 Raises 

664 ------ 

665 TypeError 

666 If a unicode object is passed 

667 ValueError 

668 If `track=True`, but an untracked Frame is passed. 

669 ZMQError 

670 If the send does not succeed for any reason (including 

671 if NOBLOCK is set and the outgoing queue is full). 

672 

673 

674 .. versionchanged:: 17.0 

675 

676 DRAFT support for routing_id and group arguments. 

677 """ 

678 if routing_id is not None: 

679 if not isinstance(data, zmq.Frame): 

680 data = zmq.Frame( 

681 data, 

682 track=track, 

683 copy=copy or None, 

684 copy_threshold=self.copy_threshold, 

685 ) 

686 data.routing_id = routing_id 

687 if group is not None: 

688 if not isinstance(data, zmq.Frame): 

689 data = zmq.Frame( 

690 data, 

691 track=track, 

692 copy=copy or None, 

693 copy_threshold=self.copy_threshold, 

694 ) 

695 data.group = group 

696 return super().send(data, flags=flags, copy=copy, track=track) 

697 

698 def send_multipart( 

699 self, 

700 msg_parts: Sequence, 

701 flags: int = 0, 

702 copy: bool = True, 

703 track: bool = False, 

704 **kwargs, 

705 ): 

706 """Send a sequence of buffers as a multipart message. 

707 

708 The zmq.SNDMORE flag is added to all msg parts before the last. 

709 

710 Parameters 

711 ---------- 

712 msg_parts : iterable 

713 A sequence of objects to send as a multipart message. Each element 

714 can be any sendable object (Frame, bytes, buffer-providers) 

715 flags : int, optional 

716 Any valid flags for :func:`Socket.send`. 

717 SNDMORE is added automatically for frames before the last. 

718 copy : bool, optional 

719 Should the frame(s) be sent in a copying or non-copying manner. 

720 If copy=False, frames smaller than self.copy_threshold bytes 

721 will be copied anyway. 

722 track : bool, optional 

723 Should the frame(s) be tracked for notification that ZMQ has 

724 finished with it (ignored if copy=True). 

725 

726 Returns 

727 ------- 

728 None : if copy or not track 

729 MessageTracker : if track and not copy 

730 a MessageTracker object, whose `pending` property will 

731 be True until the last send is completed. 

732 """ 

733 # typecheck parts before sending: 

734 for i, msg in enumerate(msg_parts): 

735 if isinstance(msg, (zmq.Frame, bytes, memoryview)): 

736 continue 

737 try: 

738 memoryview(msg) 

739 except Exception: 

740 rmsg = repr(msg) 

741 if len(rmsg) > 32: 

742 rmsg = rmsg[:32] + '...' 

743 raise TypeError( 

744 "Frame %i (%s) does not support the buffer interface." 

745 % ( 

746 i, 

747 rmsg, 

748 ) 

749 ) 

750 for msg in msg_parts[:-1]: 

751 self.send(msg, zmq.SNDMORE | flags, copy=copy, track=track) 

752 # Send the last part without the extra SNDMORE flag. 

753 return self.send(msg_parts[-1], flags, copy=copy, track=track) 

754 

755 @overload 

756 def recv_multipart( 

757 self, flags: int = ..., *, copy: Literal[True], track: bool = ... 

758 ) -> List[bytes]: 

759 ... 

760 

761 @overload 

762 def recv_multipart( 

763 self, flags: int = ..., *, copy: Literal[False], track: bool = ... 

764 ) -> List[zmq.Frame]: 

765 ... 

766 

767 @overload 

768 def recv_multipart(self, flags: int = ..., *, track: bool = ...) -> List[bytes]: 

769 ... 

770 

771 @overload 

772 def recv_multipart( 

773 self, flags: int = 0, copy: bool = True, track: bool = False 

774 ) -> Union[List[zmq.Frame], List[bytes]]: 

775 ... 

776 

777 def recv_multipart( 

778 self, flags: int = 0, copy: bool = True, track: bool = False 

779 ) -> Union[List[zmq.Frame], List[bytes]]: 

780 """Receive a multipart message as a list of bytes or Frame objects 

781 

782 Parameters 

783 ---------- 

784 flags : int, optional 

785 Any valid flags for :func:`Socket.recv`. 

786 copy : bool, optional 

787 Should the message frame(s) be received in a copying or non-copying manner? 

788 If False a Frame object is returned for each part, if True a copy of 

789 the bytes is made for each frame. 

790 track : bool, optional 

791 Should the message frame(s) be tracked for notification that ZMQ has 

792 finished with it? (ignored if copy=True) 

793 

794 Returns 

795 ------- 

796 msg_parts : list 

797 A list of frames in the multipart message; either Frames or bytes, 

798 depending on `copy`. 

799 

800 Raises 

801 ------ 

802 ZMQError 

803 for any of the reasons :func:`~Socket.recv` might fail 

804 """ 

805 parts = [self.recv(flags, copy=copy, track=track)] 

806 # have first part already, only loop while more to receive 

807 while self.getsockopt(zmq.RCVMORE): 

808 part = self.recv(flags, copy=copy, track=track) 

809 parts.append(part) 

810 # cast List[Union] to Union[List] 

811 # how do we get mypy to recognize that return type is invariant on `copy`? 

812 return cast(Union[List[zmq.Frame], List[bytes]], parts) 

813 

814 def _deserialize( 

815 self, 

816 recvd: bytes, 

817 load: Callable[[bytes], Any], 

818 ) -> Any: 

819 """Deserialize a received message 

820 

821 Override in subclass (e.g. Futures) if recvd is not the raw bytes. 

822 

823 The default implementation expects bytes and returns the deserialized message immediately. 

824 

825 Parameters 

826 ---------- 

827 

828 load: callable 

829 Callable that deserializes bytes 

830 recvd: 

831 The object returned by self.recv 

832 

833 """ 

834 return load(recvd) 

835 

836 def send_serialized(self, msg, serialize, flags=0, copy=True, **kwargs): 

837 """Send a message with a custom serialization function. 

838 

839 .. versionadded:: 17 

840 

841 Parameters 

842 ---------- 

843 msg : The message to be sent. Can be any object serializable by `serialize`. 

844 serialize : callable 

845 The serialization function to use. 

846 serialize(msg) should return an iterable of sendable message frames 

847 (e.g. bytes objects), which will be passed to send_multipart. 

848 flags : int, optional 

849 Any valid flags for :func:`Socket.send`. 

850 copy : bool, optional 

851 Whether to copy the frames. 

852 

853 """ 

854 frames = serialize(msg) 

855 return self.send_multipart(frames, flags=flags, copy=copy, **kwargs) 

856 

857 def recv_serialized(self, deserialize, flags=0, copy=True): 

858 """Receive a message with a custom deserialization function. 

859 

860 .. versionadded:: 17 

861 

862 Parameters 

863 ---------- 

864 deserialize : callable 

865 The deserialization function to use. 

866 deserialize will be called with one argument: the list of frames 

867 returned by recv_multipart() and can return any object. 

868 flags : int, optional 

869 Any valid flags for :func:`Socket.recv`. 

870 copy : bool, optional 

871 Whether to recv bytes or Frame objects. 

872 

873 Returns 

874 ------- 

875 obj : object 

876 The object returned by the deserialization function. 

877 

878 Raises 

879 ------ 

880 ZMQError 

881 for any of the reasons :func:`~Socket.recv` might fail 

882 """ 

883 frames = self.recv_multipart(flags=flags, copy=copy) 

884 return self._deserialize(frames, deserialize) 

885 

886 def send_string( 

887 self, 

888 u: str, 

889 flags: int = 0, 

890 copy: bool = True, 

891 encoding: str = 'utf-8', 

892 **kwargs, 

893 ) -> Optional["zmq.Frame"]: 

894 """Send a Python unicode string as a message with an encoding. 

895 

896 0MQ communicates with raw bytes, so you must encode/decode 

897 text (str) around 0MQ. 

898 

899 Parameters 

900 ---------- 

901 u : str 

902 The unicode string to send. 

903 flags : int, optional 

904 Any valid flags for :func:`Socket.send`. 

905 encoding : str [default: 'utf-8'] 

906 The encoding to be used 

907 """ 

908 if not isinstance(u, str): 

909 raise TypeError("str objects only") 

910 return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs) 

911 

912 send_unicode = send_string 

913 

914 def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str: 

915 """Receive a unicode string, as sent by send_string. 

916 

917 Parameters 

918 ---------- 

919 flags : int 

920 Any valid flags for :func:`Socket.recv`. 

921 encoding : str [default: 'utf-8'] 

922 The encoding to be used 

923 

924 Returns 

925 ------- 

926 s : str 

927 The Python unicode string that arrives as encoded bytes. 

928 

929 Raises 

930 ------ 

931 ZMQError 

932 for any of the reasons :func:`~Socket.recv` might fail 

933 """ 

934 msg = self.recv(flags=flags) 

935 return self._deserialize(msg, lambda buf: buf.decode(encoding)) 

936 

937 recv_unicode = recv_string 

938 

939 def send_pyobj( 

940 self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, **kwargs 

941 ) -> Optional[zmq.Frame]: 

942 """Send a Python object as a message using pickle to serialize. 

943 

944 Parameters 

945 ---------- 

946 obj : Python object 

947 The Python object to send. 

948 flags : int 

949 Any valid flags for :func:`Socket.send`. 

950 protocol : int 

951 The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL 

952 where defined, and pickle.HIGHEST_PROTOCOL elsewhere. 

953 """ 

954 msg = pickle.dumps(obj, protocol) 

955 return self.send(msg, flags=flags, **kwargs) 

956 

957 def recv_pyobj(self, flags: int = 0) -> Any: 

958 """Receive a Python object as a message using pickle to serialize. 

959 

960 Parameters 

961 ---------- 

962 flags : int 

963 Any valid flags for :func:`Socket.recv`. 

964 

965 Returns 

966 ------- 

967 obj : Python object 

968 The Python object that arrives as a message. 

969 

970 Raises 

971 ------ 

972 ZMQError 

973 for any of the reasons :func:`~Socket.recv` might fail 

974 """ 

975 msg = self.recv(flags) 

976 return self._deserialize(msg, pickle.loads) 

977 

978 def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None: 

979 """Send a Python object as a message using json to serialize. 

980 

981 Keyword arguments are passed on to json.dumps 

982 

983 Parameters 

984 ---------- 

985 obj : Python object 

986 The Python object to send 

987 flags : int 

988 Any valid flags for :func:`Socket.send` 

989 """ 

990 send_kwargs = {} 

991 for key in ('routing_id', 'group'): 

992 if key in kwargs: 

993 send_kwargs[key] = kwargs.pop(key) 

994 msg = jsonapi.dumps(obj, **kwargs) 

995 return self.send(msg, flags=flags, **send_kwargs) 

996 

997 def recv_json(self, flags: int = 0, **kwargs) -> Union[List, str, int, float, Dict]: 

998 """Receive a Python object as a message using json to serialize. 

999 

1000 Keyword arguments are passed on to json.loads 

1001 

1002 Parameters 

1003 ---------- 

1004 flags : int 

1005 Any valid flags for :func:`Socket.recv`. 

1006 

1007 Returns 

1008 ------- 

1009 obj : Python object 

1010 The Python object that arrives as a message. 

1011 

1012 Raises 

1013 ------ 

1014 ZMQError 

1015 for any of the reasons :func:`~Socket.recv` might fail 

1016 """ 

1017 msg = self.recv(flags) 

1018 return self._deserialize(msg, lambda buf: jsonapi.loads(buf, **kwargs)) 

1019 

1020 _poller_class = Poller 

1021 

1022 def poll(self, timeout=None, flags=zmq.POLLIN) -> int: 

1023 """Poll the socket for events. 

1024 See :class:`Poller` to wait for multiple sockets at once. 

1025 

1026 Parameters 

1027 ---------- 

1028 timeout : int [default: None] 

1029 The timeout (in milliseconds) to wait for an event. If unspecified 

1030 (or specified None), will wait forever for an event. 

1031 flags : int [default: POLLIN] 

1032 POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for. 

1033 

1034 Returns 

1035 ------- 

1036 event_mask : int 

1037 The poll event mask (POLLIN, POLLOUT), 

1038 0 if the timeout was reached without an event. 

1039 """ 

1040 

1041 if self.closed: 

1042 raise ZMQError(zmq.ENOTSUP) 

1043 

1044 p = self._poller_class() 

1045 p.register(self, flags) 

1046 evts = dict(p.poll(timeout)) 

1047 # return 0 if no events, otherwise return event bitfield 

1048 return evts.get(self, 0) 

1049 

1050 def get_monitor_socket( 

1051 self: T, events: Optional[int] = None, addr: Optional[str] = None 

1052 ) -> T: 

1053 """Return a connected PAIR socket ready to receive the event notifications. 

1054 

1055 .. versionadded:: libzmq-4.0 

1056 .. versionadded:: 14.0 

1057 

1058 Parameters 

1059 ---------- 

1060 events : int [default: ZMQ_EVENT_ALL] 

1061 The bitmask defining which events are wanted. 

1062 addr : string [default: None] 

1063 The optional endpoint for the monitoring sockets. 

1064 

1065 Returns 

1066 ------- 

1067 socket : (PAIR) 

1068 The socket is already connected and ready to receive messages. 

1069 """ 

1070 # safe-guard, method only available on libzmq >= 4 

1071 if zmq.zmq_version_info() < (4,): 

1072 raise NotImplementedError( 

1073 "get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version() 

1074 ) 

1075 

1076 # if already monitoring, return existing socket 

1077 if self._monitor_socket: 

1078 if self._monitor_socket.closed: 

1079 self._monitor_socket = None 

1080 else: 

1081 return self._monitor_socket 

1082 

1083 if addr is None: 

1084 # create endpoint name from internal fd 

1085 addr = f"inproc://monitor.s-{self.FD}" 

1086 if events is None: 

1087 # use all events 

1088 events = zmq.EVENT_ALL 

1089 # attach monitoring socket 

1090 self.monitor(addr, events) 

1091 # create new PAIR socket and connect it 

1092 self._monitor_socket = self.context.socket(zmq.PAIR) 

1093 self._monitor_socket.connect(addr) 

1094 return self._monitor_socket 

1095 

1096 def disable_monitor(self) -> None: 

1097 """Shutdown the PAIR socket (created using get_monitor_socket) 

1098 that is serving socket events. 

1099 

1100 .. versionadded:: 14.4 

1101 """ 

1102 self._monitor_socket = None 

1103 self.monitor(None, 0) 

1104 

1105 

1106__all__ = ['Socket']