Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/zmq/sugar/socket.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""0MQ Socket pure Python methods."""
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
6from __future__ import annotations
8import errno
9import pickle
10import random
11import sys
12from typing import (
13 Any,
14 Callable,
15 Generic,
16 List,
17 Literal,
18 Sequence,
19 TypeVar,
20 Union,
21 cast,
22 overload,
23)
24from warnings import warn
26import zmq
27from zmq._typing import TypeAlias
28from zmq.backend import Socket as SocketBase
29from zmq.error import ZMQBindError, ZMQError
30from zmq.utils import jsonapi
31from zmq.utils.interop import cast_int_addr
33from ..constants import SocketOption, SocketType, _OptType
34from .attrsettr import AttributeSetter
35from .poll import Poller
37try:
38 DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL
39except AttributeError:
40 DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL
42_SocketType = TypeVar("_SocketType", bound="Socket")
44_JSONType: TypeAlias = "int | str | bool | list[_JSONType] | dict[str, _JSONType]"
47class _SocketContext(Generic[_SocketType]):
48 """Context Manager for socket bind/unbind"""
50 socket: _SocketType
51 kind: str
52 addr: str
54 def __repr__(self):
55 return f"<SocketContext({self.kind}={self.addr!r})>"
57 def __init__(
58 self: _SocketContext[_SocketType], socket: _SocketType, kind: str, addr: str
59 ):
60 assert kind in {"bind", "connect"}
61 self.socket = socket
62 self.kind = kind
63 self.addr = addr
65 def __enter__(self: _SocketContext[_SocketType]) -> _SocketType:
66 return self.socket
68 def __exit__(self, *args):
69 if self.socket.closed:
70 return
71 if self.kind == "bind":
72 self.socket.unbind(self.addr)
73 elif self.kind == "connect":
74 self.socket.disconnect(self.addr)
77SocketReturnType = TypeVar("SocketReturnType")
80class Socket(SocketBase, AttributeSetter, Generic[SocketReturnType]):
81 """The ZMQ socket object
83 To create a Socket, first create a Context::
85 ctx = zmq.Context.instance()
87 then call ``ctx.socket(socket_type)``::
89 s = ctx.socket(zmq.ROUTER)
91 .. versionadded:: 25
93 Sockets can now be shadowed by passing another Socket.
94 This helps in creating an async copy of a sync socket or vice versa::
96 s = zmq.Socket(async_socket)
98 Which previously had to be::
100 s = zmq.Socket.shadow(async_socket.underlying)
101 """
103 _shadow = False
104 _shadow_obj = None
105 _monitor_socket = None
106 _type_name = 'UNKNOWN'
108 @overload
109 def __init__(
110 self: Socket[bytes],
111 ctx_or_socket: zmq.Context,
112 socket_type: int,
113 *,
114 copy_threshold: int | None = None,
115 ): ...
117 @overload
118 def __init__(
119 self: Socket[bytes],
120 *,
121 shadow: Socket | int,
122 copy_threshold: int | None = None,
123 ): ...
125 @overload
126 def __init__(
127 self: Socket[bytes],
128 ctx_or_socket: Socket,
129 ): ...
131 def __init__(
132 self: Socket[bytes],
133 ctx_or_socket: zmq.Context | Socket | None = None,
134 socket_type: int = 0,
135 *,
136 shadow: Socket | int = 0,
137 copy_threshold: int | None = None,
138 ):
139 shadow_context: zmq.Context | None = None
140 if isinstance(ctx_or_socket, zmq.Socket):
141 # positional Socket(other_socket)
142 shadow = ctx_or_socket
143 ctx_or_socket = None
145 shadow_address: int = 0
147 if shadow:
148 self._shadow = True
149 # hold a reference to the shadow object
150 self._shadow_obj = shadow
151 if not isinstance(shadow, int):
152 if isinstance(shadow, zmq.Socket):
153 shadow_context = shadow.context
154 try:
155 shadow = cast(int, shadow.underlying)
156 except AttributeError:
157 pass
158 shadow_address = cast_int_addr(shadow)
159 else:
160 self._shadow = False
162 super().__init__(
163 ctx_or_socket,
164 socket_type,
165 shadow=shadow_address,
166 copy_threshold=copy_threshold,
167 )
168 if self._shadow_obj and shadow_context:
169 # keep self.context reference if shadowing a Socket object
170 self.context = shadow_context
172 try:
173 socket_type = cast(int, self.get(zmq.TYPE))
174 except Exception:
175 pass
176 else:
177 try:
178 self.__dict__["type"] = stype = SocketType(socket_type)
179 except ValueError:
180 self._type_name = str(socket_type)
181 else:
182 self._type_name = stype.name
184 def __del__(self):
185 if not self._shadow and not self.closed:
186 if warn is not None:
187 # warn can be None during process teardown
188 warn(
189 f"Unclosed socket {self}",
190 ResourceWarning,
191 stacklevel=2,
192 source=self,
193 )
194 self.close()
196 _repr_cls = "zmq.Socket"
198 def __repr__(self):
199 cls = self.__class__
200 # look up _repr_cls on exact class, not inherited
201 _repr_cls = cls.__dict__.get("_repr_cls", None)
202 if _repr_cls is None:
203 _repr_cls = f"{cls.__module__}.{cls.__name__}"
205 closed = ' closed' if self._closed else ''
207 return f"<{_repr_cls}(zmq.{self._type_name}) at {hex(id(self))}{closed}>"
209 # socket as context manager:
210 def __enter__(self: _SocketType) -> _SocketType:
211 """Sockets are context managers
213 .. versionadded:: 14.4
214 """
215 return self
217 def __exit__(self, *args, **kwargs):
218 self.close()
220 # -------------------------------------------------------------------------
221 # Socket creation
222 # -------------------------------------------------------------------------
224 def __copy__(self: _SocketType, memo=None) -> _SocketType:
225 """Copying a Socket creates a shadow copy"""
226 return self.__class__.shadow(self.underlying)
228 __deepcopy__ = __copy__
230 @classmethod
231 def shadow(cls: type[_SocketType], address: int | zmq.Socket) -> _SocketType:
232 """Shadow an existing libzmq socket
234 address is a zmq.Socket or an integer (or FFI pointer)
235 representing the address of the libzmq socket.
237 .. versionadded:: 14.1
239 .. versionadded:: 25
240 Support for shadowing `zmq.Socket` objects,
241 instead of just integer addresses.
242 """
243 return cls(shadow=address)
245 def close(self, linger=None) -> None:
246 """
247 Close the socket.
249 If linger is specified, LINGER sockopt will be set prior to closing.
251 Note: closing a zmq Socket may not close the underlying sockets
252 if there are undelivered messages.
253 Only after all messages are delivered or discarded by reaching the socket's LINGER timeout
254 (default: forever)
255 will the underlying sockets be closed.
257 This can be called to close the socket by hand. If this is not
258 called, the socket will automatically be closed when it is
259 garbage collected,
260 in which case you may see a ResourceWarning about the unclosed socket.
261 """
262 if self.context:
263 self.context._rm_socket(self)
264 super().close(linger=linger)
266 # -------------------------------------------------------------------------
267 # Connect/Bind context managers
268 # -------------------------------------------------------------------------
270 def _connect_cm(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
271 """Context manager to disconnect on exit
273 .. versionadded:: 20.0
274 """
275 return _SocketContext(self, 'connect', addr)
277 def _bind_cm(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
278 """Context manager to unbind on exit
280 .. versionadded:: 20.0
281 """
282 try:
283 # retrieve last_endpoint
284 # to support binding on random ports via
285 # `socket.bind('tcp://127.0.0.1:0')`
286 addr = cast(bytes, self.get(zmq.LAST_ENDPOINT)).decode("utf8")
287 except (AttributeError, ZMQError, UnicodeDecodeError):
288 pass
289 return _SocketContext(self, 'bind', addr)
291 def bind(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
292 """s.bind(addr)
294 Bind the socket to an address.
296 This causes the socket to listen on a network port. Sockets on the
297 other side of this connection will use ``Socket.connect(addr)`` to
298 connect to this socket.
300 Returns a context manager which will call unbind on exit.
302 .. versionadded:: 20.0
303 Can be used as a context manager.
305 .. versionadded:: 26.0
306 binding to port 0 can be used as a context manager
307 for binding to a random port.
308 The URL can be retrieved as `socket.last_endpoint`.
310 Parameters
311 ----------
312 addr : str
313 The address string. This has the form 'protocol://interface:port',
314 for example 'tcp://127.0.0.1:5555'. Protocols supported include
315 tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is
316 encoded to utf-8 first.
318 """
319 try:
320 super().bind(addr)
321 except ZMQError as e:
322 e.strerror += f" (addr={addr!r})"
323 raise
324 return self._bind_cm(addr)
326 def connect(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
327 """s.connect(addr)
329 Connect to a remote 0MQ socket.
331 Returns a context manager which will call disconnect on exit.
333 .. versionadded:: 20.0
334 Can be used as a context manager.
336 Parameters
337 ----------
338 addr : str
339 The address string. This has the form 'protocol://interface:port',
340 for example 'tcp://127.0.0.1:5555'. Protocols supported are
341 tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
342 encoded to utf-8 first.
344 """
345 try:
346 super().connect(addr)
347 except ZMQError as e:
348 e.strerror += f" (addr={addr!r})"
349 raise
350 return self._connect_cm(addr)
352 # -------------------------------------------------------------------------
353 # Deprecated aliases
354 # -------------------------------------------------------------------------
356 @property
357 def socket_type(self) -> int:
358 warn("Socket.socket_type is deprecated, use Socket.type", DeprecationWarning)
359 return cast(int, self.type)
361 # -------------------------------------------------------------------------
362 # Hooks for sockopt completion
363 # -------------------------------------------------------------------------
365 def __dir__(self):
366 keys = dir(self.__class__)
367 keys.extend(SocketOption.__members__)
368 return keys
370 # -------------------------------------------------------------------------
371 # Getting/Setting options
372 # -------------------------------------------------------------------------
373 setsockopt = SocketBase.set
374 getsockopt = SocketBase.get
376 def __setattr__(self, key, value):
377 """Override to allow setting zmq.[UN]SUBSCRIBE even though we have a subscribe method"""
378 if key in self.__dict__:
379 object.__setattr__(self, key, value)
380 return
381 _key = key.lower()
382 if _key in ('subscribe', 'unsubscribe'):
383 if isinstance(value, str):
384 value = value.encode('utf8')
385 if _key == 'subscribe':
386 self.set(zmq.SUBSCRIBE, value)
387 else:
388 self.set(zmq.UNSUBSCRIBE, value)
389 return
390 super().__setattr__(key, value)
392 def fileno(self) -> int:
393 """Return edge-triggered file descriptor for this socket.
395 This is a read-only edge-triggered file descriptor for both read and write events on this socket.
396 It is important that all available events be consumed when an event is detected,
397 otherwise the read event will not trigger again.
399 .. versionadded:: 17.0
400 """
401 return self.FD
403 def subscribe(self, topic: str | bytes) -> None:
404 """Subscribe to a topic
406 Only for SUB sockets.
408 .. versionadded:: 15.3
409 """
410 if isinstance(topic, str):
411 topic = topic.encode('utf8')
412 self.set(zmq.SUBSCRIBE, topic)
414 def unsubscribe(self, topic: str | bytes) -> None:
415 """Unsubscribe from a topic
417 Only for SUB sockets.
419 .. versionadded:: 15.3
420 """
421 if isinstance(topic, str):
422 topic = topic.encode('utf8')
423 self.set(zmq.UNSUBSCRIBE, topic)
425 def set_string(self, option: int, optval: str, encoding='utf-8') -> None:
426 """Set socket options with a unicode object.
428 This is simply a wrapper for setsockopt to protect from encoding ambiguity.
430 See the 0MQ documentation for details on specific options.
432 Parameters
433 ----------
434 option : int
435 The name of the option to set. Can be any of: SUBSCRIBE,
436 UNSUBSCRIBE, IDENTITY
437 optval : str
438 The value of the option to set.
439 encoding : str
440 The encoding to be used, default is utf8
441 """
442 if not isinstance(optval, str):
443 raise TypeError(f"strings only, not {type(optval)}: {optval!r}")
444 return self.set(option, optval.encode(encoding))
446 setsockopt_unicode = setsockopt_string = set_string
448 def get_string(self, option: int, encoding='utf-8') -> str:
449 """Get the value of a socket option.
451 See the 0MQ documentation for details on specific options.
453 Parameters
454 ----------
455 option : int
456 The option to retrieve.
458 Returns
459 -------
460 optval : str
461 The value of the option as a unicode string.
462 """
463 if SocketOption(option)._opt_type != _OptType.bytes:
464 raise TypeError(f"option {option} will not return a string to be decoded")
465 return cast(bytes, self.get(option)).decode(encoding)
467 getsockopt_unicode = getsockopt_string = get_string
469 def bind_to_random_port(
470 self: _SocketType,
471 addr: str,
472 min_port: int = 49152,
473 max_port: int = 65536,
474 max_tries: int = 100,
475 ) -> int:
476 """Bind this socket to a random port in a range.
478 If the port range is unspecified, the system will choose the port.
480 Parameters
481 ----------
482 addr : str
483 The address string without the port to pass to ``Socket.bind()``.
484 min_port : int, optional
485 The minimum port in the range of ports to try (inclusive).
486 max_port : int, optional
487 The maximum port in the range of ports to try (exclusive).
488 max_tries : int, optional
489 The maximum number of bind attempts to make.
491 Returns
492 -------
493 port : int
494 The port the socket was bound to.
496 Raises
497 ------
498 ZMQBindError
499 if `max_tries` reached before successful bind
500 """
501 if min_port == 49152 and max_port == 65536:
502 # if LAST_ENDPOINT is supported, and min_port / max_port weren't specified,
503 # we can bind to port 0 and let the OS do the work
504 self.bind(f"{addr}:*")
505 url = cast(bytes, self.last_endpoint).decode('ascii', 'replace')
506 _, port_s = url.rsplit(':', 1)
507 return int(port_s)
509 for i in range(max_tries):
510 try:
511 port = random.randrange(min_port, max_port)
512 self.bind(f'{addr}:{port}')
513 except ZMQError as exception:
514 en = exception.errno
515 if en == zmq.EADDRINUSE:
516 continue
517 elif sys.platform == 'win32' and en == errno.EACCES:
518 continue
519 else:
520 raise
521 else:
522 return port
523 raise ZMQBindError("Could not bind socket to random port.")
525 def get_hwm(self) -> int:
526 """Get the High Water Mark.
528 On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM
529 """
530 # return sndhwm, fallback on rcvhwm
531 try:
532 return cast(int, self.get(zmq.SNDHWM))
533 except zmq.ZMQError:
534 pass
536 return cast(int, self.get(zmq.RCVHWM))
538 def set_hwm(self, value: int) -> None:
539 """Set the High Water Mark.
541 On libzmq ≥ 3, this sets both SNDHWM and RCVHWM
544 .. warning::
546 New values only take effect for subsequent socket
547 bind/connects.
548 """
549 raised = None
550 try:
551 self.sndhwm = value
552 except Exception as e:
553 raised = e
554 try:
555 self.rcvhwm = value
556 except Exception as e:
557 raised = e
559 if raised:
560 raise raised
562 hwm = property(
563 get_hwm,
564 set_hwm,
565 None,
566 """Property for High Water Mark.
568 Setting hwm sets both SNDHWM and RCVHWM as appropriate.
569 It gets SNDHWM if available, otherwise RCVHWM.
570 """,
571 )
573 # -------------------------------------------------------------------------
574 # Sending and receiving messages
575 # -------------------------------------------------------------------------
577 @overload
578 def send(
579 self,
580 data: Any,
581 flags: int = ...,
582 copy: bool = ...,
583 *,
584 track: Literal[True],
585 routing_id: int | None = ...,
586 group: str | None = ...,
587 ) -> zmq.MessageTracker: ...
589 @overload
590 def send(
591 self,
592 data: Any,
593 flags: int = ...,
594 copy: bool = ...,
595 *,
596 track: Literal[False],
597 routing_id: int | None = ...,
598 group: str | None = ...,
599 ) -> None: ...
601 @overload
602 def send(
603 self,
604 data: Any,
605 flags: int = ...,
606 *,
607 copy: bool = ...,
608 routing_id: int | None = ...,
609 group: str | None = ...,
610 ) -> None: ...
612 @overload
613 def send(
614 self,
615 data: Any,
616 flags: int = ...,
617 copy: bool = ...,
618 track: bool = ...,
619 routing_id: int | None = ...,
620 group: str | None = ...,
621 ) -> zmq.MessageTracker | None: ...
623 def send(
624 self,
625 data: Any,
626 flags: int = 0,
627 copy: bool = True,
628 track: bool = False,
629 routing_id: int | None = None,
630 group: str | None = None,
631 ) -> zmq.MessageTracker | None:
632 """Send a single zmq message frame on this socket.
634 This queues the message to be sent by the IO thread at a later time.
636 With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full;
637 otherwise, this waits until space is available.
638 See :class:`Poller` for more general non-blocking I/O.
640 Parameters
641 ----------
642 data : bytes, Frame, memoryview
643 The content of the message. This can be any object that provides
644 the Python buffer API (i.e. `memoryview(data)` can be called).
645 flags : int
646 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE.
647 copy : bool
648 Should the message be sent in a copying or non-copying manner.
649 track : bool
650 Should the message be tracked for notification that ZMQ has
651 finished with it? (ignored if copy=True)
652 routing_id : int
653 For use with SERVER sockets
654 group : str
655 For use with RADIO sockets
657 Returns
658 -------
659 None : if `copy` or not track
660 None if message was sent, raises an exception otherwise.
661 MessageTracker : if track and not copy
662 a MessageTracker object, whose `done` property will
663 be False until the send is completed.
665 Raises
666 ------
667 TypeError
668 If a unicode object is passed
669 ValueError
670 If `track=True`, but an untracked Frame is passed.
671 ZMQError
672 If the send does not succeed for any reason (including
673 if NOBLOCK is set and the outgoing queue is full).
676 .. versionchanged:: 17.0
678 DRAFT support for routing_id and group arguments.
679 """
680 if routing_id is not None:
681 if not isinstance(data, zmq.Frame):
682 data = zmq.Frame(
683 data,
684 track=track,
685 copy=copy or None,
686 copy_threshold=self.copy_threshold,
687 )
688 data.routing_id = routing_id
689 if group is not None:
690 if not isinstance(data, zmq.Frame):
691 data = zmq.Frame(
692 data,
693 track=track,
694 copy=copy or None,
695 copy_threshold=self.copy_threshold,
696 )
697 data.group = group
698 return super().send(data, flags=flags, copy=copy, track=track)
700 def send_multipart(
701 self,
702 msg_parts: Sequence,
703 flags: int = 0,
704 copy: bool = True,
705 track: bool = False,
706 **kwargs,
707 ):
708 """Send a sequence of buffers as a multipart message.
710 The zmq.SNDMORE flag is added to all msg parts before the last.
712 Parameters
713 ----------
714 msg_parts : iterable
715 A sequence of objects to send as a multipart message. Each element
716 can be any sendable object (Frame, bytes, buffer-providers)
717 flags : int, optional
718 Any valid flags for :func:`Socket.send`.
719 SNDMORE is added automatically for frames before the last.
720 copy : bool, optional
721 Should the frame(s) be sent in a copying or non-copying manner.
722 If copy=False, frames smaller than self.copy_threshold bytes
723 will be copied anyway.
724 track : bool, optional
725 Should the frame(s) be tracked for notification that ZMQ has
726 finished with it (ignored if copy=True).
728 Returns
729 -------
730 None : if copy or not track
731 MessageTracker : if track and not copy
732 a MessageTracker object, whose `done` property will
733 be False until the last send is completed.
734 """
735 # typecheck parts before sending:
736 for i, msg in enumerate(msg_parts):
737 if isinstance(msg, (zmq.Frame, bytes, memoryview)):
738 continue
739 try:
740 memoryview(msg)
741 except Exception:
742 rmsg = repr(msg)
743 if len(rmsg) > 32:
744 rmsg = rmsg[:32] + '...'
745 raise TypeError(
746 f"Frame {i} ({rmsg}) does not support the buffer interface."
747 )
748 for msg in msg_parts[:-1]:
749 self.send(msg, zmq.SNDMORE | flags, copy=copy, track=track)
750 # Send the last part without the extra SNDMORE flag.
751 return self.send(msg_parts[-1], flags, copy=copy, track=track)
753 @overload
754 def recv_multipart(
755 self, flags: int = ..., *, copy: Literal[True], track: bool = ...
756 ) -> list[bytes]: ...
758 @overload
759 def recv_multipart(
760 self, flags: int = ..., *, copy: Literal[False], track: bool = ...
761 ) -> list[zmq.Frame]: ...
763 @overload
764 def recv_multipart(self, flags: int = ..., *, track: bool = ...) -> list[bytes]: ...
766 @overload
767 def recv_multipart(
768 self, flags: int = 0, copy: bool = True, track: bool = False
769 ) -> list[zmq.Frame] | list[bytes]: ...
771 def recv_multipart(
772 self, flags: int = 0, copy: bool = True, track: bool = False
773 ) -> list[zmq.Frame] | list[bytes]:
774 """Receive a multipart message as a list of bytes or Frame objects
776 Parameters
777 ----------
778 flags : int, optional
779 Any valid flags for :func:`Socket.recv`.
780 copy : bool, optional
781 Should the message frame(s) be received in a copying or non-copying manner?
782 If False a Frame object is returned for each part, if True a copy of
783 the bytes is made for each frame.
784 track : bool, optional
785 Should the message frame(s) be tracked for notification that ZMQ has
786 finished with it? (ignored if copy=True)
788 Returns
789 -------
790 msg_parts : list
791 A list of frames in the multipart message; either Frames or bytes,
792 depending on `copy`.
794 Raises
795 ------
796 ZMQError
797 for any of the reasons :func:`~Socket.recv` might fail
798 """
799 parts = [self.recv(flags, copy=copy, track=track)]
800 # have first part already, only loop while more to receive
801 while self.getsockopt(zmq.RCVMORE):
802 part = self.recv(flags, copy=copy, track=track)
803 parts.append(part)
804 # cast List[Union] to Union[List]
805 # how do we get mypy to recognize that return type is invariant on `copy`?
806 return cast(Union[List[zmq.Frame], List[bytes]], parts)
808 def _deserialize(
809 self,
810 recvd: bytes,
811 load: Callable[[bytes], Any],
812 ) -> Any:
813 """Deserialize a received message
815 Override in subclass (e.g. Futures) if recvd is not the raw bytes.
817 The default implementation expects bytes and returns the deserialized message immediately.
819 Parameters
820 ----------
822 load: callable
823 Callable that deserializes bytes
824 recvd:
825 The object returned by self.recv
827 """
828 return load(recvd)
830 def send_serialized(self, msg, serialize, flags=0, copy=True, **kwargs):
831 """Send a message with a custom serialization function.
833 .. versionadded:: 17
835 Parameters
836 ----------
837 msg : The message to be sent. Can be any object serializable by `serialize`.
838 serialize : callable
839 The serialization function to use.
840 serialize(msg) should return an iterable of sendable message frames
841 (e.g. bytes objects), which will be passed to send_multipart.
842 flags : int, optional
843 Any valid flags for :func:`Socket.send`.
844 copy : bool, optional
845 Whether to copy the frames.
847 """
848 frames = serialize(msg)
849 return self.send_multipart(frames, flags=flags, copy=copy, **kwargs)
851 def recv_serialized(self, deserialize, flags=0, copy=True):
852 """Receive a message with a custom deserialization function.
854 .. versionadded:: 17
856 Parameters
857 ----------
858 deserialize : callable
859 The deserialization function to use.
860 deserialize will be called with one argument: the list of frames
861 returned by recv_multipart() and can return any object.
862 flags : int, optional
863 Any valid flags for :func:`Socket.recv`.
864 copy : bool, optional
865 Whether to recv bytes or Frame objects.
867 Returns
868 -------
869 obj : object
870 The object returned by the deserialization function.
872 Raises
873 ------
874 ZMQError
875 for any of the reasons :func:`~Socket.recv` might fail
876 """
877 frames = self.recv_multipart(flags=flags, copy=copy)
878 return self._deserialize(frames, deserialize)
880 def send_string(
881 self,
882 u: str,
883 flags: int = 0,
884 copy: bool = True,
885 encoding: str = 'utf-8',
886 **kwargs,
887 ) -> zmq.Frame | None:
888 """Send a Python unicode string as a message with an encoding.
890 0MQ communicates with raw bytes, so you must encode/decode
891 text (str) around 0MQ.
893 Parameters
894 ----------
895 u : str
896 The unicode string to send.
897 flags : int, optional
898 Any valid flags for :func:`Socket.send`.
899 encoding : str
900 The encoding to be used
901 """
902 if not isinstance(u, str):
903 raise TypeError("str objects only")
904 return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs)
906 send_unicode = send_string
908 def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str:
909 """Receive a unicode string, as sent by send_string.
911 Parameters
912 ----------
913 flags : int
914 Any valid flags for :func:`Socket.recv`.
915 encoding : str
916 The encoding to be used
918 Returns
919 -------
920 s : str
921 The Python unicode string that arrives as encoded bytes.
923 Raises
924 ------
925 ZMQError
926 for any of the reasons :func:`Socket.recv` might fail
927 """
928 msg = self.recv(flags=flags)
929 return self._deserialize(msg, lambda buf: buf.decode(encoding))
931 recv_unicode = recv_string
933 def send_pyobj(
934 self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, **kwargs
935 ) -> zmq.Frame | None:
936 """
937 Send a Python object as a message using pickle to serialize.
939 .. warning::
941 Never deserialize an untrusted message with pickle,
942 which can involve arbitrary code execution.
943 Make sure to authenticate the sources of messages
944 before unpickling them, e.g. with transport-level security
945 (e.g. CURVE, ZAP, or IPC permissions)
946 or signed messages.
948 Parameters
949 ----------
950 obj : Python object
951 The Python object to send.
952 flags : int
953 Any valid flags for :func:`Socket.send`.
954 protocol : int
955 The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL
956 where defined, and pickle.HIGHEST_PROTOCOL elsewhere.
957 """
958 msg = pickle.dumps(obj, protocol)
959 return self.send(msg, flags=flags, **kwargs)
961 def recv_pyobj(self, flags: int = 0) -> Any:
962 """
963 Receive a Python object as a message using UNSAFE pickle to serialize.
965 .. warning::
967 Never deserialize an untrusted message with pickle,
968 which can involve arbitrary code execution.
969 Make sure to authenticate the sources of messages
970 before unpickling them, e.g. with transport-level security
971 (such as CURVE or IPC permissions)
972 or authenticating messages themselves before deserializing.
974 Parameters
975 ----------
976 flags : int
977 Any valid flags for :func:`Socket.recv`.
979 Returns
980 -------
981 obj : Python object
982 The Python object that arrives as a message.
984 Raises
985 ------
986 ZMQError
987 for any of the reasons :func:`~Socket.recv` might fail
988 """
989 msg = self.recv(flags)
990 return self._deserialize(msg, pickle.loads)
992 def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None:
993 """Send a Python object as a message using json to serialize.
995 Keyword arguments are passed on to json.dumps
997 Parameters
998 ----------
999 obj : Python object
1000 The Python object to send
1001 flags : int
1002 Any valid flags for :func:`Socket.send`
1003 """
1004 send_kwargs = {}
1005 for key in ('routing_id', 'group'):
1006 if key in kwargs:
1007 send_kwargs[key] = kwargs.pop(key)
1008 msg = jsonapi.dumps(obj, **kwargs)
1009 return self.send(msg, flags=flags, **send_kwargs)
1011 def recv_json(self, flags: int = 0, **kwargs) -> _JSONType:
1012 """Receive a Python object as a message using json to serialize.
1014 Keyword arguments are passed on to json.loads
1016 Parameters
1017 ----------
1018 flags : int
1019 Any valid flags for :func:`Socket.recv`.
1021 Returns
1022 -------
1023 obj : Python object
1024 The Python object that arrives as a message.
1026 Raises
1027 ------
1028 ZMQError
1029 for any of the reasons :func:`~Socket.recv` might fail
1030 """
1031 msg = self.recv(flags)
1032 return self._deserialize(msg, lambda buf: jsonapi.loads(buf, **kwargs))
1034 _poller_class = Poller
1036 def poll(self, timeout: int | None = None, flags: int = zmq.POLLIN) -> int:
1037 """Poll the socket for events.
1039 See :class:`Poller` to wait for multiple sockets at once.
1041 Parameters
1042 ----------
1043 timeout : int
1044 The timeout (in milliseconds) to wait for an event. If unspecified
1045 (or specified None), will wait forever for an event.
1046 flags : int
1047 default: POLLIN.
1048 POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for.
1050 Returns
1051 -------
1052 event_mask : int
1053 The poll event mask (POLLIN, POLLOUT),
1054 0 if the timeout was reached without an event.
1055 """
1057 if self.closed:
1058 raise ZMQError(zmq.ENOTSUP)
1060 p = self._poller_class()
1061 p.register(self, flags)
1062 evts = dict(p.poll(timeout))
1063 # return 0 if no events, otherwise return event bitfield
1064 return evts.get(self, 0)
1066 def get_monitor_socket(
1067 self: _SocketType, events: int | None = None, addr: str | None = None
1068 ) -> _SocketType:
1069 """Return a connected PAIR socket ready to receive the event notifications.
1071 .. versionadded:: libzmq-4.0
1072 .. versionadded:: 14.0
1074 Parameters
1075 ----------
1076 events : int
1077 default: `zmq.EVENT_ALL`
1078 The bitmask defining which events are wanted.
1079 addr : str
1080 The optional endpoint for the monitoring sockets.
1082 Returns
1083 -------
1084 socket : zmq.Socket
1085 The PAIR socket, connected and ready to receive messages.
1086 """
1087 # safe-guard, method only available on libzmq >= 4
1088 if zmq.zmq_version_info() < (4,):
1089 raise NotImplementedError(
1090 f"get_monitor_socket requires libzmq >= 4, have {zmq.zmq_version()}"
1091 )
1093 # if already monitoring, return existing socket
1094 if self._monitor_socket:
1095 if self._monitor_socket.closed:
1096 self._monitor_socket = None
1097 else:
1098 return self._monitor_socket
1100 if addr is None:
1101 # create endpoint name from internal fd
1102 addr = f"inproc://monitor.s-{self.FD}"
1103 if events is None:
1104 # use all events
1105 events = zmq.EVENT_ALL
1106 # attach monitoring socket
1107 self.monitor(addr, events)
1108 # create new PAIR socket and connect it
1109 self._monitor_socket = self.context.socket(zmq.PAIR)
1110 self._monitor_socket.connect(addr)
1111 return self._monitor_socket
1113 def disable_monitor(self) -> None:
1114 """Shutdown the PAIR socket (created using get_monitor_socket)
1115 that is serving socket events.
1117 .. versionadded:: 14.4
1118 """
1119 self._monitor_socket = None
1120 self.monitor(None, 0)
1123SyncSocket: TypeAlias = Socket[bytes]
1125__all__ = ['Socket', 'SyncSocket']