Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/sugar/socket.py: 33%
318 statements
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-10 06:20 +0000
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-10 06:20 +0000
1"""0MQ Socket pure Python methods."""
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
7import errno
8import pickle
9import random
10import sys
11from typing import (
12 Any,
13 Callable,
14 Dict,
15 Generic,
16 List,
17 Optional,
18 Sequence,
19 Type,
20 TypeVar,
21 Union,
22 cast,
23 overload,
24)
25from warnings import warn
27import zmq
28from zmq._typing import Literal
29from zmq.backend import Socket as SocketBase
30from zmq.error import ZMQBindError, ZMQError
31from zmq.utils import jsonapi
32from zmq.utils.interop import cast_int_addr
34from ..constants import SocketOption, SocketType, _OptType
35from .attrsettr import AttributeSetter
36from .poll import Poller
38try:
39 DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL
40except AttributeError:
41 DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL
43T = TypeVar("T", bound="Socket")
46class _SocketContext(Generic[T]):
47 """Context Manager for socket bind/unbind"""
49 socket: T
50 kind: str
51 addr: str
53 def __repr__(self):
54 return f"<SocketContext({self.kind}={self.addr!r})>"
56 def __init__(self: "_SocketContext[T]", socket: T, kind: str, addr: str):
57 assert kind in {"bind", "connect"}
58 self.socket = socket
59 self.kind = kind
60 self.addr = addr
62 def __enter__(self: "_SocketContext[T]") -> T:
63 return self.socket
65 def __exit__(self, *args):
66 if self.socket.closed:
67 return
68 if self.kind == "bind":
69 self.socket.unbind(self.addr)
70 elif self.kind == "connect":
71 self.socket.disconnect(self.addr)
74ST = TypeVar("ST")
77class Socket(SocketBase, AttributeSetter, Generic[ST]):
78 """The ZMQ socket object
80 To create a Socket, first create a Context::
82 ctx = zmq.Context.instance()
84 then call ``ctx.socket(socket_type)``::
86 s = ctx.socket(zmq.ROUTER)
88 .. versionadded:: 25
90 Sockets can now be shadowed by passing another Socket.
91 This helps in creating an async copy of a sync socket or vice versa::
93 s = zmq.Socket(async_socket)
95 Which previously had to be::
97 s = zmq.Socket.shadow(async_socket.underlying)
98 """
100 _shadow = False
101 _shadow_obj = None
102 _monitor_socket = None
103 _type_name = 'UNKNOWN'
105 @overload
106 def __init__(
107 self: "Socket[bytes]",
108 ctx_or_socket: "zmq.Context",
109 socket_type: int,
110 *,
111 copy_threshold: Optional[int] = None,
112 ):
113 ...
115 @overload
116 def __init__(
117 self: "Socket[bytes]",
118 *,
119 shadow: Union["Socket", int],
120 copy_threshold: Optional[int] = None,
121 ):
122 ...
124 @overload
125 def __init__(
126 self: "Socket[bytes]",
127 ctx_or_socket: "Socket",
128 ):
129 ...
131 def __init__(
132 self: "Socket[bytes]",
133 ctx_or_socket: Optional[Union["zmq.Context", "Socket"]] = None,
134 socket_type: int = 0,
135 *,
136 shadow: Union["Socket", int] = 0,
137 copy_threshold: Optional[int] = None,
138 ):
139 if isinstance(ctx_or_socket, zmq.Socket):
140 # positional Socket(other_socket)
141 shadow = ctx_or_socket
142 ctx_or_socket = None
144 shadow_address: int = 0
146 if shadow:
147 self._shadow = True
148 # hold a reference to the shadow object
149 self._shadow_obj = shadow
150 if not isinstance(shadow, int):
151 try:
152 shadow = cast(int, shadow.underlying)
153 except AttributeError:
154 pass
155 shadow_address = cast_int_addr(shadow)
156 else:
157 self._shadow = False
159 super().__init__(
160 ctx_or_socket,
161 socket_type,
162 shadow=shadow_address,
163 copy_threshold=copy_threshold,
164 )
166 try:
167 socket_type = cast(int, self.get(zmq.TYPE))
168 except Exception:
169 pass
170 else:
171 try:
172 self.__dict__["type"] = stype = SocketType(socket_type)
173 except ValueError:
174 self._type_name = str(socket_type)
175 else:
176 self._type_name = stype.name
178 def __del__(self):
179 if not self._shadow and not self.closed:
180 if warn is not None:
181 # warn can be None during process teardown
182 warn(
183 f"Unclosed socket {self}",
184 ResourceWarning,
185 stacklevel=2,
186 source=self,
187 )
188 self.close()
190 _repr_cls = "zmq.Socket"
192 def __repr__(self):
193 cls = self.__class__
194 # look up _repr_cls on exact class, not inherited
195 _repr_cls = cls.__dict__.get("_repr_cls", None)
196 if _repr_cls is None:
197 _repr_cls = f"{cls.__module__}.{cls.__name__}"
199 closed = ' closed' if self._closed else ''
201 return f"<{_repr_cls}(zmq.{self._type_name}) at {hex(id(self))}{closed}>"
203 # socket as context manager:
204 def __enter__(self: T) -> T:
205 """Sockets are context managers
207 .. versionadded:: 14.4
208 """
209 return self
211 def __exit__(self, *args, **kwargs):
212 self.close()
214 # -------------------------------------------------------------------------
215 # Socket creation
216 # -------------------------------------------------------------------------
218 def __copy__(self: T, memo=None) -> T:
219 """Copying a Socket creates a shadow copy"""
220 return self.__class__.shadow(self.underlying)
222 __deepcopy__ = __copy__
224 @classmethod
225 def shadow(cls: Type[T], address: Union[int, "zmq.Socket"]) -> T:
226 """Shadow an existing libzmq socket
228 address is a zmq.Socket or an integer (or FFI pointer)
229 representing the address of the libzmq socket.
231 .. versionadded:: 14.1
233 .. versionadded:: 25
234 Support for shadowing `zmq.Socket` objects,
235 instead of just integer addresses.
236 """
237 return cls(shadow=address)
239 def close(self, linger=None) -> None:
240 """
241 Close the socket.
243 If linger is specified, LINGER sockopt will be set prior to closing.
245 Note: closing a zmq Socket may not close the underlying sockets
246 if there are undelivered messages.
247 Only after all messages are delivered or discarded by reaching the socket's LINGER timeout
248 (default: forever)
249 will the underlying sockets be closed.
251 This can be called to close the socket by hand. If this is not
252 called, the socket will automatically be closed when it is
253 garbage collected,
254 in which case you may see a ResourceWarning about the unclosed socket.
255 """
256 if self.context:
257 self.context._rm_socket(self)
258 super().close(linger=linger)
260 # -------------------------------------------------------------------------
261 # Connect/Bind context managers
262 # -------------------------------------------------------------------------
264 def _connect_cm(self: T, addr: str) -> _SocketContext[T]:
265 """Context manager to disconnect on exit
267 .. versionadded:: 20.0
268 """
269 return _SocketContext(self, 'connect', addr)
271 def _bind_cm(self: T, addr: str) -> _SocketContext[T]:
272 """Context manager to unbind on exit
274 .. versionadded:: 20.0
275 """
276 return _SocketContext(self, 'bind', addr)
278 def bind(self: T, addr: str) -> _SocketContext[T]:
279 """s.bind(addr)
281 Bind the socket to an address.
283 This causes the socket to listen on a network port. Sockets on the
284 other side of this connection will use ``Socket.connect(addr)`` to
285 connect to this socket.
287 Returns a context manager which will call unbind on exit.
289 .. versionadded:: 20.0
290 Can be used as a context manager.
292 Parameters
293 ----------
294 addr : str
295 The address string. This has the form 'protocol://interface:port',
296 for example 'tcp://127.0.0.1:5555'. Protocols supported include
297 tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is
298 encoded to utf-8 first.
300 """
301 super().bind(addr)
302 return self._bind_cm(addr)
304 def connect(self: T, addr: str) -> _SocketContext[T]:
305 """s.connect(addr)
307 Connect to a remote 0MQ socket.
309 Returns a context manager which will call disconnect on exit.
311 .. versionadded:: 20.0
312 Can be used as a context manager.
314 Parameters
315 ----------
316 addr : str
317 The address string. This has the form 'protocol://interface:port',
318 for example 'tcp://127.0.0.1:5555'. Protocols supported are
319 tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
320 encoded to utf-8 first.
322 """
323 super().connect(addr)
324 return self._connect_cm(addr)
326 # -------------------------------------------------------------------------
327 # Deprecated aliases
328 # -------------------------------------------------------------------------
330 @property
331 def socket_type(self) -> int:
332 warn("Socket.socket_type is deprecated, use Socket.type", DeprecationWarning)
333 return cast(int, self.type)
335 # -------------------------------------------------------------------------
336 # Hooks for sockopt completion
337 # -------------------------------------------------------------------------
339 def __dir__(self):
340 keys = dir(self.__class__)
341 keys.extend(SocketOption.__members__)
342 return keys
344 # -------------------------------------------------------------------------
345 # Getting/Setting options
346 # -------------------------------------------------------------------------
347 setsockopt = SocketBase.set
348 getsockopt = SocketBase.get
350 def __setattr__(self, key, value):
351 """Override to allow setting zmq.[UN]SUBSCRIBE even though we have a subscribe method"""
352 if key in self.__dict__:
353 object.__setattr__(self, key, value)
354 return
355 _key = key.lower()
356 if _key in ('subscribe', 'unsubscribe'):
357 if isinstance(value, str):
358 value = value.encode('utf8')
359 if _key == 'subscribe':
360 self.set(zmq.SUBSCRIBE, value)
361 else:
362 self.set(zmq.UNSUBSCRIBE, value)
363 return
364 super().__setattr__(key, value)
366 def fileno(self) -> int:
367 """Return edge-triggered file descriptor for this socket.
369 This is a read-only edge-triggered file descriptor for both read and write events on this socket.
370 It is important that all available events be consumed when an event is detected,
371 otherwise the read event will not trigger again.
373 .. versionadded:: 17.0
374 """
375 return self.FD
377 def subscribe(self, topic: Union[str, bytes]) -> None:
378 """Subscribe to a topic
380 Only for SUB sockets.
382 .. versionadded:: 15.3
383 """
384 if isinstance(topic, str):
385 topic = topic.encode('utf8')
386 self.set(zmq.SUBSCRIBE, topic)
388 def unsubscribe(self, topic: Union[str, bytes]) -> None:
389 """Unsubscribe from a topic
391 Only for SUB sockets.
393 .. versionadded:: 15.3
394 """
395 if isinstance(topic, str):
396 topic = topic.encode('utf8')
397 self.set(zmq.UNSUBSCRIBE, topic)
399 def set_string(self, option: int, optval: str, encoding='utf-8') -> None:
400 """Set socket options with a unicode object.
402 This is simply a wrapper for setsockopt to protect from encoding ambiguity.
404 See the 0MQ documentation for details on specific options.
406 Parameters
407 ----------
408 option : int
409 The name of the option to set. Can be any of: SUBSCRIBE,
410 UNSUBSCRIBE, IDENTITY
411 optval : str
412 The value of the option to set.
413 encoding : str
414 The encoding to be used, default is utf8
415 """
416 if not isinstance(optval, str):
417 raise TypeError(f"strings only, not {type(optval)}: {optval!r}")
418 return self.set(option, optval.encode(encoding))
420 setsockopt_unicode = setsockopt_string = set_string
422 def get_string(self, option: int, encoding='utf-8') -> str:
423 """Get the value of a socket option.
425 See the 0MQ documentation for details on specific options.
427 Parameters
428 ----------
429 option : int
430 The option to retrieve.
432 Returns
433 -------
434 optval : str
435 The value of the option as a unicode string.
436 """
437 if SocketOption(option)._opt_type != _OptType.bytes:
438 raise TypeError(f"option {option} will not return a string to be decoded")
439 return cast(bytes, self.get(option)).decode(encoding)
441 getsockopt_unicode = getsockopt_string = get_string
443 def bind_to_random_port(
444 self: T,
445 addr: str,
446 min_port: int = 49152,
447 max_port: int = 65536,
448 max_tries: int = 100,
449 ) -> int:
450 """Bind this socket to a random port in a range.
452 If the port range is unspecified, the system will choose the port.
454 Parameters
455 ----------
456 addr : str
457 The address string without the port to pass to ``Socket.bind()``.
458 min_port : int, optional
459 The minimum port in the range of ports to try (inclusive).
460 max_port : int, optional
461 The maximum port in the range of ports to try (exclusive).
462 max_tries : int, optional
463 The maximum number of bind attempts to make.
465 Returns
466 -------
467 port : int
468 The port the socket was bound to.
470 Raises
471 ------
472 ZMQBindError
473 if `max_tries` reached before successful bind
474 """
475 if (
476 (zmq.zmq_version_info() >= (3, 2))
477 and min_port == 49152
478 and max_port == 65536
479 ):
480 # if LAST_ENDPOINT is supported, and min_port / max_port weren't specified,
481 # we can bind to port 0 and let the OS do the work
482 self.bind("%s:*" % addr)
483 url = cast(bytes, self.last_endpoint).decode('ascii', 'replace')
484 _, port_s = url.rsplit(':', 1)
485 return int(port_s)
487 for i in range(max_tries):
488 try:
489 port = random.randrange(min_port, max_port)
490 self.bind(f'{addr}:{port}')
491 except ZMQError as exception:
492 en = exception.errno
493 if en == zmq.EADDRINUSE:
494 continue
495 elif sys.platform == 'win32' and en == errno.EACCES:
496 continue
497 else:
498 raise
499 else:
500 return port
501 raise ZMQBindError("Could not bind socket to random port.")
503 def get_hwm(self) -> int:
504 """Get the High Water Mark.
506 On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM
507 """
508 major = zmq.zmq_version_info()[0]
509 if major >= 3:
510 # return sndhwm, fallback on rcvhwm
511 try:
512 return cast(int, self.get(zmq.SNDHWM))
513 except zmq.ZMQError:
514 pass
516 return cast(int, self.get(zmq.RCVHWM))
517 else:
518 return cast(int, self.get(zmq.HWM))
520 def set_hwm(self, value: int) -> None:
521 """Set the High Water Mark.
523 On libzmq ≥ 3, this sets both SNDHWM and RCVHWM
526 .. warning::
528 New values only take effect for subsequent socket
529 bind/connects.
530 """
531 major = zmq.zmq_version_info()[0]
532 if major >= 3:
533 raised = None
534 try:
535 self.sndhwm = value
536 except Exception as e:
537 raised = e
538 try:
539 self.rcvhwm = value
540 except Exception as e:
541 raised = e
543 if raised:
544 raise raised
545 else:
546 self.set(zmq.HWM, value)
548 hwm = property(
549 get_hwm,
550 set_hwm,
551 None,
552 """Property for High Water Mark.
554 Setting hwm sets both SNDHWM and RCVHWM as appropriate.
555 It gets SNDHWM if available, otherwise RCVHWM.
556 """,
557 )
559 # -------------------------------------------------------------------------
560 # Sending and receiving messages
561 # -------------------------------------------------------------------------
563 @overload
564 def send(
565 self,
566 data: Any,
567 flags: int = ...,
568 copy: bool = ...,
569 *,
570 track: Literal[True],
571 routing_id: Optional[int] = ...,
572 group: Optional[str] = ...,
573 ) -> "zmq.MessageTracker":
574 ...
576 @overload
577 def send(
578 self,
579 data: Any,
580 flags: int = ...,
581 copy: bool = ...,
582 *,
583 track: Literal[False],
584 routing_id: Optional[int] = ...,
585 group: Optional[str] = ...,
586 ) -> None:
587 ...
589 @overload
590 def send(
591 self,
592 data: Any,
593 flags: int = ...,
594 *,
595 copy: bool = ...,
596 routing_id: Optional[int] = ...,
597 group: Optional[str] = ...,
598 ) -> None:
599 ...
601 @overload
602 def send(
603 self,
604 data: Any,
605 flags: int = ...,
606 copy: bool = ...,
607 track: bool = ...,
608 routing_id: Optional[int] = ...,
609 group: Optional[str] = ...,
610 ) -> Optional["zmq.MessageTracker"]:
611 ...
613 def send(
614 self,
615 data: Any,
616 flags: int = 0,
617 copy: bool = True,
618 track: bool = False,
619 routing_id: Optional[int] = None,
620 group: Optional[str] = None,
621 ) -> Optional["zmq.MessageTracker"]:
622 """Send a single zmq message frame on this socket.
624 This queues the message to be sent by the IO thread at a later time.
626 With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full;
627 otherwise, this waits until space is available.
628 See :class:`Poller` for more general non-blocking I/O.
630 Parameters
631 ----------
632 data : bytes, Frame, memoryview
633 The content of the message. This can be any object that provides
634 the Python buffer API (i.e. `memoryview(data)` can be called).
635 flags : int
636 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE.
637 copy : bool
638 Should the message be sent in a copying or non-copying manner.
639 track : bool
640 Should the message be tracked for notification that ZMQ has
641 finished with it? (ignored if copy=True)
642 routing_id : int
643 For use with SERVER sockets
644 group : str
645 For use with RADIO sockets
647 Returns
648 -------
649 None : if `copy` or not track
650 None if message was sent, raises an exception otherwise.
651 MessageTracker : if track and not copy
652 a MessageTracker object, whose `pending` property will
653 be True until the send is completed.
655 Raises
656 ------
657 TypeError
658 If a unicode object is passed
659 ValueError
660 If `track=True`, but an untracked Frame is passed.
661 ZMQError
662 If the send does not succeed for any reason (including
663 if NOBLOCK is set and the outgoing queue is full).
666 .. versionchanged:: 17.0
668 DRAFT support for routing_id and group arguments.
669 """
670 if routing_id is not None:
671 if not isinstance(data, zmq.Frame):
672 data = zmq.Frame(
673 data,
674 track=track,
675 copy=copy or None,
676 copy_threshold=self.copy_threshold,
677 )
678 data.routing_id = routing_id
679 if group is not None:
680 if not isinstance(data, zmq.Frame):
681 data = zmq.Frame(
682 data,
683 track=track,
684 copy=copy or None,
685 copy_threshold=self.copy_threshold,
686 )
687 data.group = group
688 return super().send(data, flags=flags, copy=copy, track=track)
690 def send_multipart(
691 self,
692 msg_parts: Sequence,
693 flags: int = 0,
694 copy: bool = True,
695 track: bool = False,
696 **kwargs,
697 ):
698 """Send a sequence of buffers as a multipart message.
700 The zmq.SNDMORE flag is added to all msg parts before the last.
702 Parameters
703 ----------
704 msg_parts : iterable
705 A sequence of objects to send as a multipart message. Each element
706 can be any sendable object (Frame, bytes, buffer-providers)
707 flags : int, optional
708 Any valid flags for :func:`Socket.send`.
709 SNDMORE is added automatically for frames before the last.
710 copy : bool, optional
711 Should the frame(s) be sent in a copying or non-copying manner.
712 If copy=False, frames smaller than self.copy_threshold bytes
713 will be copied anyway.
714 track : bool, optional
715 Should the frame(s) be tracked for notification that ZMQ has
716 finished with it (ignored if copy=True).
718 Returns
719 -------
720 None : if copy or not track
721 MessageTracker : if track and not copy
722 a MessageTracker object, whose `pending` property will
723 be True until the last send is completed.
724 """
725 # typecheck parts before sending:
726 for i, msg in enumerate(msg_parts):
727 if isinstance(msg, (zmq.Frame, bytes, memoryview)):
728 continue
729 try:
730 memoryview(msg)
731 except Exception:
732 rmsg = repr(msg)
733 if len(rmsg) > 32:
734 rmsg = rmsg[:32] + '...'
735 raise TypeError(
736 "Frame %i (%s) does not support the buffer interface."
737 % (
738 i,
739 rmsg,
740 )
741 )
742 for msg in msg_parts[:-1]:
743 self.send(msg, zmq.SNDMORE | flags, copy=copy, track=track)
744 # Send the last part without the extra SNDMORE flag.
745 return self.send(msg_parts[-1], flags, copy=copy, track=track)
747 @overload
748 def recv_multipart(
749 self, flags: int = ..., *, copy: Literal[True], track: bool = ...
750 ) -> List[bytes]:
751 ...
753 @overload
754 def recv_multipart(
755 self, flags: int = ..., *, copy: Literal[False], track: bool = ...
756 ) -> List[zmq.Frame]:
757 ...
759 @overload
760 def recv_multipart(self, flags: int = ..., *, track: bool = ...) -> List[bytes]:
761 ...
763 @overload
764 def recv_multipart(
765 self, flags: int = 0, copy: bool = True, track: bool = False
766 ) -> Union[List[zmq.Frame], List[bytes]]:
767 ...
769 def recv_multipart(
770 self, flags: int = 0, copy: bool = True, track: bool = False
771 ) -> Union[List[zmq.Frame], List[bytes]]:
772 """Receive a multipart message as a list of bytes or Frame objects
774 Parameters
775 ----------
776 flags : int, optional
777 Any valid flags for :func:`Socket.recv`.
778 copy : bool, optional
779 Should the message frame(s) be received in a copying or non-copying manner?
780 If False a Frame object is returned for each part, if True a copy of
781 the bytes is made for each frame.
782 track : bool, optional
783 Should the message frame(s) be tracked for notification that ZMQ has
784 finished with it? (ignored if copy=True)
786 Returns
787 -------
788 msg_parts : list
789 A list of frames in the multipart message; either Frames or bytes,
790 depending on `copy`.
792 Raises
793 ------
794 ZMQError
795 for any of the reasons :func:`~Socket.recv` might fail
796 """
797 parts = [self.recv(flags, copy=copy, track=track)]
798 # have first part already, only loop while more to receive
799 while self.getsockopt(zmq.RCVMORE):
800 part = self.recv(flags, copy=copy, track=track)
801 parts.append(part)
802 # cast List[Union] to Union[List]
803 # how do we get mypy to recognize that return type is invariant on `copy`?
804 return cast(Union[List[zmq.Frame], List[bytes]], parts)
806 def _deserialize(
807 self,
808 recvd: bytes,
809 load: Callable[[bytes], Any],
810 ) -> Any:
811 """Deserialize a received message
813 Override in subclass (e.g. Futures) if recvd is not the raw bytes.
815 The default implementation expects bytes and returns the deserialized message immediately.
817 Parameters
818 ----------
820 load: callable
821 Callable that deserializes bytes
822 recvd:
823 The object returned by self.recv
825 """
826 return load(recvd)
828 def send_serialized(self, msg, serialize, flags=0, copy=True, **kwargs):
829 """Send a message with a custom serialization function.
831 .. versionadded:: 17
833 Parameters
834 ----------
835 msg : The message to be sent. Can be any object serializable by `serialize`.
836 serialize : callable
837 The serialization function to use.
838 serialize(msg) should return an iterable of sendable message frames
839 (e.g. bytes objects), which will be passed to send_multipart.
840 flags : int, optional
841 Any valid flags for :func:`Socket.send`.
842 copy : bool, optional
843 Whether to copy the frames.
845 """
846 frames = serialize(msg)
847 return self.send_multipart(frames, flags=flags, copy=copy, **kwargs)
849 def recv_serialized(self, deserialize, flags=0, copy=True):
850 """Receive a message with a custom deserialization function.
852 .. versionadded:: 17
854 Parameters
855 ----------
856 deserialize : callable
857 The deserialization function to use.
858 deserialize will be called with one argument: the list of frames
859 returned by recv_multipart() and can return any object.
860 flags : int, optional
861 Any valid flags for :func:`Socket.recv`.
862 copy : bool, optional
863 Whether to recv bytes or Frame objects.
865 Returns
866 -------
867 obj : object
868 The object returned by the deserialization function.
870 Raises
871 ------
872 ZMQError
873 for any of the reasons :func:`~Socket.recv` might fail
874 """
875 frames = self.recv_multipart(flags=flags, copy=copy)
876 return self._deserialize(frames, deserialize)
878 def send_string(
879 self,
880 u: str,
881 flags: int = 0,
882 copy: bool = True,
883 encoding: str = 'utf-8',
884 **kwargs,
885 ) -> Optional["zmq.Frame"]:
886 """Send a Python unicode string as a message with an encoding.
888 0MQ communicates with raw bytes, so you must encode/decode
889 text (str) around 0MQ.
891 Parameters
892 ----------
893 u : str
894 The unicode string to send.
895 flags : int, optional
896 Any valid flags for :func:`Socket.send`.
897 encoding : str [default: 'utf-8']
898 The encoding to be used
899 """
900 if not isinstance(u, str):
901 raise TypeError("str objects only")
902 return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs)
904 send_unicode = send_string
906 def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str:
907 """Receive a unicode string, as sent by send_string.
909 Parameters
910 ----------
911 flags : int
912 Any valid flags for :func:`Socket.recv`.
913 encoding : str [default: 'utf-8']
914 The encoding to be used
916 Returns
917 -------
918 s : str
919 The Python unicode string that arrives as encoded bytes.
921 Raises
922 ------
923 ZMQError
924 for any of the reasons :func:`~Socket.recv` might fail
925 """
926 msg = self.recv(flags=flags)
927 return self._deserialize(msg, lambda buf: buf.decode(encoding))
929 recv_unicode = recv_string
931 def send_pyobj(
932 self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, **kwargs
933 ) -> Optional[zmq.Frame]:
934 """Send a Python object as a message using pickle to serialize.
936 Parameters
937 ----------
938 obj : Python object
939 The Python object to send.
940 flags : int
941 Any valid flags for :func:`Socket.send`.
942 protocol : int
943 The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL
944 where defined, and pickle.HIGHEST_PROTOCOL elsewhere.
945 """
946 msg = pickle.dumps(obj, protocol)
947 return self.send(msg, flags=flags, **kwargs)
949 def recv_pyobj(self, flags: int = 0) -> Any:
950 """Receive a Python object as a message using pickle to serialize.
952 Parameters
953 ----------
954 flags : int
955 Any valid flags for :func:`Socket.recv`.
957 Returns
958 -------
959 obj : Python object
960 The Python object that arrives as a message.
962 Raises
963 ------
964 ZMQError
965 for any of the reasons :func:`~Socket.recv` might fail
966 """
967 msg = self.recv(flags)
968 return self._deserialize(msg, pickle.loads)
970 def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None:
971 """Send a Python object as a message using json to serialize.
973 Keyword arguments are passed on to json.dumps
975 Parameters
976 ----------
977 obj : Python object
978 The Python object to send
979 flags : int
980 Any valid flags for :func:`Socket.send`
981 """
982 send_kwargs = {}
983 for key in ('routing_id', 'group'):
984 if key in kwargs:
985 send_kwargs[key] = kwargs.pop(key)
986 msg = jsonapi.dumps(obj, **kwargs)
987 return self.send(msg, flags=flags, **send_kwargs)
989 def recv_json(self, flags: int = 0, **kwargs) -> Union[List, str, int, float, Dict]:
990 """Receive a Python object as a message using json to serialize.
992 Keyword arguments are passed on to json.loads
994 Parameters
995 ----------
996 flags : int
997 Any valid flags for :func:`Socket.recv`.
999 Returns
1000 -------
1001 obj : Python object
1002 The Python object that arrives as a message.
1004 Raises
1005 ------
1006 ZMQError
1007 for any of the reasons :func:`~Socket.recv` might fail
1008 """
1009 msg = self.recv(flags)
1010 return self._deserialize(msg, lambda buf: jsonapi.loads(buf, **kwargs))
1012 _poller_class = Poller
1014 def poll(self, timeout=None, flags=zmq.POLLIN) -> int:
1015 """Poll the socket for events.
1016 See :class:`Poller` to wait for multiple sockets at once.
1018 Parameters
1019 ----------
1020 timeout : int [default: None]
1021 The timeout (in milliseconds) to wait for an event. If unspecified
1022 (or specified None), will wait forever for an event.
1023 flags : int [default: POLLIN]
1024 POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for.
1026 Returns
1027 -------
1028 event_mask : int
1029 The poll event mask (POLLIN, POLLOUT),
1030 0 if the timeout was reached without an event.
1031 """
1033 if self.closed:
1034 raise ZMQError(zmq.ENOTSUP)
1036 p = self._poller_class()
1037 p.register(self, flags)
1038 evts = dict(p.poll(timeout))
1039 # return 0 if no events, otherwise return event bitfield
1040 return evts.get(self, 0)
1042 def get_monitor_socket(
1043 self: T, events: Optional[int] = None, addr: Optional[str] = None
1044 ) -> T:
1045 """Return a connected PAIR socket ready to receive the event notifications.
1047 .. versionadded:: libzmq-4.0
1048 .. versionadded:: 14.0
1050 Parameters
1051 ----------
1052 events : int [default: ZMQ_EVENT_ALL]
1053 The bitmask defining which events are wanted.
1054 addr : string [default: None]
1055 The optional endpoint for the monitoring sockets.
1057 Returns
1058 -------
1059 socket : (PAIR)
1060 The socket is already connected and ready to receive messages.
1061 """
1062 # safe-guard, method only available on libzmq >= 4
1063 if zmq.zmq_version_info() < (4,):
1064 raise NotImplementedError(
1065 "get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version()
1066 )
1068 # if already monitoring, return existing socket
1069 if self._monitor_socket:
1070 if self._monitor_socket.closed:
1071 self._monitor_socket = None
1072 else:
1073 return self._monitor_socket
1075 if addr is None:
1076 # create endpoint name from internal fd
1077 addr = f"inproc://monitor.s-{self.FD}"
1078 if events is None:
1079 # use all events
1080 events = zmq.EVENT_ALL
1081 # attach monitoring socket
1082 self.monitor(addr, events)
1083 # create new PAIR socket and connect it
1084 self._monitor_socket = self.context.socket(zmq.PAIR)
1085 self._monitor_socket.connect(addr)
1086 return self._monitor_socket
1088 def disable_monitor(self) -> None:
1089 """Shutdown the PAIR socket (created using get_monitor_socket)
1090 that is serving socket events.
1092 .. versionadded:: 14.4
1093 """
1094 self._monitor_socket = None
1095 self.monitor(None, 0)
1098__all__ = ['Socket']