Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/zmq/error.py: 44%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""0MQ Error classes and functions."""
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
7from errno import EINTR
10class ZMQBaseError(Exception):
11 """Base exception class for 0MQ errors in Python."""
14class ZMQError(ZMQBaseError):
15 """Wrap an errno style error.
17 Parameters
18 ----------
19 errno : int
20 The ZMQ errno or None. If None, then ``zmq_errno()`` is called and
21 used.
22 msg : str
23 Description of the error or None.
24 """
26 errno: int | None = None
27 strerror: str
29 def __init__(self, errno: int | None = None, msg: str | None = None):
30 """Wrap an errno style error.
32 Parameters
33 ----------
34 errno : int
35 The ZMQ errno or None. If None, then ``zmq_errno()`` is called and
36 used.
37 msg : string
38 Description of the error or None.
39 """
40 from zmq.backend import strerror, zmq_errno
42 if errno is None:
43 errno = zmq_errno()
44 if isinstance(errno, int):
45 self.errno = errno
46 if msg is None:
47 self.strerror = strerror(errno)
48 else:
49 self.strerror = msg
50 else:
51 if msg is None:
52 self.strerror = str(errno)
53 else:
54 self.strerror = msg
55 # flush signals, because there could be a SIGINT
56 # waiting to pounce, resulting in uncaught exceptions.
57 # Doing this here means getting SIGINT during a blocking
58 # libzmq call will raise a *catchable* KeyboardInterrupt
59 # PyErr_CheckSignals()
61 def __str__(self) -> str:
62 return self.strerror
64 def __repr__(self) -> str:
65 return f"{self.__class__.__name__}('{str(self)}')"
68class ZMQBindError(ZMQBaseError):
69 """An error for ``Socket.bind_to_random_port()``.
71 See Also
72 --------
73 .Socket.bind_to_random_port
74 """
77class NotDone(ZMQBaseError):
78 """Raised when timeout is reached while waiting for 0MQ to finish with a Message
80 See Also
81 --------
82 .MessageTracker.wait : object for tracking when ZeroMQ is done
83 """
86class ContextTerminated(ZMQError):
87 """Wrapper for zmq.ETERM
89 .. versionadded:: 13.0
90 """
92 def __init__(self, errno="ignored", msg="ignored"):
93 from zmq import ETERM
95 super().__init__(ETERM)
98class Again(ZMQError):
99 """Wrapper for zmq.EAGAIN
101 .. versionadded:: 13.0
102 """
104 def __init__(self, errno="ignored", msg="ignored"):
105 from zmq import EAGAIN
107 super().__init__(EAGAIN)
110class InterruptedSystemCall(ZMQError, InterruptedError):
111 """Wrapper for EINTR
113 This exception should be caught internally in pyzmq
114 to retry system calls, and not propagate to the user.
116 .. versionadded:: 14.7
117 """
119 errno = EINTR
120 strerror: str
122 def __init__(self, errno="ignored", msg="ignored"):
123 super().__init__(EINTR)
125 def __str__(self):
126 s = super().__str__()
127 return s + ": This call should have been retried. Please report this to pyzmq."
130def _check_rc(rc, errno=None, error_without_errno=True):
131 """internal utility for checking zmq return condition
133 and raising the appropriate Exception class
134 """
135 if rc == -1:
136 if errno is None:
137 from zmq.backend import zmq_errno
139 errno = zmq_errno()
140 if errno == 0 and not error_without_errno:
141 return
142 from zmq import EAGAIN, ETERM
144 if errno == EINTR:
145 raise InterruptedSystemCall(errno)
146 elif errno == EAGAIN:
147 raise Again(errno)
148 elif errno == ETERM:
149 raise ContextTerminated(errno)
150 else:
151 raise ZMQError(errno)
154_zmq_version_info = None
155_zmq_version = None
158class ZMQVersionError(NotImplementedError):
159 """Raised when a feature is not provided by the linked version of libzmq.
161 .. versionadded:: 14.2
162 """
164 min_version = None
166 def __init__(self, min_version: str, msg: str = "Feature"):
167 global _zmq_version
168 if _zmq_version is None:
169 from zmq import zmq_version
171 _zmq_version = zmq_version()
172 self.msg = msg
173 self.min_version = min_version
174 self.version = _zmq_version
176 def __repr__(self):
177 return f"ZMQVersionError('{str(self)}')"
179 def __str__(self):
180 return f"{self.msg} requires libzmq >= {self.min_version}, have {self.version}"
183def _check_version(
184 min_version_info: tuple[int] | tuple[int, int] | tuple[int, int, int],
185 msg: str = "Feature",
186):
187 """Check for libzmq
189 raises ZMQVersionError if current zmq version is not at least min_version
191 min_version_info is a tuple of integers, and will be compared against zmq.zmq_version_info().
192 """
193 global _zmq_version_info
194 if _zmq_version_info is None:
195 from zmq import zmq_version_info
197 _zmq_version_info = zmq_version_info()
198 if _zmq_version_info < min_version_info:
199 min_version = ".".join(str(v) for v in min_version_info)
200 raise ZMQVersionError(min_version, msg)
203__all__ = [
204 "ZMQBaseError",
205 "ZMQBindError",
206 "ZMQError",
207 "NotDone",
208 "ContextTerminated",
209 "InterruptedSystemCall",
210 "Again",
211 "ZMQVersionError",
212]