Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/error.py: 42%
76 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1"""0MQ Error classes and functions."""
3# Copyright (C) PyZMQ Developers
4# Distributed under the terms of the Modified BSD License.
6from errno import EINTR
7from typing import Optional, Tuple, Union
10class ZMQBaseError(Exception):
11 """Base exception class for 0MQ errors in Python."""
14class ZMQError(ZMQBaseError):
15 """Wrap an errno style error.
17 Parameters
18 ----------
19 errno : int
20 The ZMQ errno or None. If None, then ``zmq_errno()`` is called and
21 used.
22 msg : string
23 Description of the error or None.
24 """
26 errno: Optional[int] = None
28 def __init__(self, errno: Optional[int] = None, msg: Optional[str] = None):
29 """Wrap an errno style error.
31 Parameters
32 ----------
33 errno : int
34 The ZMQ errno or None. If None, then ``zmq_errno()`` is called and
35 used.
36 msg : string
37 Description of the error or None.
38 """
39 from zmq.backend import strerror, zmq_errno
41 if errno is None:
42 errno = zmq_errno()
43 if isinstance(errno, int):
44 self.errno = errno
45 if msg is None:
46 self.strerror = strerror(errno)
47 else:
48 self.strerror = msg
49 else:
50 if msg is None:
51 self.strerror = str(errno)
52 else:
53 self.strerror = msg
54 # flush signals, because there could be a SIGINT
55 # waiting to pounce, resulting in uncaught exceptions.
56 # Doing this here means getting SIGINT during a blocking
57 # libzmq call will raise a *catchable* KeyboardInterrupt
58 # PyErr_CheckSignals()
60 def __str__(self) -> str:
61 return self.strerror
63 def __repr__(self) -> str:
64 return f"{self.__class__.__name__}('{str(self)}')"
67class ZMQBindError(ZMQBaseError):
68 """An error for ``Socket.bind_to_random_port()``.
70 See Also
71 --------
72 .Socket.bind_to_random_port
73 """
76class NotDone(ZMQBaseError):
77 """Raised when timeout is reached while waiting for 0MQ to finish with a Message
79 See Also
80 --------
81 .MessageTracker.wait : object for tracking when ZeroMQ is done
82 """
85class ContextTerminated(ZMQError):
86 """Wrapper for zmq.ETERM
88 .. versionadded:: 13.0
89 """
91 def __init__(self, errno="ignored", msg="ignored"):
92 from zmq import ETERM
94 super().__init__(ETERM)
97class Again(ZMQError):
98 """Wrapper for zmq.EAGAIN
100 .. versionadded:: 13.0
101 """
103 def __init__(self, errno="ignored", msg="ignored"):
104 from zmq import EAGAIN
106 super().__init__(EAGAIN)
109class InterruptedSystemCall(ZMQError, InterruptedError):
110 """Wrapper for EINTR
112 This exception should be caught internally in pyzmq
113 to retry system calls, and not propagate to the user.
115 .. versionadded:: 14.7
116 """
118 errno = EINTR
120 def __init__(self, errno="ignored", msg="ignored"):
121 super().__init__(EINTR)
123 def __str__(self):
124 s = super().__str__()
125 return s + ": This call should have been retried. Please report this to pyzmq."
128def _check_rc(rc, errno=None, error_without_errno=True):
129 """internal utility for checking zmq return condition
131 and raising the appropriate Exception class
132 """
133 if rc == -1:
134 if errno is None:
135 from zmq.backend import zmq_errno
137 errno = zmq_errno()
138 if errno == 0 and not error_without_errno:
139 return
140 from zmq import EAGAIN, ETERM
142 if errno == EINTR:
143 raise InterruptedSystemCall(errno)
144 elif errno == EAGAIN:
145 raise Again(errno)
146 elif errno == ETERM:
147 raise ContextTerminated(errno)
148 else:
149 raise ZMQError(errno)
152_zmq_version_info = None
153_zmq_version = None
156class ZMQVersionError(NotImplementedError):
157 """Raised when a feature is not provided by the linked version of libzmq.
159 .. versionadded:: 14.2
160 """
162 min_version = None
164 def __init__(self, min_version: str, msg: str = "Feature"):
165 global _zmq_version
166 if _zmq_version is None:
167 from zmq import zmq_version
169 _zmq_version = zmq_version()
170 self.msg = msg
171 self.min_version = min_version
172 self.version = _zmq_version
174 def __repr__(self):
175 return "ZMQVersionError('%s')" % str(self)
177 def __str__(self):
178 return "{} requires libzmq >= {}, have {}".format(
179 self.msg,
180 self.min_version,
181 self.version,
182 )
185def _check_version(
186 min_version_info: Union[Tuple[int], Tuple[int, int], Tuple[int, int, int]],
187 msg: str = "Feature",
188):
189 """Check for libzmq
191 raises ZMQVersionError if current zmq version is not at least min_version
193 min_version_info is a tuple of integers, and will be compared against zmq.zmq_version_info().
194 """
195 global _zmq_version_info
196 if _zmq_version_info is None:
197 from zmq import zmq_version_info
199 _zmq_version_info = zmq_version_info()
200 if _zmq_version_info < min_version_info:
201 min_version = ".".join(str(v) for v in min_version_info)
202 raise ZMQVersionError(min_version, msg)
205__all__ = [
206 "ZMQBaseError",
207 "ZMQBindError",
208 "ZMQError",
209 "NotDone",
210 "ContextTerminated",
211 "InterruptedSystemCall",
212 "Again",
213 "ZMQVersionError",
214]