Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/sugar/socket.py: 33%

318 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-04-10 06:20 +0000

1"""0MQ Socket pure Python methods.""" 

2 

3# Copyright (C) PyZMQ Developers 

4# Distributed under the terms of the Modified BSD License. 

5 

6 

7import errno 

8import pickle 

9import random 

10import sys 

11from typing import ( 

12 Any, 

13 Callable, 

14 Dict, 

15 Generic, 

16 List, 

17 Optional, 

18 Sequence, 

19 Type, 

20 TypeVar, 

21 Union, 

22 cast, 

23 overload, 

24) 

25from warnings import warn 

26 

27import zmq 

28from zmq._typing import Literal 

29from zmq.backend import Socket as SocketBase 

30from zmq.error import ZMQBindError, ZMQError 

31from zmq.utils import jsonapi 

32from zmq.utils.interop import cast_int_addr 

33 

34from ..constants import SocketOption, SocketType, _OptType 

35from .attrsettr import AttributeSetter 

36from .poll import Poller 

37 

38try: 

39 DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL 

40except AttributeError: 

41 DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL 

42 

43T = TypeVar("T", bound="Socket") 

44 

45 

46class _SocketContext(Generic[T]): 

47 """Context Manager for socket bind/unbind""" 

48 

49 socket: T 

50 kind: str 

51 addr: str 

52 

53 def __repr__(self): 

54 return f"<SocketContext({self.kind}={self.addr!r})>" 

55 

56 def __init__(self: "_SocketContext[T]", socket: T, kind: str, addr: str): 

57 assert kind in {"bind", "connect"} 

58 self.socket = socket 

59 self.kind = kind 

60 self.addr = addr 

61 

62 def __enter__(self: "_SocketContext[T]") -> T: 

63 return self.socket 

64 

65 def __exit__(self, *args): 

66 if self.socket.closed: 

67 return 

68 if self.kind == "bind": 

69 self.socket.unbind(self.addr) 

70 elif self.kind == "connect": 

71 self.socket.disconnect(self.addr) 

72 

73 

74ST = TypeVar("ST") 

75 

76 

77class Socket(SocketBase, AttributeSetter, Generic[ST]): 

78 """The ZMQ socket object 

79 

80 To create a Socket, first create a Context:: 

81 

82 ctx = zmq.Context.instance() 

83 

84 then call ``ctx.socket(socket_type)``:: 

85 

86 s = ctx.socket(zmq.ROUTER) 

87 

88 .. versionadded:: 25 

89 

90 Sockets can now be shadowed by passing another Socket. 

91 This helps in creating an async copy of a sync socket or vice versa:: 

92 

93 s = zmq.Socket(async_socket) 

94 

95 Which previously had to be:: 

96 

97 s = zmq.Socket.shadow(async_socket.underlying) 

98 """ 

99 

100 _shadow = False 

101 _shadow_obj = None 

102 _monitor_socket = None 

103 _type_name = 'UNKNOWN' 

104 

105 @overload 

106 def __init__( 

107 self: "Socket[bytes]", 

108 ctx_or_socket: "zmq.Context", 

109 socket_type: int, 

110 *, 

111 copy_threshold: Optional[int] = None, 

112 ): 

113 ... 

114 

115 @overload 

116 def __init__( 

117 self: "Socket[bytes]", 

118 *, 

119 shadow: Union["Socket", int], 

120 copy_threshold: Optional[int] = None, 

121 ): 

122 ... 

123 

124 @overload 

125 def __init__( 

126 self: "Socket[bytes]", 

127 ctx_or_socket: "Socket", 

128 ): 

129 ... 

130 

131 def __init__( 

132 self: "Socket[bytes]", 

133 ctx_or_socket: Optional[Union["zmq.Context", "Socket"]] = None, 

134 socket_type: int = 0, 

135 *, 

136 shadow: Union["Socket", int] = 0, 

137 copy_threshold: Optional[int] = None, 

138 ): 

139 if isinstance(ctx_or_socket, zmq.Socket): 

140 # positional Socket(other_socket) 

141 shadow = ctx_or_socket 

142 ctx_or_socket = None 

143 

144 shadow_address: int = 0 

145 

146 if shadow: 

147 self._shadow = True 

148 # hold a reference to the shadow object 

149 self._shadow_obj = shadow 

150 if not isinstance(shadow, int): 

151 try: 

152 shadow = cast(int, shadow.underlying) 

153 except AttributeError: 

154 pass 

155 shadow_address = cast_int_addr(shadow) 

156 else: 

157 self._shadow = False 

158 

159 super().__init__( 

160 ctx_or_socket, 

161 socket_type, 

162 shadow=shadow_address, 

163 copy_threshold=copy_threshold, 

164 ) 

165 

166 try: 

167 socket_type = cast(int, self.get(zmq.TYPE)) 

168 except Exception: 

169 pass 

170 else: 

171 try: 

172 self.__dict__["type"] = stype = SocketType(socket_type) 

173 except ValueError: 

174 self._type_name = str(socket_type) 

175 else: 

176 self._type_name = stype.name 

177 

178 def __del__(self): 

179 if not self._shadow and not self.closed: 

180 if warn is not None: 

181 # warn can be None during process teardown 

182 warn( 

183 f"Unclosed socket {self}", 

184 ResourceWarning, 

185 stacklevel=2, 

186 source=self, 

187 ) 

188 self.close() 

189 

190 _repr_cls = "zmq.Socket" 

191 

192 def __repr__(self): 

193 cls = self.__class__ 

194 # look up _repr_cls on exact class, not inherited 

195 _repr_cls = cls.__dict__.get("_repr_cls", None) 

196 if _repr_cls is None: 

197 _repr_cls = f"{cls.__module__}.{cls.__name__}" 

198 

199 closed = ' closed' if self._closed else '' 

200 

201 return f"<{_repr_cls}(zmq.{self._type_name}) at {hex(id(self))}{closed}>" 

202 

203 # socket as context manager: 

204 def __enter__(self: T) -> T: 

205 """Sockets are context managers 

206 

207 .. versionadded:: 14.4 

208 """ 

209 return self 

210 

211 def __exit__(self, *args, **kwargs): 

212 self.close() 

213 

214 # ------------------------------------------------------------------------- 

215 # Socket creation 

216 # ------------------------------------------------------------------------- 

217 

218 def __copy__(self: T, memo=None) -> T: 

219 """Copying a Socket creates a shadow copy""" 

220 return self.__class__.shadow(self.underlying) 

221 

222 __deepcopy__ = __copy__ 

223 

224 @classmethod 

225 def shadow(cls: Type[T], address: Union[int, "zmq.Socket"]) -> T: 

226 """Shadow an existing libzmq socket 

227 

228 address is a zmq.Socket or an integer (or FFI pointer) 

229 representing the address of the libzmq socket. 

230 

231 .. versionadded:: 14.1 

232 

233 .. versionadded:: 25 

234 Support for shadowing `zmq.Socket` objects, 

235 instead of just integer addresses. 

236 """ 

237 return cls(shadow=address) 

238 

239 def close(self, linger=None) -> None: 

240 """ 

241 Close the socket. 

242 

243 If linger is specified, LINGER sockopt will be set prior to closing. 

244 

245 Note: closing a zmq Socket may not close the underlying sockets 

246 if there are undelivered messages. 

247 Only after all messages are delivered or discarded by reaching the socket's LINGER timeout 

248 (default: forever) 

249 will the underlying sockets be closed. 

250 

251 This can be called to close the socket by hand. If this is not 

252 called, the socket will automatically be closed when it is 

253 garbage collected, 

254 in which case you may see a ResourceWarning about the unclosed socket. 

255 """ 

256 if self.context: 

257 self.context._rm_socket(self) 

258 super().close(linger=linger) 

259 

260 # ------------------------------------------------------------------------- 

261 # Connect/Bind context managers 

262 # ------------------------------------------------------------------------- 

263 

264 def _connect_cm(self: T, addr: str) -> _SocketContext[T]: 

265 """Context manager to disconnect on exit 

266 

267 .. versionadded:: 20.0 

268 """ 

269 return _SocketContext(self, 'connect', addr) 

270 

271 def _bind_cm(self: T, addr: str) -> _SocketContext[T]: 

272 """Context manager to unbind on exit 

273 

274 .. versionadded:: 20.0 

275 """ 

276 return _SocketContext(self, 'bind', addr) 

277 

278 def bind(self: T, addr: str) -> _SocketContext[T]: 

279 """s.bind(addr) 

280 

281 Bind the socket to an address. 

282 

283 This causes the socket to listen on a network port. Sockets on the 

284 other side of this connection will use ``Socket.connect(addr)`` to 

285 connect to this socket. 

286 

287 Returns a context manager which will call unbind on exit. 

288 

289 .. versionadded:: 20.0 

290 Can be used as a context manager. 

291 

292 Parameters 

293 ---------- 

294 addr : str 

295 The address string. This has the form 'protocol://interface:port', 

296 for example 'tcp://127.0.0.1:5555'. Protocols supported include 

297 tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is 

298 encoded to utf-8 first. 

299 

300 """ 

301 super().bind(addr) 

302 return self._bind_cm(addr) 

303 

304 def connect(self: T, addr: str) -> _SocketContext[T]: 

305 """s.connect(addr) 

306 

307 Connect to a remote 0MQ socket. 

308 

309 Returns a context manager which will call disconnect on exit. 

310 

311 .. versionadded:: 20.0 

312 Can be used as a context manager. 

313 

314 Parameters 

315 ---------- 

316 addr : str 

317 The address string. This has the form 'protocol://interface:port', 

318 for example 'tcp://127.0.0.1:5555'. Protocols supported are 

319 tcp, udp, pgm, inproc and ipc. If the address is unicode, it is 

320 encoded to utf-8 first. 

321 

322 """ 

323 super().connect(addr) 

324 return self._connect_cm(addr) 

325 

326 # ------------------------------------------------------------------------- 

327 # Deprecated aliases 

328 # ------------------------------------------------------------------------- 

329 

330 @property 

331 def socket_type(self) -> int: 

332 warn("Socket.socket_type is deprecated, use Socket.type", DeprecationWarning) 

333 return cast(int, self.type) 

334 

335 # ------------------------------------------------------------------------- 

336 # Hooks for sockopt completion 

337 # ------------------------------------------------------------------------- 

338 

339 def __dir__(self): 

340 keys = dir(self.__class__) 

341 keys.extend(SocketOption.__members__) 

342 return keys 

343 

344 # ------------------------------------------------------------------------- 

345 # Getting/Setting options 

346 # ------------------------------------------------------------------------- 

347 setsockopt = SocketBase.set 

348 getsockopt = SocketBase.get 

349 

350 def __setattr__(self, key, value): 

351 """Override to allow setting zmq.[UN]SUBSCRIBE even though we have a subscribe method""" 

352 if key in self.__dict__: 

353 object.__setattr__(self, key, value) 

354 return 

355 _key = key.lower() 

356 if _key in ('subscribe', 'unsubscribe'): 

357 if isinstance(value, str): 

358 value = value.encode('utf8') 

359 if _key == 'subscribe': 

360 self.set(zmq.SUBSCRIBE, value) 

361 else: 

362 self.set(zmq.UNSUBSCRIBE, value) 

363 return 

364 super().__setattr__(key, value) 

365 

366 def fileno(self) -> int: 

367 """Return edge-triggered file descriptor for this socket. 

368 

369 This is a read-only edge-triggered file descriptor for both read and write events on this socket. 

370 It is important that all available events be consumed when an event is detected, 

371 otherwise the read event will not trigger again. 

372 

373 .. versionadded:: 17.0 

374 """ 

375 return self.FD 

376 

377 def subscribe(self, topic: Union[str, bytes]) -> None: 

378 """Subscribe to a topic 

379 

380 Only for SUB sockets. 

381 

382 .. versionadded:: 15.3 

383 """ 

384 if isinstance(topic, str): 

385 topic = topic.encode('utf8') 

386 self.set(zmq.SUBSCRIBE, topic) 

387 

388 def unsubscribe(self, topic: Union[str, bytes]) -> None: 

389 """Unsubscribe from a topic 

390 

391 Only for SUB sockets. 

392 

393 .. versionadded:: 15.3 

394 """ 

395 if isinstance(topic, str): 

396 topic = topic.encode('utf8') 

397 self.set(zmq.UNSUBSCRIBE, topic) 

398 

399 def set_string(self, option: int, optval: str, encoding='utf-8') -> None: 

400 """Set socket options with a unicode object. 

401 

402 This is simply a wrapper for setsockopt to protect from encoding ambiguity. 

403 

404 See the 0MQ documentation for details on specific options. 

405 

406 Parameters 

407 ---------- 

408 option : int 

409 The name of the option to set. Can be any of: SUBSCRIBE, 

410 UNSUBSCRIBE, IDENTITY 

411 optval : str 

412 The value of the option to set. 

413 encoding : str 

414 The encoding to be used, default is utf8 

415 """ 

416 if not isinstance(optval, str): 

417 raise TypeError(f"strings only, not {type(optval)}: {optval!r}") 

418 return self.set(option, optval.encode(encoding)) 

419 

420 setsockopt_unicode = setsockopt_string = set_string 

421 

422 def get_string(self, option: int, encoding='utf-8') -> str: 

423 """Get the value of a socket option. 

424 

425 See the 0MQ documentation for details on specific options. 

426 

427 Parameters 

428 ---------- 

429 option : int 

430 The option to retrieve. 

431 

432 Returns 

433 ------- 

434 optval : str 

435 The value of the option as a unicode string. 

436 """ 

437 if SocketOption(option)._opt_type != _OptType.bytes: 

438 raise TypeError(f"option {option} will not return a string to be decoded") 

439 return cast(bytes, self.get(option)).decode(encoding) 

440 

441 getsockopt_unicode = getsockopt_string = get_string 

442 

443 def bind_to_random_port( 

444 self: T, 

445 addr: str, 

446 min_port: int = 49152, 

447 max_port: int = 65536, 

448 max_tries: int = 100, 

449 ) -> int: 

450 """Bind this socket to a random port in a range. 

451 

452 If the port range is unspecified, the system will choose the port. 

453 

454 Parameters 

455 ---------- 

456 addr : str 

457 The address string without the port to pass to ``Socket.bind()``. 

458 min_port : int, optional 

459 The minimum port in the range of ports to try (inclusive). 

460 max_port : int, optional 

461 The maximum port in the range of ports to try (exclusive). 

462 max_tries : int, optional 

463 The maximum number of bind attempts to make. 

464 

465 Returns 

466 ------- 

467 port : int 

468 The port the socket was bound to. 

469 

470 Raises 

471 ------ 

472 ZMQBindError 

473 if `max_tries` reached before successful bind 

474 """ 

475 if ( 

476 (zmq.zmq_version_info() >= (3, 2)) 

477 and min_port == 49152 

478 and max_port == 65536 

479 ): 

480 # if LAST_ENDPOINT is supported, and min_port / max_port weren't specified, 

481 # we can bind to port 0 and let the OS do the work 

482 self.bind("%s:*" % addr) 

483 url = cast(bytes, self.last_endpoint).decode('ascii', 'replace') 

484 _, port_s = url.rsplit(':', 1) 

485 return int(port_s) 

486 

487 for i in range(max_tries): 

488 try: 

489 port = random.randrange(min_port, max_port) 

490 self.bind(f'{addr}:{port}') 

491 except ZMQError as exception: 

492 en = exception.errno 

493 if en == zmq.EADDRINUSE: 

494 continue 

495 elif sys.platform == 'win32' and en == errno.EACCES: 

496 continue 

497 else: 

498 raise 

499 else: 

500 return port 

501 raise ZMQBindError("Could not bind socket to random port.") 

502 

503 def get_hwm(self) -> int: 

504 """Get the High Water Mark. 

505 

506 On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM 

507 """ 

508 major = zmq.zmq_version_info()[0] 

509 if major >= 3: 

510 # return sndhwm, fallback on rcvhwm 

511 try: 

512 return cast(int, self.get(zmq.SNDHWM)) 

513 except zmq.ZMQError: 

514 pass 

515 

516 return cast(int, self.get(zmq.RCVHWM)) 

517 else: 

518 return cast(int, self.get(zmq.HWM)) 

519 

520 def set_hwm(self, value: int) -> None: 

521 """Set the High Water Mark. 

522 

523 On libzmq ≥ 3, this sets both SNDHWM and RCVHWM 

524 

525 

526 .. warning:: 

527 

528 New values only take effect for subsequent socket 

529 bind/connects. 

530 """ 

531 major = zmq.zmq_version_info()[0] 

532 if major >= 3: 

533 raised = None 

534 try: 

535 self.sndhwm = value 

536 except Exception as e: 

537 raised = e 

538 try: 

539 self.rcvhwm = value 

540 except Exception as e: 

541 raised = e 

542 

543 if raised: 

544 raise raised 

545 else: 

546 self.set(zmq.HWM, value) 

547 

548 hwm = property( 

549 get_hwm, 

550 set_hwm, 

551 None, 

552 """Property for High Water Mark. 

553 

554 Setting hwm sets both SNDHWM and RCVHWM as appropriate. 

555 It gets SNDHWM if available, otherwise RCVHWM. 

556 """, 

557 ) 

558 

559 # ------------------------------------------------------------------------- 

560 # Sending and receiving messages 

561 # ------------------------------------------------------------------------- 

562 

563 @overload 

564 def send( 

565 self, 

566 data: Any, 

567 flags: int = ..., 

568 copy: bool = ..., 

569 *, 

570 track: Literal[True], 

571 routing_id: Optional[int] = ..., 

572 group: Optional[str] = ..., 

573 ) -> "zmq.MessageTracker": 

574 ... 

575 

576 @overload 

577 def send( 

578 self, 

579 data: Any, 

580 flags: int = ..., 

581 copy: bool = ..., 

582 *, 

583 track: Literal[False], 

584 routing_id: Optional[int] = ..., 

585 group: Optional[str] = ..., 

586 ) -> None: 

587 ... 

588 

589 @overload 

590 def send( 

591 self, 

592 data: Any, 

593 flags: int = ..., 

594 *, 

595 copy: bool = ..., 

596 routing_id: Optional[int] = ..., 

597 group: Optional[str] = ..., 

598 ) -> None: 

599 ... 

600 

601 @overload 

602 def send( 

603 self, 

604 data: Any, 

605 flags: int = ..., 

606 copy: bool = ..., 

607 track: bool = ..., 

608 routing_id: Optional[int] = ..., 

609 group: Optional[str] = ..., 

610 ) -> Optional["zmq.MessageTracker"]: 

611 ... 

612 

613 def send( 

614 self, 

615 data: Any, 

616 flags: int = 0, 

617 copy: bool = True, 

618 track: bool = False, 

619 routing_id: Optional[int] = None, 

620 group: Optional[str] = None, 

621 ) -> Optional["zmq.MessageTracker"]: 

622 """Send a single zmq message frame on this socket. 

623 

624 This queues the message to be sent by the IO thread at a later time. 

625 

626 With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full; 

627 otherwise, this waits until space is available. 

628 See :class:`Poller` for more general non-blocking I/O. 

629 

630 Parameters 

631 ---------- 

632 data : bytes, Frame, memoryview 

633 The content of the message. This can be any object that provides 

634 the Python buffer API (i.e. `memoryview(data)` can be called). 

635 flags : int 

636 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE. 

637 copy : bool 

638 Should the message be sent in a copying or non-copying manner. 

639 track : bool 

640 Should the message be tracked for notification that ZMQ has 

641 finished with it? (ignored if copy=True) 

642 routing_id : int 

643 For use with SERVER sockets 

644 group : str 

645 For use with RADIO sockets 

646 

647 Returns 

648 ------- 

649 None : if `copy` or not track 

650 None if message was sent, raises an exception otherwise. 

651 MessageTracker : if track and not copy 

652 a MessageTracker object, whose `pending` property will 

653 be True until the send is completed. 

654 

655 Raises 

656 ------ 

657 TypeError 

658 If a unicode object is passed 

659 ValueError 

660 If `track=True`, but an untracked Frame is passed. 

661 ZMQError 

662 If the send does not succeed for any reason (including 

663 if NOBLOCK is set and the outgoing queue is full). 

664 

665 

666 .. versionchanged:: 17.0 

667 

668 DRAFT support for routing_id and group arguments. 

669 """ 

670 if routing_id is not None: 

671 if not isinstance(data, zmq.Frame): 

672 data = zmq.Frame( 

673 data, 

674 track=track, 

675 copy=copy or None, 

676 copy_threshold=self.copy_threshold, 

677 ) 

678 data.routing_id = routing_id 

679 if group is not None: 

680 if not isinstance(data, zmq.Frame): 

681 data = zmq.Frame( 

682 data, 

683 track=track, 

684 copy=copy or None, 

685 copy_threshold=self.copy_threshold, 

686 ) 

687 data.group = group 

688 return super().send(data, flags=flags, copy=copy, track=track) 

689 

690 def send_multipart( 

691 self, 

692 msg_parts: Sequence, 

693 flags: int = 0, 

694 copy: bool = True, 

695 track: bool = False, 

696 **kwargs, 

697 ): 

698 """Send a sequence of buffers as a multipart message. 

699 

700 The zmq.SNDMORE flag is added to all msg parts before the last. 

701 

702 Parameters 

703 ---------- 

704 msg_parts : iterable 

705 A sequence of objects to send as a multipart message. Each element 

706 can be any sendable object (Frame, bytes, buffer-providers) 

707 flags : int, optional 

708 Any valid flags for :func:`Socket.send`. 

709 SNDMORE is added automatically for frames before the last. 

710 copy : bool, optional 

711 Should the frame(s) be sent in a copying or non-copying manner. 

712 If copy=False, frames smaller than self.copy_threshold bytes 

713 will be copied anyway. 

714 track : bool, optional 

715 Should the frame(s) be tracked for notification that ZMQ has 

716 finished with it (ignored if copy=True). 

717 

718 Returns 

719 ------- 

720 None : if copy or not track 

721 MessageTracker : if track and not copy 

722 a MessageTracker object, whose `pending` property will 

723 be True until the last send is completed. 

724 """ 

725 # typecheck parts before sending: 

726 for i, msg in enumerate(msg_parts): 

727 if isinstance(msg, (zmq.Frame, bytes, memoryview)): 

728 continue 

729 try: 

730 memoryview(msg) 

731 except Exception: 

732 rmsg = repr(msg) 

733 if len(rmsg) > 32: 

734 rmsg = rmsg[:32] + '...' 

735 raise TypeError( 

736 "Frame %i (%s) does not support the buffer interface." 

737 % ( 

738 i, 

739 rmsg, 

740 ) 

741 ) 

742 for msg in msg_parts[:-1]: 

743 self.send(msg, zmq.SNDMORE | flags, copy=copy, track=track) 

744 # Send the last part without the extra SNDMORE flag. 

745 return self.send(msg_parts[-1], flags, copy=copy, track=track) 

746 

747 @overload 

748 def recv_multipart( 

749 self, flags: int = ..., *, copy: Literal[True], track: bool = ... 

750 ) -> List[bytes]: 

751 ... 

752 

753 @overload 

754 def recv_multipart( 

755 self, flags: int = ..., *, copy: Literal[False], track: bool = ... 

756 ) -> List[zmq.Frame]: 

757 ... 

758 

759 @overload 

760 def recv_multipart(self, flags: int = ..., *, track: bool = ...) -> List[bytes]: 

761 ... 

762 

763 @overload 

764 def recv_multipart( 

765 self, flags: int = 0, copy: bool = True, track: bool = False 

766 ) -> Union[List[zmq.Frame], List[bytes]]: 

767 ... 

768 

769 def recv_multipart( 

770 self, flags: int = 0, copy: bool = True, track: bool = False 

771 ) -> Union[List[zmq.Frame], List[bytes]]: 

772 """Receive a multipart message as a list of bytes or Frame objects 

773 

774 Parameters 

775 ---------- 

776 flags : int, optional 

777 Any valid flags for :func:`Socket.recv`. 

778 copy : bool, optional 

779 Should the message frame(s) be received in a copying or non-copying manner? 

780 If False a Frame object is returned for each part, if True a copy of 

781 the bytes is made for each frame. 

782 track : bool, optional 

783 Should the message frame(s) be tracked for notification that ZMQ has 

784 finished with it? (ignored if copy=True) 

785 

786 Returns 

787 ------- 

788 msg_parts : list 

789 A list of frames in the multipart message; either Frames or bytes, 

790 depending on `copy`. 

791 

792 Raises 

793 ------ 

794 ZMQError 

795 for any of the reasons :func:`~Socket.recv` might fail 

796 """ 

797 parts = [self.recv(flags, copy=copy, track=track)] 

798 # have first part already, only loop while more to receive 

799 while self.getsockopt(zmq.RCVMORE): 

800 part = self.recv(flags, copy=copy, track=track) 

801 parts.append(part) 

802 # cast List[Union] to Union[List] 

803 # how do we get mypy to recognize that return type is invariant on `copy`? 

804 return cast(Union[List[zmq.Frame], List[bytes]], parts) 

805 

806 def _deserialize( 

807 self, 

808 recvd: bytes, 

809 load: Callable[[bytes], Any], 

810 ) -> Any: 

811 """Deserialize a received message 

812 

813 Override in subclass (e.g. Futures) if recvd is not the raw bytes. 

814 

815 The default implementation expects bytes and returns the deserialized message immediately. 

816 

817 Parameters 

818 ---------- 

819 

820 load: callable 

821 Callable that deserializes bytes 

822 recvd: 

823 The object returned by self.recv 

824 

825 """ 

826 return load(recvd) 

827 

828 def send_serialized(self, msg, serialize, flags=0, copy=True, **kwargs): 

829 """Send a message with a custom serialization function. 

830 

831 .. versionadded:: 17 

832 

833 Parameters 

834 ---------- 

835 msg : The message to be sent. Can be any object serializable by `serialize`. 

836 serialize : callable 

837 The serialization function to use. 

838 serialize(msg) should return an iterable of sendable message frames 

839 (e.g. bytes objects), which will be passed to send_multipart. 

840 flags : int, optional 

841 Any valid flags for :func:`Socket.send`. 

842 copy : bool, optional 

843 Whether to copy the frames. 

844 

845 """ 

846 frames = serialize(msg) 

847 return self.send_multipart(frames, flags=flags, copy=copy, **kwargs) 

848 

849 def recv_serialized(self, deserialize, flags=0, copy=True): 

850 """Receive a message with a custom deserialization function. 

851 

852 .. versionadded:: 17 

853 

854 Parameters 

855 ---------- 

856 deserialize : callable 

857 The deserialization function to use. 

858 deserialize will be called with one argument: the list of frames 

859 returned by recv_multipart() and can return any object. 

860 flags : int, optional 

861 Any valid flags for :func:`Socket.recv`. 

862 copy : bool, optional 

863 Whether to recv bytes or Frame objects. 

864 

865 Returns 

866 ------- 

867 obj : object 

868 The object returned by the deserialization function. 

869 

870 Raises 

871 ------ 

872 ZMQError 

873 for any of the reasons :func:`~Socket.recv` might fail 

874 """ 

875 frames = self.recv_multipart(flags=flags, copy=copy) 

876 return self._deserialize(frames, deserialize) 

877 

878 def send_string( 

879 self, 

880 u: str, 

881 flags: int = 0, 

882 copy: bool = True, 

883 encoding: str = 'utf-8', 

884 **kwargs, 

885 ) -> Optional["zmq.Frame"]: 

886 """Send a Python unicode string as a message with an encoding. 

887 

888 0MQ communicates with raw bytes, so you must encode/decode 

889 text (str) around 0MQ. 

890 

891 Parameters 

892 ---------- 

893 u : str 

894 The unicode string to send. 

895 flags : int, optional 

896 Any valid flags for :func:`Socket.send`. 

897 encoding : str [default: 'utf-8'] 

898 The encoding to be used 

899 """ 

900 if not isinstance(u, str): 

901 raise TypeError("str objects only") 

902 return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs) 

903 

904 send_unicode = send_string 

905 

906 def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str: 

907 """Receive a unicode string, as sent by send_string. 

908 

909 Parameters 

910 ---------- 

911 flags : int 

912 Any valid flags for :func:`Socket.recv`. 

913 encoding : str [default: 'utf-8'] 

914 The encoding to be used 

915 

916 Returns 

917 ------- 

918 s : str 

919 The Python unicode string that arrives as encoded bytes. 

920 

921 Raises 

922 ------ 

923 ZMQError 

924 for any of the reasons :func:`~Socket.recv` might fail 

925 """ 

926 msg = self.recv(flags=flags) 

927 return self._deserialize(msg, lambda buf: buf.decode(encoding)) 

928 

929 recv_unicode = recv_string 

930 

931 def send_pyobj( 

932 self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, **kwargs 

933 ) -> Optional[zmq.Frame]: 

934 """Send a Python object as a message using pickle to serialize. 

935 

936 Parameters 

937 ---------- 

938 obj : Python object 

939 The Python object to send. 

940 flags : int 

941 Any valid flags for :func:`Socket.send`. 

942 protocol : int 

943 The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL 

944 where defined, and pickle.HIGHEST_PROTOCOL elsewhere. 

945 """ 

946 msg = pickle.dumps(obj, protocol) 

947 return self.send(msg, flags=flags, **kwargs) 

948 

949 def recv_pyobj(self, flags: int = 0) -> Any: 

950 """Receive a Python object as a message using pickle to serialize. 

951 

952 Parameters 

953 ---------- 

954 flags : int 

955 Any valid flags for :func:`Socket.recv`. 

956 

957 Returns 

958 ------- 

959 obj : Python object 

960 The Python object that arrives as a message. 

961 

962 Raises 

963 ------ 

964 ZMQError 

965 for any of the reasons :func:`~Socket.recv` might fail 

966 """ 

967 msg = self.recv(flags) 

968 return self._deserialize(msg, pickle.loads) 

969 

970 def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None: 

971 """Send a Python object as a message using json to serialize. 

972 

973 Keyword arguments are passed on to json.dumps 

974 

975 Parameters 

976 ---------- 

977 obj : Python object 

978 The Python object to send 

979 flags : int 

980 Any valid flags for :func:`Socket.send` 

981 """ 

982 send_kwargs = {} 

983 for key in ('routing_id', 'group'): 

984 if key in kwargs: 

985 send_kwargs[key] = kwargs.pop(key) 

986 msg = jsonapi.dumps(obj, **kwargs) 

987 return self.send(msg, flags=flags, **send_kwargs) 

988 

989 def recv_json(self, flags: int = 0, **kwargs) -> Union[List, str, int, float, Dict]: 

990 """Receive a Python object as a message using json to serialize. 

991 

992 Keyword arguments are passed on to json.loads 

993 

994 Parameters 

995 ---------- 

996 flags : int 

997 Any valid flags for :func:`Socket.recv`. 

998 

999 Returns 

1000 ------- 

1001 obj : Python object 

1002 The Python object that arrives as a message. 

1003 

1004 Raises 

1005 ------ 

1006 ZMQError 

1007 for any of the reasons :func:`~Socket.recv` might fail 

1008 """ 

1009 msg = self.recv(flags) 

1010 return self._deserialize(msg, lambda buf: jsonapi.loads(buf, **kwargs)) 

1011 

1012 _poller_class = Poller 

1013 

1014 def poll(self, timeout=None, flags=zmq.POLLIN) -> int: 

1015 """Poll the socket for events. 

1016 See :class:`Poller` to wait for multiple sockets at once. 

1017 

1018 Parameters 

1019 ---------- 

1020 timeout : int [default: None] 

1021 The timeout (in milliseconds) to wait for an event. If unspecified 

1022 (or specified None), will wait forever for an event. 

1023 flags : int [default: POLLIN] 

1024 POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for. 

1025 

1026 Returns 

1027 ------- 

1028 event_mask : int 

1029 The poll event mask (POLLIN, POLLOUT), 

1030 0 if the timeout was reached without an event. 

1031 """ 

1032 

1033 if self.closed: 

1034 raise ZMQError(zmq.ENOTSUP) 

1035 

1036 p = self._poller_class() 

1037 p.register(self, flags) 

1038 evts = dict(p.poll(timeout)) 

1039 # return 0 if no events, otherwise return event bitfield 

1040 return evts.get(self, 0) 

1041 

1042 def get_monitor_socket( 

1043 self: T, events: Optional[int] = None, addr: Optional[str] = None 

1044 ) -> T: 

1045 """Return a connected PAIR socket ready to receive the event notifications. 

1046 

1047 .. versionadded:: libzmq-4.0 

1048 .. versionadded:: 14.0 

1049 

1050 Parameters 

1051 ---------- 

1052 events : int [default: ZMQ_EVENT_ALL] 

1053 The bitmask defining which events are wanted. 

1054 addr : string [default: None] 

1055 The optional endpoint for the monitoring sockets. 

1056 

1057 Returns 

1058 ------- 

1059 socket : (PAIR) 

1060 The socket is already connected and ready to receive messages. 

1061 """ 

1062 # safe-guard, method only available on libzmq >= 4 

1063 if zmq.zmq_version_info() < (4,): 

1064 raise NotImplementedError( 

1065 "get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version() 

1066 ) 

1067 

1068 # if already monitoring, return existing socket 

1069 if self._monitor_socket: 

1070 if self._monitor_socket.closed: 

1071 self._monitor_socket = None 

1072 else: 

1073 return self._monitor_socket 

1074 

1075 if addr is None: 

1076 # create endpoint name from internal fd 

1077 addr = f"inproc://monitor.s-{self.FD}" 

1078 if events is None: 

1079 # use all events 

1080 events = zmq.EVENT_ALL 

1081 # attach monitoring socket 

1082 self.monitor(addr, events) 

1083 # create new PAIR socket and connect it 

1084 self._monitor_socket = self.context.socket(zmq.PAIR) 

1085 self._monitor_socket.connect(addr) 

1086 return self._monitor_socket 

1087 

1088 def disable_monitor(self) -> None: 

1089 """Shutdown the PAIR socket (created using get_monitor_socket) 

1090 that is serving socket events. 

1091 

1092 .. versionadded:: 14.4 

1093 """ 

1094 self._monitor_socket = None 

1095 self.monitor(None, 0) 

1096 

1097 

1098__all__ = ['Socket']