Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/zmq/sugar/socket.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

323 statements  

1"""0MQ Socket pure Python methods.""" 

2 

3# Copyright (C) PyZMQ Developers 

4# Distributed under the terms of the Modified BSD License. 

5 

6from __future__ import annotations 

7 

8import errno 

9import pickle 

10import random 

11import sys 

12from collections.abc import Sequence 

13from typing import ( 

14 Any, 

15 Callable, 

16 Generic, 

17 Literal, 

18 TypeVar, 

19 Union, 

20 cast, 

21 overload, 

22) 

23from warnings import warn 

24 

25import zmq 

26from zmq._typing import TypeAlias 

27from zmq.backend import Socket as SocketBase 

28from zmq.error import ZMQBindError, ZMQError 

29from zmq.utils import jsonapi 

30from zmq.utils.interop import cast_int_addr 

31 

32from ..constants import SocketOption, SocketType, _OptType 

33from .attrsettr import AttributeSetter 

34from .poll import Poller 

35 

36try: 

37 DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL 

38except AttributeError: 

39 DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL 

40 

41_SocketType = TypeVar("_SocketType", bound="Socket") 

42 

43_JSONType: TypeAlias = "int | str | bool | list[_JSONType] | dict[str, _JSONType]" 

44 

45 

46class _SocketContext(Generic[_SocketType]): 

47 """Context Manager for socket bind/unbind""" 

48 

49 socket: _SocketType 

50 kind: str 

51 addr: str 

52 

53 def __repr__(self): 

54 return f"<SocketContext({self.kind}={self.addr!r})>" 

55 

56 def __init__( 

57 self: _SocketContext[_SocketType], socket: _SocketType, kind: str, addr: str 

58 ): 

59 assert kind in {"bind", "connect"} 

60 self.socket = socket 

61 self.kind = kind 

62 self.addr = addr 

63 

64 def __enter__(self: _SocketContext[_SocketType]) -> _SocketType: 

65 return self.socket 

66 

67 def __exit__(self, *args): 

68 if self.socket.closed: 

69 return 

70 if self.kind == "bind": 

71 self.socket.unbind(self.addr) 

72 elif self.kind == "connect": 

73 self.socket.disconnect(self.addr) 

74 

75 

76SocketReturnType = TypeVar("SocketReturnType") 

77 

78 

79class Socket(SocketBase, AttributeSetter, Generic[SocketReturnType]): 

80 """The ZMQ socket object 

81 

82 To create a Socket, first create a Context:: 

83 

84 ctx = zmq.Context.instance() 

85 

86 then call ``ctx.socket(socket_type)``:: 

87 

88 s = ctx.socket(zmq.ROUTER) 

89 

90 .. versionadded:: 25 

91 

92 Sockets can now be shadowed by passing another Socket. 

93 This helps in creating an async copy of a sync socket or vice versa:: 

94 

95 s = zmq.Socket(async_socket) 

96 

97 Which previously had to be:: 

98 

99 s = zmq.Socket.shadow(async_socket.underlying) 

100 """ 

101 

102 _shadow = False 

103 _shadow_obj = None 

104 _monitor_socket = None 

105 _type_name = 'UNKNOWN' 

106 

107 @overload 

108 def __init__( 

109 self: Socket[bytes], 

110 ctx_or_socket: zmq.Context, 

111 socket_type: int, 

112 *, 

113 copy_threshold: int | None = None, 

114 ): ... 

115 

116 @overload 

117 def __init__( 

118 self: Socket[bytes], 

119 *, 

120 shadow: Socket | int, 

121 copy_threshold: int | None = None, 

122 ): ... 

123 

124 @overload 

125 def __init__( 

126 self: Socket[bytes], 

127 ctx_or_socket: Socket, 

128 ): ... 

129 

130 def __init__( 

131 self: Socket[bytes], 

132 ctx_or_socket: zmq.Context | Socket | None = None, 

133 socket_type: int = 0, 

134 *, 

135 shadow: Socket | int = 0, 

136 copy_threshold: int | None = None, 

137 ): 

138 shadow_context: zmq.Context | None = None 

139 if isinstance(ctx_or_socket, zmq.Socket): 

140 # positional Socket(other_socket) 

141 shadow = ctx_or_socket 

142 ctx_or_socket = None 

143 

144 shadow_address: int = 0 

145 

146 if shadow: 

147 self._shadow = True 

148 # hold a reference to the shadow object 

149 self._shadow_obj = shadow 

150 if not isinstance(shadow, int): 

151 if isinstance(shadow, zmq.Socket): 

152 shadow_context = shadow.context 

153 try: 

154 shadow = cast(int, shadow.underlying) 

155 except AttributeError: 

156 pass 

157 shadow_address = cast_int_addr(shadow) 

158 else: 

159 self._shadow = False 

160 

161 super().__init__( 

162 ctx_or_socket, 

163 socket_type, 

164 shadow=shadow_address, 

165 copy_threshold=copy_threshold, 

166 ) 

167 if self._shadow_obj and shadow_context: 

168 # keep self.context reference if shadowing a Socket object 

169 self.context = shadow_context 

170 

171 try: 

172 socket_type = cast(int, self.get(zmq.TYPE)) 

173 except Exception: 

174 pass 

175 else: 

176 try: 

177 self.__dict__["type"] = stype = SocketType(socket_type) 

178 except ValueError: 

179 self._type_name = str(socket_type) 

180 else: 

181 self._type_name = stype.name 

182 

183 def __del__(self): 

184 if not self._shadow and not self.closed: 

185 if warn is not None: 

186 # warn can be None during process teardown 

187 warn( 

188 f"Unclosed socket {self}", 

189 ResourceWarning, 

190 stacklevel=2, 

191 source=self, 

192 ) 

193 self.close() 

194 

195 _repr_cls = "zmq.Socket" 

196 

197 def __repr__(self): 

198 cls = self.__class__ 

199 # look up _repr_cls on exact class, not inherited 

200 _repr_cls = cls.__dict__.get("_repr_cls", None) 

201 if _repr_cls is None: 

202 _repr_cls = f"{cls.__module__}.{cls.__name__}" 

203 

204 closed = ' closed' if self._closed else '' 

205 

206 return f"<{_repr_cls}(zmq.{self._type_name}) at {hex(id(self))}{closed}>" 

207 

208 # socket as context manager: 

209 def __enter__(self: _SocketType) -> _SocketType: 

210 """Sockets are context managers 

211 

212 .. versionadded:: 14.4 

213 """ 

214 return self 

215 

216 def __exit__(self, *args, **kwargs): 

217 self.close() 

218 

219 # ------------------------------------------------------------------------- 

220 # Socket creation 

221 # ------------------------------------------------------------------------- 

222 

223 def __copy__(self: _SocketType, memo=None) -> _SocketType: 

224 """Copying a Socket creates a shadow copy""" 

225 return self.__class__.shadow(self.underlying) 

226 

227 __deepcopy__ = __copy__ 

228 

229 @classmethod 

230 def shadow(cls: type[_SocketType], address: int | zmq.Socket) -> _SocketType: 

231 """Shadow an existing libzmq socket 

232 

233 address is a zmq.Socket or an integer (or FFI pointer) 

234 representing the address of the libzmq socket. 

235 

236 .. versionadded:: 14.1 

237 

238 .. versionadded:: 25 

239 Support for shadowing `zmq.Socket` objects, 

240 instead of just integer addresses. 

241 """ 

242 return cls(shadow=address) 

243 

244 def close(self, linger=None) -> None: 

245 """ 

246 Close the socket. 

247 

248 If linger is specified, LINGER sockopt will be set prior to closing. 

249 

250 Note: closing a zmq Socket may not close the underlying sockets 

251 if there are undelivered messages. 

252 Only after all messages are delivered or discarded by reaching the socket's LINGER timeout 

253 (default: forever) 

254 will the underlying sockets be closed. 

255 

256 This can be called to close the socket by hand. If this is not 

257 called, the socket will automatically be closed when it is 

258 garbage collected, 

259 in which case you may see a ResourceWarning about the unclosed socket. 

260 """ 

261 if self.context: 

262 self.context._rm_socket(self) 

263 super().close(linger=linger) 

264 

265 # ------------------------------------------------------------------------- 

266 # Connect/Bind context managers 

267 # ------------------------------------------------------------------------- 

268 

269 def _connect_cm(self: _SocketType, addr: str) -> _SocketContext[_SocketType]: 

270 """Context manager to disconnect on exit 

271 

272 .. versionadded:: 20.0 

273 """ 

274 return _SocketContext(self, 'connect', addr) 

275 

276 def _bind_cm(self: _SocketType, addr: str) -> _SocketContext[_SocketType]: 

277 """Context manager to unbind on exit 

278 

279 .. versionadded:: 20.0 

280 """ 

281 try: 

282 # retrieve last_endpoint 

283 # to support binding on random ports via 

284 # `socket.bind('tcp://127.0.0.1:0')` 

285 addr = cast(bytes, self.get(zmq.LAST_ENDPOINT)).decode("utf8") 

286 except (AttributeError, ZMQError, UnicodeDecodeError): 

287 pass 

288 return _SocketContext(self, 'bind', addr) 

289 

290 def bind(self: _SocketType, addr: str) -> _SocketContext[_SocketType]: 

291 """s.bind(addr) 

292 

293 Bind the socket to an address. 

294 

295 This causes the socket to listen on a network port. Sockets on the 

296 other side of this connection will use ``Socket.connect(addr)`` to 

297 connect to this socket. 

298 

299 Returns a context manager which will call unbind on exit. 

300 

301 .. versionadded:: 20.0 

302 Can be used as a context manager. 

303 

304 .. versionadded:: 26.0 

305 binding to port 0 can be used as a context manager 

306 for binding to a random port. 

307 The URL can be retrieved as `socket.last_endpoint`. 

308 

309 Parameters 

310 ---------- 

311 addr : str 

312 The address string. This has the form 'protocol://interface:port', 

313 for example 'tcp://127.0.0.1:5555'. Protocols supported include 

314 tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is 

315 encoded to utf-8 first. 

316 

317 """ 

318 try: 

319 super().bind(addr) 

320 except ZMQError as e: 

321 e.strerror += f" (addr={addr!r})" 

322 raise 

323 return self._bind_cm(addr) 

324 

325 def connect(self: _SocketType, addr: str) -> _SocketContext[_SocketType]: 

326 """s.connect(addr) 

327 

328 Connect to a remote 0MQ socket. 

329 

330 Returns a context manager which will call disconnect on exit. 

331 

332 .. versionadded:: 20.0 

333 Can be used as a context manager. 

334 

335 Parameters 

336 ---------- 

337 addr : str 

338 The address string. This has the form 'protocol://interface:port', 

339 for example 'tcp://127.0.0.1:5555'. Protocols supported are 

340 tcp, udp, pgm, inproc and ipc. If the address is unicode, it is 

341 encoded to utf-8 first. 

342 

343 """ 

344 try: 

345 super().connect(addr) 

346 except ZMQError as e: 

347 e.strerror += f" (addr={addr!r})" 

348 raise 

349 return self._connect_cm(addr) 

350 

351 # ------------------------------------------------------------------------- 

352 # Deprecated aliases 

353 # ------------------------------------------------------------------------- 

354 

355 @property 

356 def socket_type(self) -> int: 

357 warn("Socket.socket_type is deprecated, use Socket.type", DeprecationWarning) 

358 return cast(int, self.type) 

359 

360 # ------------------------------------------------------------------------- 

361 # Hooks for sockopt completion 

362 # ------------------------------------------------------------------------- 

363 

364 def __dir__(self): 

365 keys = dir(self.__class__) 

366 keys.extend(SocketOption.__members__) 

367 return keys 

368 

369 # ------------------------------------------------------------------------- 

370 # Getting/Setting options 

371 # ------------------------------------------------------------------------- 

372 setsockopt = SocketBase.set 

373 getsockopt = SocketBase.get 

374 

375 def __setattr__(self, key, value): 

376 """Override to allow setting zmq.[UN]SUBSCRIBE even though we have a subscribe method""" 

377 if key in self.__dict__: 

378 object.__setattr__(self, key, value) 

379 return 

380 _key = key.lower() 

381 if _key in ('subscribe', 'unsubscribe'): 

382 if isinstance(value, str): 

383 value = value.encode('utf8') 

384 if _key == 'subscribe': 

385 self.set(zmq.SUBSCRIBE, value) 

386 else: 

387 self.set(zmq.UNSUBSCRIBE, value) 

388 return 

389 super().__setattr__(key, value) 

390 

391 def fileno(self) -> int: 

392 """Return edge-triggered file descriptor for this socket. 

393 

394 This is a read-only edge-triggered file descriptor for both read and write events on this socket. 

395 It is important that all available events be consumed when an event is detected, 

396 otherwise the read event will not trigger again. 

397 

398 .. versionadded:: 17.0 

399 """ 

400 return self.FD 

401 

402 def subscribe(self, topic: str | bytes) -> None: 

403 """Subscribe to a topic 

404 

405 Only for SUB sockets. 

406 

407 .. versionadded:: 15.3 

408 """ 

409 if isinstance(topic, str): 

410 topic = topic.encode('utf8') 

411 self.set(zmq.SUBSCRIBE, topic) 

412 

413 def unsubscribe(self, topic: str | bytes) -> None: 

414 """Unsubscribe from a topic 

415 

416 Only for SUB sockets. 

417 

418 .. versionadded:: 15.3 

419 """ 

420 if isinstance(topic, str): 

421 topic = topic.encode('utf8') 

422 self.set(zmq.UNSUBSCRIBE, topic) 

423 

424 def set_string(self, option: int, optval: str, encoding='utf-8') -> None: 

425 """Set socket options with a unicode object. 

426 

427 This is simply a wrapper for setsockopt to protect from encoding ambiguity. 

428 

429 See the 0MQ documentation for details on specific options. 

430 

431 Parameters 

432 ---------- 

433 option : int 

434 The name of the option to set. Can be any of: SUBSCRIBE, 

435 UNSUBSCRIBE, IDENTITY 

436 optval : str 

437 The value of the option to set. 

438 encoding : str 

439 The encoding to be used, default is utf8 

440 """ 

441 if not isinstance(optval, str): 

442 raise TypeError(f"strings only, not {type(optval)}: {optval!r}") 

443 return self.set(option, optval.encode(encoding)) 

444 

445 setsockopt_unicode = setsockopt_string = set_string 

446 

447 def get_string(self, option: int, encoding='utf-8') -> str: 

448 """Get the value of a socket option. 

449 

450 See the 0MQ documentation for details on specific options. 

451 

452 Parameters 

453 ---------- 

454 option : int 

455 The option to retrieve. 

456 

457 Returns 

458 ------- 

459 optval : str 

460 The value of the option as a unicode string. 

461 """ 

462 if SocketOption(option)._opt_type != _OptType.bytes: 

463 raise TypeError(f"option {option} will not return a string to be decoded") 

464 return cast(bytes, self.get(option)).decode(encoding) 

465 

466 getsockopt_unicode = getsockopt_string = get_string 

467 

468 def bind_to_random_port( 

469 self: _SocketType, 

470 addr: str, 

471 min_port: int = 49152, 

472 max_port: int = 65536, 

473 max_tries: int = 100, 

474 ) -> int: 

475 """Bind this socket to a random port in a range. 

476 

477 If the port range is unspecified, the system will choose the port. 

478 

479 Parameters 

480 ---------- 

481 addr : str 

482 The address string without the port to pass to ``Socket.bind()``. 

483 min_port : int, optional 

484 The minimum port in the range of ports to try (inclusive). 

485 max_port : int, optional 

486 The maximum port in the range of ports to try (exclusive). 

487 max_tries : int, optional 

488 The maximum number of bind attempts to make. 

489 

490 Returns 

491 ------- 

492 port : int 

493 The port the socket was bound to. 

494 

495 Raises 

496 ------ 

497 ZMQBindError 

498 if `max_tries` reached before successful bind 

499 """ 

500 if min_port == 49152 and max_port == 65536: 

501 # if LAST_ENDPOINT is supported, and min_port / max_port weren't specified, 

502 # we can bind to port 0 and let the OS do the work 

503 self.bind(f"{addr}:*") 

504 url = cast(bytes, self.last_endpoint).decode('ascii', 'replace') 

505 _, port_s = url.rsplit(':', 1) 

506 return int(port_s) 

507 

508 for i in range(max_tries): 

509 try: 

510 port = random.randrange(min_port, max_port) 

511 self.bind(f'{addr}:{port}') 

512 except ZMQError as exception: 

513 en = exception.errno 

514 if en == zmq.EADDRINUSE: 

515 continue 

516 elif sys.platform == 'win32' and en == errno.EACCES: 

517 continue 

518 else: 

519 raise 

520 else: 

521 return port 

522 raise ZMQBindError("Could not bind socket to random port.") 

523 

524 def get_hwm(self) -> int: 

525 """Get the High Water Mark. 

526 

527 On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM 

528 """ 

529 # return sndhwm, fallback on rcvhwm 

530 try: 

531 return cast(int, self.get(zmq.SNDHWM)) 

532 except zmq.ZMQError: 

533 pass 

534 

535 return cast(int, self.get(zmq.RCVHWM)) 

536 

537 def set_hwm(self, value: int) -> None: 

538 """Set the High Water Mark. 

539 

540 On libzmq ≥ 3, this sets both SNDHWM and RCVHWM 

541 

542 

543 .. warning:: 

544 

545 New values only take effect for subsequent socket 

546 bind/connects. 

547 """ 

548 raised = None 

549 try: 

550 self.sndhwm = value 

551 except Exception as e: 

552 raised = e 

553 try: 

554 self.rcvhwm = value 

555 except Exception as e: 

556 raised = e 

557 

558 if raised: 

559 raise raised 

560 

561 hwm = property( 

562 get_hwm, 

563 set_hwm, 

564 None, 

565 """Property for High Water Mark. 

566 

567 Setting hwm sets both SNDHWM and RCVHWM as appropriate. 

568 It gets SNDHWM if available, otherwise RCVHWM. 

569 """, 

570 ) 

571 

572 # ------------------------------------------------------------------------- 

573 # Sending and receiving messages 

574 # ------------------------------------------------------------------------- 

575 

576 @overload 

577 def send( 

578 self, 

579 data: Any, 

580 flags: int = ..., 

581 copy: bool = ..., 

582 *, 

583 track: Literal[True], 

584 routing_id: int | None = ..., 

585 group: str | None = ..., 

586 ) -> zmq.MessageTracker: ... 

587 

588 @overload 

589 def send( 

590 self, 

591 data: Any, 

592 flags: int = ..., 

593 copy: bool = ..., 

594 *, 

595 track: Literal[False], 

596 routing_id: int | None = ..., 

597 group: str | None = ..., 

598 ) -> None: ... 

599 

600 @overload 

601 def send( 

602 self, 

603 data: Any, 

604 flags: int = ..., 

605 *, 

606 copy: bool = ..., 

607 routing_id: int | None = ..., 

608 group: str | None = ..., 

609 ) -> None: ... 

610 

611 @overload 

612 def send( 

613 self, 

614 data: Any, 

615 flags: int = ..., 

616 copy: bool = ..., 

617 track: bool = ..., 

618 routing_id: int | None = ..., 

619 group: str | None = ..., 

620 ) -> zmq.MessageTracker | None: ... 

621 

622 def send( 

623 self, 

624 data: Any, 

625 flags: int = 0, 

626 copy: bool = True, 

627 track: bool = False, 

628 routing_id: int | None = None, 

629 group: str | None = None, 

630 ) -> zmq.MessageTracker | None: 

631 """Send a single zmq message frame on this socket. 

632 

633 This queues the message to be sent by the IO thread at a later time. 

634 

635 With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full; 

636 otherwise, this waits until space is available. 

637 See :class:`Poller` for more general non-blocking I/O. 

638 

639 Parameters 

640 ---------- 

641 data : bytes, Frame, memoryview 

642 The content of the message. This can be any object that provides 

643 the Python buffer API (i.e. `memoryview(data)` can be called). 

644 flags : int 

645 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE. 

646 copy : bool 

647 Should the message be sent in a copying or non-copying manner. 

648 track : bool 

649 Should the message be tracked for notification that ZMQ has 

650 finished with it? (ignored if copy=True) 

651 routing_id : int 

652 For use with SERVER sockets 

653 group : str 

654 For use with RADIO sockets 

655 

656 Returns 

657 ------- 

658 None : if `copy` or not track 

659 None if message was sent, raises an exception otherwise. 

660 MessageTracker : if track and not copy 

661 a MessageTracker object, whose `done` property will 

662 be False until the send is completed. 

663 

664 Raises 

665 ------ 

666 TypeError 

667 If a unicode object is passed 

668 ValueError 

669 If `track=True`, but an untracked Frame is passed. 

670 ZMQError 

671 If the send does not succeed for any reason (including 

672 if NOBLOCK is set and the outgoing queue is full). 

673 

674 

675 .. versionchanged:: 17.0 

676 

677 DRAFT support for routing_id and group arguments. 

678 """ 

679 if routing_id is not None: 

680 if not isinstance(data, zmq.Frame): 

681 data = zmq.Frame( 

682 data, 

683 track=track, 

684 copy=copy or None, 

685 copy_threshold=self.copy_threshold, 

686 ) 

687 data.routing_id = routing_id 

688 if group is not None: 

689 if not isinstance(data, zmq.Frame): 

690 data = zmq.Frame( 

691 data, 

692 track=track, 

693 copy=copy or None, 

694 copy_threshold=self.copy_threshold, 

695 ) 

696 data.group = group 

697 return super().send(data, flags=flags, copy=copy, track=track) 

698 

699 def send_multipart( 

700 self, 

701 msg_parts: Sequence, 

702 flags: int = 0, 

703 copy: bool = True, 

704 track: bool = False, 

705 **kwargs, 

706 ): 

707 """Send a sequence of buffers as a multipart message. 

708 

709 The zmq.SNDMORE flag is added to all msg parts before the last. 

710 

711 Parameters 

712 ---------- 

713 msg_parts : iterable 

714 A sequence of objects to send as a multipart message. Each element 

715 can be any sendable object (Frame, bytes, buffer-providers) 

716 flags : int, optional 

717 Any valid flags for :func:`Socket.send`. 

718 SNDMORE is added automatically for frames before the last. 

719 copy : bool, optional 

720 Should the frame(s) be sent in a copying or non-copying manner. 

721 If copy=False, frames smaller than self.copy_threshold bytes 

722 will be copied anyway. 

723 track : bool, optional 

724 Should the frame(s) be tracked for notification that ZMQ has 

725 finished with it (ignored if copy=True). 

726 

727 Returns 

728 ------- 

729 None : if copy or not track 

730 MessageTracker : if track and not copy 

731 a MessageTracker object, whose `done` property will 

732 be False until the last send is completed. 

733 """ 

734 # typecheck parts before sending: 

735 for i, msg in enumerate(msg_parts): 

736 if isinstance(msg, (zmq.Frame, bytes, memoryview)): 

737 continue 

738 try: 

739 memoryview(msg) 

740 except Exception: 

741 rmsg = repr(msg) 

742 if len(rmsg) > 32: 

743 rmsg = rmsg[:32] + '...' 

744 raise TypeError( 

745 f"Frame {i} ({rmsg}) does not support the buffer interface." 

746 ) 

747 for msg in msg_parts[:-1]: 

748 self.send(msg, zmq.SNDMORE | flags, copy=copy, track=track) 

749 # Send the last part without the extra SNDMORE flag. 

750 return self.send(msg_parts[-1], flags, copy=copy, track=track) 

751 

752 @overload 

753 def recv_multipart( 

754 self, flags: int = ..., *, copy: Literal[True], track: bool = ... 

755 ) -> list[bytes]: ... 

756 

757 @overload 

758 def recv_multipart( 

759 self, flags: int = ..., *, copy: Literal[False], track: bool = ... 

760 ) -> list[zmq.Frame]: ... 

761 

762 @overload 

763 def recv_multipart(self, flags: int = ..., *, track: bool = ...) -> list[bytes]: ... 

764 

765 @overload 

766 def recv_multipart( 

767 self, flags: int = 0, copy: bool = True, track: bool = False 

768 ) -> list[zmq.Frame] | list[bytes]: ... 

769 

770 def recv_multipart( 

771 self, flags: int = 0, copy: bool = True, track: bool = False 

772 ) -> list[zmq.Frame] | list[bytes]: 

773 """Receive a multipart message as a list of bytes or Frame objects 

774 

775 Parameters 

776 ---------- 

777 flags : int, optional 

778 Any valid flags for :func:`Socket.recv`. 

779 copy : bool, optional 

780 Should the message frame(s) be received in a copying or non-copying manner? 

781 If False a Frame object is returned for each part, if True a copy of 

782 the bytes is made for each frame. 

783 track : bool, optional 

784 Should the message frame(s) be tracked for notification that ZMQ has 

785 finished with it? (ignored if copy=True) 

786 

787 Returns 

788 ------- 

789 msg_parts : list 

790 A list of frames in the multipart message; either Frames or bytes, 

791 depending on `copy`. 

792 

793 Raises 

794 ------ 

795 ZMQError 

796 for any of the reasons :func:`~Socket.recv` might fail 

797 """ 

798 parts = [self.recv(flags, copy=copy, track=track)] 

799 # have first part already, only loop while more to receive 

800 while self.getsockopt(zmq.RCVMORE): 

801 part = self.recv(flags, copy=copy, track=track) 

802 parts.append(part) 

803 # cast List[Union] to Union[List] 

804 # how do we get mypy to recognize that return type is invariant on `copy`? 

805 return cast(Union[list[zmq.Frame], list[bytes]], parts) 

806 

807 def _deserialize( 

808 self, 

809 recvd: bytes, 

810 load: Callable[[bytes], Any], 

811 ) -> Any: 

812 """Deserialize a received message 

813 

814 Override in subclass (e.g. Futures) if recvd is not the raw bytes. 

815 

816 The default implementation expects bytes and returns the deserialized message immediately. 

817 

818 Parameters 

819 ---------- 

820 

821 load: callable 

822 Callable that deserializes bytes 

823 recvd: 

824 The object returned by self.recv 

825 

826 """ 

827 return load(recvd) 

828 

829 def send_serialized(self, msg, serialize, flags=0, copy=True, **kwargs): 

830 """Send a message with a custom serialization function. 

831 

832 .. versionadded:: 17 

833 

834 Parameters 

835 ---------- 

836 msg : The message to be sent. Can be any object serializable by `serialize`. 

837 serialize : callable 

838 The serialization function to use. 

839 serialize(msg) should return an iterable of sendable message frames 

840 (e.g. bytes objects), which will be passed to send_multipart. 

841 flags : int, optional 

842 Any valid flags for :func:`Socket.send`. 

843 copy : bool, optional 

844 Whether to copy the frames. 

845 

846 """ 

847 frames = serialize(msg) 

848 return self.send_multipart(frames, flags=flags, copy=copy, **kwargs) 

849 

850 def recv_serialized(self, deserialize, flags=0, copy=True): 

851 """Receive a message with a custom deserialization function. 

852 

853 .. versionadded:: 17 

854 

855 Parameters 

856 ---------- 

857 deserialize : callable 

858 The deserialization function to use. 

859 deserialize will be called with one argument: the list of frames 

860 returned by recv_multipart() and can return any object. 

861 flags : int, optional 

862 Any valid flags for :func:`Socket.recv`. 

863 copy : bool, optional 

864 Whether to recv bytes or Frame objects. 

865 

866 Returns 

867 ------- 

868 obj : object 

869 The object returned by the deserialization function. 

870 

871 Raises 

872 ------ 

873 ZMQError 

874 for any of the reasons :func:`~Socket.recv` might fail 

875 """ 

876 frames = self.recv_multipart(flags=flags, copy=copy) 

877 return self._deserialize(frames, deserialize) 

878 

879 def send_string( 

880 self, 

881 u: str, 

882 flags: int = 0, 

883 copy: bool = True, 

884 encoding: str = 'utf-8', 

885 **kwargs, 

886 ) -> zmq.Frame | None: 

887 """Send a Python unicode string as a message with an encoding. 

888 

889 0MQ communicates with raw bytes, so you must encode/decode 

890 text (str) around 0MQ. 

891 

892 Parameters 

893 ---------- 

894 u : str 

895 The unicode string to send. 

896 flags : int, optional 

897 Any valid flags for :func:`Socket.send`. 

898 encoding : str 

899 The encoding to be used 

900 """ 

901 if not isinstance(u, str): 

902 raise TypeError("str objects only") 

903 return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs) 

904 

905 send_unicode = send_string 

906 

907 def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str: 

908 """Receive a unicode string, as sent by send_string. 

909 

910 Parameters 

911 ---------- 

912 flags : int 

913 Any valid flags for :func:`Socket.recv`. 

914 encoding : str 

915 The encoding to be used 

916 

917 Returns 

918 ------- 

919 s : str 

920 The Python unicode string that arrives as encoded bytes. 

921 

922 Raises 

923 ------ 

924 ZMQError 

925 for any of the reasons :func:`Socket.recv` might fail 

926 """ 

927 msg = self.recv(flags=flags) 

928 return self._deserialize(msg, lambda buf: buf.decode(encoding)) 

929 

930 recv_unicode = recv_string 

931 

932 def send_pyobj( 

933 self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, **kwargs 

934 ) -> zmq.Frame | None: 

935 """ 

936 Send a Python object as a message using pickle to serialize. 

937 

938 .. warning:: 

939 

940 Never deserialize an untrusted message with pickle, 

941 which can involve arbitrary code execution. 

942 Make sure to authenticate the sources of messages 

943 before unpickling them, e.g. with transport-level security 

944 (e.g. CURVE, ZAP, or IPC permissions) 

945 or signed messages. 

946 

947 Parameters 

948 ---------- 

949 obj : Python object 

950 The Python object to send. 

951 flags : int 

952 Any valid flags for :func:`Socket.send`. 

953 protocol : int 

954 The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL 

955 where defined, and pickle.HIGHEST_PROTOCOL elsewhere. 

956 """ 

957 msg = pickle.dumps(obj, protocol) 

958 return self.send(msg, flags=flags, **kwargs) 

959 

960 def recv_pyobj(self, flags: int = 0) -> Any: 

961 """ 

962 Receive a Python object as a message using UNSAFE pickle to serialize. 

963 

964 .. warning:: 

965 

966 Never deserialize an untrusted message with pickle, 

967 which can involve arbitrary code execution. 

968 Make sure to authenticate the sources of messages 

969 before unpickling them, e.g. with transport-level security 

970 (such as CURVE or IPC permissions) 

971 or authenticating messages themselves before deserializing. 

972 

973 Parameters 

974 ---------- 

975 flags : int 

976 Any valid flags for :func:`Socket.recv`. 

977 

978 Returns 

979 ------- 

980 obj : Python object 

981 The Python object that arrives as a message. 

982 

983 Raises 

984 ------ 

985 ZMQError 

986 for any of the reasons :func:`~Socket.recv` might fail 

987 """ 

988 msg = self.recv(flags) 

989 return self._deserialize(msg, pickle.loads) 

990 

991 def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None: 

992 """Send a Python object as a message using json to serialize. 

993 

994 Keyword arguments are passed on to json.dumps 

995 

996 Parameters 

997 ---------- 

998 obj : Python object 

999 The Python object to send 

1000 flags : int 

1001 Any valid flags for :func:`Socket.send` 

1002 """ 

1003 send_kwargs = {} 

1004 for key in ('routing_id', 'group'): 

1005 if key in kwargs: 

1006 send_kwargs[key] = kwargs.pop(key) 

1007 msg = jsonapi.dumps(obj, **kwargs) 

1008 return self.send(msg, flags=flags, **send_kwargs) 

1009 

1010 def recv_json(self, flags: int = 0, **kwargs) -> _JSONType: 

1011 """Receive a Python object as a message using json to serialize. 

1012 

1013 Keyword arguments are passed on to json.loads 

1014 

1015 Parameters 

1016 ---------- 

1017 flags : int 

1018 Any valid flags for :func:`Socket.recv`. 

1019 

1020 Returns 

1021 ------- 

1022 obj : Python object 

1023 The Python object that arrives as a message. 

1024 

1025 Raises 

1026 ------ 

1027 ZMQError 

1028 for any of the reasons :func:`~Socket.recv` might fail 

1029 """ 

1030 msg = self.recv(flags) 

1031 return self._deserialize(msg, lambda buf: jsonapi.loads(buf, **kwargs)) 

1032 

1033 _poller_class = Poller 

1034 

1035 def poll(self, timeout: int | None = None, flags: int = zmq.POLLIN) -> int: 

1036 """Poll the socket for events. 

1037 

1038 See :class:`Poller` to wait for multiple sockets at once. 

1039 

1040 Parameters 

1041 ---------- 

1042 timeout : int 

1043 The timeout (in milliseconds) to wait for an event. If unspecified 

1044 (or specified None), will wait forever for an event. 

1045 flags : int 

1046 default: POLLIN. 

1047 POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for. 

1048 

1049 Returns 

1050 ------- 

1051 event_mask : int 

1052 The poll event mask (POLLIN, POLLOUT), 

1053 0 if the timeout was reached without an event. 

1054 """ 

1055 

1056 if self.closed: 

1057 raise ZMQError(zmq.ENOTSUP) 

1058 

1059 p = self._poller_class() 

1060 p.register(self, flags) 

1061 evts = dict(p.poll(timeout)) 

1062 # return 0 if no events, otherwise return event bitfield 

1063 return evts.get(self, 0) 

1064 

1065 def get_monitor_socket( 

1066 self: _SocketType, events: int | None = None, addr: str | None = None 

1067 ) -> _SocketType: 

1068 """Return a connected PAIR socket ready to receive the event notifications. 

1069 

1070 .. versionadded:: libzmq-4.0 

1071 .. versionadded:: 14.0 

1072 

1073 Parameters 

1074 ---------- 

1075 events : int 

1076 default: `zmq.EVENT_ALL` 

1077 The bitmask defining which events are wanted. 

1078 addr : str 

1079 The optional endpoint for the monitoring sockets. 

1080 

1081 Returns 

1082 ------- 

1083 socket : zmq.Socket 

1084 The PAIR socket, connected and ready to receive messages. 

1085 """ 

1086 # safe-guard, method only available on libzmq >= 4 

1087 if zmq.zmq_version_info() < (4,): 

1088 raise NotImplementedError( 

1089 f"get_monitor_socket requires libzmq >= 4, have {zmq.zmq_version()}" 

1090 ) 

1091 

1092 # if already monitoring, return existing socket 

1093 if self._monitor_socket: 

1094 if self._monitor_socket.closed: 

1095 self._monitor_socket = None 

1096 else: 

1097 return self._monitor_socket 

1098 

1099 if addr is None: 

1100 # create endpoint name from internal fd 

1101 addr = f"inproc://monitor.s-{self.FD}" 

1102 if events is None: 

1103 # use all events 

1104 events = zmq.EVENT_ALL 

1105 # attach monitoring socket 

1106 self.monitor(addr, events) 

1107 # create new PAIR socket and connect it 

1108 self._monitor_socket = self.context.socket(zmq.PAIR) 

1109 self._monitor_socket.connect(addr) 

1110 return self._monitor_socket 

1111 

1112 def disable_monitor(self) -> None: 

1113 """Shutdown the PAIR socket (created using get_monitor_socket) 

1114 that is serving socket events. 

1115 

1116 .. versionadded:: 14.4 

1117 """ 

1118 self._monitor_socket = None 

1119 self.monitor(None, 0) 

1120 

1121 

1122SyncSocket: TypeAlias = Socket[bytes] 

1123 

1124__all__ = ['Socket', 'SyncSocket']