Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/sugar/socket.py: 32%
326 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1"""0MQ Socket pure Python methods."""
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
7import errno
8import pickle
9import random
10import sys
11from typing import (
12 Any,
13 Callable,
14 Dict,
15 Generic,
16 List,
17 Optional,
18 Sequence,
19 Type,
20 TypeVar,
21 Union,
22 cast,
23 overload,
24)
25from warnings import warn
27import zmq
28from zmq._typing import Literal
29from zmq.backend import Socket as SocketBase
30from zmq.error import ZMQBindError, ZMQError
31from zmq.utils import jsonapi
32from zmq.utils.interop import cast_int_addr
34from ..constants import SocketOption, SocketType, _OptType
35from .attrsettr import AttributeSetter
36from .poll import Poller
38try:
39 DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL
40except AttributeError:
41 DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL
43T = TypeVar("T", bound="Socket")
46class _SocketContext(Generic[T]):
47 """Context Manager for socket bind/unbind"""
49 socket: T
50 kind: str
51 addr: str
53 def __repr__(self):
54 return f"<SocketContext({self.kind}={self.addr!r})>"
56 def __init__(self: "_SocketContext[T]", socket: T, kind: str, addr: str):
57 assert kind in {"bind", "connect"}
58 self.socket = socket
59 self.kind = kind
60 self.addr = addr
62 def __enter__(self: "_SocketContext[T]") -> T:
63 return self.socket
65 def __exit__(self, *args):
66 if self.socket.closed:
67 return
68 if self.kind == "bind":
69 self.socket.unbind(self.addr)
70 elif self.kind == "connect":
71 self.socket.disconnect(self.addr)
74ST = TypeVar("ST")
77class Socket(SocketBase, AttributeSetter, Generic[ST]):
78 """The ZMQ socket object
80 To create a Socket, first create a Context::
82 ctx = zmq.Context.instance()
84 then call ``ctx.socket(socket_type)``::
86 s = ctx.socket(zmq.ROUTER)
88 .. versionadded:: 25
90 Sockets can now be shadowed by passing another Socket.
91 This helps in creating an async copy of a sync socket or vice versa::
93 s = zmq.Socket(async_socket)
95 Which previously had to be::
97 s = zmq.Socket.shadow(async_socket.underlying)
98 """
100 _shadow = False
101 _shadow_obj = None
102 _monitor_socket = None
103 _type_name = 'UNKNOWN'
105 @overload
106 def __init__(
107 self: "Socket[bytes]",
108 ctx_or_socket: "zmq.Context",
109 socket_type: int,
110 *,
111 copy_threshold: Optional[int] = None,
112 ):
113 ...
115 @overload
116 def __init__(
117 self: "Socket[bytes]",
118 *,
119 shadow: Union["Socket", int],
120 copy_threshold: Optional[int] = None,
121 ):
122 ...
124 @overload
125 def __init__(
126 self: "Socket[bytes]",
127 ctx_or_socket: "Socket",
128 ):
129 ...
131 def __init__(
132 self: "Socket[bytes]",
133 ctx_or_socket: Optional[Union["zmq.Context", "Socket"]] = None,
134 socket_type: int = 0,
135 *,
136 shadow: Union["Socket", int] = 0,
137 copy_threshold: Optional[int] = None,
138 ):
139 if isinstance(ctx_or_socket, zmq.Socket):
140 # positional Socket(other_socket)
141 shadow = ctx_or_socket
142 ctx_or_socket = None
144 shadow_address: int = 0
146 if shadow:
147 self._shadow = True
148 # hold a reference to the shadow object
149 self._shadow_obj = shadow
150 if not isinstance(shadow, int):
151 try:
152 shadow = cast(int, shadow.underlying)
153 except AttributeError:
154 pass
155 shadow_address = cast_int_addr(shadow)
156 else:
157 self._shadow = False
159 super().__init__(
160 ctx_or_socket,
161 socket_type,
162 shadow=shadow_address,
163 copy_threshold=copy_threshold,
164 )
166 try:
167 socket_type = cast(int, self.get(zmq.TYPE))
168 except Exception:
169 pass
170 else:
171 try:
172 self.__dict__["type"] = stype = SocketType(socket_type)
173 except ValueError:
174 self._type_name = str(socket_type)
175 else:
176 self._type_name = stype.name
178 def __del__(self):
179 if not self._shadow and not self.closed:
180 if warn is not None:
181 # warn can be None during process teardown
182 warn(
183 f"Unclosed socket {self}",
184 ResourceWarning,
185 stacklevel=2,
186 source=self,
187 )
188 self.close()
190 _repr_cls = "zmq.Socket"
192 def __repr__(self):
193 cls = self.__class__
194 # look up _repr_cls on exact class, not inherited
195 _repr_cls = cls.__dict__.get("_repr_cls", None)
196 if _repr_cls is None:
197 _repr_cls = f"{cls.__module__}.{cls.__name__}"
199 closed = ' closed' if self._closed else ''
201 return f"<{_repr_cls}(zmq.{self._type_name}) at {hex(id(self))}{closed}>"
203 # socket as context manager:
204 def __enter__(self: T) -> T:
205 """Sockets are context managers
207 .. versionadded:: 14.4
208 """
209 return self
211 def __exit__(self, *args, **kwargs):
212 self.close()
214 # -------------------------------------------------------------------------
215 # Socket creation
216 # -------------------------------------------------------------------------
218 def __copy__(self: T, memo=None) -> T:
219 """Copying a Socket creates a shadow copy"""
220 return self.__class__.shadow(self.underlying)
222 __deepcopy__ = __copy__
224 @classmethod
225 def shadow(cls: Type[T], address: Union[int, "zmq.Socket"]) -> T:
226 """Shadow an existing libzmq socket
228 address is a zmq.Socket or an integer (or FFI pointer)
229 representing the address of the libzmq socket.
231 .. versionadded:: 14.1
233 .. versionadded:: 25
234 Support for shadowing `zmq.Socket` objects,
235 instead of just integer addresses.
236 """
237 return cls(shadow=address)
239 def close(self, linger=None) -> None:
240 """
241 Close the socket.
243 If linger is specified, LINGER sockopt will be set prior to closing.
245 Note: closing a zmq Socket may not close the underlying sockets
246 if there are undelivered messages.
247 Only after all messages are delivered or discarded by reaching the socket's LINGER timeout
248 (default: forever)
249 will the underlying sockets be closed.
251 This can be called to close the socket by hand. If this is not
252 called, the socket will automatically be closed when it is
253 garbage collected,
254 in which case you may see a ResourceWarning about the unclosed socket.
255 """
256 if self.context:
257 self.context._rm_socket(self)
258 super().close(linger=linger)
260 # -------------------------------------------------------------------------
261 # Connect/Bind context managers
262 # -------------------------------------------------------------------------
264 def _connect_cm(self: T, addr: str) -> _SocketContext[T]:
265 """Context manager to disconnect on exit
267 .. versionadded:: 20.0
268 """
269 return _SocketContext(self, 'connect', addr)
271 def _bind_cm(self: T, addr: str) -> _SocketContext[T]:
272 """Context manager to unbind on exit
274 .. versionadded:: 20.0
275 """
276 return _SocketContext(self, 'bind', addr)
278 def bind(self: T, addr: str) -> _SocketContext[T]:
279 """s.bind(addr)
281 Bind the socket to an address.
283 This causes the socket to listen on a network port. Sockets on the
284 other side of this connection will use ``Socket.connect(addr)`` to
285 connect to this socket.
287 Returns a context manager which will call unbind on exit.
289 .. versionadded:: 20.0
290 Can be used as a context manager.
292 Parameters
293 ----------
294 addr : str
295 The address string. This has the form 'protocol://interface:port',
296 for example 'tcp://127.0.0.1:5555'. Protocols supported include
297 tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is
298 encoded to utf-8 first.
300 """
301 try:
302 super().bind(addr)
303 except ZMQError as e:
304 e.strerror += f" (addr={addr!r})"
305 raise
306 return self._bind_cm(addr)
308 def connect(self: T, addr: str) -> _SocketContext[T]:
309 """s.connect(addr)
311 Connect to a remote 0MQ socket.
313 Returns a context manager which will call disconnect on exit.
315 .. versionadded:: 20.0
316 Can be used as a context manager.
318 Parameters
319 ----------
320 addr : str
321 The address string. This has the form 'protocol://interface:port',
322 for example 'tcp://127.0.0.1:5555'. Protocols supported are
323 tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
324 encoded to utf-8 first.
326 """
327 try:
328 super().connect(addr)
329 except ZMQError as e:
330 e.strerror += f" (addr={addr!r})"
331 raise
332 return self._connect_cm(addr)
334 # -------------------------------------------------------------------------
335 # Deprecated aliases
336 # -------------------------------------------------------------------------
338 @property
339 def socket_type(self) -> int:
340 warn("Socket.socket_type is deprecated, use Socket.type", DeprecationWarning)
341 return cast(int, self.type)
343 # -------------------------------------------------------------------------
344 # Hooks for sockopt completion
345 # -------------------------------------------------------------------------
347 def __dir__(self):
348 keys = dir(self.__class__)
349 keys.extend(SocketOption.__members__)
350 return keys
352 # -------------------------------------------------------------------------
353 # Getting/Setting options
354 # -------------------------------------------------------------------------
355 setsockopt = SocketBase.set
356 getsockopt = SocketBase.get
358 def __setattr__(self, key, value):
359 """Override to allow setting zmq.[UN]SUBSCRIBE even though we have a subscribe method"""
360 if key in self.__dict__:
361 object.__setattr__(self, key, value)
362 return
363 _key = key.lower()
364 if _key in ('subscribe', 'unsubscribe'):
365 if isinstance(value, str):
366 value = value.encode('utf8')
367 if _key == 'subscribe':
368 self.set(zmq.SUBSCRIBE, value)
369 else:
370 self.set(zmq.UNSUBSCRIBE, value)
371 return
372 super().__setattr__(key, value)
374 def fileno(self) -> int:
375 """Return edge-triggered file descriptor for this socket.
377 This is a read-only edge-triggered file descriptor for both read and write events on this socket.
378 It is important that all available events be consumed when an event is detected,
379 otherwise the read event will not trigger again.
381 .. versionadded:: 17.0
382 """
383 return self.FD
385 def subscribe(self, topic: Union[str, bytes]) -> None:
386 """Subscribe to a topic
388 Only for SUB sockets.
390 .. versionadded:: 15.3
391 """
392 if isinstance(topic, str):
393 topic = topic.encode('utf8')
394 self.set(zmq.SUBSCRIBE, topic)
396 def unsubscribe(self, topic: Union[str, bytes]) -> None:
397 """Unsubscribe from a topic
399 Only for SUB sockets.
401 .. versionadded:: 15.3
402 """
403 if isinstance(topic, str):
404 topic = topic.encode('utf8')
405 self.set(zmq.UNSUBSCRIBE, topic)
407 def set_string(self, option: int, optval: str, encoding='utf-8') -> None:
408 """Set socket options with a unicode object.
410 This is simply a wrapper for setsockopt to protect from encoding ambiguity.
412 See the 0MQ documentation for details on specific options.
414 Parameters
415 ----------
416 option : int
417 The name of the option to set. Can be any of: SUBSCRIBE,
418 UNSUBSCRIBE, IDENTITY
419 optval : str
420 The value of the option to set.
421 encoding : str
422 The encoding to be used, default is utf8
423 """
424 if not isinstance(optval, str):
425 raise TypeError(f"strings only, not {type(optval)}: {optval!r}")
426 return self.set(option, optval.encode(encoding))
428 setsockopt_unicode = setsockopt_string = set_string
430 def get_string(self, option: int, encoding='utf-8') -> str:
431 """Get the value of a socket option.
433 See the 0MQ documentation for details on specific options.
435 Parameters
436 ----------
437 option : int
438 The option to retrieve.
440 Returns
441 -------
442 optval : str
443 The value of the option as a unicode string.
444 """
445 if SocketOption(option)._opt_type != _OptType.bytes:
446 raise TypeError(f"option {option} will not return a string to be decoded")
447 return cast(bytes, self.get(option)).decode(encoding)
449 getsockopt_unicode = getsockopt_string = get_string
451 def bind_to_random_port(
452 self: T,
453 addr: str,
454 min_port: int = 49152,
455 max_port: int = 65536,
456 max_tries: int = 100,
457 ) -> int:
458 """Bind this socket to a random port in a range.
460 If the port range is unspecified, the system will choose the port.
462 Parameters
463 ----------
464 addr : str
465 The address string without the port to pass to ``Socket.bind()``.
466 min_port : int, optional
467 The minimum port in the range of ports to try (inclusive).
468 max_port : int, optional
469 The maximum port in the range of ports to try (exclusive).
470 max_tries : int, optional
471 The maximum number of bind attempts to make.
473 Returns
474 -------
475 port : int
476 The port the socket was bound to.
478 Raises
479 ------
480 ZMQBindError
481 if `max_tries` reached before successful bind
482 """
483 if (
484 (zmq.zmq_version_info() >= (3, 2))
485 and min_port == 49152
486 and max_port == 65536
487 ):
488 # if LAST_ENDPOINT is supported, and min_port / max_port weren't specified,
489 # we can bind to port 0 and let the OS do the work
490 self.bind("%s:*" % addr)
491 url = cast(bytes, self.last_endpoint).decode('ascii', 'replace')
492 _, port_s = url.rsplit(':', 1)
493 return int(port_s)
495 for i in range(max_tries):
496 try:
497 port = random.randrange(min_port, max_port)
498 self.bind(f'{addr}:{port}')
499 except ZMQError as exception:
500 en = exception.errno
501 if en == zmq.EADDRINUSE:
502 continue
503 elif sys.platform == 'win32' and en == errno.EACCES:
504 continue
505 else:
506 raise
507 else:
508 return port
509 raise ZMQBindError("Could not bind socket to random port.")
511 def get_hwm(self) -> int:
512 """Get the High Water Mark.
514 On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM
515 """
516 major = zmq.zmq_version_info()[0]
517 if major >= 3:
518 # return sndhwm, fallback on rcvhwm
519 try:
520 return cast(int, self.get(zmq.SNDHWM))
521 except zmq.ZMQError:
522 pass
524 return cast(int, self.get(zmq.RCVHWM))
525 else:
526 return cast(int, self.get(zmq.HWM))
528 def set_hwm(self, value: int) -> None:
529 """Set the High Water Mark.
531 On libzmq ≥ 3, this sets both SNDHWM and RCVHWM
534 .. warning::
536 New values only take effect for subsequent socket
537 bind/connects.
538 """
539 major = zmq.zmq_version_info()[0]
540 if major >= 3:
541 raised = None
542 try:
543 self.sndhwm = value
544 except Exception as e:
545 raised = e
546 try:
547 self.rcvhwm = value
548 except Exception as e:
549 raised = e
551 if raised:
552 raise raised
553 else:
554 self.set(zmq.HWM, value)
556 hwm = property(
557 get_hwm,
558 set_hwm,
559 None,
560 """Property for High Water Mark.
562 Setting hwm sets both SNDHWM and RCVHWM as appropriate.
563 It gets SNDHWM if available, otherwise RCVHWM.
564 """,
565 )
567 # -------------------------------------------------------------------------
568 # Sending and receiving messages
569 # -------------------------------------------------------------------------
571 @overload
572 def send(
573 self,
574 data: Any,
575 flags: int = ...,
576 copy: bool = ...,
577 *,
578 track: Literal[True],
579 routing_id: Optional[int] = ...,
580 group: Optional[str] = ...,
581 ) -> "zmq.MessageTracker":
582 ...
584 @overload
585 def send(
586 self,
587 data: Any,
588 flags: int = ...,
589 copy: bool = ...,
590 *,
591 track: Literal[False],
592 routing_id: Optional[int] = ...,
593 group: Optional[str] = ...,
594 ) -> None:
595 ...
597 @overload
598 def send(
599 self,
600 data: Any,
601 flags: int = ...,
602 *,
603 copy: bool = ...,
604 routing_id: Optional[int] = ...,
605 group: Optional[str] = ...,
606 ) -> None:
607 ...
609 @overload
610 def send(
611 self,
612 data: Any,
613 flags: int = ...,
614 copy: bool = ...,
615 track: bool = ...,
616 routing_id: Optional[int] = ...,
617 group: Optional[str] = ...,
618 ) -> Optional["zmq.MessageTracker"]:
619 ...
621 def send(
622 self,
623 data: Any,
624 flags: int = 0,
625 copy: bool = True,
626 track: bool = False,
627 routing_id: Optional[int] = None,
628 group: Optional[str] = None,
629 ) -> Optional["zmq.MessageTracker"]:
630 """Send a single zmq message frame on this socket.
632 This queues the message to be sent by the IO thread at a later time.
634 With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full;
635 otherwise, this waits until space is available.
636 See :class:`Poller` for more general non-blocking I/O.
638 Parameters
639 ----------
640 data : bytes, Frame, memoryview
641 The content of the message. This can be any object that provides
642 the Python buffer API (i.e. `memoryview(data)` can be called).
643 flags : int
644 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE.
645 copy : bool
646 Should the message be sent in a copying or non-copying manner.
647 track : bool
648 Should the message be tracked for notification that ZMQ has
649 finished with it? (ignored if copy=True)
650 routing_id : int
651 For use with SERVER sockets
652 group : str
653 For use with RADIO sockets
655 Returns
656 -------
657 None : if `copy` or not track
658 None if message was sent, raises an exception otherwise.
659 MessageTracker : if track and not copy
660 a MessageTracker object, whose `pending` property will
661 be True until the send is completed.
663 Raises
664 ------
665 TypeError
666 If a unicode object is passed
667 ValueError
668 If `track=True`, but an untracked Frame is passed.
669 ZMQError
670 If the send does not succeed for any reason (including
671 if NOBLOCK is set and the outgoing queue is full).
674 .. versionchanged:: 17.0
676 DRAFT support for routing_id and group arguments.
677 """
678 if routing_id is not None:
679 if not isinstance(data, zmq.Frame):
680 data = zmq.Frame(
681 data,
682 track=track,
683 copy=copy or None,
684 copy_threshold=self.copy_threshold,
685 )
686 data.routing_id = routing_id
687 if group is not None:
688 if not isinstance(data, zmq.Frame):
689 data = zmq.Frame(
690 data,
691 track=track,
692 copy=copy or None,
693 copy_threshold=self.copy_threshold,
694 )
695 data.group = group
696 return super().send(data, flags=flags, copy=copy, track=track)
698 def send_multipart(
699 self,
700 msg_parts: Sequence,
701 flags: int = 0,
702 copy: bool = True,
703 track: bool = False,
704 **kwargs,
705 ):
706 """Send a sequence of buffers as a multipart message.
708 The zmq.SNDMORE flag is added to all msg parts before the last.
710 Parameters
711 ----------
712 msg_parts : iterable
713 A sequence of objects to send as a multipart message. Each element
714 can be any sendable object (Frame, bytes, buffer-providers)
715 flags : int, optional
716 Any valid flags for :func:`Socket.send`.
717 SNDMORE is added automatically for frames before the last.
718 copy : bool, optional
719 Should the frame(s) be sent in a copying or non-copying manner.
720 If copy=False, frames smaller than self.copy_threshold bytes
721 will be copied anyway.
722 track : bool, optional
723 Should the frame(s) be tracked for notification that ZMQ has
724 finished with it (ignored if copy=True).
726 Returns
727 -------
728 None : if copy or not track
729 MessageTracker : if track and not copy
730 a MessageTracker object, whose `pending` property will
731 be True until the last send is completed.
732 """
733 # typecheck parts before sending:
734 for i, msg in enumerate(msg_parts):
735 if isinstance(msg, (zmq.Frame, bytes, memoryview)):
736 continue
737 try:
738 memoryview(msg)
739 except Exception:
740 rmsg = repr(msg)
741 if len(rmsg) > 32:
742 rmsg = rmsg[:32] + '...'
743 raise TypeError(
744 "Frame %i (%s) does not support the buffer interface."
745 % (
746 i,
747 rmsg,
748 )
749 )
750 for msg in msg_parts[:-1]:
751 self.send(msg, zmq.SNDMORE | flags, copy=copy, track=track)
752 # Send the last part without the extra SNDMORE flag.
753 return self.send(msg_parts[-1], flags, copy=copy, track=track)
755 @overload
756 def recv_multipart(
757 self, flags: int = ..., *, copy: Literal[True], track: bool = ...
758 ) -> List[bytes]:
759 ...
761 @overload
762 def recv_multipart(
763 self, flags: int = ..., *, copy: Literal[False], track: bool = ...
764 ) -> List[zmq.Frame]:
765 ...
767 @overload
768 def recv_multipart(self, flags: int = ..., *, track: bool = ...) -> List[bytes]:
769 ...
771 @overload
772 def recv_multipart(
773 self, flags: int = 0, copy: bool = True, track: bool = False
774 ) -> Union[List[zmq.Frame], List[bytes]]:
775 ...
777 def recv_multipart(
778 self, flags: int = 0, copy: bool = True, track: bool = False
779 ) -> Union[List[zmq.Frame], List[bytes]]:
780 """Receive a multipart message as a list of bytes or Frame objects
782 Parameters
783 ----------
784 flags : int, optional
785 Any valid flags for :func:`Socket.recv`.
786 copy : bool, optional
787 Should the message frame(s) be received in a copying or non-copying manner?
788 If False a Frame object is returned for each part, if True a copy of
789 the bytes is made for each frame.
790 track : bool, optional
791 Should the message frame(s) be tracked for notification that ZMQ has
792 finished with it? (ignored if copy=True)
794 Returns
795 -------
796 msg_parts : list
797 A list of frames in the multipart message; either Frames or bytes,
798 depending on `copy`.
800 Raises
801 ------
802 ZMQError
803 for any of the reasons :func:`~Socket.recv` might fail
804 """
805 parts = [self.recv(flags, copy=copy, track=track)]
806 # have first part already, only loop while more to receive
807 while self.getsockopt(zmq.RCVMORE):
808 part = self.recv(flags, copy=copy, track=track)
809 parts.append(part)
810 # cast List[Union] to Union[List]
811 # how do we get mypy to recognize that return type is invariant on `copy`?
812 return cast(Union[List[zmq.Frame], List[bytes]], parts)
814 def _deserialize(
815 self,
816 recvd: bytes,
817 load: Callable[[bytes], Any],
818 ) -> Any:
819 """Deserialize a received message
821 Override in subclass (e.g. Futures) if recvd is not the raw bytes.
823 The default implementation expects bytes and returns the deserialized message immediately.
825 Parameters
826 ----------
828 load: callable
829 Callable that deserializes bytes
830 recvd:
831 The object returned by self.recv
833 """
834 return load(recvd)
836 def send_serialized(self, msg, serialize, flags=0, copy=True, **kwargs):
837 """Send a message with a custom serialization function.
839 .. versionadded:: 17
841 Parameters
842 ----------
843 msg : The message to be sent. Can be any object serializable by `serialize`.
844 serialize : callable
845 The serialization function to use.
846 serialize(msg) should return an iterable of sendable message frames
847 (e.g. bytes objects), which will be passed to send_multipart.
848 flags : int, optional
849 Any valid flags for :func:`Socket.send`.
850 copy : bool, optional
851 Whether to copy the frames.
853 """
854 frames = serialize(msg)
855 return self.send_multipart(frames, flags=flags, copy=copy, **kwargs)
857 def recv_serialized(self, deserialize, flags=0, copy=True):
858 """Receive a message with a custom deserialization function.
860 .. versionadded:: 17
862 Parameters
863 ----------
864 deserialize : callable
865 The deserialization function to use.
866 deserialize will be called with one argument: the list of frames
867 returned by recv_multipart() and can return any object.
868 flags : int, optional
869 Any valid flags for :func:`Socket.recv`.
870 copy : bool, optional
871 Whether to recv bytes or Frame objects.
873 Returns
874 -------
875 obj : object
876 The object returned by the deserialization function.
878 Raises
879 ------
880 ZMQError
881 for any of the reasons :func:`~Socket.recv` might fail
882 """
883 frames = self.recv_multipart(flags=flags, copy=copy)
884 return self._deserialize(frames, deserialize)
886 def send_string(
887 self,
888 u: str,
889 flags: int = 0,
890 copy: bool = True,
891 encoding: str = 'utf-8',
892 **kwargs,
893 ) -> Optional["zmq.Frame"]:
894 """Send a Python unicode string as a message with an encoding.
896 0MQ communicates with raw bytes, so you must encode/decode
897 text (str) around 0MQ.
899 Parameters
900 ----------
901 u : str
902 The unicode string to send.
903 flags : int, optional
904 Any valid flags for :func:`Socket.send`.
905 encoding : str [default: 'utf-8']
906 The encoding to be used
907 """
908 if not isinstance(u, str):
909 raise TypeError("str objects only")
910 return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs)
912 send_unicode = send_string
914 def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str:
915 """Receive a unicode string, as sent by send_string.
917 Parameters
918 ----------
919 flags : int
920 Any valid flags for :func:`Socket.recv`.
921 encoding : str [default: 'utf-8']
922 The encoding to be used
924 Returns
925 -------
926 s : str
927 The Python unicode string that arrives as encoded bytes.
929 Raises
930 ------
931 ZMQError
932 for any of the reasons :func:`~Socket.recv` might fail
933 """
934 msg = self.recv(flags=flags)
935 return self._deserialize(msg, lambda buf: buf.decode(encoding))
937 recv_unicode = recv_string
939 def send_pyobj(
940 self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, **kwargs
941 ) -> Optional[zmq.Frame]:
942 """Send a Python object as a message using pickle to serialize.
944 Parameters
945 ----------
946 obj : Python object
947 The Python object to send.
948 flags : int
949 Any valid flags for :func:`Socket.send`.
950 protocol : int
951 The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL
952 where defined, and pickle.HIGHEST_PROTOCOL elsewhere.
953 """
954 msg = pickle.dumps(obj, protocol)
955 return self.send(msg, flags=flags, **kwargs)
957 def recv_pyobj(self, flags: int = 0) -> Any:
958 """Receive a Python object as a message using pickle to serialize.
960 Parameters
961 ----------
962 flags : int
963 Any valid flags for :func:`Socket.recv`.
965 Returns
966 -------
967 obj : Python object
968 The Python object that arrives as a message.
970 Raises
971 ------
972 ZMQError
973 for any of the reasons :func:`~Socket.recv` might fail
974 """
975 msg = self.recv(flags)
976 return self._deserialize(msg, pickle.loads)
978 def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None:
979 """Send a Python object as a message using json to serialize.
981 Keyword arguments are passed on to json.dumps
983 Parameters
984 ----------
985 obj : Python object
986 The Python object to send
987 flags : int
988 Any valid flags for :func:`Socket.send`
989 """
990 send_kwargs = {}
991 for key in ('routing_id', 'group'):
992 if key in kwargs:
993 send_kwargs[key] = kwargs.pop(key)
994 msg = jsonapi.dumps(obj, **kwargs)
995 return self.send(msg, flags=flags, **send_kwargs)
997 def recv_json(self, flags: int = 0, **kwargs) -> Union[List, str, int, float, Dict]:
998 """Receive a Python object as a message using json to serialize.
1000 Keyword arguments are passed on to json.loads
1002 Parameters
1003 ----------
1004 flags : int
1005 Any valid flags for :func:`Socket.recv`.
1007 Returns
1008 -------
1009 obj : Python object
1010 The Python object that arrives as a message.
1012 Raises
1013 ------
1014 ZMQError
1015 for any of the reasons :func:`~Socket.recv` might fail
1016 """
1017 msg = self.recv(flags)
1018 return self._deserialize(msg, lambda buf: jsonapi.loads(buf, **kwargs))
1020 _poller_class = Poller
1022 def poll(self, timeout=None, flags=zmq.POLLIN) -> int:
1023 """Poll the socket for events.
1024 See :class:`Poller` to wait for multiple sockets at once.
1026 Parameters
1027 ----------
1028 timeout : int [default: None]
1029 The timeout (in milliseconds) to wait for an event. If unspecified
1030 (or specified None), will wait forever for an event.
1031 flags : int [default: POLLIN]
1032 POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for.
1034 Returns
1035 -------
1036 event_mask : int
1037 The poll event mask (POLLIN, POLLOUT),
1038 0 if the timeout was reached without an event.
1039 """
1041 if self.closed:
1042 raise ZMQError(zmq.ENOTSUP)
1044 p = self._poller_class()
1045 p.register(self, flags)
1046 evts = dict(p.poll(timeout))
1047 # return 0 if no events, otherwise return event bitfield
1048 return evts.get(self, 0)
1050 def get_monitor_socket(
1051 self: T, events: Optional[int] = None, addr: Optional[str] = None
1052 ) -> T:
1053 """Return a connected PAIR socket ready to receive the event notifications.
1055 .. versionadded:: libzmq-4.0
1056 .. versionadded:: 14.0
1058 Parameters
1059 ----------
1060 events : int [default: ZMQ_EVENT_ALL]
1061 The bitmask defining which events are wanted.
1062 addr : string [default: None]
1063 The optional endpoint for the monitoring sockets.
1065 Returns
1066 -------
1067 socket : (PAIR)
1068 The socket is already connected and ready to receive messages.
1069 """
1070 # safe-guard, method only available on libzmq >= 4
1071 if zmq.zmq_version_info() < (4,):
1072 raise NotImplementedError(
1073 "get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version()
1074 )
1076 # if already monitoring, return existing socket
1077 if self._monitor_socket:
1078 if self._monitor_socket.closed:
1079 self._monitor_socket = None
1080 else:
1081 return self._monitor_socket
1083 if addr is None:
1084 # create endpoint name from internal fd
1085 addr = f"inproc://monitor.s-{self.FD}"
1086 if events is None:
1087 # use all events
1088 events = zmq.EVENT_ALL
1089 # attach monitoring socket
1090 self.monitor(addr, events)
1091 # create new PAIR socket and connect it
1092 self._monitor_socket = self.context.socket(zmq.PAIR)
1093 self._monitor_socket.connect(addr)
1094 return self._monitor_socket
1096 def disable_monitor(self) -> None:
1097 """Shutdown the PAIR socket (created using get_monitor_socket)
1098 that is serving socket events.
1100 .. versionadded:: 14.4
1101 """
1102 self._monitor_socket = None
1103 self.monitor(None, 0)
1106__all__ = ['Socket']