Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/sugar/socket.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

321 statements  

1"""0MQ Socket pure Python methods.""" 

2 

3# Copyright (C) PyZMQ Developers 

4# Distributed under the terms of the Modified BSD License. 

5 

6from __future__ import annotations 

7 

8import errno 

9import pickle 

10import random 

11import sys 

12from typing import ( 

13 Any, 

14 Callable, 

15 Generic, 

16 List, 

17 Sequence, 

18 TypeVar, 

19 Union, 

20 cast, 

21 overload, 

22) 

23from warnings import warn 

24 

25import zmq 

26from zmq._typing import Literal, TypeAlias 

27from zmq.backend import Socket as SocketBase 

28from zmq.error import ZMQBindError, ZMQError 

29from zmq.utils import jsonapi 

30from zmq.utils.interop import cast_int_addr 

31 

32from ..constants import SocketOption, SocketType, _OptType 

33from .attrsettr import AttributeSetter 

34from .poll import Poller 

35 

36try: 

37 DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL 

38except AttributeError: 

39 DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL 

40 

41_SocketType = TypeVar("_SocketType", bound="Socket") 

42 

43 

44class _SocketContext(Generic[_SocketType]): 

45 """Context Manager for socket bind/unbind""" 

46 

47 socket: _SocketType 

48 kind: str 

49 addr: str 

50 

51 def __repr__(self): 

52 return f"<SocketContext({self.kind}={self.addr!r})>" 

53 

54 def __init__( 

55 self: _SocketContext[_SocketType], socket: _SocketType, kind: str, addr: str 

56 ): 

57 assert kind in {"bind", "connect"} 

58 self.socket = socket 

59 self.kind = kind 

60 self.addr = addr 

61 

62 def __enter__(self: _SocketContext[_SocketType]) -> _SocketType: 

63 return self.socket 

64 

65 def __exit__(self, *args): 

66 if self.socket.closed: 

67 return 

68 if self.kind == "bind": 

69 self.socket.unbind(self.addr) 

70 elif self.kind == "connect": 

71 self.socket.disconnect(self.addr) 

72 

73 

74SocketReturnType = TypeVar("SocketReturnType") 

75 

76 

77class Socket(SocketBase, AttributeSetter, Generic[SocketReturnType]): 

78 """The ZMQ socket object 

79 

80 To create a Socket, first create a Context:: 

81 

82 ctx = zmq.Context.instance() 

83 

84 then call ``ctx.socket(socket_type)``:: 

85 

86 s = ctx.socket(zmq.ROUTER) 

87 

88 .. versionadded:: 25 

89 

90 Sockets can now be shadowed by passing another Socket. 

91 This helps in creating an async copy of a sync socket or vice versa:: 

92 

93 s = zmq.Socket(async_socket) 

94 

95 Which previously had to be:: 

96 

97 s = zmq.Socket.shadow(async_socket.underlying) 

98 """ 

99 

100 _shadow = False 

101 _shadow_obj = None 

102 _monitor_socket = None 

103 _type_name = 'UNKNOWN' 

104 

105 @overload 

106 def __init__( 

107 self: Socket[bytes], 

108 ctx_or_socket: zmq.Context, 

109 socket_type: int, 

110 *, 

111 copy_threshold: int | None = None, 

112 ): ... 

113 

114 @overload 

115 def __init__( 

116 self: Socket[bytes], 

117 *, 

118 shadow: Socket | int, 

119 copy_threshold: int | None = None, 

120 ): ... 

121 

122 @overload 

123 def __init__( 

124 self: Socket[bytes], 

125 ctx_or_socket: Socket, 

126 ): ... 

127 

128 def __init__( 

129 self: Socket[bytes], 

130 ctx_or_socket: zmq.Context | Socket | None = None, 

131 socket_type: int = 0, 

132 *, 

133 shadow: Socket | int = 0, 

134 copy_threshold: int | None = None, 

135 ): 

136 if isinstance(ctx_or_socket, zmq.Socket): 

137 # positional Socket(other_socket) 

138 shadow = ctx_or_socket 

139 ctx_or_socket = None 

140 

141 shadow_address: int = 0 

142 

143 if shadow: 

144 self._shadow = True 

145 # hold a reference to the shadow object 

146 self._shadow_obj = shadow 

147 if not isinstance(shadow, int): 

148 try: 

149 shadow = cast(int, shadow.underlying) 

150 except AttributeError: 

151 pass 

152 shadow_address = cast_int_addr(shadow) 

153 else: 

154 self._shadow = False 

155 

156 super().__init__( 

157 ctx_or_socket, 

158 socket_type, 

159 shadow=shadow_address, 

160 copy_threshold=copy_threshold, 

161 ) 

162 

163 try: 

164 socket_type = cast(int, self.get(zmq.TYPE)) 

165 except Exception: 

166 pass 

167 else: 

168 try: 

169 self.__dict__["type"] = stype = SocketType(socket_type) 

170 except ValueError: 

171 self._type_name = str(socket_type) 

172 else: 

173 self._type_name = stype.name 

174 

175 def __del__(self): 

176 if not self._shadow and not self.closed: 

177 if warn is not None: 

178 # warn can be None during process teardown 

179 warn( 

180 f"Unclosed socket {self}", 

181 ResourceWarning, 

182 stacklevel=2, 

183 source=self, 

184 ) 

185 self.close() 

186 

187 _repr_cls = "zmq.Socket" 

188 

189 def __repr__(self): 

190 cls = self.__class__ 

191 # look up _repr_cls on exact class, not inherited 

192 _repr_cls = cls.__dict__.get("_repr_cls", None) 

193 if _repr_cls is None: 

194 _repr_cls = f"{cls.__module__}.{cls.__name__}" 

195 

196 closed = ' closed' if self._closed else '' 

197 

198 return f"<{_repr_cls}(zmq.{self._type_name}) at {hex(id(self))}{closed}>" 

199 

200 # socket as context manager: 

201 def __enter__(self: _SocketType) -> _SocketType: 

202 """Sockets are context managers 

203 

204 .. versionadded:: 14.4 

205 """ 

206 return self 

207 

208 def __exit__(self, *args, **kwargs): 

209 self.close() 

210 

211 # ------------------------------------------------------------------------- 

212 # Socket creation 

213 # ------------------------------------------------------------------------- 

214 

215 def __copy__(self: _SocketType, memo=None) -> _SocketType: 

216 """Copying a Socket creates a shadow copy""" 

217 return self.__class__.shadow(self.underlying) 

218 

219 __deepcopy__ = __copy__ 

220 

221 @classmethod 

222 def shadow(cls: type[_SocketType], address: int | zmq.Socket) -> _SocketType: 

223 """Shadow an existing libzmq socket 

224 

225 address is a zmq.Socket or an integer (or FFI pointer) 

226 representing the address of the libzmq socket. 

227 

228 .. versionadded:: 14.1 

229 

230 .. versionadded:: 25 

231 Support for shadowing `zmq.Socket` objects, 

232 instead of just integer addresses. 

233 """ 

234 return cls(shadow=address) 

235 

236 def close(self, linger=None) -> None: 

237 """ 

238 Close the socket. 

239 

240 If linger is specified, LINGER sockopt will be set prior to closing. 

241 

242 Note: closing a zmq Socket may not close the underlying sockets 

243 if there are undelivered messages. 

244 Only after all messages are delivered or discarded by reaching the socket's LINGER timeout 

245 (default: forever) 

246 will the underlying sockets be closed. 

247 

248 This can be called to close the socket by hand. If this is not 

249 called, the socket will automatically be closed when it is 

250 garbage collected, 

251 in which case you may see a ResourceWarning about the unclosed socket. 

252 """ 

253 if self.context: 

254 self.context._rm_socket(self) 

255 super().close(linger=linger) 

256 

257 # ------------------------------------------------------------------------- 

258 # Connect/Bind context managers 

259 # ------------------------------------------------------------------------- 

260 

261 def _connect_cm(self: _SocketType, addr: str) -> _SocketContext[_SocketType]: 

262 """Context manager to disconnect on exit 

263 

264 .. versionadded:: 20.0 

265 """ 

266 return _SocketContext(self, 'connect', addr) 

267 

268 def _bind_cm(self: _SocketType, addr: str) -> _SocketContext[_SocketType]: 

269 """Context manager to unbind on exit 

270 

271 .. versionadded:: 20.0 

272 """ 

273 try: 

274 # retrieve last_endpoint 

275 # to support binding on random ports via 

276 # `socket.bind('tcp://127.0.0.1:0')` 

277 addr = cast(bytes, self.get(zmq.LAST_ENDPOINT)).decode("utf8") 

278 except (AttributeError, ZMQError, UnicodeDecodeError): 

279 pass 

280 return _SocketContext(self, 'bind', addr) 

281 

282 def bind(self: _SocketType, addr: str) -> _SocketContext[_SocketType]: 

283 """s.bind(addr) 

284 

285 Bind the socket to an address. 

286 

287 This causes the socket to listen on a network port. Sockets on the 

288 other side of this connection will use ``Socket.connect(addr)`` to 

289 connect to this socket. 

290 

291 Returns a context manager which will call unbind on exit. 

292 

293 .. versionadded:: 20.0 

294 Can be used as a context manager. 

295 

296 .. versionadded:: 26.0 

297 binding to port 0 can be used as a context manager 

298 for binding to a random port. 

299 The URL can be retrieved as `socket.last_endpoint`. 

300 

301 Parameters 

302 ---------- 

303 addr : str 

304 The address string. This has the form 'protocol://interface:port', 

305 for example 'tcp://127.0.0.1:5555'. Protocols supported include 

306 tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is 

307 encoded to utf-8 first. 

308 

309 """ 

310 try: 

311 super().bind(addr) 

312 except ZMQError as e: 

313 e.strerror += f" (addr={addr!r})" 

314 raise 

315 return self._bind_cm(addr) 

316 

317 def connect(self: _SocketType, addr: str) -> _SocketContext[_SocketType]: 

318 """s.connect(addr) 

319 

320 Connect to a remote 0MQ socket. 

321 

322 Returns a context manager which will call disconnect on exit. 

323 

324 .. versionadded:: 20.0 

325 Can be used as a context manager. 

326 

327 Parameters 

328 ---------- 

329 addr : str 

330 The address string. This has the form 'protocol://interface:port', 

331 for example 'tcp://127.0.0.1:5555'. Protocols supported are 

332 tcp, udp, pgm, inproc and ipc. If the address is unicode, it is 

333 encoded to utf-8 first. 

334 

335 """ 

336 try: 

337 super().connect(addr) 

338 except ZMQError as e: 

339 e.strerror += f" (addr={addr!r})" 

340 raise 

341 return self._connect_cm(addr) 

342 

343 # ------------------------------------------------------------------------- 

344 # Deprecated aliases 

345 # ------------------------------------------------------------------------- 

346 

347 @property 

348 def socket_type(self) -> int: 

349 warn("Socket.socket_type is deprecated, use Socket.type", DeprecationWarning) 

350 return cast(int, self.type) 

351 

352 # ------------------------------------------------------------------------- 

353 # Hooks for sockopt completion 

354 # ------------------------------------------------------------------------- 

355 

356 def __dir__(self): 

357 keys = dir(self.__class__) 

358 keys.extend(SocketOption.__members__) 

359 return keys 

360 

361 # ------------------------------------------------------------------------- 

362 # Getting/Setting options 

363 # ------------------------------------------------------------------------- 

364 setsockopt = SocketBase.set 

365 getsockopt = SocketBase.get 

366 

367 def __setattr__(self, key, value): 

368 """Override to allow setting zmq.[UN]SUBSCRIBE even though we have a subscribe method""" 

369 if key in self.__dict__: 

370 object.__setattr__(self, key, value) 

371 return 

372 _key = key.lower() 

373 if _key in ('subscribe', 'unsubscribe'): 

374 if isinstance(value, str): 

375 value = value.encode('utf8') 

376 if _key == 'subscribe': 

377 self.set(zmq.SUBSCRIBE, value) 

378 else: 

379 self.set(zmq.UNSUBSCRIBE, value) 

380 return 

381 super().__setattr__(key, value) 

382 

383 def fileno(self) -> int: 

384 """Return edge-triggered file descriptor for this socket. 

385 

386 This is a read-only edge-triggered file descriptor for both read and write events on this socket. 

387 It is important that all available events be consumed when an event is detected, 

388 otherwise the read event will not trigger again. 

389 

390 .. versionadded:: 17.0 

391 """ 

392 return self.FD 

393 

394 def subscribe(self, topic: str | bytes) -> None: 

395 """Subscribe to a topic 

396 

397 Only for SUB sockets. 

398 

399 .. versionadded:: 15.3 

400 """ 

401 if isinstance(topic, str): 

402 topic = topic.encode('utf8') 

403 self.set(zmq.SUBSCRIBE, topic) 

404 

405 def unsubscribe(self, topic: str | bytes) -> None: 

406 """Unsubscribe from a topic 

407 

408 Only for SUB sockets. 

409 

410 .. versionadded:: 15.3 

411 """ 

412 if isinstance(topic, str): 

413 topic = topic.encode('utf8') 

414 self.set(zmq.UNSUBSCRIBE, topic) 

415 

416 def set_string(self, option: int, optval: str, encoding='utf-8') -> None: 

417 """Set socket options with a unicode object. 

418 

419 This is simply a wrapper for setsockopt to protect from encoding ambiguity. 

420 

421 See the 0MQ documentation for details on specific options. 

422 

423 Parameters 

424 ---------- 

425 option : int 

426 The name of the option to set. Can be any of: SUBSCRIBE, 

427 UNSUBSCRIBE, IDENTITY 

428 optval : str 

429 The value of the option to set. 

430 encoding : str 

431 The encoding to be used, default is utf8 

432 """ 

433 if not isinstance(optval, str): 

434 raise TypeError(f"strings only, not {type(optval)}: {optval!r}") 

435 return self.set(option, optval.encode(encoding)) 

436 

437 setsockopt_unicode = setsockopt_string = set_string 

438 

439 def get_string(self, option: int, encoding='utf-8') -> str: 

440 """Get the value of a socket option. 

441 

442 See the 0MQ documentation for details on specific options. 

443 

444 Parameters 

445 ---------- 

446 option : int 

447 The option to retrieve. 

448 

449 Returns 

450 ------- 

451 optval : str 

452 The value of the option as a unicode string. 

453 """ 

454 if SocketOption(option)._opt_type != _OptType.bytes: 

455 raise TypeError(f"option {option} will not return a string to be decoded") 

456 return cast(bytes, self.get(option)).decode(encoding) 

457 

458 getsockopt_unicode = getsockopt_string = get_string 

459 

460 def bind_to_random_port( 

461 self: _SocketType, 

462 addr: str, 

463 min_port: int = 49152, 

464 max_port: int = 65536, 

465 max_tries: int = 100, 

466 ) -> int: 

467 """Bind this socket to a random port in a range. 

468 

469 If the port range is unspecified, the system will choose the port. 

470 

471 Parameters 

472 ---------- 

473 addr : str 

474 The address string without the port to pass to ``Socket.bind()``. 

475 min_port : int, optional 

476 The minimum port in the range of ports to try (inclusive). 

477 max_port : int, optional 

478 The maximum port in the range of ports to try (exclusive). 

479 max_tries : int, optional 

480 The maximum number of bind attempts to make. 

481 

482 Returns 

483 ------- 

484 port : int 

485 The port the socket was bound to. 

486 

487 Raises 

488 ------ 

489 ZMQBindError 

490 if `max_tries` reached before successful bind 

491 """ 

492 if ( 

493 (zmq.zmq_version_info() >= (3, 2)) 

494 and min_port == 49152 

495 and max_port == 65536 

496 ): 

497 # if LAST_ENDPOINT is supported, and min_port / max_port weren't specified, 

498 # we can bind to port 0 and let the OS do the work 

499 self.bind("%s:*" % addr) 

500 url = cast(bytes, self.last_endpoint).decode('ascii', 'replace') 

501 _, port_s = url.rsplit(':', 1) 

502 return int(port_s) 

503 

504 for i in range(max_tries): 

505 try: 

506 port = random.randrange(min_port, max_port) 

507 self.bind(f'{addr}:{port}') 

508 except ZMQError as exception: 

509 en = exception.errno 

510 if en == zmq.EADDRINUSE: 

511 continue 

512 elif sys.platform == 'win32' and en == errno.EACCES: 

513 continue 

514 else: 

515 raise 

516 else: 

517 return port 

518 raise ZMQBindError("Could not bind socket to random port.") 

519 

520 def get_hwm(self) -> int: 

521 """Get the High Water Mark. 

522 

523 On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM 

524 """ 

525 major = zmq.zmq_version_info()[0] 

526 if major >= 3: 

527 # return sndhwm, fallback on rcvhwm 

528 try: 

529 return cast(int, self.get(zmq.SNDHWM)) 

530 except zmq.ZMQError: 

531 pass 

532 

533 return cast(int, self.get(zmq.RCVHWM)) 

534 else: 

535 return cast(int, self.get(zmq.HWM)) 

536 

537 def set_hwm(self, value: int) -> None: 

538 """Set the High Water Mark. 

539 

540 On libzmq ≥ 3, this sets both SNDHWM and RCVHWM 

541 

542 

543 .. warning:: 

544 

545 New values only take effect for subsequent socket 

546 bind/connects. 

547 """ 

548 major = zmq.zmq_version_info()[0] 

549 if major >= 3: 

550 raised = None 

551 try: 

552 self.sndhwm = value 

553 except Exception as e: 

554 raised = e 

555 try: 

556 self.rcvhwm = value 

557 except Exception as e: 

558 raised = e 

559 

560 if raised: 

561 raise raised 

562 else: 

563 self.set(zmq.HWM, value) 

564 

565 hwm = property( 

566 get_hwm, 

567 set_hwm, 

568 None, 

569 """Property for High Water Mark. 

570 

571 Setting hwm sets both SNDHWM and RCVHWM as appropriate. 

572 It gets SNDHWM if available, otherwise RCVHWM. 

573 """, 

574 ) 

575 

576 # ------------------------------------------------------------------------- 

577 # Sending and receiving messages 

578 # ------------------------------------------------------------------------- 

579 

580 @overload 

581 def send( 

582 self, 

583 data: Any, 

584 flags: int = ..., 

585 copy: bool = ..., 

586 *, 

587 track: Literal[True], 

588 routing_id: int | None = ..., 

589 group: str | None = ..., 

590 ) -> zmq.MessageTracker: ... 

591 

592 @overload 

593 def send( 

594 self, 

595 data: Any, 

596 flags: int = ..., 

597 copy: bool = ..., 

598 *, 

599 track: Literal[False], 

600 routing_id: int | None = ..., 

601 group: str | None = ..., 

602 ) -> None: ... 

603 

604 @overload 

605 def send( 

606 self, 

607 data: Any, 

608 flags: int = ..., 

609 *, 

610 copy: bool = ..., 

611 routing_id: int | None = ..., 

612 group: str | None = ..., 

613 ) -> None: ... 

614 

615 @overload 

616 def send( 

617 self, 

618 data: Any, 

619 flags: int = ..., 

620 copy: bool = ..., 

621 track: bool = ..., 

622 routing_id: int | None = ..., 

623 group: str | None = ..., 

624 ) -> zmq.MessageTracker | None: ... 

625 

626 def send( 

627 self, 

628 data: Any, 

629 flags: int = 0, 

630 copy: bool = True, 

631 track: bool = False, 

632 routing_id: int | None = None, 

633 group: str | None = None, 

634 ) -> zmq.MessageTracker | None: 

635 """Send a single zmq message frame on this socket. 

636 

637 This queues the message to be sent by the IO thread at a later time. 

638 

639 With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full; 

640 otherwise, this waits until space is available. 

641 See :class:`Poller` for more general non-blocking I/O. 

642 

643 Parameters 

644 ---------- 

645 data : bytes, Frame, memoryview 

646 The content of the message. This can be any object that provides 

647 the Python buffer API (i.e. `memoryview(data)` can be called). 

648 flags : int 

649 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE. 

650 copy : bool 

651 Should the message be sent in a copying or non-copying manner. 

652 track : bool 

653 Should the message be tracked for notification that ZMQ has 

654 finished with it? (ignored if copy=True) 

655 routing_id : int 

656 For use with SERVER sockets 

657 group : str 

658 For use with RADIO sockets 

659 

660 Returns 

661 ------- 

662 None : if `copy` or not track 

663 None if message was sent, raises an exception otherwise. 

664 MessageTracker : if track and not copy 

665 a MessageTracker object, whose `done` property will 

666 be False until the send is completed. 

667 

668 Raises 

669 ------ 

670 TypeError 

671 If a unicode object is passed 

672 ValueError 

673 If `track=True`, but an untracked Frame is passed. 

674 ZMQError 

675 If the send does not succeed for any reason (including 

676 if NOBLOCK is set and the outgoing queue is full). 

677 

678 

679 .. versionchanged:: 17.0 

680 

681 DRAFT support for routing_id and group arguments. 

682 """ 

683 if routing_id is not None: 

684 if not isinstance(data, zmq.Frame): 

685 data = zmq.Frame( 

686 data, 

687 track=track, 

688 copy=copy or None, 

689 copy_threshold=self.copy_threshold, 

690 ) 

691 data.routing_id = routing_id 

692 if group is not None: 

693 if not isinstance(data, zmq.Frame): 

694 data = zmq.Frame( 

695 data, 

696 track=track, 

697 copy=copy or None, 

698 copy_threshold=self.copy_threshold, 

699 ) 

700 data.group = group 

701 return super().send(data, flags=flags, copy=copy, track=track) 

702 

703 def send_multipart( 

704 self, 

705 msg_parts: Sequence, 

706 flags: int = 0, 

707 copy: bool = True, 

708 track: bool = False, 

709 **kwargs, 

710 ): 

711 """Send a sequence of buffers as a multipart message. 

712 

713 The zmq.SNDMORE flag is added to all msg parts before the last. 

714 

715 Parameters 

716 ---------- 

717 msg_parts : iterable 

718 A sequence of objects to send as a multipart message. Each element 

719 can be any sendable object (Frame, bytes, buffer-providers) 

720 flags : int, optional 

721 Any valid flags for :func:`Socket.send`. 

722 SNDMORE is added automatically for frames before the last. 

723 copy : bool, optional 

724 Should the frame(s) be sent in a copying or non-copying manner. 

725 If copy=False, frames smaller than self.copy_threshold bytes 

726 will be copied anyway. 

727 track : bool, optional 

728 Should the frame(s) be tracked for notification that ZMQ has 

729 finished with it (ignored if copy=True). 

730 

731 Returns 

732 ------- 

733 None : if copy or not track 

734 MessageTracker : if track and not copy 

735 a MessageTracker object, whose `done` property will 

736 be False until the last send is completed. 

737 """ 

738 # typecheck parts before sending: 

739 for i, msg in enumerate(msg_parts): 

740 if isinstance(msg, (zmq.Frame, bytes, memoryview)): 

741 continue 

742 try: 

743 memoryview(msg) 

744 except Exception: 

745 rmsg = repr(msg) 

746 if len(rmsg) > 32: 

747 rmsg = rmsg[:32] + '...' 

748 raise TypeError( 

749 "Frame %i (%s) does not support the buffer interface." 

750 % ( 

751 i, 

752 rmsg, 

753 ) 

754 ) 

755 for msg in msg_parts[:-1]: 

756 self.send(msg, zmq.SNDMORE | flags, copy=copy, track=track) 

757 # Send the last part without the extra SNDMORE flag. 

758 return self.send(msg_parts[-1], flags, copy=copy, track=track) 

759 

760 @overload 

761 def recv_multipart( 

762 self, flags: int = ..., *, copy: Literal[True], track: bool = ... 

763 ) -> list[bytes]: ... 

764 

765 @overload 

766 def recv_multipart( 

767 self, flags: int = ..., *, copy: Literal[False], track: bool = ... 

768 ) -> list[zmq.Frame]: ... 

769 

770 @overload 

771 def recv_multipart(self, flags: int = ..., *, track: bool = ...) -> list[bytes]: ... 

772 

773 @overload 

774 def recv_multipart( 

775 self, flags: int = 0, copy: bool = True, track: bool = False 

776 ) -> list[zmq.Frame] | list[bytes]: ... 

777 

778 def recv_multipart( 

779 self, flags: int = 0, copy: bool = True, track: bool = False 

780 ) -> list[zmq.Frame] | list[bytes]: 

781 """Receive a multipart message as a list of bytes or Frame objects 

782 

783 Parameters 

784 ---------- 

785 flags : int, optional 

786 Any valid flags for :func:`Socket.recv`. 

787 copy : bool, optional 

788 Should the message frame(s) be received in a copying or non-copying manner? 

789 If False a Frame object is returned for each part, if True a copy of 

790 the bytes is made for each frame. 

791 track : bool, optional 

792 Should the message frame(s) be tracked for notification that ZMQ has 

793 finished with it? (ignored if copy=True) 

794 

795 Returns 

796 ------- 

797 msg_parts : list 

798 A list of frames in the multipart message; either Frames or bytes, 

799 depending on `copy`. 

800 

801 Raises 

802 ------ 

803 ZMQError 

804 for any of the reasons :func:`~Socket.recv` might fail 

805 """ 

806 parts = [self.recv(flags, copy=copy, track=track)] 

807 # have first part already, only loop while more to receive 

808 while self.getsockopt(zmq.RCVMORE): 

809 part = self.recv(flags, copy=copy, track=track) 

810 parts.append(part) 

811 # cast List[Union] to Union[List] 

812 # how do we get mypy to recognize that return type is invariant on `copy`? 

813 return cast(Union[List[zmq.Frame], List[bytes]], parts) 

814 

815 def _deserialize( 

816 self, 

817 recvd: bytes, 

818 load: Callable[[bytes], Any], 

819 ) -> Any: 

820 """Deserialize a received message 

821 

822 Override in subclass (e.g. Futures) if recvd is not the raw bytes. 

823 

824 The default implementation expects bytes and returns the deserialized message immediately. 

825 

826 Parameters 

827 ---------- 

828 

829 load: callable 

830 Callable that deserializes bytes 

831 recvd: 

832 The object returned by self.recv 

833 

834 """ 

835 return load(recvd) 

836 

837 def send_serialized(self, msg, serialize, flags=0, copy=True, **kwargs): 

838 """Send a message with a custom serialization function. 

839 

840 .. versionadded:: 17 

841 

842 Parameters 

843 ---------- 

844 msg : The message to be sent. Can be any object serializable by `serialize`. 

845 serialize : callable 

846 The serialization function to use. 

847 serialize(msg) should return an iterable of sendable message frames 

848 (e.g. bytes objects), which will be passed to send_multipart. 

849 flags : int, optional 

850 Any valid flags for :func:`Socket.send`. 

851 copy : bool, optional 

852 Whether to copy the frames. 

853 

854 """ 

855 frames = serialize(msg) 

856 return self.send_multipart(frames, flags=flags, copy=copy, **kwargs) 

857 

858 def recv_serialized(self, deserialize, flags=0, copy=True): 

859 """Receive a message with a custom deserialization function. 

860 

861 .. versionadded:: 17 

862 

863 Parameters 

864 ---------- 

865 deserialize : callable 

866 The deserialization function to use. 

867 deserialize will be called with one argument: the list of frames 

868 returned by recv_multipart() and can return any object. 

869 flags : int, optional 

870 Any valid flags for :func:`Socket.recv`. 

871 copy : bool, optional 

872 Whether to recv bytes or Frame objects. 

873 

874 Returns 

875 ------- 

876 obj : object 

877 The object returned by the deserialization function. 

878 

879 Raises 

880 ------ 

881 ZMQError 

882 for any of the reasons :func:`~Socket.recv` might fail 

883 """ 

884 frames = self.recv_multipart(flags=flags, copy=copy) 

885 return self._deserialize(frames, deserialize) 

886 

887 def send_string( 

888 self, 

889 u: str, 

890 flags: int = 0, 

891 copy: bool = True, 

892 encoding: str = 'utf-8', 

893 **kwargs, 

894 ) -> zmq.Frame | None: 

895 """Send a Python unicode string as a message with an encoding. 

896 

897 0MQ communicates with raw bytes, so you must encode/decode 

898 text (str) around 0MQ. 

899 

900 Parameters 

901 ---------- 

902 u : str 

903 The unicode string to send. 

904 flags : int, optional 

905 Any valid flags for :func:`Socket.send`. 

906 encoding : str 

907 The encoding to be used 

908 """ 

909 if not isinstance(u, str): 

910 raise TypeError("str objects only") 

911 return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs) 

912 

913 send_unicode = send_string 

914 

915 def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str: 

916 """Receive a unicode string, as sent by send_string. 

917 

918 Parameters 

919 ---------- 

920 flags : int 

921 Any valid flags for :func:`Socket.recv`. 

922 encoding : str 

923 The encoding to be used 

924 

925 Returns 

926 ------- 

927 s : str 

928 The Python unicode string that arrives as encoded bytes. 

929 

930 Raises 

931 ------ 

932 ZMQError 

933 for any of the reasons :func:`Socket.recv` might fail 

934 """ 

935 msg = self.recv(flags=flags) 

936 return self._deserialize(msg, lambda buf: buf.decode(encoding)) 

937 

938 recv_unicode = recv_string 

939 

940 def send_pyobj( 

941 self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, **kwargs 

942 ) -> zmq.Frame | None: 

943 """Send a Python object as a message using pickle to serialize. 

944 

945 Parameters 

946 ---------- 

947 obj : Python object 

948 The Python object to send. 

949 flags : int 

950 Any valid flags for :func:`Socket.send`. 

951 protocol : int 

952 The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL 

953 where defined, and pickle.HIGHEST_PROTOCOL elsewhere. 

954 """ 

955 msg = pickle.dumps(obj, protocol) 

956 return self.send(msg, flags=flags, **kwargs) 

957 

958 def recv_pyobj(self, flags: int = 0) -> Any: 

959 """Receive a Python object as a message using pickle to serialize. 

960 

961 Parameters 

962 ---------- 

963 flags : int 

964 Any valid flags for :func:`Socket.recv`. 

965 

966 Returns 

967 ------- 

968 obj : Python object 

969 The Python object that arrives as a message. 

970 

971 Raises 

972 ------ 

973 ZMQError 

974 for any of the reasons :func:`~Socket.recv` might fail 

975 """ 

976 msg = self.recv(flags) 

977 return self._deserialize(msg, pickle.loads) 

978 

979 def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None: 

980 """Send a Python object as a message using json to serialize. 

981 

982 Keyword arguments are passed on to json.dumps 

983 

984 Parameters 

985 ---------- 

986 obj : Python object 

987 The Python object to send 

988 flags : int 

989 Any valid flags for :func:`Socket.send` 

990 """ 

991 send_kwargs = {} 

992 for key in ('routing_id', 'group'): 

993 if key in kwargs: 

994 send_kwargs[key] = kwargs.pop(key) 

995 msg = jsonapi.dumps(obj, **kwargs) 

996 return self.send(msg, flags=flags, **send_kwargs) 

997 

998 def recv_json(self, flags: int = 0, **kwargs) -> list | str | int | float | dict: 

999 """Receive a Python object as a message using json to serialize. 

1000 

1001 Keyword arguments are passed on to json.loads 

1002 

1003 Parameters 

1004 ---------- 

1005 flags : int 

1006 Any valid flags for :func:`Socket.recv`. 

1007 

1008 Returns 

1009 ------- 

1010 obj : Python object 

1011 The Python object that arrives as a message. 

1012 

1013 Raises 

1014 ------ 

1015 ZMQError 

1016 for any of the reasons :func:`~Socket.recv` might fail 

1017 """ 

1018 msg = self.recv(flags) 

1019 return self._deserialize(msg, lambda buf: jsonapi.loads(buf, **kwargs)) 

1020 

1021 _poller_class = Poller 

1022 

1023 def poll(self, timeout: int | None = None, flags: int = zmq.POLLIN) -> int: 

1024 """Poll the socket for events. 

1025 

1026 See :class:`Poller` to wait for multiple sockets at once. 

1027 

1028 Parameters 

1029 ---------- 

1030 timeout : int 

1031 The timeout (in milliseconds) to wait for an event. If unspecified 

1032 (or specified None), will wait forever for an event. 

1033 flags : int 

1034 default: POLLIN. 

1035 POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for. 

1036 

1037 Returns 

1038 ------- 

1039 event_mask : int 

1040 The poll event mask (POLLIN, POLLOUT), 

1041 0 if the timeout was reached without an event. 

1042 """ 

1043 

1044 if self.closed: 

1045 raise ZMQError(zmq.ENOTSUP) 

1046 

1047 p = self._poller_class() 

1048 p.register(self, flags) 

1049 evts = dict(p.poll(timeout)) 

1050 # return 0 if no events, otherwise return event bitfield 

1051 return evts.get(self, 0) 

1052 

1053 def get_monitor_socket( 

1054 self: _SocketType, events: int | None = None, addr: str | None = None 

1055 ) -> _SocketType: 

1056 """Return a connected PAIR socket ready to receive the event notifications. 

1057 

1058 .. versionadded:: libzmq-4.0 

1059 .. versionadded:: 14.0 

1060 

1061 Parameters 

1062 ---------- 

1063 events : int 

1064 default: `zmq.EVENT_ALL` 

1065 The bitmask defining which events are wanted. 

1066 addr : str 

1067 The optional endpoint for the monitoring sockets. 

1068 

1069 Returns 

1070 ------- 

1071 socket : zmq.Socket 

1072 The PAIR socket, connected and ready to receive messages. 

1073 """ 

1074 # safe-guard, method only available on libzmq >= 4 

1075 if zmq.zmq_version_info() < (4,): 

1076 raise NotImplementedError( 

1077 "get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version() 

1078 ) 

1079 

1080 # if already monitoring, return existing socket 

1081 if self._monitor_socket: 

1082 if self._monitor_socket.closed: 

1083 self._monitor_socket = None 

1084 else: 

1085 return self._monitor_socket 

1086 

1087 if addr is None: 

1088 # create endpoint name from internal fd 

1089 addr = f"inproc://monitor.s-{self.FD}" 

1090 if events is None: 

1091 # use all events 

1092 events = zmq.EVENT_ALL 

1093 # attach monitoring socket 

1094 self.monitor(addr, events) 

1095 # create new PAIR socket and connect it 

1096 self._monitor_socket = self.context.socket(zmq.PAIR) 

1097 self._monitor_socket.connect(addr) 

1098 return self._monitor_socket 

1099 

1100 def disable_monitor(self) -> None: 

1101 """Shutdown the PAIR socket (created using get_monitor_socket) 

1102 that is serving socket events. 

1103 

1104 .. versionadded:: 14.4 

1105 """ 

1106 self._monitor_socket = None 

1107 self.monitor(None, 0) 

1108 

1109 

1110SyncSocket: TypeAlias = Socket[bytes] 

1111 

1112__all__ = ['Socket', 'SyncSocket']