Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/zmq/sugar/socket.py: 46%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""0MQ Socket pure Python methods."""
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
6from __future__ import annotations
8import errno
9import pickle
10import random
11import sys
12from collections.abc import Sequence
13from typing import (
14 Any,
15 Callable,
16 Generic,
17 Literal,
18 TypeVar,
19 Union,
20 cast,
21 overload,
22)
23from warnings import warn
25import zmq
26from zmq._typing import TypeAlias
27from zmq.backend import Socket as SocketBase
28from zmq.error import ZMQBindError, ZMQError
29from zmq.utils import jsonapi
30from zmq.utils.interop import cast_int_addr
32from ..constants import SocketOption, SocketType, _OptType
33from .attrsettr import AttributeSetter
34from .poll import Poller
36try:
37 DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL
38except AttributeError:
39 DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL
41_SocketType = TypeVar("_SocketType", bound="Socket")
43_JSONType: TypeAlias = "int | str | bool | list[_JSONType] | dict[str, _JSONType]"
46class _SocketContext(Generic[_SocketType]):
47 """Context Manager for socket bind/unbind"""
49 socket: _SocketType
50 kind: str
51 addr: str
53 def __repr__(self):
54 return f"<SocketContext({self.kind}={self.addr!r})>"
56 def __init__(
57 self: _SocketContext[_SocketType], socket: _SocketType, kind: str, addr: str
58 ):
59 assert kind in {"bind", "connect"}
60 self.socket = socket
61 self.kind = kind
62 self.addr = addr
64 def __enter__(self: _SocketContext[_SocketType]) -> _SocketType:
65 return self.socket
67 def __exit__(self, *args):
68 if self.socket.closed:
69 return
70 if self.kind == "bind":
71 self.socket.unbind(self.addr)
72 elif self.kind == "connect":
73 self.socket.disconnect(self.addr)
76SocketReturnType = TypeVar("SocketReturnType")
79class Socket(SocketBase, AttributeSetter, Generic[SocketReturnType]):
80 """The ZMQ socket object
82 To create a Socket, first create a Context::
84 ctx = zmq.Context.instance()
86 then call ``ctx.socket(socket_type)``::
88 s = ctx.socket(zmq.ROUTER)
90 .. versionadded:: 25
92 Sockets can now be shadowed by passing another Socket.
93 This helps in creating an async copy of a sync socket or vice versa::
95 s = zmq.Socket(async_socket)
97 Which previously had to be::
99 s = zmq.Socket.shadow(async_socket.underlying)
100 """
102 _shadow = False
103 _shadow_obj = None
104 _monitor_socket = None
105 _type_name = 'UNKNOWN'
107 @overload
108 def __init__(
109 self: Socket[bytes],
110 ctx_or_socket: zmq.Context,
111 socket_type: int,
112 *,
113 copy_threshold: int | None = None,
114 ): ...
116 @overload
117 def __init__(
118 self: Socket[bytes],
119 *,
120 shadow: Socket | int,
121 copy_threshold: int | None = None,
122 ): ...
124 @overload
125 def __init__(
126 self: Socket[bytes],
127 ctx_or_socket: Socket,
128 ): ...
130 def __init__(
131 self: Socket[bytes],
132 ctx_or_socket: zmq.Context | Socket | None = None,
133 socket_type: int = 0,
134 *,
135 shadow: Socket | int = 0,
136 copy_threshold: int | None = None,
137 ):
138 shadow_context: zmq.Context | None = None
139 if isinstance(ctx_or_socket, zmq.Socket):
140 # positional Socket(other_socket)
141 shadow = ctx_or_socket
142 ctx_or_socket = None
144 shadow_address: int = 0
146 if shadow:
147 self._shadow = True
148 # hold a reference to the shadow object
149 self._shadow_obj = shadow
150 if not isinstance(shadow, int):
151 if isinstance(shadow, zmq.Socket):
152 shadow_context = shadow.context
153 try:
154 shadow = cast(int, shadow.underlying)
155 except AttributeError:
156 pass
157 shadow_address = cast_int_addr(shadow)
158 else:
159 self._shadow = False
161 super().__init__(
162 ctx_or_socket,
163 socket_type,
164 shadow=shadow_address,
165 copy_threshold=copy_threshold,
166 )
167 if self._shadow_obj and shadow_context:
168 # keep self.context reference if shadowing a Socket object
169 self.context = shadow_context
171 try:
172 socket_type = cast(int, self.get(zmq.TYPE))
173 except Exception:
174 pass
175 else:
176 try:
177 self.__dict__["type"] = stype = SocketType(socket_type)
178 except ValueError:
179 self._type_name = str(socket_type)
180 else:
181 self._type_name = stype.name
183 def __del__(self):
184 if not self._shadow and not self.closed:
185 if warn is not None:
186 # warn can be None during process teardown
187 warn(
188 f"Unclosed socket {self}",
189 ResourceWarning,
190 stacklevel=2,
191 source=self,
192 )
193 self.close()
195 _repr_cls = "zmq.Socket"
197 def __repr__(self):
198 cls = self.__class__
199 # look up _repr_cls on exact class, not inherited
200 _repr_cls = cls.__dict__.get("_repr_cls", None)
201 if _repr_cls is None:
202 _repr_cls = f"{cls.__module__}.{cls.__name__}"
204 closed = ' closed' if self._closed else ''
206 return f"<{_repr_cls}(zmq.{self._type_name}) at {hex(id(self))}{closed}>"
208 # socket as context manager:
209 def __enter__(self: _SocketType) -> _SocketType:
210 """Sockets are context managers
212 .. versionadded:: 14.4
213 """
214 return self
216 def __exit__(self, *args, **kwargs):
217 self.close()
219 # -------------------------------------------------------------------------
220 # Socket creation
221 # -------------------------------------------------------------------------
223 def __copy__(self: _SocketType, memo=None) -> _SocketType:
224 """Copying a Socket creates a shadow copy"""
225 return self.__class__.shadow(self.underlying)
227 __deepcopy__ = __copy__
229 @classmethod
230 def shadow(cls: type[_SocketType], address: int | zmq.Socket) -> _SocketType:
231 """Shadow an existing libzmq socket
233 address is a zmq.Socket or an integer (or FFI pointer)
234 representing the address of the libzmq socket.
236 .. versionadded:: 14.1
238 .. versionadded:: 25
239 Support for shadowing `zmq.Socket` objects,
240 instead of just integer addresses.
241 """
242 return cls(shadow=address)
244 def close(self, linger=None) -> None:
245 """
246 Close the socket.
248 If linger is specified, LINGER sockopt will be set prior to closing.
250 Note: closing a zmq Socket may not close the underlying sockets
251 if there are undelivered messages.
252 Only after all messages are delivered or discarded by reaching the socket's LINGER timeout
253 (default: forever)
254 will the underlying sockets be closed.
256 This can be called to close the socket by hand. If this is not
257 called, the socket will automatically be closed when it is
258 garbage collected,
259 in which case you may see a ResourceWarning about the unclosed socket.
260 """
261 if self.context:
262 self.context._rm_socket(self)
263 super().close(linger=linger)
265 # -------------------------------------------------------------------------
266 # Connect/Bind context managers
267 # -------------------------------------------------------------------------
269 def _connect_cm(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
270 """Context manager to disconnect on exit
272 .. versionadded:: 20.0
273 """
274 return _SocketContext(self, 'connect', addr)
276 def _bind_cm(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
277 """Context manager to unbind on exit
279 .. versionadded:: 20.0
280 """
281 try:
282 # retrieve last_endpoint
283 # to support binding on random ports via
284 # `socket.bind('tcp://127.0.0.1:0')`
285 addr = cast(bytes, self.get(zmq.LAST_ENDPOINT)).decode("utf8")
286 except (AttributeError, ZMQError, UnicodeDecodeError):
287 pass
288 return _SocketContext(self, 'bind', addr)
290 def bind(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
291 """s.bind(addr)
293 Bind the socket to an address.
295 This causes the socket to listen on a network port. Sockets on the
296 other side of this connection will use ``Socket.connect(addr)`` to
297 connect to this socket.
299 Returns a context manager which will call unbind on exit.
301 .. versionadded:: 20.0
302 Can be used as a context manager.
304 .. versionadded:: 26.0
305 binding to port 0 can be used as a context manager
306 for binding to a random port.
307 The URL can be retrieved as `socket.last_endpoint`.
309 Parameters
310 ----------
311 addr : str
312 The address string. This has the form 'protocol://interface:port',
313 for example 'tcp://127.0.0.1:5555'. Protocols supported include
314 tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is
315 encoded to utf-8 first.
317 """
318 try:
319 super().bind(addr)
320 except ZMQError as e:
321 e.strerror += f" (addr={addr!r})"
322 raise
323 return self._bind_cm(addr)
325 def connect(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
326 """s.connect(addr)
328 Connect to a remote 0MQ socket.
330 Returns a context manager which will call disconnect on exit.
332 .. versionadded:: 20.0
333 Can be used as a context manager.
335 Parameters
336 ----------
337 addr : str
338 The address string. This has the form 'protocol://interface:port',
339 for example 'tcp://127.0.0.1:5555'. Protocols supported are
340 tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
341 encoded to utf-8 first.
343 """
344 try:
345 super().connect(addr)
346 except ZMQError as e:
347 e.strerror += f" (addr={addr!r})"
348 raise
349 return self._connect_cm(addr)
351 # -------------------------------------------------------------------------
352 # Deprecated aliases
353 # -------------------------------------------------------------------------
355 @property
356 def socket_type(self) -> int:
357 warn("Socket.socket_type is deprecated, use Socket.type", DeprecationWarning)
358 return cast(int, self.type)
360 # -------------------------------------------------------------------------
361 # Hooks for sockopt completion
362 # -------------------------------------------------------------------------
364 def __dir__(self):
365 keys = dir(self.__class__)
366 keys.extend(SocketOption.__members__)
367 return keys
369 # -------------------------------------------------------------------------
370 # Getting/Setting options
371 # -------------------------------------------------------------------------
372 setsockopt = SocketBase.set
373 getsockopt = SocketBase.get
375 def __setattr__(self, key, value):
376 """Override to allow setting zmq.[UN]SUBSCRIBE even though we have a subscribe method"""
377 if key in self.__dict__:
378 object.__setattr__(self, key, value)
379 return
380 _key = key.lower()
381 if _key in ('subscribe', 'unsubscribe'):
382 if isinstance(value, str):
383 value = value.encode('utf8')
384 if _key == 'subscribe':
385 self.set(zmq.SUBSCRIBE, value)
386 else:
387 self.set(zmq.UNSUBSCRIBE, value)
388 return
389 super().__setattr__(key, value)
391 def fileno(self) -> int:
392 """Return edge-triggered file descriptor for this socket.
394 This is a read-only edge-triggered file descriptor for both read and write events on this socket.
395 It is important that all available events be consumed when an event is detected,
396 otherwise the read event will not trigger again.
398 .. versionadded:: 17.0
399 """
400 return self.FD
402 def subscribe(self, topic: str | bytes) -> None:
403 """Subscribe to a topic
405 Only for SUB sockets.
407 .. versionadded:: 15.3
408 """
409 if isinstance(topic, str):
410 topic = topic.encode('utf8')
411 self.set(zmq.SUBSCRIBE, topic)
413 def unsubscribe(self, topic: str | bytes) -> None:
414 """Unsubscribe from a topic
416 Only for SUB sockets.
418 .. versionadded:: 15.3
419 """
420 if isinstance(topic, str):
421 topic = topic.encode('utf8')
422 self.set(zmq.UNSUBSCRIBE, topic)
424 def set_string(self, option: int, optval: str, encoding='utf-8') -> None:
425 """Set socket options with a unicode object.
427 This is simply a wrapper for setsockopt to protect from encoding ambiguity.
429 See the 0MQ documentation for details on specific options.
431 Parameters
432 ----------
433 option : int
434 The name of the option to set. Can be any of: SUBSCRIBE,
435 UNSUBSCRIBE, IDENTITY
436 optval : str
437 The value of the option to set.
438 encoding : str
439 The encoding to be used, default is utf8
440 """
441 if not isinstance(optval, str):
442 raise TypeError(f"strings only, not {type(optval)}: {optval!r}")
443 return self.set(option, optval.encode(encoding))
445 setsockopt_unicode = setsockopt_string = set_string
447 def get_string(self, option: int, encoding='utf-8') -> str:
448 """Get the value of a socket option.
450 See the 0MQ documentation for details on specific options.
452 Parameters
453 ----------
454 option : int
455 The option to retrieve.
457 Returns
458 -------
459 optval : str
460 The value of the option as a unicode string.
461 """
462 if SocketOption(option)._opt_type != _OptType.bytes:
463 raise TypeError(f"option {option} will not return a string to be decoded")
464 return cast(bytes, self.get(option)).decode(encoding)
466 getsockopt_unicode = getsockopt_string = get_string
468 def bind_to_random_port(
469 self: _SocketType,
470 addr: str,
471 min_port: int = 49152,
472 max_port: int = 65536,
473 max_tries: int = 100,
474 ) -> int:
475 """Bind this socket to a random port in a range.
477 If the port range is unspecified, the system will choose the port.
479 Parameters
480 ----------
481 addr : str
482 The address string without the port to pass to ``Socket.bind()``.
483 min_port : int, optional
484 The minimum port in the range of ports to try (inclusive).
485 max_port : int, optional
486 The maximum port in the range of ports to try (exclusive).
487 max_tries : int, optional
488 The maximum number of bind attempts to make.
490 Returns
491 -------
492 port : int
493 The port the socket was bound to.
495 Raises
496 ------
497 ZMQBindError
498 if `max_tries` reached before successful bind
499 """
500 if min_port == 49152 and max_port == 65536:
501 # if LAST_ENDPOINT is supported, and min_port / max_port weren't specified,
502 # we can bind to port 0 and let the OS do the work
503 self.bind(f"{addr}:*")
504 url = cast(bytes, self.last_endpoint).decode('ascii', 'replace')
505 _, port_s = url.rsplit(':', 1)
506 return int(port_s)
508 for i in range(max_tries):
509 try:
510 port = random.randrange(min_port, max_port)
511 self.bind(f'{addr}:{port}')
512 except ZMQError as exception:
513 en = exception.errno
514 if en == zmq.EADDRINUSE:
515 continue
516 elif sys.platform == 'win32' and en == errno.EACCES:
517 continue
518 else:
519 raise
520 else:
521 return port
522 raise ZMQBindError("Could not bind socket to random port.")
524 def get_hwm(self) -> int:
525 """Get the High Water Mark.
527 On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM
528 """
529 # return sndhwm, fallback on rcvhwm
530 try:
531 return cast(int, self.get(zmq.SNDHWM))
532 except zmq.ZMQError:
533 pass
535 return cast(int, self.get(zmq.RCVHWM))
537 def set_hwm(self, value: int) -> None:
538 """Set the High Water Mark.
540 On libzmq ≥ 3, this sets both SNDHWM and RCVHWM
543 .. warning::
545 New values only take effect for subsequent socket
546 bind/connects.
547 """
548 raised = None
549 try:
550 self.sndhwm = value
551 except Exception as e:
552 raised = e
553 try:
554 self.rcvhwm = value
555 except Exception as e:
556 raised = e
558 if raised:
559 raise raised
561 hwm = property(
562 get_hwm,
563 set_hwm,
564 None,
565 """Property for High Water Mark.
567 Setting hwm sets both SNDHWM and RCVHWM as appropriate.
568 It gets SNDHWM if available, otherwise RCVHWM.
569 """,
570 )
572 # -------------------------------------------------------------------------
573 # Sending and receiving messages
574 # -------------------------------------------------------------------------
576 @overload
577 def send(
578 self,
579 data: Any,
580 flags: int = ...,
581 copy: bool = ...,
582 *,
583 track: Literal[True],
584 routing_id: int | None = ...,
585 group: str | None = ...,
586 ) -> zmq.MessageTracker: ...
588 @overload
589 def send(
590 self,
591 data: Any,
592 flags: int = ...,
593 copy: bool = ...,
594 *,
595 track: Literal[False],
596 routing_id: int | None = ...,
597 group: str | None = ...,
598 ) -> None: ...
600 @overload
601 def send(
602 self,
603 data: Any,
604 flags: int = ...,
605 *,
606 copy: bool = ...,
607 routing_id: int | None = ...,
608 group: str | None = ...,
609 ) -> None: ...
611 @overload
612 def send(
613 self,
614 data: Any,
615 flags: int = ...,
616 copy: bool = ...,
617 track: bool = ...,
618 routing_id: int | None = ...,
619 group: str | None = ...,
620 ) -> zmq.MessageTracker | None: ...
622 def send(
623 self,
624 data: Any,
625 flags: int = 0,
626 copy: bool = True,
627 track: bool = False,
628 routing_id: int | None = None,
629 group: str | None = None,
630 ) -> zmq.MessageTracker | None:
631 """Send a single zmq message frame on this socket.
633 This queues the message to be sent by the IO thread at a later time.
635 With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full;
636 otherwise, this waits until space is available.
637 See :class:`Poller` for more general non-blocking I/O.
639 Parameters
640 ----------
641 data : bytes, Frame, memoryview
642 The content of the message. This can be any object that provides
643 the Python buffer API (i.e. `memoryview(data)` can be called).
644 flags : int
645 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE.
646 copy : bool
647 Should the message be sent in a copying or non-copying manner.
648 track : bool
649 Should the message be tracked for notification that ZMQ has
650 finished with it? (ignored if copy=True)
651 routing_id : int
652 For use with SERVER sockets
653 group : str
654 For use with RADIO sockets
656 Returns
657 -------
658 None : if `copy` or not track
659 None if message was sent, raises an exception otherwise.
660 MessageTracker : if track and not copy
661 a MessageTracker object, whose `done` property will
662 be False until the send is completed.
664 Raises
665 ------
666 TypeError
667 If a unicode object is passed
668 ValueError
669 If `track=True`, but an untracked Frame is passed.
670 ZMQError
671 If the send does not succeed for any reason (including
672 if NOBLOCK is set and the outgoing queue is full).
675 .. versionchanged:: 17.0
677 DRAFT support for routing_id and group arguments.
678 """
679 if routing_id is not None:
680 if not isinstance(data, zmq.Frame):
681 data = zmq.Frame(
682 data,
683 track=track,
684 copy=copy or None,
685 copy_threshold=self.copy_threshold,
686 )
687 data.routing_id = routing_id
688 if group is not None:
689 if not isinstance(data, zmq.Frame):
690 data = zmq.Frame(
691 data,
692 track=track,
693 copy=copy or None,
694 copy_threshold=self.copy_threshold,
695 )
696 data.group = group
697 return super().send(data, flags=flags, copy=copy, track=track)
699 def send_multipart(
700 self,
701 msg_parts: Sequence,
702 flags: int = 0,
703 copy: bool = True,
704 track: bool = False,
705 **kwargs,
706 ):
707 """Send a sequence of buffers as a multipart message.
709 The zmq.SNDMORE flag is added to all msg parts before the last.
711 Parameters
712 ----------
713 msg_parts : iterable
714 A sequence of objects to send as a multipart message. Each element
715 can be any sendable object (Frame, bytes, buffer-providers)
716 flags : int, optional
717 Any valid flags for :func:`Socket.send`.
718 SNDMORE is added automatically for frames before the last.
719 copy : bool, optional
720 Should the frame(s) be sent in a copying or non-copying manner.
721 If copy=False, frames smaller than self.copy_threshold bytes
722 will be copied anyway.
723 track : bool, optional
724 Should the frame(s) be tracked for notification that ZMQ has
725 finished with it (ignored if copy=True).
727 Returns
728 -------
729 None : if copy or not track
730 MessageTracker : if track and not copy
731 a MessageTracker object, whose `done` property will
732 be False until the last send is completed.
733 """
734 # typecheck parts before sending:
735 for i, msg in enumerate(msg_parts):
736 if isinstance(msg, (zmq.Frame, bytes, memoryview)):
737 continue
738 try:
739 memoryview(msg)
740 except Exception:
741 rmsg = repr(msg)
742 if len(rmsg) > 32:
743 rmsg = rmsg[:32] + '...'
744 raise TypeError(
745 f"Frame {i} ({rmsg}) does not support the buffer interface."
746 )
747 for msg in msg_parts[:-1]:
748 self.send(msg, zmq.SNDMORE | flags, copy=copy, track=track)
749 # Send the last part without the extra SNDMORE flag.
750 return self.send(msg_parts[-1], flags, copy=copy, track=track)
752 @overload
753 def recv_multipart(
754 self, flags: int = ..., *, copy: Literal[True], track: bool = ...
755 ) -> list[bytes]: ...
757 @overload
758 def recv_multipart(
759 self, flags: int = ..., *, copy: Literal[False], track: bool = ...
760 ) -> list[zmq.Frame]: ...
762 @overload
763 def recv_multipart(self, flags: int = ..., *, track: bool = ...) -> list[bytes]: ...
765 @overload
766 def recv_multipart(
767 self, flags: int = 0, copy: bool = True, track: bool = False
768 ) -> list[zmq.Frame] | list[bytes]: ...
770 def recv_multipart(
771 self, flags: int = 0, copy: bool = True, track: bool = False
772 ) -> list[zmq.Frame] | list[bytes]:
773 """Receive a multipart message as a list of bytes or Frame objects
775 Parameters
776 ----------
777 flags : int, optional
778 Any valid flags for :func:`Socket.recv`.
779 copy : bool, optional
780 Should the message frame(s) be received in a copying or non-copying manner?
781 If False a Frame object is returned for each part, if True a copy of
782 the bytes is made for each frame.
783 track : bool, optional
784 Should the message frame(s) be tracked for notification that ZMQ has
785 finished with it? (ignored if copy=True)
787 Returns
788 -------
789 msg_parts : list
790 A list of frames in the multipart message; either Frames or bytes,
791 depending on `copy`.
793 Raises
794 ------
795 ZMQError
796 for any of the reasons :func:`~Socket.recv` might fail
797 """
798 parts = [self.recv(flags, copy=copy, track=track)]
799 # have first part already, only loop while more to receive
800 while self.getsockopt(zmq.RCVMORE):
801 part = self.recv(flags, copy=copy, track=track)
802 parts.append(part)
803 # cast List[Union] to Union[List]
804 # how do we get mypy to recognize that return type is invariant on `copy`?
805 return cast(Union[list[zmq.Frame], list[bytes]], parts)
807 def _deserialize(
808 self,
809 recvd: bytes,
810 load: Callable[[bytes], Any],
811 ) -> Any:
812 """Deserialize a received message
814 Override in subclass (e.g. Futures) if recvd is not the raw bytes.
816 The default implementation expects bytes and returns the deserialized message immediately.
818 Parameters
819 ----------
821 load: callable
822 Callable that deserializes bytes
823 recvd:
824 The object returned by self.recv
826 """
827 return load(recvd)
829 def send_serialized(self, msg, serialize, flags=0, copy=True, **kwargs):
830 """Send a message with a custom serialization function.
832 .. versionadded:: 17
834 Parameters
835 ----------
836 msg : The message to be sent. Can be any object serializable by `serialize`.
837 serialize : callable
838 The serialization function to use.
839 serialize(msg) should return an iterable of sendable message frames
840 (e.g. bytes objects), which will be passed to send_multipart.
841 flags : int, optional
842 Any valid flags for :func:`Socket.send`.
843 copy : bool, optional
844 Whether to copy the frames.
846 """
847 frames = serialize(msg)
848 return self.send_multipart(frames, flags=flags, copy=copy, **kwargs)
850 def recv_serialized(self, deserialize, flags=0, copy=True):
851 """Receive a message with a custom deserialization function.
853 .. versionadded:: 17
855 Parameters
856 ----------
857 deserialize : callable
858 The deserialization function to use.
859 deserialize will be called with one argument: the list of frames
860 returned by recv_multipart() and can return any object.
861 flags : int, optional
862 Any valid flags for :func:`Socket.recv`.
863 copy : bool, optional
864 Whether to recv bytes or Frame objects.
866 Returns
867 -------
868 obj : object
869 The object returned by the deserialization function.
871 Raises
872 ------
873 ZMQError
874 for any of the reasons :func:`~Socket.recv` might fail
875 """
876 frames = self.recv_multipart(flags=flags, copy=copy)
877 return self._deserialize(frames, deserialize)
879 def send_string(
880 self,
881 u: str,
882 flags: int = 0,
883 copy: bool = True,
884 encoding: str = 'utf-8',
885 **kwargs,
886 ) -> zmq.Frame | None:
887 """Send a Python unicode string as a message with an encoding.
889 0MQ communicates with raw bytes, so you must encode/decode
890 text (str) around 0MQ.
892 Parameters
893 ----------
894 u : str
895 The unicode string to send.
896 flags : int, optional
897 Any valid flags for :func:`Socket.send`.
898 encoding : str
899 The encoding to be used
900 """
901 if not isinstance(u, str):
902 raise TypeError("str objects only")
903 return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs)
905 send_unicode = send_string
907 def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str:
908 """Receive a unicode string, as sent by send_string.
910 Parameters
911 ----------
912 flags : int
913 Any valid flags for :func:`Socket.recv`.
914 encoding : str
915 The encoding to be used
917 Returns
918 -------
919 s : str
920 The Python unicode string that arrives as encoded bytes.
922 Raises
923 ------
924 ZMQError
925 for any of the reasons :func:`Socket.recv` might fail
926 """
927 msg = self.recv(flags=flags)
928 return self._deserialize(msg, lambda buf: buf.decode(encoding))
930 recv_unicode = recv_string
932 def send_pyobj(
933 self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, **kwargs
934 ) -> zmq.Frame | None:
935 """
936 Send a Python object as a message using pickle to serialize.
938 .. warning::
940 Never deserialize an untrusted message with pickle,
941 which can involve arbitrary code execution.
942 Make sure to authenticate the sources of messages
943 before unpickling them, e.g. with transport-level security
944 (e.g. CURVE, ZAP, or IPC permissions)
945 or signed messages.
947 Parameters
948 ----------
949 obj : Python object
950 The Python object to send.
951 flags : int
952 Any valid flags for :func:`Socket.send`.
953 protocol : int
954 The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL
955 where defined, and pickle.HIGHEST_PROTOCOL elsewhere.
956 """
957 msg = pickle.dumps(obj, protocol)
958 return self.send(msg, flags=flags, **kwargs)
960 def recv_pyobj(self, flags: int = 0) -> Any:
961 """
962 Receive a Python object as a message using UNSAFE pickle to serialize.
964 .. warning::
966 Never deserialize an untrusted message with pickle,
967 which can involve arbitrary code execution.
968 Make sure to authenticate the sources of messages
969 before unpickling them, e.g. with transport-level security
970 (such as CURVE or IPC permissions)
971 or authenticating messages themselves before deserializing.
973 Parameters
974 ----------
975 flags : int
976 Any valid flags for :func:`Socket.recv`.
978 Returns
979 -------
980 obj : Python object
981 The Python object that arrives as a message.
983 Raises
984 ------
985 ZMQError
986 for any of the reasons :func:`~Socket.recv` might fail
987 """
988 msg = self.recv(flags)
989 return self._deserialize(msg, pickle.loads)
991 def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None:
992 """Send a Python object as a message using json to serialize.
994 Keyword arguments are passed on to json.dumps
996 Parameters
997 ----------
998 obj : Python object
999 The Python object to send
1000 flags : int
1001 Any valid flags for :func:`Socket.send`
1002 """
1003 send_kwargs = {}
1004 for key in ('routing_id', 'group'):
1005 if key in kwargs:
1006 send_kwargs[key] = kwargs.pop(key)
1007 msg = jsonapi.dumps(obj, **kwargs)
1008 return self.send(msg, flags=flags, **send_kwargs)
1010 def recv_json(self, flags: int = 0, **kwargs) -> _JSONType:
1011 """Receive a Python object as a message using json to serialize.
1013 Keyword arguments are passed on to json.loads
1015 Parameters
1016 ----------
1017 flags : int
1018 Any valid flags for :func:`Socket.recv`.
1020 Returns
1021 -------
1022 obj : Python object
1023 The Python object that arrives as a message.
1025 Raises
1026 ------
1027 ZMQError
1028 for any of the reasons :func:`~Socket.recv` might fail
1029 """
1030 msg = self.recv(flags)
1031 return self._deserialize(msg, lambda buf: jsonapi.loads(buf, **kwargs))
1033 _poller_class = Poller
1035 def poll(self, timeout: int | None = None, flags: int = zmq.POLLIN) -> int:
1036 """Poll the socket for events.
1038 See :class:`Poller` to wait for multiple sockets at once.
1040 Parameters
1041 ----------
1042 timeout : int
1043 The timeout (in milliseconds) to wait for an event. If unspecified
1044 (or specified None), will wait forever for an event.
1045 flags : int
1046 default: POLLIN.
1047 POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for.
1049 Returns
1050 -------
1051 event_mask : int
1052 The poll event mask (POLLIN, POLLOUT),
1053 0 if the timeout was reached without an event.
1054 """
1056 if self.closed:
1057 raise ZMQError(zmq.ENOTSUP)
1059 p = self._poller_class()
1060 p.register(self, flags)
1061 evts = dict(p.poll(timeout))
1062 # return 0 if no events, otherwise return event bitfield
1063 return evts.get(self, 0)
1065 def get_monitor_socket(
1066 self: _SocketType, events: int | None = None, addr: str | None = None
1067 ) -> _SocketType:
1068 """Return a connected PAIR socket ready to receive the event notifications.
1070 .. versionadded:: libzmq-4.0
1071 .. versionadded:: 14.0
1073 Parameters
1074 ----------
1075 events : int
1076 default: `zmq.EVENT_ALL`
1077 The bitmask defining which events are wanted.
1078 addr : str
1079 The optional endpoint for the monitoring sockets.
1081 Returns
1082 -------
1083 socket : zmq.Socket
1084 The PAIR socket, connected and ready to receive messages.
1085 """
1086 # safe-guard, method only available on libzmq >= 4
1087 if zmq.zmq_version_info() < (4,):
1088 raise NotImplementedError(
1089 f"get_monitor_socket requires libzmq >= 4, have {zmq.zmq_version()}"
1090 )
1092 # if already monitoring, return existing socket
1093 if self._monitor_socket:
1094 if self._monitor_socket.closed:
1095 self._monitor_socket = None
1096 else:
1097 return self._monitor_socket
1099 if addr is None:
1100 # create endpoint name from internal fd
1101 addr = f"inproc://monitor.s-{self.FD}"
1102 if events is None:
1103 # use all events
1104 events = zmq.EVENT_ALL
1105 # attach monitoring socket
1106 self.monitor(addr, events)
1107 # create new PAIR socket and connect it
1108 self._monitor_socket = self.context.socket(zmq.PAIR)
1109 self._monitor_socket.connect(addr)
1110 return self._monitor_socket
1112 def disable_monitor(self) -> None:
1113 """Shutdown the PAIR socket (created using get_monitor_socket)
1114 that is serving socket events.
1116 .. versionadded:: 14.4
1117 """
1118 self._monitor_socket = None
1119 self.monitor(None, 0)
1122SyncSocket: TypeAlias = Socket[bytes]
1124__all__ = ['Socket', 'SyncSocket']