Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/zmq/error.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

79 statements  

1"""0MQ Error classes and functions.""" 

2 

3# Copyright (C) PyZMQ Developers 

4# Distributed under the terms of the Modified BSD License. 

5from __future__ import annotations 

6 

7from errno import EINTR 

8 

9 

10class ZMQBaseError(Exception): 

11 """Base exception class for 0MQ errors in Python.""" 

12 

13 

14class ZMQError(ZMQBaseError): 

15 """Wrap an errno style error. 

16 

17 Parameters 

18 ---------- 

19 errno : int 

20 The ZMQ errno or None. If None, then ``zmq_errno()`` is called and 

21 used. 

22 msg : str 

23 Description of the error or None. 

24 """ 

25 

26 errno: int | None = None 

27 strerror: str 

28 

29 def __init__(self, errno: int | None = None, msg: str | None = None): 

30 """Wrap an errno style error. 

31 

32 Parameters 

33 ---------- 

34 errno : int 

35 The ZMQ errno or None. If None, then ``zmq_errno()`` is called and 

36 used. 

37 msg : string 

38 Description of the error or None. 

39 """ 

40 from zmq.backend import strerror, zmq_errno 

41 

42 if errno is None: 

43 errno = zmq_errno() 

44 if isinstance(errno, int): 

45 self.errno = errno 

46 if msg is None: 

47 self.strerror = strerror(errno) 

48 else: 

49 self.strerror = msg 

50 else: 

51 if msg is None: 

52 self.strerror = str(errno) 

53 else: 

54 self.strerror = msg 

55 # flush signals, because there could be a SIGINT 

56 # waiting to pounce, resulting in uncaught exceptions. 

57 # Doing this here means getting SIGINT during a blocking 

58 # libzmq call will raise a *catchable* KeyboardInterrupt 

59 # PyErr_CheckSignals() 

60 

61 def __str__(self) -> str: 

62 return self.strerror 

63 

64 def __repr__(self) -> str: 

65 return f"{self.__class__.__name__}('{str(self)}')" 

66 

67 

68class ZMQBindError(ZMQBaseError): 

69 """An error for ``Socket.bind_to_random_port()``. 

70 

71 See Also 

72 -------- 

73 .Socket.bind_to_random_port 

74 """ 

75 

76 

77class NotDone(ZMQBaseError): 

78 """Raised when timeout is reached while waiting for 0MQ to finish with a Message 

79 

80 See Also 

81 -------- 

82 .MessageTracker.wait : object for tracking when ZeroMQ is done 

83 """ 

84 

85 

86class ContextTerminated(ZMQError): 

87 """Wrapper for zmq.ETERM 

88 

89 .. versionadded:: 13.0 

90 """ 

91 

92 def __init__(self, errno="ignored", msg="ignored"): 

93 from zmq import ETERM 

94 

95 super().__init__(ETERM) 

96 

97 

98class Again(ZMQError): 

99 """Wrapper for zmq.EAGAIN 

100 

101 .. versionadded:: 13.0 

102 """ 

103 

104 def __init__(self, errno="ignored", msg="ignored"): 

105 from zmq import EAGAIN 

106 

107 super().__init__(EAGAIN) 

108 

109 

110class InterruptedSystemCall(ZMQError, InterruptedError): 

111 """Wrapper for EINTR 

112 

113 This exception should be caught internally in pyzmq 

114 to retry system calls, and not propagate to the user. 

115 

116 .. versionadded:: 14.7 

117 """ 

118 

119 errno = EINTR 

120 strerror: str 

121 

122 def __init__(self, errno="ignored", msg="ignored"): 

123 super().__init__(EINTR) 

124 

125 def __str__(self): 

126 s = super().__str__() 

127 return s + ": This call should have been retried. Please report this to pyzmq." 

128 

129 

130def _check_rc(rc, errno=None, error_without_errno=True): 

131 """internal utility for checking zmq return condition 

132 

133 and raising the appropriate Exception class 

134 """ 

135 if rc == -1: 

136 if errno is None: 

137 from zmq.backend import zmq_errno 

138 

139 errno = zmq_errno() 

140 if errno == 0 and not error_without_errno: 

141 return 

142 from zmq import EAGAIN, ETERM 

143 

144 if errno == EINTR: 

145 raise InterruptedSystemCall(errno) 

146 elif errno == EAGAIN: 

147 raise Again(errno) 

148 elif errno == ETERM: 

149 raise ContextTerminated(errno) 

150 else: 

151 raise ZMQError(errno) 

152 

153 

154_zmq_version_info = None 

155_zmq_version = None 

156 

157 

158class ZMQVersionError(NotImplementedError): 

159 """Raised when a feature is not provided by the linked version of libzmq. 

160 

161 .. versionadded:: 14.2 

162 """ 

163 

164 min_version = None 

165 

166 def __init__(self, min_version: str, msg: str = "Feature"): 

167 global _zmq_version 

168 if _zmq_version is None: 

169 from zmq import zmq_version 

170 

171 _zmq_version = zmq_version() 

172 self.msg = msg 

173 self.min_version = min_version 

174 self.version = _zmq_version 

175 

176 def __repr__(self): 

177 return f"ZMQVersionError('{str(self)}')" 

178 

179 def __str__(self): 

180 return f"{self.msg} requires libzmq >= {self.min_version}, have {self.version}" 

181 

182 

183def _check_version( 

184 min_version_info: tuple[int] | tuple[int, int] | tuple[int, int, int], 

185 msg: str = "Feature", 

186): 

187 """Check for libzmq 

188 

189 raises ZMQVersionError if current zmq version is not at least min_version 

190 

191 min_version_info is a tuple of integers, and will be compared against zmq.zmq_version_info(). 

192 """ 

193 global _zmq_version_info 

194 if _zmq_version_info is None: 

195 from zmq import zmq_version_info 

196 

197 _zmq_version_info = zmq_version_info() 

198 if _zmq_version_info < min_version_info: 

199 min_version = ".".join(str(v) for v in min_version_info) 

200 raise ZMQVersionError(min_version, msg) 

201 

202 

203__all__ = [ 

204 "ZMQBaseError", 

205 "ZMQBindError", 

206 "ZMQError", 

207 "NotDone", 

208 "ContextTerminated", 

209 "InterruptedSystemCall", 

210 "Again", 

211 "ZMQVersionError", 

212]