Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/zmq/sugar/socket.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

322 statements  

1"""0MQ Socket pure Python methods.""" 

2 

3# Copyright (C) PyZMQ Developers 

4# Distributed under the terms of the Modified BSD License. 

5 

6from __future__ import annotations 

7 

8import errno 

9import pickle 

10import random 

11import sys 

12from typing import ( 

13 Any, 

14 Callable, 

15 Generic, 

16 List, 

17 Literal, 

18 Sequence, 

19 TypeVar, 

20 Union, 

21 cast, 

22 overload, 

23) 

24from warnings import warn 

25 

26import zmq 

27from zmq._typing import TypeAlias 

28from zmq.backend import Socket as SocketBase 

29from zmq.error import ZMQBindError, ZMQError 

30from zmq.utils import jsonapi 

31from zmq.utils.interop import cast_int_addr 

32 

33from ..constants import SocketOption, SocketType, _OptType 

34from .attrsettr import AttributeSetter 

35from .poll import Poller 

36 

37try: 

38 DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL 

39except AttributeError: 

40 DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL 

41 

42_SocketType = TypeVar("_SocketType", bound="Socket") 

43 

44_JSONType: TypeAlias = "int | str | bool | list[_JSONType] | dict[str, _JSONType]" 

45 

46 

47class _SocketContext(Generic[_SocketType]): 

48 """Context Manager for socket bind/unbind""" 

49 

50 socket: _SocketType 

51 kind: str 

52 addr: str 

53 

54 def __repr__(self): 

55 return f"<SocketContext({self.kind}={self.addr!r})>" 

56 

57 def __init__( 

58 self: _SocketContext[_SocketType], socket: _SocketType, kind: str, addr: str 

59 ): 

60 assert kind in {"bind", "connect"} 

61 self.socket = socket 

62 self.kind = kind 

63 self.addr = addr 

64 

65 def __enter__(self: _SocketContext[_SocketType]) -> _SocketType: 

66 return self.socket 

67 

68 def __exit__(self, *args): 

69 if self.socket.closed: 

70 return 

71 if self.kind == "bind": 

72 self.socket.unbind(self.addr) 

73 elif self.kind == "connect": 

74 self.socket.disconnect(self.addr) 

75 

76 

77SocketReturnType = TypeVar("SocketReturnType") 

78 

79 

80class Socket(SocketBase, AttributeSetter, Generic[SocketReturnType]): 

81 """The ZMQ socket object 

82 

83 To create a Socket, first create a Context:: 

84 

85 ctx = zmq.Context.instance() 

86 

87 then call ``ctx.socket(socket_type)``:: 

88 

89 s = ctx.socket(zmq.ROUTER) 

90 

91 .. versionadded:: 25 

92 

93 Sockets can now be shadowed by passing another Socket. 

94 This helps in creating an async copy of a sync socket or vice versa:: 

95 

96 s = zmq.Socket(async_socket) 

97 

98 Which previously had to be:: 

99 

100 s = zmq.Socket.shadow(async_socket.underlying) 

101 """ 

102 

103 _shadow = False 

104 _shadow_obj = None 

105 _monitor_socket = None 

106 _type_name = 'UNKNOWN' 

107 

108 @overload 

109 def __init__( 

110 self: Socket[bytes], 

111 ctx_or_socket: zmq.Context, 

112 socket_type: int, 

113 *, 

114 copy_threshold: int | None = None, 

115 ): ... 

116 

117 @overload 

118 def __init__( 

119 self: Socket[bytes], 

120 *, 

121 shadow: Socket | int, 

122 copy_threshold: int | None = None, 

123 ): ... 

124 

125 @overload 

126 def __init__( 

127 self: Socket[bytes], 

128 ctx_or_socket: Socket, 

129 ): ... 

130 

131 def __init__( 

132 self: Socket[bytes], 

133 ctx_or_socket: zmq.Context | Socket | None = None, 

134 socket_type: int = 0, 

135 *, 

136 shadow: Socket | int = 0, 

137 copy_threshold: int | None = None, 

138 ): 

139 shadow_context: zmq.Context | None = None 

140 if isinstance(ctx_or_socket, zmq.Socket): 

141 # positional Socket(other_socket) 

142 shadow = ctx_or_socket 

143 ctx_or_socket = None 

144 

145 shadow_address: int = 0 

146 

147 if shadow: 

148 self._shadow = True 

149 # hold a reference to the shadow object 

150 self._shadow_obj = shadow 

151 if not isinstance(shadow, int): 

152 if isinstance(shadow, zmq.Socket): 

153 shadow_context = shadow.context 

154 try: 

155 shadow = cast(int, shadow.underlying) 

156 except AttributeError: 

157 pass 

158 shadow_address = cast_int_addr(shadow) 

159 else: 

160 self._shadow = False 

161 

162 super().__init__( 

163 ctx_or_socket, 

164 socket_type, 

165 shadow=shadow_address, 

166 copy_threshold=copy_threshold, 

167 ) 

168 if self._shadow_obj and shadow_context: 

169 # keep self.context reference if shadowing a Socket object 

170 self.context = shadow_context 

171 

172 try: 

173 socket_type = cast(int, self.get(zmq.TYPE)) 

174 except Exception: 

175 pass 

176 else: 

177 try: 

178 self.__dict__["type"] = stype = SocketType(socket_type) 

179 except ValueError: 

180 self._type_name = str(socket_type) 

181 else: 

182 self._type_name = stype.name 

183 

184 def __del__(self): 

185 if not self._shadow and not self.closed: 

186 if warn is not None: 

187 # warn can be None during process teardown 

188 warn( 

189 f"Unclosed socket {self}", 

190 ResourceWarning, 

191 stacklevel=2, 

192 source=self, 

193 ) 

194 self.close() 

195 

196 _repr_cls = "zmq.Socket" 

197 

198 def __repr__(self): 

199 cls = self.__class__ 

200 # look up _repr_cls on exact class, not inherited 

201 _repr_cls = cls.__dict__.get("_repr_cls", None) 

202 if _repr_cls is None: 

203 _repr_cls = f"{cls.__module__}.{cls.__name__}" 

204 

205 closed = ' closed' if self._closed else '' 

206 

207 return f"<{_repr_cls}(zmq.{self._type_name}) at {hex(id(self))}{closed}>" 

208 

209 # socket as context manager: 

210 def __enter__(self: _SocketType) -> _SocketType: 

211 """Sockets are context managers 

212 

213 .. versionadded:: 14.4 

214 """ 

215 return self 

216 

217 def __exit__(self, *args, **kwargs): 

218 self.close() 

219 

220 # ------------------------------------------------------------------------- 

221 # Socket creation 

222 # ------------------------------------------------------------------------- 

223 

224 def __copy__(self: _SocketType, memo=None) -> _SocketType: 

225 """Copying a Socket creates a shadow copy""" 

226 return self.__class__.shadow(self.underlying) 

227 

228 __deepcopy__ = __copy__ 

229 

230 @classmethod 

231 def shadow(cls: type[_SocketType], address: int | zmq.Socket) -> _SocketType: 

232 """Shadow an existing libzmq socket 

233 

234 address is a zmq.Socket or an integer (or FFI pointer) 

235 representing the address of the libzmq socket. 

236 

237 .. versionadded:: 14.1 

238 

239 .. versionadded:: 25 

240 Support for shadowing `zmq.Socket` objects, 

241 instead of just integer addresses. 

242 """ 

243 return cls(shadow=address) 

244 

245 def close(self, linger=None) -> None: 

246 """ 

247 Close the socket. 

248 

249 If linger is specified, LINGER sockopt will be set prior to closing. 

250 

251 Note: closing a zmq Socket may not close the underlying sockets 

252 if there are undelivered messages. 

253 Only after all messages are delivered or discarded by reaching the socket's LINGER timeout 

254 (default: forever) 

255 will the underlying sockets be closed. 

256 

257 This can be called to close the socket by hand. If this is not 

258 called, the socket will automatically be closed when it is 

259 garbage collected, 

260 in which case you may see a ResourceWarning about the unclosed socket. 

261 """ 

262 if self.context: 

263 self.context._rm_socket(self) 

264 super().close(linger=linger) 

265 

266 # ------------------------------------------------------------------------- 

267 # Connect/Bind context managers 

268 # ------------------------------------------------------------------------- 

269 

270 def _connect_cm(self: _SocketType, addr: str) -> _SocketContext[_SocketType]: 

271 """Context manager to disconnect on exit 

272 

273 .. versionadded:: 20.0 

274 """ 

275 return _SocketContext(self, 'connect', addr) 

276 

277 def _bind_cm(self: _SocketType, addr: str) -> _SocketContext[_SocketType]: 

278 """Context manager to unbind on exit 

279 

280 .. versionadded:: 20.0 

281 """ 

282 try: 

283 # retrieve last_endpoint 

284 # to support binding on random ports via 

285 # `socket.bind('tcp://127.0.0.1:0')` 

286 addr = cast(bytes, self.get(zmq.LAST_ENDPOINT)).decode("utf8") 

287 except (AttributeError, ZMQError, UnicodeDecodeError): 

288 pass 

289 return _SocketContext(self, 'bind', addr) 

290 

291 def bind(self: _SocketType, addr: str) -> _SocketContext[_SocketType]: 

292 """s.bind(addr) 

293 

294 Bind the socket to an address. 

295 

296 This causes the socket to listen on a network port. Sockets on the 

297 other side of this connection will use ``Socket.connect(addr)`` to 

298 connect to this socket. 

299 

300 Returns a context manager which will call unbind on exit. 

301 

302 .. versionadded:: 20.0 

303 Can be used as a context manager. 

304 

305 .. versionadded:: 26.0 

306 binding to port 0 can be used as a context manager 

307 for binding to a random port. 

308 The URL can be retrieved as `socket.last_endpoint`. 

309 

310 Parameters 

311 ---------- 

312 addr : str 

313 The address string. This has the form 'protocol://interface:port', 

314 for example 'tcp://127.0.0.1:5555'. Protocols supported include 

315 tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is 

316 encoded to utf-8 first. 

317 

318 """ 

319 try: 

320 super().bind(addr) 

321 except ZMQError as e: 

322 e.strerror += f" (addr={addr!r})" 

323 raise 

324 return self._bind_cm(addr) 

325 

326 def connect(self: _SocketType, addr: str) -> _SocketContext[_SocketType]: 

327 """s.connect(addr) 

328 

329 Connect to a remote 0MQ socket. 

330 

331 Returns a context manager which will call disconnect on exit. 

332 

333 .. versionadded:: 20.0 

334 Can be used as a context manager. 

335 

336 Parameters 

337 ---------- 

338 addr : str 

339 The address string. This has the form 'protocol://interface:port', 

340 for example 'tcp://127.0.0.1:5555'. Protocols supported are 

341 tcp, udp, pgm, inproc and ipc. If the address is unicode, it is 

342 encoded to utf-8 first. 

343 

344 """ 

345 try: 

346 super().connect(addr) 

347 except ZMQError as e: 

348 e.strerror += f" (addr={addr!r})" 

349 raise 

350 return self._connect_cm(addr) 

351 

352 # ------------------------------------------------------------------------- 

353 # Deprecated aliases 

354 # ------------------------------------------------------------------------- 

355 

356 @property 

357 def socket_type(self) -> int: 

358 warn("Socket.socket_type is deprecated, use Socket.type", DeprecationWarning) 

359 return cast(int, self.type) 

360 

361 # ------------------------------------------------------------------------- 

362 # Hooks for sockopt completion 

363 # ------------------------------------------------------------------------- 

364 

365 def __dir__(self): 

366 keys = dir(self.__class__) 

367 keys.extend(SocketOption.__members__) 

368 return keys 

369 

370 # ------------------------------------------------------------------------- 

371 # Getting/Setting options 

372 # ------------------------------------------------------------------------- 

373 setsockopt = SocketBase.set 

374 getsockopt = SocketBase.get 

375 

376 def __setattr__(self, key, value): 

377 """Override to allow setting zmq.[UN]SUBSCRIBE even though we have a subscribe method""" 

378 if key in self.__dict__: 

379 object.__setattr__(self, key, value) 

380 return 

381 _key = key.lower() 

382 if _key in ('subscribe', 'unsubscribe'): 

383 if isinstance(value, str): 

384 value = value.encode('utf8') 

385 if _key == 'subscribe': 

386 self.set(zmq.SUBSCRIBE, value) 

387 else: 

388 self.set(zmq.UNSUBSCRIBE, value) 

389 return 

390 super().__setattr__(key, value) 

391 

392 def fileno(self) -> int: 

393 """Return edge-triggered file descriptor for this socket. 

394 

395 This is a read-only edge-triggered file descriptor for both read and write events on this socket. 

396 It is important that all available events be consumed when an event is detected, 

397 otherwise the read event will not trigger again. 

398 

399 .. versionadded:: 17.0 

400 """ 

401 return self.FD 

402 

403 def subscribe(self, topic: str | bytes) -> None: 

404 """Subscribe to a topic 

405 

406 Only for SUB sockets. 

407 

408 .. versionadded:: 15.3 

409 """ 

410 if isinstance(topic, str): 

411 topic = topic.encode('utf8') 

412 self.set(zmq.SUBSCRIBE, topic) 

413 

414 def unsubscribe(self, topic: str | bytes) -> None: 

415 """Unsubscribe from a topic 

416 

417 Only for SUB sockets. 

418 

419 .. versionadded:: 15.3 

420 """ 

421 if isinstance(topic, str): 

422 topic = topic.encode('utf8') 

423 self.set(zmq.UNSUBSCRIBE, topic) 

424 

425 def set_string(self, option: int, optval: str, encoding='utf-8') -> None: 

426 """Set socket options with a unicode object. 

427 

428 This is simply a wrapper for setsockopt to protect from encoding ambiguity. 

429 

430 See the 0MQ documentation for details on specific options. 

431 

432 Parameters 

433 ---------- 

434 option : int 

435 The name of the option to set. Can be any of: SUBSCRIBE, 

436 UNSUBSCRIBE, IDENTITY 

437 optval : str 

438 The value of the option to set. 

439 encoding : str 

440 The encoding to be used, default is utf8 

441 """ 

442 if not isinstance(optval, str): 

443 raise TypeError(f"strings only, not {type(optval)}: {optval!r}") 

444 return self.set(option, optval.encode(encoding)) 

445 

446 setsockopt_unicode = setsockopt_string = set_string 

447 

448 def get_string(self, option: int, encoding='utf-8') -> str: 

449 """Get the value of a socket option. 

450 

451 See the 0MQ documentation for details on specific options. 

452 

453 Parameters 

454 ---------- 

455 option : int 

456 The option to retrieve. 

457 

458 Returns 

459 ------- 

460 optval : str 

461 The value of the option as a unicode string. 

462 """ 

463 if SocketOption(option)._opt_type != _OptType.bytes: 

464 raise TypeError(f"option {option} will not return a string to be decoded") 

465 return cast(bytes, self.get(option)).decode(encoding) 

466 

467 getsockopt_unicode = getsockopt_string = get_string 

468 

469 def bind_to_random_port( 

470 self: _SocketType, 

471 addr: str, 

472 min_port: int = 49152, 

473 max_port: int = 65536, 

474 max_tries: int = 100, 

475 ) -> int: 

476 """Bind this socket to a random port in a range. 

477 

478 If the port range is unspecified, the system will choose the port. 

479 

480 Parameters 

481 ---------- 

482 addr : str 

483 The address string without the port to pass to ``Socket.bind()``. 

484 min_port : int, optional 

485 The minimum port in the range of ports to try (inclusive). 

486 max_port : int, optional 

487 The maximum port in the range of ports to try (exclusive). 

488 max_tries : int, optional 

489 The maximum number of bind attempts to make. 

490 

491 Returns 

492 ------- 

493 port : int 

494 The port the socket was bound to. 

495 

496 Raises 

497 ------ 

498 ZMQBindError 

499 if `max_tries` reached before successful bind 

500 """ 

501 if min_port == 49152 and max_port == 65536: 

502 # if LAST_ENDPOINT is supported, and min_port / max_port weren't specified, 

503 # we can bind to port 0 and let the OS do the work 

504 self.bind(f"{addr}:*") 

505 url = cast(bytes, self.last_endpoint).decode('ascii', 'replace') 

506 _, port_s = url.rsplit(':', 1) 

507 return int(port_s) 

508 

509 for i in range(max_tries): 

510 try: 

511 port = random.randrange(min_port, max_port) 

512 self.bind(f'{addr}:{port}') 

513 except ZMQError as exception: 

514 en = exception.errno 

515 if en == zmq.EADDRINUSE: 

516 continue 

517 elif sys.platform == 'win32' and en == errno.EACCES: 

518 continue 

519 else: 

520 raise 

521 else: 

522 return port 

523 raise ZMQBindError("Could not bind socket to random port.") 

524 

525 def get_hwm(self) -> int: 

526 """Get the High Water Mark. 

527 

528 On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM 

529 """ 

530 # return sndhwm, fallback on rcvhwm 

531 try: 

532 return cast(int, self.get(zmq.SNDHWM)) 

533 except zmq.ZMQError: 

534 pass 

535 

536 return cast(int, self.get(zmq.RCVHWM)) 

537 

538 def set_hwm(self, value: int) -> None: 

539 """Set the High Water Mark. 

540 

541 On libzmq ≥ 3, this sets both SNDHWM and RCVHWM 

542 

543 

544 .. warning:: 

545 

546 New values only take effect for subsequent socket 

547 bind/connects. 

548 """ 

549 raised = None 

550 try: 

551 self.sndhwm = value 

552 except Exception as e: 

553 raised = e 

554 try: 

555 self.rcvhwm = value 

556 except Exception as e: 

557 raised = e 

558 

559 if raised: 

560 raise raised 

561 

562 hwm = property( 

563 get_hwm, 

564 set_hwm, 

565 None, 

566 """Property for High Water Mark. 

567 

568 Setting hwm sets both SNDHWM and RCVHWM as appropriate. 

569 It gets SNDHWM if available, otherwise RCVHWM. 

570 """, 

571 ) 

572 

573 # ------------------------------------------------------------------------- 

574 # Sending and receiving messages 

575 # ------------------------------------------------------------------------- 

576 

577 @overload 

578 def send( 

579 self, 

580 data: Any, 

581 flags: int = ..., 

582 copy: bool = ..., 

583 *, 

584 track: Literal[True], 

585 routing_id: int | None = ..., 

586 group: str | None = ..., 

587 ) -> zmq.MessageTracker: ... 

588 

589 @overload 

590 def send( 

591 self, 

592 data: Any, 

593 flags: int = ..., 

594 copy: bool = ..., 

595 *, 

596 track: Literal[False], 

597 routing_id: int | None = ..., 

598 group: str | None = ..., 

599 ) -> None: ... 

600 

601 @overload 

602 def send( 

603 self, 

604 data: Any, 

605 flags: int = ..., 

606 *, 

607 copy: bool = ..., 

608 routing_id: int | None = ..., 

609 group: str | None = ..., 

610 ) -> None: ... 

611 

612 @overload 

613 def send( 

614 self, 

615 data: Any, 

616 flags: int = ..., 

617 copy: bool = ..., 

618 track: bool = ..., 

619 routing_id: int | None = ..., 

620 group: str | None = ..., 

621 ) -> zmq.MessageTracker | None: ... 

622 

623 def send( 

624 self, 

625 data: Any, 

626 flags: int = 0, 

627 copy: bool = True, 

628 track: bool = False, 

629 routing_id: int | None = None, 

630 group: str | None = None, 

631 ) -> zmq.MessageTracker | None: 

632 """Send a single zmq message frame on this socket. 

633 

634 This queues the message to be sent by the IO thread at a later time. 

635 

636 With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full; 

637 otherwise, this waits until space is available. 

638 See :class:`Poller` for more general non-blocking I/O. 

639 

640 Parameters 

641 ---------- 

642 data : bytes, Frame, memoryview 

643 The content of the message. This can be any object that provides 

644 the Python buffer API (i.e. `memoryview(data)` can be called). 

645 flags : int 

646 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE. 

647 copy : bool 

648 Should the message be sent in a copying or non-copying manner. 

649 track : bool 

650 Should the message be tracked for notification that ZMQ has 

651 finished with it? (ignored if copy=True) 

652 routing_id : int 

653 For use with SERVER sockets 

654 group : str 

655 For use with RADIO sockets 

656 

657 Returns 

658 ------- 

659 None : if `copy` or not track 

660 None if message was sent, raises an exception otherwise. 

661 MessageTracker : if track and not copy 

662 a MessageTracker object, whose `done` property will 

663 be False until the send is completed. 

664 

665 Raises 

666 ------ 

667 TypeError 

668 If a unicode object is passed 

669 ValueError 

670 If `track=True`, but an untracked Frame is passed. 

671 ZMQError 

672 If the send does not succeed for any reason (including 

673 if NOBLOCK is set and the outgoing queue is full). 

674 

675 

676 .. versionchanged:: 17.0 

677 

678 DRAFT support for routing_id and group arguments. 

679 """ 

680 if routing_id is not None: 

681 if not isinstance(data, zmq.Frame): 

682 data = zmq.Frame( 

683 data, 

684 track=track, 

685 copy=copy or None, 

686 copy_threshold=self.copy_threshold, 

687 ) 

688 data.routing_id = routing_id 

689 if group is not None: 

690 if not isinstance(data, zmq.Frame): 

691 data = zmq.Frame( 

692 data, 

693 track=track, 

694 copy=copy or None, 

695 copy_threshold=self.copy_threshold, 

696 ) 

697 data.group = group 

698 return super().send(data, flags=flags, copy=copy, track=track) 

699 

700 def send_multipart( 

701 self, 

702 msg_parts: Sequence, 

703 flags: int = 0, 

704 copy: bool = True, 

705 track: bool = False, 

706 **kwargs, 

707 ): 

708 """Send a sequence of buffers as a multipart message. 

709 

710 The zmq.SNDMORE flag is added to all msg parts before the last. 

711 

712 Parameters 

713 ---------- 

714 msg_parts : iterable 

715 A sequence of objects to send as a multipart message. Each element 

716 can be any sendable object (Frame, bytes, buffer-providers) 

717 flags : int, optional 

718 Any valid flags for :func:`Socket.send`. 

719 SNDMORE is added automatically for frames before the last. 

720 copy : bool, optional 

721 Should the frame(s) be sent in a copying or non-copying manner. 

722 If copy=False, frames smaller than self.copy_threshold bytes 

723 will be copied anyway. 

724 track : bool, optional 

725 Should the frame(s) be tracked for notification that ZMQ has 

726 finished with it (ignored if copy=True). 

727 

728 Returns 

729 ------- 

730 None : if copy or not track 

731 MessageTracker : if track and not copy 

732 a MessageTracker object, whose `done` property will 

733 be False until the last send is completed. 

734 """ 

735 # typecheck parts before sending: 

736 for i, msg in enumerate(msg_parts): 

737 if isinstance(msg, (zmq.Frame, bytes, memoryview)): 

738 continue 

739 try: 

740 memoryview(msg) 

741 except Exception: 

742 rmsg = repr(msg) 

743 if len(rmsg) > 32: 

744 rmsg = rmsg[:32] + '...' 

745 raise TypeError( 

746 f"Frame {i} ({rmsg}) does not support the buffer interface." 

747 ) 

748 for msg in msg_parts[:-1]: 

749 self.send(msg, zmq.SNDMORE | flags, copy=copy, track=track) 

750 # Send the last part without the extra SNDMORE flag. 

751 return self.send(msg_parts[-1], flags, copy=copy, track=track) 

752 

753 @overload 

754 def recv_multipart( 

755 self, flags: int = ..., *, copy: Literal[True], track: bool = ... 

756 ) -> list[bytes]: ... 

757 

758 @overload 

759 def recv_multipart( 

760 self, flags: int = ..., *, copy: Literal[False], track: bool = ... 

761 ) -> list[zmq.Frame]: ... 

762 

763 @overload 

764 def recv_multipart(self, flags: int = ..., *, track: bool = ...) -> list[bytes]: ... 

765 

766 @overload 

767 def recv_multipart( 

768 self, flags: int = 0, copy: bool = True, track: bool = False 

769 ) -> list[zmq.Frame] | list[bytes]: ... 

770 

771 def recv_multipart( 

772 self, flags: int = 0, copy: bool = True, track: bool = False 

773 ) -> list[zmq.Frame] | list[bytes]: 

774 """Receive a multipart message as a list of bytes or Frame objects 

775 

776 Parameters 

777 ---------- 

778 flags : int, optional 

779 Any valid flags for :func:`Socket.recv`. 

780 copy : bool, optional 

781 Should the message frame(s) be received in a copying or non-copying manner? 

782 If False a Frame object is returned for each part, if True a copy of 

783 the bytes is made for each frame. 

784 track : bool, optional 

785 Should the message frame(s) be tracked for notification that ZMQ has 

786 finished with it? (ignored if copy=True) 

787 

788 Returns 

789 ------- 

790 msg_parts : list 

791 A list of frames in the multipart message; either Frames or bytes, 

792 depending on `copy`. 

793 

794 Raises 

795 ------ 

796 ZMQError 

797 for any of the reasons :func:`~Socket.recv` might fail 

798 """ 

799 parts = [self.recv(flags, copy=copy, track=track)] 

800 # have first part already, only loop while more to receive 

801 while self.getsockopt(zmq.RCVMORE): 

802 part = self.recv(flags, copy=copy, track=track) 

803 parts.append(part) 

804 # cast List[Union] to Union[List] 

805 # how do we get mypy to recognize that return type is invariant on `copy`? 

806 return cast(Union[List[zmq.Frame], List[bytes]], parts) 

807 

808 def _deserialize( 

809 self, 

810 recvd: bytes, 

811 load: Callable[[bytes], Any], 

812 ) -> Any: 

813 """Deserialize a received message 

814 

815 Override in subclass (e.g. Futures) if recvd is not the raw bytes. 

816 

817 The default implementation expects bytes and returns the deserialized message immediately. 

818 

819 Parameters 

820 ---------- 

821 

822 load: callable 

823 Callable that deserializes bytes 

824 recvd: 

825 The object returned by self.recv 

826 

827 """ 

828 return load(recvd) 

829 

830 def send_serialized(self, msg, serialize, flags=0, copy=True, **kwargs): 

831 """Send a message with a custom serialization function. 

832 

833 .. versionadded:: 17 

834 

835 Parameters 

836 ---------- 

837 msg : The message to be sent. Can be any object serializable by `serialize`. 

838 serialize : callable 

839 The serialization function to use. 

840 serialize(msg) should return an iterable of sendable message frames 

841 (e.g. bytes objects), which will be passed to send_multipart. 

842 flags : int, optional 

843 Any valid flags for :func:`Socket.send`. 

844 copy : bool, optional 

845 Whether to copy the frames. 

846 

847 """ 

848 frames = serialize(msg) 

849 return self.send_multipart(frames, flags=flags, copy=copy, **kwargs) 

850 

851 def recv_serialized(self, deserialize, flags=0, copy=True): 

852 """Receive a message with a custom deserialization function. 

853 

854 .. versionadded:: 17 

855 

856 Parameters 

857 ---------- 

858 deserialize : callable 

859 The deserialization function to use. 

860 deserialize will be called with one argument: the list of frames 

861 returned by recv_multipart() and can return any object. 

862 flags : int, optional 

863 Any valid flags for :func:`Socket.recv`. 

864 copy : bool, optional 

865 Whether to recv bytes or Frame objects. 

866 

867 Returns 

868 ------- 

869 obj : object 

870 The object returned by the deserialization function. 

871 

872 Raises 

873 ------ 

874 ZMQError 

875 for any of the reasons :func:`~Socket.recv` might fail 

876 """ 

877 frames = self.recv_multipart(flags=flags, copy=copy) 

878 return self._deserialize(frames, deserialize) 

879 

880 def send_string( 

881 self, 

882 u: str, 

883 flags: int = 0, 

884 copy: bool = True, 

885 encoding: str = 'utf-8', 

886 **kwargs, 

887 ) -> zmq.Frame | None: 

888 """Send a Python unicode string as a message with an encoding. 

889 

890 0MQ communicates with raw bytes, so you must encode/decode 

891 text (str) around 0MQ. 

892 

893 Parameters 

894 ---------- 

895 u : str 

896 The unicode string to send. 

897 flags : int, optional 

898 Any valid flags for :func:`Socket.send`. 

899 encoding : str 

900 The encoding to be used 

901 """ 

902 if not isinstance(u, str): 

903 raise TypeError("str objects only") 

904 return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs) 

905 

906 send_unicode = send_string 

907 

908 def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str: 

909 """Receive a unicode string, as sent by send_string. 

910 

911 Parameters 

912 ---------- 

913 flags : int 

914 Any valid flags for :func:`Socket.recv`. 

915 encoding : str 

916 The encoding to be used 

917 

918 Returns 

919 ------- 

920 s : str 

921 The Python unicode string that arrives as encoded bytes. 

922 

923 Raises 

924 ------ 

925 ZMQError 

926 for any of the reasons :func:`Socket.recv` might fail 

927 """ 

928 msg = self.recv(flags=flags) 

929 return self._deserialize(msg, lambda buf: buf.decode(encoding)) 

930 

931 recv_unicode = recv_string 

932 

933 def send_pyobj( 

934 self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, **kwargs 

935 ) -> zmq.Frame | None: 

936 """ 

937 Send a Python object as a message using pickle to serialize. 

938 

939 .. warning:: 

940 

941 Never deserialize an untrusted message with pickle, 

942 which can involve arbitrary code execution. 

943 Make sure to authenticate the sources of messages 

944 before unpickling them, e.g. with transport-level security 

945 (e.g. CURVE, ZAP, or IPC permissions) 

946 or signed messages. 

947 

948 Parameters 

949 ---------- 

950 obj : Python object 

951 The Python object to send. 

952 flags : int 

953 Any valid flags for :func:`Socket.send`. 

954 protocol : int 

955 The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL 

956 where defined, and pickle.HIGHEST_PROTOCOL elsewhere. 

957 """ 

958 msg = pickle.dumps(obj, protocol) 

959 return self.send(msg, flags=flags, **kwargs) 

960 

961 def recv_pyobj(self, flags: int = 0) -> Any: 

962 """ 

963 Receive a Python object as a message using UNSAFE pickle to serialize. 

964 

965 .. warning:: 

966 

967 Never deserialize an untrusted message with pickle, 

968 which can involve arbitrary code execution. 

969 Make sure to authenticate the sources of messages 

970 before unpickling them, e.g. with transport-level security 

971 (such as CURVE or IPC permissions) 

972 or authenticating messages themselves before deserializing. 

973 

974 Parameters 

975 ---------- 

976 flags : int 

977 Any valid flags for :func:`Socket.recv`. 

978 

979 Returns 

980 ------- 

981 obj : Python object 

982 The Python object that arrives as a message. 

983 

984 Raises 

985 ------ 

986 ZMQError 

987 for any of the reasons :func:`~Socket.recv` might fail 

988 """ 

989 msg = self.recv(flags) 

990 return self._deserialize(msg, pickle.loads) 

991 

992 def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None: 

993 """Send a Python object as a message using json to serialize. 

994 

995 Keyword arguments are passed on to json.dumps 

996 

997 Parameters 

998 ---------- 

999 obj : Python object 

1000 The Python object to send 

1001 flags : int 

1002 Any valid flags for :func:`Socket.send` 

1003 """ 

1004 send_kwargs = {} 

1005 for key in ('routing_id', 'group'): 

1006 if key in kwargs: 

1007 send_kwargs[key] = kwargs.pop(key) 

1008 msg = jsonapi.dumps(obj, **kwargs) 

1009 return self.send(msg, flags=flags, **send_kwargs) 

1010 

1011 def recv_json(self, flags: int = 0, **kwargs) -> _JSONType: 

1012 """Receive a Python object as a message using json to serialize. 

1013 

1014 Keyword arguments are passed on to json.loads 

1015 

1016 Parameters 

1017 ---------- 

1018 flags : int 

1019 Any valid flags for :func:`Socket.recv`. 

1020 

1021 Returns 

1022 ------- 

1023 obj : Python object 

1024 The Python object that arrives as a message. 

1025 

1026 Raises 

1027 ------ 

1028 ZMQError 

1029 for any of the reasons :func:`~Socket.recv` might fail 

1030 """ 

1031 msg = self.recv(flags) 

1032 return self._deserialize(msg, lambda buf: jsonapi.loads(buf, **kwargs)) 

1033 

1034 _poller_class = Poller 

1035 

1036 def poll(self, timeout: int | None = None, flags: int = zmq.POLLIN) -> int: 

1037 """Poll the socket for events. 

1038 

1039 See :class:`Poller` to wait for multiple sockets at once. 

1040 

1041 Parameters 

1042 ---------- 

1043 timeout : int 

1044 The timeout (in milliseconds) to wait for an event. If unspecified 

1045 (or specified None), will wait forever for an event. 

1046 flags : int 

1047 default: POLLIN. 

1048 POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for. 

1049 

1050 Returns 

1051 ------- 

1052 event_mask : int 

1053 The poll event mask (POLLIN, POLLOUT), 

1054 0 if the timeout was reached without an event. 

1055 """ 

1056 

1057 if self.closed: 

1058 raise ZMQError(zmq.ENOTSUP) 

1059 

1060 p = self._poller_class() 

1061 p.register(self, flags) 

1062 evts = dict(p.poll(timeout)) 

1063 # return 0 if no events, otherwise return event bitfield 

1064 return evts.get(self, 0) 

1065 

1066 def get_monitor_socket( 

1067 self: _SocketType, events: int | None = None, addr: str | None = None 

1068 ) -> _SocketType: 

1069 """Return a connected PAIR socket ready to receive the event notifications. 

1070 

1071 .. versionadded:: libzmq-4.0 

1072 .. versionadded:: 14.0 

1073 

1074 Parameters 

1075 ---------- 

1076 events : int 

1077 default: `zmq.EVENT_ALL` 

1078 The bitmask defining which events are wanted. 

1079 addr : str 

1080 The optional endpoint for the monitoring sockets. 

1081 

1082 Returns 

1083 ------- 

1084 socket : zmq.Socket 

1085 The PAIR socket, connected and ready to receive messages. 

1086 """ 

1087 # safe-guard, method only available on libzmq >= 4 

1088 if zmq.zmq_version_info() < (4,): 

1089 raise NotImplementedError( 

1090 f"get_monitor_socket requires libzmq >= 4, have {zmq.zmq_version()}" 

1091 ) 

1092 

1093 # if already monitoring, return existing socket 

1094 if self._monitor_socket: 

1095 if self._monitor_socket.closed: 

1096 self._monitor_socket = None 

1097 else: 

1098 return self._monitor_socket 

1099 

1100 if addr is None: 

1101 # create endpoint name from internal fd 

1102 addr = f"inproc://monitor.s-{self.FD}" 

1103 if events is None: 

1104 # use all events 

1105 events = zmq.EVENT_ALL 

1106 # attach monitoring socket 

1107 self.monitor(addr, events) 

1108 # create new PAIR socket and connect it 

1109 self._monitor_socket = self.context.socket(zmq.PAIR) 

1110 self._monitor_socket.connect(addr) 

1111 return self._monitor_socket 

1112 

1113 def disable_monitor(self) -> None: 

1114 """Shutdown the PAIR socket (created using get_monitor_socket) 

1115 that is serving socket events. 

1116 

1117 .. versionadded:: 14.4 

1118 """ 

1119 self._monitor_socket = None 

1120 self.monitor(None, 0) 

1121 

1122 

1123SyncSocket: TypeAlias = Socket[bytes] 

1124 

1125__all__ = ['Socket', 'SyncSocket']