1"""0MQ Socket pure Python methods."""
2
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
5
6from __future__ import annotations
7
8import errno
9import pickle
10import random
11import sys
12from typing import (
13 Any,
14 Callable,
15 Generic,
16 List,
17 Literal,
18 Sequence,
19 TypeVar,
20 Union,
21 cast,
22 overload,
23)
24from warnings import warn
25
26import zmq
27from zmq._typing import TypeAlias
28from zmq.backend import Socket as SocketBase
29from zmq.error import ZMQBindError, ZMQError
30from zmq.utils import jsonapi
31from zmq.utils.interop import cast_int_addr
32
33from ..constants import SocketOption, SocketType, _OptType
34from .attrsettr import AttributeSetter
35from .poll import Poller
36
37try:
38 DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL
39except AttributeError:
40 DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL
41
42_SocketType = TypeVar("_SocketType", bound="Socket")
43
44_JSONType: TypeAlias = "int | str | bool | list[_JSONType] | dict[str, _JSONType]"
45
46
47class _SocketContext(Generic[_SocketType]):
48 """Context Manager for socket bind/unbind"""
49
50 socket: _SocketType
51 kind: str
52 addr: str
53
54 def __repr__(self):
55 return f"<SocketContext({self.kind}={self.addr!r})>"
56
57 def __init__(
58 self: _SocketContext[_SocketType], socket: _SocketType, kind: str, addr: str
59 ):
60 assert kind in {"bind", "connect"}
61 self.socket = socket
62 self.kind = kind
63 self.addr = addr
64
65 def __enter__(self: _SocketContext[_SocketType]) -> _SocketType:
66 return self.socket
67
68 def __exit__(self, *args):
69 if self.socket.closed:
70 return
71 if self.kind == "bind":
72 self.socket.unbind(self.addr)
73 elif self.kind == "connect":
74 self.socket.disconnect(self.addr)
75
76
77SocketReturnType = TypeVar("SocketReturnType")
78
79
80class Socket(SocketBase, AttributeSetter, Generic[SocketReturnType]):
81 """The ZMQ socket object
82
83 To create a Socket, first create a Context::
84
85 ctx = zmq.Context.instance()
86
87 then call ``ctx.socket(socket_type)``::
88
89 s = ctx.socket(zmq.ROUTER)
90
91 .. versionadded:: 25
92
93 Sockets can now be shadowed by passing another Socket.
94 This helps in creating an async copy of a sync socket or vice versa::
95
96 s = zmq.Socket(async_socket)
97
98 Which previously had to be::
99
100 s = zmq.Socket.shadow(async_socket.underlying)
101 """
102
103 _shadow = False
104 _shadow_obj = None
105 _monitor_socket = None
106 _type_name = 'UNKNOWN'
107
108 @overload
109 def __init__(
110 self: Socket[bytes],
111 ctx_or_socket: zmq.Context,
112 socket_type: int,
113 *,
114 copy_threshold: int | None = None,
115 ): ...
116
117 @overload
118 def __init__(
119 self: Socket[bytes],
120 *,
121 shadow: Socket | int,
122 copy_threshold: int | None = None,
123 ): ...
124
125 @overload
126 def __init__(
127 self: Socket[bytes],
128 ctx_or_socket: Socket,
129 ): ...
130
131 def __init__(
132 self: Socket[bytes],
133 ctx_or_socket: zmq.Context | Socket | None = None,
134 socket_type: int = 0,
135 *,
136 shadow: Socket | int = 0,
137 copy_threshold: int | None = None,
138 ):
139 shadow_context: zmq.Context | None = None
140 if isinstance(ctx_or_socket, zmq.Socket):
141 # positional Socket(other_socket)
142 shadow = ctx_or_socket
143 ctx_or_socket = None
144
145 shadow_address: int = 0
146
147 if shadow:
148 self._shadow = True
149 # hold a reference to the shadow object
150 self._shadow_obj = shadow
151 if not isinstance(shadow, int):
152 if isinstance(shadow, zmq.Socket):
153 shadow_context = shadow.context
154 try:
155 shadow = cast(int, shadow.underlying)
156 except AttributeError:
157 pass
158 shadow_address = cast_int_addr(shadow)
159 else:
160 self._shadow = False
161
162 super().__init__(
163 ctx_or_socket,
164 socket_type,
165 shadow=shadow_address,
166 copy_threshold=copy_threshold,
167 )
168 if self._shadow_obj and shadow_context:
169 # keep self.context reference if shadowing a Socket object
170 self.context = shadow_context
171
172 try:
173 socket_type = cast(int, self.get(zmq.TYPE))
174 except Exception:
175 pass
176 else:
177 try:
178 self.__dict__["type"] = stype = SocketType(socket_type)
179 except ValueError:
180 self._type_name = str(socket_type)
181 else:
182 self._type_name = stype.name
183
184 def __del__(self):
185 if not self._shadow and not self.closed:
186 if warn is not None:
187 # warn can be None during process teardown
188 warn(
189 f"Unclosed socket {self}",
190 ResourceWarning,
191 stacklevel=2,
192 source=self,
193 )
194 self.close()
195
196 _repr_cls = "zmq.Socket"
197
198 def __repr__(self):
199 cls = self.__class__
200 # look up _repr_cls on exact class, not inherited
201 _repr_cls = cls.__dict__.get("_repr_cls", None)
202 if _repr_cls is None:
203 _repr_cls = f"{cls.__module__}.{cls.__name__}"
204
205 closed = ' closed' if self._closed else ''
206
207 return f"<{_repr_cls}(zmq.{self._type_name}) at {hex(id(self))}{closed}>"
208
209 # socket as context manager:
210 def __enter__(self: _SocketType) -> _SocketType:
211 """Sockets are context managers
212
213 .. versionadded:: 14.4
214 """
215 return self
216
217 def __exit__(self, *args, **kwargs):
218 self.close()
219
220 # -------------------------------------------------------------------------
221 # Socket creation
222 # -------------------------------------------------------------------------
223
224 def __copy__(self: _SocketType, memo=None) -> _SocketType:
225 """Copying a Socket creates a shadow copy"""
226 return self.__class__.shadow(self.underlying)
227
228 __deepcopy__ = __copy__
229
230 @classmethod
231 def shadow(cls: type[_SocketType], address: int | zmq.Socket) -> _SocketType:
232 """Shadow an existing libzmq socket
233
234 address is a zmq.Socket or an integer (or FFI pointer)
235 representing the address of the libzmq socket.
236
237 .. versionadded:: 14.1
238
239 .. versionadded:: 25
240 Support for shadowing `zmq.Socket` objects,
241 instead of just integer addresses.
242 """
243 return cls(shadow=address)
244
245 def close(self, linger=None) -> None:
246 """
247 Close the socket.
248
249 If linger is specified, LINGER sockopt will be set prior to closing.
250
251 Note: closing a zmq Socket may not close the underlying sockets
252 if there are undelivered messages.
253 Only after all messages are delivered or discarded by reaching the socket's LINGER timeout
254 (default: forever)
255 will the underlying sockets be closed.
256
257 This can be called to close the socket by hand. If this is not
258 called, the socket will automatically be closed when it is
259 garbage collected,
260 in which case you may see a ResourceWarning about the unclosed socket.
261 """
262 if self.context:
263 self.context._rm_socket(self)
264 super().close(linger=linger)
265
266 # -------------------------------------------------------------------------
267 # Connect/Bind context managers
268 # -------------------------------------------------------------------------
269
270 def _connect_cm(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
271 """Context manager to disconnect on exit
272
273 .. versionadded:: 20.0
274 """
275 return _SocketContext(self, 'connect', addr)
276
277 def _bind_cm(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
278 """Context manager to unbind on exit
279
280 .. versionadded:: 20.0
281 """
282 try:
283 # retrieve last_endpoint
284 # to support binding on random ports via
285 # `socket.bind('tcp://127.0.0.1:0')`
286 addr = cast(bytes, self.get(zmq.LAST_ENDPOINT)).decode("utf8")
287 except (AttributeError, ZMQError, UnicodeDecodeError):
288 pass
289 return _SocketContext(self, 'bind', addr)
290
291 def bind(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
292 """s.bind(addr)
293
294 Bind the socket to an address.
295
296 This causes the socket to listen on a network port. Sockets on the
297 other side of this connection will use ``Socket.connect(addr)`` to
298 connect to this socket.
299
300 Returns a context manager which will call unbind on exit.
301
302 .. versionadded:: 20.0
303 Can be used as a context manager.
304
305 .. versionadded:: 26.0
306 binding to port 0 can be used as a context manager
307 for binding to a random port.
308 The URL can be retrieved as `socket.last_endpoint`.
309
310 Parameters
311 ----------
312 addr : str
313 The address string. This has the form 'protocol://interface:port',
314 for example 'tcp://127.0.0.1:5555'. Protocols supported include
315 tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is
316 encoded to utf-8 first.
317
318 """
319 try:
320 super().bind(addr)
321 except ZMQError as e:
322 e.strerror += f" (addr={addr!r})"
323 raise
324 return self._bind_cm(addr)
325
326 def connect(self: _SocketType, addr: str) -> _SocketContext[_SocketType]:
327 """s.connect(addr)
328
329 Connect to a remote 0MQ socket.
330
331 Returns a context manager which will call disconnect on exit.
332
333 .. versionadded:: 20.0
334 Can be used as a context manager.
335
336 Parameters
337 ----------
338 addr : str
339 The address string. This has the form 'protocol://interface:port',
340 for example 'tcp://127.0.0.1:5555'. Protocols supported are
341 tcp, udp, pgm, inproc and ipc. If the address is unicode, it is
342 encoded to utf-8 first.
343
344 """
345 try:
346 super().connect(addr)
347 except ZMQError as e:
348 e.strerror += f" (addr={addr!r})"
349 raise
350 return self._connect_cm(addr)
351
352 # -------------------------------------------------------------------------
353 # Deprecated aliases
354 # -------------------------------------------------------------------------
355
356 @property
357 def socket_type(self) -> int:
358 warn("Socket.socket_type is deprecated, use Socket.type", DeprecationWarning)
359 return cast(int, self.type)
360
361 # -------------------------------------------------------------------------
362 # Hooks for sockopt completion
363 # -------------------------------------------------------------------------
364
365 def __dir__(self):
366 keys = dir(self.__class__)
367 keys.extend(SocketOption.__members__)
368 return keys
369
370 # -------------------------------------------------------------------------
371 # Getting/Setting options
372 # -------------------------------------------------------------------------
373 setsockopt = SocketBase.set
374 getsockopt = SocketBase.get
375
376 def __setattr__(self, key, value):
377 """Override to allow setting zmq.[UN]SUBSCRIBE even though we have a subscribe method"""
378 if key in self.__dict__:
379 object.__setattr__(self, key, value)
380 return
381 _key = key.lower()
382 if _key in ('subscribe', 'unsubscribe'):
383 if isinstance(value, str):
384 value = value.encode('utf8')
385 if _key == 'subscribe':
386 self.set(zmq.SUBSCRIBE, value)
387 else:
388 self.set(zmq.UNSUBSCRIBE, value)
389 return
390 super().__setattr__(key, value)
391
392 def fileno(self) -> int:
393 """Return edge-triggered file descriptor for this socket.
394
395 This is a read-only edge-triggered file descriptor for both read and write events on this socket.
396 It is important that all available events be consumed when an event is detected,
397 otherwise the read event will not trigger again.
398
399 .. versionadded:: 17.0
400 """
401 return self.FD
402
403 def subscribe(self, topic: str | bytes) -> None:
404 """Subscribe to a topic
405
406 Only for SUB sockets.
407
408 .. versionadded:: 15.3
409 """
410 if isinstance(topic, str):
411 topic = topic.encode('utf8')
412 self.set(zmq.SUBSCRIBE, topic)
413
414 def unsubscribe(self, topic: str | bytes) -> None:
415 """Unsubscribe from a topic
416
417 Only for SUB sockets.
418
419 .. versionadded:: 15.3
420 """
421 if isinstance(topic, str):
422 topic = topic.encode('utf8')
423 self.set(zmq.UNSUBSCRIBE, topic)
424
425 def set_string(self, option: int, optval: str, encoding='utf-8') -> None:
426 """Set socket options with a unicode object.
427
428 This is simply a wrapper for setsockopt to protect from encoding ambiguity.
429
430 See the 0MQ documentation for details on specific options.
431
432 Parameters
433 ----------
434 option : int
435 The name of the option to set. Can be any of: SUBSCRIBE,
436 UNSUBSCRIBE, IDENTITY
437 optval : str
438 The value of the option to set.
439 encoding : str
440 The encoding to be used, default is utf8
441 """
442 if not isinstance(optval, str):
443 raise TypeError(f"strings only, not {type(optval)}: {optval!r}")
444 return self.set(option, optval.encode(encoding))
445
446 setsockopt_unicode = setsockopt_string = set_string
447
448 def get_string(self, option: int, encoding='utf-8') -> str:
449 """Get the value of a socket option.
450
451 See the 0MQ documentation for details on specific options.
452
453 Parameters
454 ----------
455 option : int
456 The option to retrieve.
457
458 Returns
459 -------
460 optval : str
461 The value of the option as a unicode string.
462 """
463 if SocketOption(option)._opt_type != _OptType.bytes:
464 raise TypeError(f"option {option} will not return a string to be decoded")
465 return cast(bytes, self.get(option)).decode(encoding)
466
467 getsockopt_unicode = getsockopt_string = get_string
468
469 def bind_to_random_port(
470 self: _SocketType,
471 addr: str,
472 min_port: int = 49152,
473 max_port: int = 65536,
474 max_tries: int = 100,
475 ) -> int:
476 """Bind this socket to a random port in a range.
477
478 If the port range is unspecified, the system will choose the port.
479
480 Parameters
481 ----------
482 addr : str
483 The address string without the port to pass to ``Socket.bind()``.
484 min_port : int, optional
485 The minimum port in the range of ports to try (inclusive).
486 max_port : int, optional
487 The maximum port in the range of ports to try (exclusive).
488 max_tries : int, optional
489 The maximum number of bind attempts to make.
490
491 Returns
492 -------
493 port : int
494 The port the socket was bound to.
495
496 Raises
497 ------
498 ZMQBindError
499 if `max_tries` reached before successful bind
500 """
501 if min_port == 49152 and max_port == 65536:
502 # if LAST_ENDPOINT is supported, and min_port / max_port weren't specified,
503 # we can bind to port 0 and let the OS do the work
504 self.bind(f"{addr}:*")
505 url = cast(bytes, self.last_endpoint).decode('ascii', 'replace')
506 _, port_s = url.rsplit(':', 1)
507 return int(port_s)
508
509 for i in range(max_tries):
510 try:
511 port = random.randrange(min_port, max_port)
512 self.bind(f'{addr}:{port}')
513 except ZMQError as exception:
514 en = exception.errno
515 if en == zmq.EADDRINUSE:
516 continue
517 elif sys.platform == 'win32' and en == errno.EACCES:
518 continue
519 else:
520 raise
521 else:
522 return port
523 raise ZMQBindError("Could not bind socket to random port.")
524
525 def get_hwm(self) -> int:
526 """Get the High Water Mark.
527
528 On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM
529 """
530 # return sndhwm, fallback on rcvhwm
531 try:
532 return cast(int, self.get(zmq.SNDHWM))
533 except zmq.ZMQError:
534 pass
535
536 return cast(int, self.get(zmq.RCVHWM))
537
538 def set_hwm(self, value: int) -> None:
539 """Set the High Water Mark.
540
541 On libzmq ≥ 3, this sets both SNDHWM and RCVHWM
542
543
544 .. warning::
545
546 New values only take effect for subsequent socket
547 bind/connects.
548 """
549 raised = None
550 try:
551 self.sndhwm = value
552 except Exception as e:
553 raised = e
554 try:
555 self.rcvhwm = value
556 except Exception as e:
557 raised = e
558
559 if raised:
560 raise raised
561
562 hwm = property(
563 get_hwm,
564 set_hwm,
565 None,
566 """Property for High Water Mark.
567
568 Setting hwm sets both SNDHWM and RCVHWM as appropriate.
569 It gets SNDHWM if available, otherwise RCVHWM.
570 """,
571 )
572
573 # -------------------------------------------------------------------------
574 # Sending and receiving messages
575 # -------------------------------------------------------------------------
576
577 @overload
578 def send(
579 self,
580 data: Any,
581 flags: int = ...,
582 copy: bool = ...,
583 *,
584 track: Literal[True],
585 routing_id: int | None = ...,
586 group: str | None = ...,
587 ) -> zmq.MessageTracker: ...
588
589 @overload
590 def send(
591 self,
592 data: Any,
593 flags: int = ...,
594 copy: bool = ...,
595 *,
596 track: Literal[False],
597 routing_id: int | None = ...,
598 group: str | None = ...,
599 ) -> None: ...
600
601 @overload
602 def send(
603 self,
604 data: Any,
605 flags: int = ...,
606 *,
607 copy: bool = ...,
608 routing_id: int | None = ...,
609 group: str | None = ...,
610 ) -> None: ...
611
612 @overload
613 def send(
614 self,
615 data: Any,
616 flags: int = ...,
617 copy: bool = ...,
618 track: bool = ...,
619 routing_id: int | None = ...,
620 group: str | None = ...,
621 ) -> zmq.MessageTracker | None: ...
622
623 def send(
624 self,
625 data: Any,
626 flags: int = 0,
627 copy: bool = True,
628 track: bool = False,
629 routing_id: int | None = None,
630 group: str | None = None,
631 ) -> zmq.MessageTracker | None:
632 """Send a single zmq message frame on this socket.
633
634 This queues the message to be sent by the IO thread at a later time.
635
636 With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full;
637 otherwise, this waits until space is available.
638 See :class:`Poller` for more general non-blocking I/O.
639
640 Parameters
641 ----------
642 data : bytes, Frame, memoryview
643 The content of the message. This can be any object that provides
644 the Python buffer API (i.e. `memoryview(data)` can be called).
645 flags : int
646 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE.
647 copy : bool
648 Should the message be sent in a copying or non-copying manner.
649 track : bool
650 Should the message be tracked for notification that ZMQ has
651 finished with it? (ignored if copy=True)
652 routing_id : int
653 For use with SERVER sockets
654 group : str
655 For use with RADIO sockets
656
657 Returns
658 -------
659 None : if `copy` or not track
660 None if message was sent, raises an exception otherwise.
661 MessageTracker : if track and not copy
662 a MessageTracker object, whose `done` property will
663 be False until the send is completed.
664
665 Raises
666 ------
667 TypeError
668 If a unicode object is passed
669 ValueError
670 If `track=True`, but an untracked Frame is passed.
671 ZMQError
672 If the send does not succeed for any reason (including
673 if NOBLOCK is set and the outgoing queue is full).
674
675
676 .. versionchanged:: 17.0
677
678 DRAFT support for routing_id and group arguments.
679 """
680 if routing_id is not None:
681 if not isinstance(data, zmq.Frame):
682 data = zmq.Frame(
683 data,
684 track=track,
685 copy=copy or None,
686 copy_threshold=self.copy_threshold,
687 )
688 data.routing_id = routing_id
689 if group is not None:
690 if not isinstance(data, zmq.Frame):
691 data = zmq.Frame(
692 data,
693 track=track,
694 copy=copy or None,
695 copy_threshold=self.copy_threshold,
696 )
697 data.group = group
698 return super().send(data, flags=flags, copy=copy, track=track)
699
700 def send_multipart(
701 self,
702 msg_parts: Sequence,
703 flags: int = 0,
704 copy: bool = True,
705 track: bool = False,
706 **kwargs,
707 ):
708 """Send a sequence of buffers as a multipart message.
709
710 The zmq.SNDMORE flag is added to all msg parts before the last.
711
712 Parameters
713 ----------
714 msg_parts : iterable
715 A sequence of objects to send as a multipart message. Each element
716 can be any sendable object (Frame, bytes, buffer-providers)
717 flags : int, optional
718 Any valid flags for :func:`Socket.send`.
719 SNDMORE is added automatically for frames before the last.
720 copy : bool, optional
721 Should the frame(s) be sent in a copying or non-copying manner.
722 If copy=False, frames smaller than self.copy_threshold bytes
723 will be copied anyway.
724 track : bool, optional
725 Should the frame(s) be tracked for notification that ZMQ has
726 finished with it (ignored if copy=True).
727
728 Returns
729 -------
730 None : if copy or not track
731 MessageTracker : if track and not copy
732 a MessageTracker object, whose `done` property will
733 be False until the last send is completed.
734 """
735 # typecheck parts before sending:
736 for i, msg in enumerate(msg_parts):
737 if isinstance(msg, (zmq.Frame, bytes, memoryview)):
738 continue
739 try:
740 memoryview(msg)
741 except Exception:
742 rmsg = repr(msg)
743 if len(rmsg) > 32:
744 rmsg = rmsg[:32] + '...'
745 raise TypeError(
746 f"Frame {i} ({rmsg}) does not support the buffer interface."
747 )
748 for msg in msg_parts[:-1]:
749 self.send(msg, zmq.SNDMORE | flags, copy=copy, track=track)
750 # Send the last part without the extra SNDMORE flag.
751 return self.send(msg_parts[-1], flags, copy=copy, track=track)
752
753 @overload
754 def recv_multipart(
755 self, flags: int = ..., *, copy: Literal[True], track: bool = ...
756 ) -> list[bytes]: ...
757
758 @overload
759 def recv_multipart(
760 self, flags: int = ..., *, copy: Literal[False], track: bool = ...
761 ) -> list[zmq.Frame]: ...
762
763 @overload
764 def recv_multipart(self, flags: int = ..., *, track: bool = ...) -> list[bytes]: ...
765
766 @overload
767 def recv_multipart(
768 self, flags: int = 0, copy: bool = True, track: bool = False
769 ) -> list[zmq.Frame] | list[bytes]: ...
770
771 def recv_multipart(
772 self, flags: int = 0, copy: bool = True, track: bool = False
773 ) -> list[zmq.Frame] | list[bytes]:
774 """Receive a multipart message as a list of bytes or Frame objects
775
776 Parameters
777 ----------
778 flags : int, optional
779 Any valid flags for :func:`Socket.recv`.
780 copy : bool, optional
781 Should the message frame(s) be received in a copying or non-copying manner?
782 If False a Frame object is returned for each part, if True a copy of
783 the bytes is made for each frame.
784 track : bool, optional
785 Should the message frame(s) be tracked for notification that ZMQ has
786 finished with it? (ignored if copy=True)
787
788 Returns
789 -------
790 msg_parts : list
791 A list of frames in the multipart message; either Frames or bytes,
792 depending on `copy`.
793
794 Raises
795 ------
796 ZMQError
797 for any of the reasons :func:`~Socket.recv` might fail
798 """
799 parts = [self.recv(flags, copy=copy, track=track)]
800 # have first part already, only loop while more to receive
801 while self.getsockopt(zmq.RCVMORE):
802 part = self.recv(flags, copy=copy, track=track)
803 parts.append(part)
804 # cast List[Union] to Union[List]
805 # how do we get mypy to recognize that return type is invariant on `copy`?
806 return cast(Union[List[zmq.Frame], List[bytes]], parts)
807
808 def _deserialize(
809 self,
810 recvd: bytes,
811 load: Callable[[bytes], Any],
812 ) -> Any:
813 """Deserialize a received message
814
815 Override in subclass (e.g. Futures) if recvd is not the raw bytes.
816
817 The default implementation expects bytes and returns the deserialized message immediately.
818
819 Parameters
820 ----------
821
822 load: callable
823 Callable that deserializes bytes
824 recvd:
825 The object returned by self.recv
826
827 """
828 return load(recvd)
829
830 def send_serialized(self, msg, serialize, flags=0, copy=True, **kwargs):
831 """Send a message with a custom serialization function.
832
833 .. versionadded:: 17
834
835 Parameters
836 ----------
837 msg : The message to be sent. Can be any object serializable by `serialize`.
838 serialize : callable
839 The serialization function to use.
840 serialize(msg) should return an iterable of sendable message frames
841 (e.g. bytes objects), which will be passed to send_multipart.
842 flags : int, optional
843 Any valid flags for :func:`Socket.send`.
844 copy : bool, optional
845 Whether to copy the frames.
846
847 """
848 frames = serialize(msg)
849 return self.send_multipart(frames, flags=flags, copy=copy, **kwargs)
850
851 def recv_serialized(self, deserialize, flags=0, copy=True):
852 """Receive a message with a custom deserialization function.
853
854 .. versionadded:: 17
855
856 Parameters
857 ----------
858 deserialize : callable
859 The deserialization function to use.
860 deserialize will be called with one argument: the list of frames
861 returned by recv_multipart() and can return any object.
862 flags : int, optional
863 Any valid flags for :func:`Socket.recv`.
864 copy : bool, optional
865 Whether to recv bytes or Frame objects.
866
867 Returns
868 -------
869 obj : object
870 The object returned by the deserialization function.
871
872 Raises
873 ------
874 ZMQError
875 for any of the reasons :func:`~Socket.recv` might fail
876 """
877 frames = self.recv_multipart(flags=flags, copy=copy)
878 return self._deserialize(frames, deserialize)
879
880 def send_string(
881 self,
882 u: str,
883 flags: int = 0,
884 copy: bool = True,
885 encoding: str = 'utf-8',
886 **kwargs,
887 ) -> zmq.Frame | None:
888 """Send a Python unicode string as a message with an encoding.
889
890 0MQ communicates with raw bytes, so you must encode/decode
891 text (str) around 0MQ.
892
893 Parameters
894 ----------
895 u : str
896 The unicode string to send.
897 flags : int, optional
898 Any valid flags for :func:`Socket.send`.
899 encoding : str
900 The encoding to be used
901 """
902 if not isinstance(u, str):
903 raise TypeError("str objects only")
904 return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs)
905
906 send_unicode = send_string
907
908 def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str:
909 """Receive a unicode string, as sent by send_string.
910
911 Parameters
912 ----------
913 flags : int
914 Any valid flags for :func:`Socket.recv`.
915 encoding : str
916 The encoding to be used
917
918 Returns
919 -------
920 s : str
921 The Python unicode string that arrives as encoded bytes.
922
923 Raises
924 ------
925 ZMQError
926 for any of the reasons :func:`Socket.recv` might fail
927 """
928 msg = self.recv(flags=flags)
929 return self._deserialize(msg, lambda buf: buf.decode(encoding))
930
931 recv_unicode = recv_string
932
933 def send_pyobj(
934 self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, **kwargs
935 ) -> zmq.Frame | None:
936 """
937 Send a Python object as a message using pickle to serialize.
938
939 .. warning::
940
941 Never deserialize an untrusted message with pickle,
942 which can involve arbitrary code execution.
943 Make sure to authenticate the sources of messages
944 before unpickling them, e.g. with transport-level security
945 (e.g. CURVE, ZAP, or IPC permissions)
946 or signed messages.
947
948 Parameters
949 ----------
950 obj : Python object
951 The Python object to send.
952 flags : int
953 Any valid flags for :func:`Socket.send`.
954 protocol : int
955 The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL
956 where defined, and pickle.HIGHEST_PROTOCOL elsewhere.
957 """
958 msg = pickle.dumps(obj, protocol)
959 return self.send(msg, flags=flags, **kwargs)
960
961 def recv_pyobj(self, flags: int = 0) -> Any:
962 """
963 Receive a Python object as a message using UNSAFE pickle to serialize.
964
965 .. warning::
966
967 Never deserialize an untrusted message with pickle,
968 which can involve arbitrary code execution.
969 Make sure to authenticate the sources of messages
970 before unpickling them, e.g. with transport-level security
971 (such as CURVE or IPC permissions)
972 or authenticating messages themselves before deserializing.
973
974 Parameters
975 ----------
976 flags : int
977 Any valid flags for :func:`Socket.recv`.
978
979 Returns
980 -------
981 obj : Python object
982 The Python object that arrives as a message.
983
984 Raises
985 ------
986 ZMQError
987 for any of the reasons :func:`~Socket.recv` might fail
988 """
989 msg = self.recv(flags)
990 return self._deserialize(msg, pickle.loads)
991
992 def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None:
993 """Send a Python object as a message using json to serialize.
994
995 Keyword arguments are passed on to json.dumps
996
997 Parameters
998 ----------
999 obj : Python object
1000 The Python object to send
1001 flags : int
1002 Any valid flags for :func:`Socket.send`
1003 """
1004 send_kwargs = {}
1005 for key in ('routing_id', 'group'):
1006 if key in kwargs:
1007 send_kwargs[key] = kwargs.pop(key)
1008 msg = jsonapi.dumps(obj, **kwargs)
1009 return self.send(msg, flags=flags, **send_kwargs)
1010
1011 def recv_json(self, flags: int = 0, **kwargs) -> _JSONType:
1012 """Receive a Python object as a message using json to serialize.
1013
1014 Keyword arguments are passed on to json.loads
1015
1016 Parameters
1017 ----------
1018 flags : int
1019 Any valid flags for :func:`Socket.recv`.
1020
1021 Returns
1022 -------
1023 obj : Python object
1024 The Python object that arrives as a message.
1025
1026 Raises
1027 ------
1028 ZMQError
1029 for any of the reasons :func:`~Socket.recv` might fail
1030 """
1031 msg = self.recv(flags)
1032 return self._deserialize(msg, lambda buf: jsonapi.loads(buf, **kwargs))
1033
1034 _poller_class = Poller
1035
1036 def poll(self, timeout: int | None = None, flags: int = zmq.POLLIN) -> int:
1037 """Poll the socket for events.
1038
1039 See :class:`Poller` to wait for multiple sockets at once.
1040
1041 Parameters
1042 ----------
1043 timeout : int
1044 The timeout (in milliseconds) to wait for an event. If unspecified
1045 (or specified None), will wait forever for an event.
1046 flags : int
1047 default: POLLIN.
1048 POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for.
1049
1050 Returns
1051 -------
1052 event_mask : int
1053 The poll event mask (POLLIN, POLLOUT),
1054 0 if the timeout was reached without an event.
1055 """
1056
1057 if self.closed:
1058 raise ZMQError(zmq.ENOTSUP)
1059
1060 p = self._poller_class()
1061 p.register(self, flags)
1062 evts = dict(p.poll(timeout))
1063 # return 0 if no events, otherwise return event bitfield
1064 return evts.get(self, 0)
1065
1066 def get_monitor_socket(
1067 self: _SocketType, events: int | None = None, addr: str | None = None
1068 ) -> _SocketType:
1069 """Return a connected PAIR socket ready to receive the event notifications.
1070
1071 .. versionadded:: libzmq-4.0
1072 .. versionadded:: 14.0
1073
1074 Parameters
1075 ----------
1076 events : int
1077 default: `zmq.EVENT_ALL`
1078 The bitmask defining which events are wanted.
1079 addr : str
1080 The optional endpoint for the monitoring sockets.
1081
1082 Returns
1083 -------
1084 socket : zmq.Socket
1085 The PAIR socket, connected and ready to receive messages.
1086 """
1087 # safe-guard, method only available on libzmq >= 4
1088 if zmq.zmq_version_info() < (4,):
1089 raise NotImplementedError(
1090 f"get_monitor_socket requires libzmq >= 4, have {zmq.zmq_version()}"
1091 )
1092
1093 # if already monitoring, return existing socket
1094 if self._monitor_socket:
1095 if self._monitor_socket.closed:
1096 self._monitor_socket = None
1097 else:
1098 return self._monitor_socket
1099
1100 if addr is None:
1101 # create endpoint name from internal fd
1102 addr = f"inproc://monitor.s-{self.FD}"
1103 if events is None:
1104 # use all events
1105 events = zmq.EVENT_ALL
1106 # attach monitoring socket
1107 self.monitor(addr, events)
1108 # create new PAIR socket and connect it
1109 self._monitor_socket = self.context.socket(zmq.PAIR)
1110 self._monitor_socket.connect(addr)
1111 return self._monitor_socket
1112
1113 def disable_monitor(self) -> None:
1114 """Shutdown the PAIR socket (created using get_monitor_socket)
1115 that is serving socket events.
1116
1117 .. versionadded:: 14.4
1118 """
1119 self._monitor_socket = None
1120 self.monitor(None, 0)
1121
1122
1123SyncSocket: TypeAlias = Socket[bytes]
1124
1125__all__ = ['Socket', 'SyncSocket']