Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/sugar/socket.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""0MQ Socket pure Python methods."""
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
6from __future__ import annotations
8import errno
9import pickle
10import random
11import sys
12from typing import (
13 Any,
14 Callable,
15 Generic,
16 List,
17 Sequence,
18 TypeVar,
19 Union,
20 cast,
21 overload,
22)
23from warnings import warn
25import zmq
26from zmq._typing import Literal, TypeAlias
27from zmq.backend import Socket as SocketBase
28from zmq.error import ZMQBindError, ZMQError
29from zmq.utils import jsonapi
30from zmq.utils.interop import cast_int_addr
32from ..constants import SocketOption, SocketType, _OptType
33from .attrsettr import AttributeSetter
34from .poll import Poller
36try:
37 DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL
38except AttributeError:
39 DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL
41_SocketType = TypeVar("_SocketType", bound="Socket")
44class _SocketContext(Generic[_SocketType]):
45 """Context Manager for socket bind/unbind"""
47 socket: _SocketType
48 kind: str
49 addr: str
51 def __repr__(self):
52 return f"<SocketContext({self.kind}={self.addr!r})>"
54 def __init__(
55 self: _SocketContext[_SocketType], socket: _SocketType, kind: str, addr: str
56 ):
57 assert kind in {"bind", "connect"}
58 self.socket = socket
59 self.kind = kind
60 self.addr = addr
62 def __enter__(self: _SocketContext[_SocketType]) -> _SocketType:
63 return self.socket
65 def __exit__(self, *args):
66 if self.socket.closed:
67 return
68 if self.kind == "bind":
69 self.socket.unbind(self.addr)
70 elif self.kind == "connect":
71 self.socket.disconnect(self.addr)
74SocketReturnType = TypeVar("SocketReturnType")
77class Socket(SocketBase, AttributeSetter, Generic[SocketReturnType]):
78 """The ZMQ socket object
80 To create a Socket, first create a Context::
82 ctx = zmq.Context.instance()
84 then call ``ctx.socket(socket_type)``::
86 s = ctx.socket(zmq.ROUTER)
88 .. versionadded:: 25
90 Sockets can now be shadowed by passing another Socket.
91 This helps in creating an async copy of a sync socket or vice versa::
93 s = zmq.Socket(async_socket)
95 Which previously had to be::
97 s = zmq.Socket.shadow(async_socket.underlying)
98 """
100 _shadow = False
101 _shadow_obj = None
102 _monitor_socket = None
103 _type_name = 'UNKNOWN'
105 @overload
106 def __init__(
107 self: Socket[bytes],
108 ctx_or_socket: zmq.Context,
109 socket_type: int,
110 *,
111 copy_threshold: int | None = None,
112 ): ...
114 @overload
115 def __init__(
116 self: Socket[bytes],
117 *,
118 shadow: Socket | int,
119 copy_threshold: int | None = None,
120 ): ...
122 @overload
123 def __init__(
124 self: Socket[bytes],
125 ctx_or_socket: Socket,
126 ): ...
128 def __init__(
129 self: Socket[bytes],
130 ctx_or_socket: zmq.Context | Socket | None = None,
131 socket_type: int = 0,
132 *,
133 shadow: Socket | int = 0,
134 copy_threshold: int | None = None,
135 ):
136 if isinstance(ctx_or_socket, zmq.Socket):
137 # positional Socket(other_socket)
138 shadow = ctx_or_socket
139 ctx_or_socket = None
141 shadow_address: int = 0
143 if shadow:
144 self._shadow = True
145 # hold a reference to the shadow object
146 self._shadow_obj = shadow
147 if not isinstance(shadow, int):
148 try:
149 shadow = cast(int, shadow.underlying)
150 except AttributeError:
151 pass
152 shadow_address = cast_int_addr(shadow)
153 else:
154 self._shadow = False
156 super().__init__(
157 ctx_or_socket,
158 socket_type,
159 shadow=shadow_address,
160 copy_threshold=copy_threshold,
161 )
163 try:
164 socket_type = cast(int, self.get(zmq.TYPE))
165 except Exception:
166 pass
167 else:
168 try:
169 self.__dict__["type"] = stype = SocketType(socket_type)
170 except ValueError:
171 self._type_name = str(socket_type)
172 else:
173 self._type_name = stype.name
175 def __del__(self):
176 if not self._shadow and not self.closed:
177 if warn is not None:
178 # warn can be None during process teardown
179 warn(
180 f"Unclosed socket {self}",
181 ResourceWarning,
182 stacklevel=2,
183 source=self,
184 )
185 self.close()
187 _repr_cls = "zmq.Socket"
189 def __repr__(self):
190 cls = self.__class__
191 # look up _repr_cls on exact class, not inherited
192 _repr_cls = cls.__dict__.get("_repr_cls", None)
193 if _repr_cls is None:
194 _repr_cls = f"{cls.__module__}.{cls.__name__}"
196 closed = ' closed' if self._closed else ''
198 return f"<{_repr_cls}(zmq.{self._type_name}) at {hex(id(self))}{closed}>"
200 # socket as context manager:
201 def __enter__(self: _SocketType) -> _SocketType:
202 """Sockets are context managers
204 .. versionadded:: 14.4
205 """
206 return self
208 def __exit__(self, *args, **kwargs):
209 self.close()
211 # -------------------------------------------------------------------------
212 # Socket creation
213 # -------------------------------------------------------------------------
215 def __copy__(self: _SocketType, memo=None) -> _SocketType:
216 """Copying a Socket creates a shadow copy"""
217 return self.__class__.shadow(self.underlying)
219 __deepcopy__ = __copy__
221 @classmethod
222 def shadow(cls: type[_SocketType], address: int | zmq.Socket) -> _SocketType:
223 """Shadow an existing libzmq socket
225 address is a zmq.Socket or an integer (or FFI pointer)
226 representing the address of the libzmq socket.
228 .. versionadded:: 14.1
230 .. versionadded:: 25
231 Support for shadowing `zmq.Socket` objects,
232 instead of just integer addresses.
233 """
234 return cls(shadow=address)
236 def close(self, linger=None) -> None:
237 """
238 Close the socket.
240 If linger is specified, LINGER sockopt will be set prior to closing.
242 Note: closing a zmq Socket may not close the underlying sockets
243 if there are undelivered messages.
244 Only after all messages are delivered or discarded by reaching the socket's LINGER timeout
245 (default: forever)
246 will the underlying sockets be closed.
248 This can be called to close the socket by hand. If this is not
249 called, the socket will automatically be closed when it is
250 garbage collected,
251 in which case you may see a ResourceWarning about the unclosed socket.
252 """
253 if self.context:
254 self.context._rm_socket(self)
255 super().close(linger=linger)
257 # -------------------------------------------------------------------------
258 # Connect/Bind context managers
259 # -------------------------------------------------------------------------
261 def _connect_cm(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
262 """Context manager to disconnect on exit
264 .. versionadded:: 20.0
265 """
266 return _SocketContext(self, 'connect', addr)
268 def _bind_cm(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
269 """Context manager to unbind on exit
271 .. versionadded:: 20.0
272 """
273 try:
274 # retrieve last_endpoint
275 # to support binding on random ports via
276 # `socket.bind('tcp://127.0.0.1:0')`
277 addr = cast(bytes, self.get(zmq.LAST_ENDPOINT)).decode("utf8")
278 except (AttributeError, ZMQError, UnicodeDecodeError):
279 pass
280 return _SocketContext(self, 'bind', addr)
282 def bind(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
283 """s.bind(addr)
285 Bind the socket to an address.
287 This causes the socket to listen on a network port. Sockets on the
288 other side of this connection will use ``Socket.connect(addr)`` to
289 connect to this socket.
291 Returns a context manager which will call unbind on exit.
293 .. versionadded:: 20.0
294 Can be used as a context manager.
296 .. versionadded:: 26.0
297 binding to port 0 can be used as a context manager
298 for binding to a random port.
299 The URL can be retrieved as `socket.last_endpoint`.
301 Parameters
302 ----------
303 addr : str
304 The address string. This has the form 'protocol://interface:port',
305 for example 'tcp://127.0.0.1:5555'. Protocols supported include
306 tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is
307 encoded to utf-8 first.
309 """
310 try:
311 super().bind(addr)
312 except ZMQError as e:
313 e.strerror += f" (addr={addr!r})"
314 raise
315 return self._bind_cm(addr)
317 def connect(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
318 """s.connect(addr)
320 Connect to a remote 0MQ socket.
322 Returns a context manager which will call disconnect on exit.
324 .. versionadded:: 20.0
325 Can be used as a context manager.
327 Parameters
328 ----------
329 addr : str
330 The address string. This has the form 'protocol://interface:port',
331 for example 'tcp://127.0.0.1:5555'. Protocols supported are
332 tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
333 encoded to utf-8 first.
335 """
336 try:
337 super().connect(addr)
338 except ZMQError as e:
339 e.strerror += f" (addr={addr!r})"
340 raise
341 return self._connect_cm(addr)
343 # -------------------------------------------------------------------------
344 # Deprecated aliases
345 # -------------------------------------------------------------------------
347 @property
348 def socket_type(self) -> int:
349 warn("Socket.socket_type is deprecated, use Socket.type", DeprecationWarning)
350 return cast(int, self.type)
352 # -------------------------------------------------------------------------
353 # Hooks for sockopt completion
354 # -------------------------------------------------------------------------
356 def __dir__(self):
357 keys = dir(self.__class__)
358 keys.extend(SocketOption.__members__)
359 return keys
361 # -------------------------------------------------------------------------
362 # Getting/Setting options
363 # -------------------------------------------------------------------------
364 setsockopt = SocketBase.set
365 getsockopt = SocketBase.get
367 def __setattr__(self, key, value):
368 """Override to allow setting zmq.[UN]SUBSCRIBE even though we have a subscribe method"""
369 if key in self.__dict__:
370 object.__setattr__(self, key, value)
371 return
372 _key = key.lower()
373 if _key in ('subscribe', 'unsubscribe'):
374 if isinstance(value, str):
375 value = value.encode('utf8')
376 if _key == 'subscribe':
377 self.set(zmq.SUBSCRIBE, value)
378 else:
379 self.set(zmq.UNSUBSCRIBE, value)
380 return
381 super().__setattr__(key, value)
383 def fileno(self) -> int:
384 """Return edge-triggered file descriptor for this socket.
386 This is a read-only edge-triggered file descriptor for both read and write events on this socket.
387 It is important that all available events be consumed when an event is detected,
388 otherwise the read event will not trigger again.
390 .. versionadded:: 17.0
391 """
392 return self.FD
394 def subscribe(self, topic: str | bytes) -> None:
395 """Subscribe to a topic
397 Only for SUB sockets.
399 .. versionadded:: 15.3
400 """
401 if isinstance(topic, str):
402 topic = topic.encode('utf8')
403 self.set(zmq.SUBSCRIBE, topic)
405 def unsubscribe(self, topic: str | bytes) -> None:
406 """Unsubscribe from a topic
408 Only for SUB sockets.
410 .. versionadded:: 15.3
411 """
412 if isinstance(topic, str):
413 topic = topic.encode('utf8')
414 self.set(zmq.UNSUBSCRIBE, topic)
416 def set_string(self, option: int, optval: str, encoding='utf-8') -> None:
417 """Set socket options with a unicode object.
419 This is simply a wrapper for setsockopt to protect from encoding ambiguity.
421 See the 0MQ documentation for details on specific options.
423 Parameters
424 ----------
425 option : int
426 The name of the option to set. Can be any of: SUBSCRIBE,
427 UNSUBSCRIBE, IDENTITY
428 optval : str
429 The value of the option to set.
430 encoding : str
431 The encoding to be used, default is utf8
432 """
433 if not isinstance(optval, str):
434 raise TypeError(f"strings only, not {type(optval)}: {optval!r}")
435 return self.set(option, optval.encode(encoding))
437 setsockopt_unicode = setsockopt_string = set_string
439 def get_string(self, option: int, encoding='utf-8') -> str:
440 """Get the value of a socket option.
442 See the 0MQ documentation for details on specific options.
444 Parameters
445 ----------
446 option : int
447 The option to retrieve.
449 Returns
450 -------
451 optval : str
452 The value of the option as a unicode string.
453 """
454 if SocketOption(option)._opt_type != _OptType.bytes:
455 raise TypeError(f"option {option} will not return a string to be decoded")
456 return cast(bytes, self.get(option)).decode(encoding)
458 getsockopt_unicode = getsockopt_string = get_string
460 def bind_to_random_port(
461 self: _SocketType,
462 addr: str,
463 min_port: int = 49152,
464 max_port: int = 65536,
465 max_tries: int = 100,
466 ) -> int:
467 """Bind this socket to a random port in a range.
469 If the port range is unspecified, the system will choose the port.
471 Parameters
472 ----------
473 addr : str
474 The address string without the port to pass to ``Socket.bind()``.
475 min_port : int, optional
476 The minimum port in the range of ports to try (inclusive).
477 max_port : int, optional
478 The maximum port in the range of ports to try (exclusive).
479 max_tries : int, optional
480 The maximum number of bind attempts to make.
482 Returns
483 -------
484 port : int
485 The port the socket was bound to.
487 Raises
488 ------
489 ZMQBindError
490 if `max_tries` reached before successful bind
491 """
492 if (
493 (zmq.zmq_version_info() >= (3, 2))
494 and min_port == 49152
495 and max_port == 65536
496 ):
497 # if LAST_ENDPOINT is supported, and min_port / max_port weren't specified,
498 # we can bind to port 0 and let the OS do the work
499 self.bind("%s:*" % addr)
500 url = cast(bytes, self.last_endpoint).decode('ascii', 'replace')
501 _, port_s = url.rsplit(':', 1)
502 return int(port_s)
504 for i in range(max_tries):
505 try:
506 port = random.randrange(min_port, max_port)
507 self.bind(f'{addr}:{port}')
508 except ZMQError as exception:
509 en = exception.errno
510 if en == zmq.EADDRINUSE:
511 continue
512 elif sys.platform == 'win32' and en == errno.EACCES:
513 continue
514 else:
515 raise
516 else:
517 return port
518 raise ZMQBindError("Could not bind socket to random port.")
520 def get_hwm(self) -> int:
521 """Get the High Water Mark.
523 On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM
524 """
525 major = zmq.zmq_version_info()[0]
526 if major >= 3:
527 # return sndhwm, fallback on rcvhwm
528 try:
529 return cast(int, self.get(zmq.SNDHWM))
530 except zmq.ZMQError:
531 pass
533 return cast(int, self.get(zmq.RCVHWM))
534 else:
535 return cast(int, self.get(zmq.HWM))
537 def set_hwm(self, value: int) -> None:
538 """Set the High Water Mark.
540 On libzmq ≥ 3, this sets both SNDHWM and RCVHWM
543 .. warning::
545 New values only take effect for subsequent socket
546 bind/connects.
547 """
548 major = zmq.zmq_version_info()[0]
549 if major >= 3:
550 raised = None
551 try:
552 self.sndhwm = value
553 except Exception as e:
554 raised = e
555 try:
556 self.rcvhwm = value
557 except Exception as e:
558 raised = e
560 if raised:
561 raise raised
562 else:
563 self.set(zmq.HWM, value)
565 hwm = property(
566 get_hwm,
567 set_hwm,
568 None,
569 """Property for High Water Mark.
571 Setting hwm sets both SNDHWM and RCVHWM as appropriate.
572 It gets SNDHWM if available, otherwise RCVHWM.
573 """,
574 )
576 # -------------------------------------------------------------------------
577 # Sending and receiving messages
578 # -------------------------------------------------------------------------
580 @overload
581 def send(
582 self,
583 data: Any,
584 flags: int = ...,
585 copy: bool = ...,
586 *,
587 track: Literal[True],
588 routing_id: int | None = ...,
589 group: str | None = ...,
590 ) -> zmq.MessageTracker: ...
592 @overload
593 def send(
594 self,
595 data: Any,
596 flags: int = ...,
597 copy: bool = ...,
598 *,
599 track: Literal[False],
600 routing_id: int | None = ...,
601 group: str | None = ...,
602 ) -> None: ...
604 @overload
605 def send(
606 self,
607 data: Any,
608 flags: int = ...,
609 *,
610 copy: bool = ...,
611 routing_id: int | None = ...,
612 group: str | None = ...,
613 ) -> None: ...
615 @overload
616 def send(
617 self,
618 data: Any,
619 flags: int = ...,
620 copy: bool = ...,
621 track: bool = ...,
622 routing_id: int | None = ...,
623 group: str | None = ...,
624 ) -> zmq.MessageTracker | None: ...
626 def send(
627 self,
628 data: Any,
629 flags: int = 0,
630 copy: bool = True,
631 track: bool = False,
632 routing_id: int | None = None,
633 group: str | None = None,
634 ) -> zmq.MessageTracker | None:
635 """Send a single zmq message frame on this socket.
637 This queues the message to be sent by the IO thread at a later time.
639 With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full;
640 otherwise, this waits until space is available.
641 See :class:`Poller` for more general non-blocking I/O.
643 Parameters
644 ----------
645 data : bytes, Frame, memoryview
646 The content of the message. This can be any object that provides
647 the Python buffer API (i.e. `memoryview(data)` can be called).
648 flags : int
649 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE.
650 copy : bool
651 Should the message be sent in a copying or non-copying manner.
652 track : bool
653 Should the message be tracked for notification that ZMQ has
654 finished with it? (ignored if copy=True)
655 routing_id : int
656 For use with SERVER sockets
657 group : str
658 For use with RADIO sockets
660 Returns
661 -------
662 None : if `copy` or not track
663 None if message was sent, raises an exception otherwise.
664 MessageTracker : if track and not copy
665 a MessageTracker object, whose `done` property will
666 be False until the send is completed.
668 Raises
669 ------
670 TypeError
671 If a unicode object is passed
672 ValueError
673 If `track=True`, but an untracked Frame is passed.
674 ZMQError
675 If the send does not succeed for any reason (including
676 if NOBLOCK is set and the outgoing queue is full).
679 .. versionchanged:: 17.0
681 DRAFT support for routing_id and group arguments.
682 """
683 if routing_id is not None:
684 if not isinstance(data, zmq.Frame):
685 data = zmq.Frame(
686 data,
687 track=track,
688 copy=copy or None,
689 copy_threshold=self.copy_threshold,
690 )
691 data.routing_id = routing_id
692 if group is not None:
693 if not isinstance(data, zmq.Frame):
694 data = zmq.Frame(
695 data,
696 track=track,
697 copy=copy or None,
698 copy_threshold=self.copy_threshold,
699 )
700 data.group = group
701 return super().send(data, flags=flags, copy=copy, track=track)
703 def send_multipart(
704 self,
705 msg_parts: Sequence,
706 flags: int = 0,
707 copy: bool = True,
708 track: bool = False,
709 **kwargs,
710 ):
711 """Send a sequence of buffers as a multipart message.
713 The zmq.SNDMORE flag is added to all msg parts before the last.
715 Parameters
716 ----------
717 msg_parts : iterable
718 A sequence of objects to send as a multipart message. Each element
719 can be any sendable object (Frame, bytes, buffer-providers)
720 flags : int, optional
721 Any valid flags for :func:`Socket.send`.
722 SNDMORE is added automatically for frames before the last.
723 copy : bool, optional
724 Should the frame(s) be sent in a copying or non-copying manner.
725 If copy=False, frames smaller than self.copy_threshold bytes
726 will be copied anyway.
727 track : bool, optional
728 Should the frame(s) be tracked for notification that ZMQ has
729 finished with it (ignored if copy=True).
731 Returns
732 -------
733 None : if copy or not track
734 MessageTracker : if track and not copy
735 a MessageTracker object, whose `done` property will
736 be False until the last send is completed.
737 """
738 # typecheck parts before sending:
739 for i, msg in enumerate(msg_parts):
740 if isinstance(msg, (zmq.Frame, bytes, memoryview)):
741 continue
742 try:
743 memoryview(msg)
744 except Exception:
745 rmsg = repr(msg)
746 if len(rmsg) > 32:
747 rmsg = rmsg[:32] + '...'
748 raise TypeError(
749 "Frame %i (%s) does not support the buffer interface."
750 % (
751 i,
752 rmsg,
753 )
754 )
755 for msg in msg_parts[:-1]:
756 self.send(msg, zmq.SNDMORE | flags, copy=copy, track=track)
757 # Send the last part without the extra SNDMORE flag.
758 return self.send(msg_parts[-1], flags, copy=copy, track=track)
760 @overload
761 def recv_multipart(
762 self, flags: int = ..., *, copy: Literal[True], track: bool = ...
763 ) -> list[bytes]: ...
765 @overload
766 def recv_multipart(
767 self, flags: int = ..., *, copy: Literal[False], track: bool = ...
768 ) -> list[zmq.Frame]: ...
770 @overload
771 def recv_multipart(self, flags: int = ..., *, track: bool = ...) -> list[bytes]: ...
773 @overload
774 def recv_multipart(
775 self, flags: int = 0, copy: bool = True, track: bool = False
776 ) -> list[zmq.Frame] | list[bytes]: ...
778 def recv_multipart(
779 self, flags: int = 0, copy: bool = True, track: bool = False
780 ) -> list[zmq.Frame] | list[bytes]:
781 """Receive a multipart message as a list of bytes or Frame objects
783 Parameters
784 ----------
785 flags : int, optional
786 Any valid flags for :func:`Socket.recv`.
787 copy : bool, optional
788 Should the message frame(s) be received in a copying or non-copying manner?
789 If False a Frame object is returned for each part, if True a copy of
790 the bytes is made for each frame.
791 track : bool, optional
792 Should the message frame(s) be tracked for notification that ZMQ has
793 finished with it? (ignored if copy=True)
795 Returns
796 -------
797 msg_parts : list
798 A list of frames in the multipart message; either Frames or bytes,
799 depending on `copy`.
801 Raises
802 ------
803 ZMQError
804 for any of the reasons :func:`~Socket.recv` might fail
805 """
806 parts = [self.recv(flags, copy=copy, track=track)]
807 # have first part already, only loop while more to receive
808 while self.getsockopt(zmq.RCVMORE):
809 part = self.recv(flags, copy=copy, track=track)
810 parts.append(part)
811 # cast List[Union] to Union[List]
812 # how do we get mypy to recognize that return type is invariant on `copy`?
813 return cast(Union[List[zmq.Frame], List[bytes]], parts)
815 def _deserialize(
816 self,
817 recvd: bytes,
818 load: Callable[[bytes], Any],
819 ) -> Any:
820 """Deserialize a received message
822 Override in subclass (e.g. Futures) if recvd is not the raw bytes.
824 The default implementation expects bytes and returns the deserialized message immediately.
826 Parameters
827 ----------
829 load: callable
830 Callable that deserializes bytes
831 recvd:
832 The object returned by self.recv
834 """
835 return load(recvd)
837 def send_serialized(self, msg, serialize, flags=0, copy=True, **kwargs):
838 """Send a message with a custom serialization function.
840 .. versionadded:: 17
842 Parameters
843 ----------
844 msg : The message to be sent. Can be any object serializable by `serialize`.
845 serialize : callable
846 The serialization function to use.
847 serialize(msg) should return an iterable of sendable message frames
848 (e.g. bytes objects), which will be passed to send_multipart.
849 flags : int, optional
850 Any valid flags for :func:`Socket.send`.
851 copy : bool, optional
852 Whether to copy the frames.
854 """
855 frames = serialize(msg)
856 return self.send_multipart(frames, flags=flags, copy=copy, **kwargs)
858 def recv_serialized(self, deserialize, flags=0, copy=True):
859 """Receive a message with a custom deserialization function.
861 .. versionadded:: 17
863 Parameters
864 ----------
865 deserialize : callable
866 The deserialization function to use.
867 deserialize will be called with one argument: the list of frames
868 returned by recv_multipart() and can return any object.
869 flags : int, optional
870 Any valid flags for :func:`Socket.recv`.
871 copy : bool, optional
872 Whether to recv bytes or Frame objects.
874 Returns
875 -------
876 obj : object
877 The object returned by the deserialization function.
879 Raises
880 ------
881 ZMQError
882 for any of the reasons :func:`~Socket.recv` might fail
883 """
884 frames = self.recv_multipart(flags=flags, copy=copy)
885 return self._deserialize(frames, deserialize)
887 def send_string(
888 self,
889 u: str,
890 flags: int = 0,
891 copy: bool = True,
892 encoding: str = 'utf-8',
893 **kwargs,
894 ) -> zmq.Frame | None:
895 """Send a Python unicode string as a message with an encoding.
897 0MQ communicates with raw bytes, so you must encode/decode
898 text (str) around 0MQ.
900 Parameters
901 ----------
902 u : str
903 The unicode string to send.
904 flags : int, optional
905 Any valid flags for :func:`Socket.send`.
906 encoding : str
907 The encoding to be used
908 """
909 if not isinstance(u, str):
910 raise TypeError("str objects only")
911 return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs)
913 send_unicode = send_string
915 def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str:
916 """Receive a unicode string, as sent by send_string.
918 Parameters
919 ----------
920 flags : int
921 Any valid flags for :func:`Socket.recv`.
922 encoding : str
923 The encoding to be used
925 Returns
926 -------
927 s : str
928 The Python unicode string that arrives as encoded bytes.
930 Raises
931 ------
932 ZMQError
933 for any of the reasons :func:`Socket.recv` might fail
934 """
935 msg = self.recv(flags=flags)
936 return self._deserialize(msg, lambda buf: buf.decode(encoding))
938 recv_unicode = recv_string
940 def send_pyobj(
941 self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, **kwargs
942 ) -> zmq.Frame | None:
943 """Send a Python object as a message using pickle to serialize.
945 Parameters
946 ----------
947 obj : Python object
948 The Python object to send.
949 flags : int
950 Any valid flags for :func:`Socket.send`.
951 protocol : int
952 The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL
953 where defined, and pickle.HIGHEST_PROTOCOL elsewhere.
954 """
955 msg = pickle.dumps(obj, protocol)
956 return self.send(msg, flags=flags, **kwargs)
958 def recv_pyobj(self, flags: int = 0) -> Any:
959 """Receive a Python object as a message using pickle to serialize.
961 Parameters
962 ----------
963 flags : int
964 Any valid flags for :func:`Socket.recv`.
966 Returns
967 -------
968 obj : Python object
969 The Python object that arrives as a message.
971 Raises
972 ------
973 ZMQError
974 for any of the reasons :func:`~Socket.recv` might fail
975 """
976 msg = self.recv(flags)
977 return self._deserialize(msg, pickle.loads)
979 def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None:
980 """Send a Python object as a message using json to serialize.
982 Keyword arguments are passed on to json.dumps
984 Parameters
985 ----------
986 obj : Python object
987 The Python object to send
988 flags : int
989 Any valid flags for :func:`Socket.send`
990 """
991 send_kwargs = {}
992 for key in ('routing_id', 'group'):
993 if key in kwargs:
994 send_kwargs[key] = kwargs.pop(key)
995 msg = jsonapi.dumps(obj, **kwargs)
996 return self.send(msg, flags=flags, **send_kwargs)
998 def recv_json(self, flags: int = 0, **kwargs) -> list | str | int | float | dict:
999 """Receive a Python object as a message using json to serialize.
1001 Keyword arguments are passed on to json.loads
1003 Parameters
1004 ----------
1005 flags : int
1006 Any valid flags for :func:`Socket.recv`.
1008 Returns
1009 -------
1010 obj : Python object
1011 The Python object that arrives as a message.
1013 Raises
1014 ------
1015 ZMQError
1016 for any of the reasons :func:`~Socket.recv` might fail
1017 """
1018 msg = self.recv(flags)
1019 return self._deserialize(msg, lambda buf: jsonapi.loads(buf, **kwargs))
1021 _poller_class = Poller
1023 def poll(self, timeout: int | None = None, flags: int = zmq.POLLIN) -> int:
1024 """Poll the socket for events.
1026 See :class:`Poller` to wait for multiple sockets at once.
1028 Parameters
1029 ----------
1030 timeout : int
1031 The timeout (in milliseconds) to wait for an event. If unspecified
1032 (or specified None), will wait forever for an event.
1033 flags : int
1034 default: POLLIN.
1035 POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for.
1037 Returns
1038 -------
1039 event_mask : int
1040 The poll event mask (POLLIN, POLLOUT),
1041 0 if the timeout was reached without an event.
1042 """
1044 if self.closed:
1045 raise ZMQError(zmq.ENOTSUP)
1047 p = self._poller_class()
1048 p.register(self, flags)
1049 evts = dict(p.poll(timeout))
1050 # return 0 if no events, otherwise return event bitfield
1051 return evts.get(self, 0)
1053 def get_monitor_socket(
1054 self: _SocketType, events: int | None = None, addr: str | None = None
1055 ) -> _SocketType:
1056 """Return a connected PAIR socket ready to receive the event notifications.
1058 .. versionadded:: libzmq-4.0
1059 .. versionadded:: 14.0
1061 Parameters
1062 ----------
1063 events : int
1064 default: `zmq.EVENT_ALL`
1065 The bitmask defining which events are wanted.
1066 addr : str
1067 The optional endpoint for the monitoring sockets.
1069 Returns
1070 -------
1071 socket : zmq.Socket
1072 The PAIR socket, connected and ready to receive messages.
1073 """
1074 # safe-guard, method only available on libzmq >= 4
1075 if zmq.zmq_version_info() < (4,):
1076 raise NotImplementedError(
1077 "get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version()
1078 )
1080 # if already monitoring, return existing socket
1081 if self._monitor_socket:
1082 if self._monitor_socket.closed:
1083 self._monitor_socket = None
1084 else:
1085 return self._monitor_socket
1087 if addr is None:
1088 # create endpoint name from internal fd
1089 addr = f"inproc://monitor.s-{self.FD}"
1090 if events is None:
1091 # use all events
1092 events = zmq.EVENT_ALL
1093 # attach monitoring socket
1094 self.monitor(addr, events)
1095 # create new PAIR socket and connect it
1096 self._monitor_socket = self.context.socket(zmq.PAIR)
1097 self._monitor_socket.connect(addr)
1098 return self._monitor_socket
1100 def disable_monitor(self) -> None:
1101 """Shutdown the PAIR socket (created using get_monitor_socket)
1102 that is serving socket events.
1104 .. versionadded:: 14.4
1105 """
1106 self._monitor_socket = None
1107 self.monitor(None, 0)
1110SyncSocket: TypeAlias = Socket[bytes]
1112__all__ = ['Socket', 'SyncSocket']