Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/zmq/error.py: 42%

76 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1"""0MQ Error classes and functions.""" 

2 

3# Copyright (C) PyZMQ Developers 

4# Distributed under the terms of the Modified BSD License. 

5 

6from errno import EINTR 

7from typing import Optional, Tuple, Union 

8 

9 

10class ZMQBaseError(Exception): 

11 """Base exception class for 0MQ errors in Python.""" 

12 

13 

14class ZMQError(ZMQBaseError): 

15 """Wrap an errno style error. 

16 

17 Parameters 

18 ---------- 

19 errno : int 

20 The ZMQ errno or None. If None, then ``zmq_errno()`` is called and 

21 used. 

22 msg : string 

23 Description of the error or None. 

24 """ 

25 

26 errno: Optional[int] = None 

27 

28 def __init__(self, errno: Optional[int] = None, msg: Optional[str] = None): 

29 """Wrap an errno style error. 

30 

31 Parameters 

32 ---------- 

33 errno : int 

34 The ZMQ errno or None. If None, then ``zmq_errno()`` is called and 

35 used. 

36 msg : string 

37 Description of the error or None. 

38 """ 

39 from zmq.backend import strerror, zmq_errno 

40 

41 if errno is None: 

42 errno = zmq_errno() 

43 if isinstance(errno, int): 

44 self.errno = errno 

45 if msg is None: 

46 self.strerror = strerror(errno) 

47 else: 

48 self.strerror = msg 

49 else: 

50 if msg is None: 

51 self.strerror = str(errno) 

52 else: 

53 self.strerror = msg 

54 # flush signals, because there could be a SIGINT 

55 # waiting to pounce, resulting in uncaught exceptions. 

56 # Doing this here means getting SIGINT during a blocking 

57 # libzmq call will raise a *catchable* KeyboardInterrupt 

58 # PyErr_CheckSignals() 

59 

60 def __str__(self) -> str: 

61 return self.strerror 

62 

63 def __repr__(self) -> str: 

64 return f"{self.__class__.__name__}('{str(self)}')" 

65 

66 

67class ZMQBindError(ZMQBaseError): 

68 """An error for ``Socket.bind_to_random_port()``. 

69 

70 See Also 

71 -------- 

72 .Socket.bind_to_random_port 

73 """ 

74 

75 

76class NotDone(ZMQBaseError): 

77 """Raised when timeout is reached while waiting for 0MQ to finish with a Message 

78 

79 See Also 

80 -------- 

81 .MessageTracker.wait : object for tracking when ZeroMQ is done 

82 """ 

83 

84 

85class ContextTerminated(ZMQError): 

86 """Wrapper for zmq.ETERM 

87 

88 .. versionadded:: 13.0 

89 """ 

90 

91 def __init__(self, errno="ignored", msg="ignored"): 

92 from zmq import ETERM 

93 

94 super().__init__(ETERM) 

95 

96 

97class Again(ZMQError): 

98 """Wrapper for zmq.EAGAIN 

99 

100 .. versionadded:: 13.0 

101 """ 

102 

103 def __init__(self, errno="ignored", msg="ignored"): 

104 from zmq import EAGAIN 

105 

106 super().__init__(EAGAIN) 

107 

108 

109class InterruptedSystemCall(ZMQError, InterruptedError): 

110 """Wrapper for EINTR 

111 

112 This exception should be caught internally in pyzmq 

113 to retry system calls, and not propagate to the user. 

114 

115 .. versionadded:: 14.7 

116 """ 

117 

118 errno = EINTR 

119 

120 def __init__(self, errno="ignored", msg="ignored"): 

121 super().__init__(EINTR) 

122 

123 def __str__(self): 

124 s = super().__str__() 

125 return s + ": This call should have been retried. Please report this to pyzmq." 

126 

127 

128def _check_rc(rc, errno=None, error_without_errno=True): 

129 """internal utility for checking zmq return condition 

130 

131 and raising the appropriate Exception class 

132 """ 

133 if rc == -1: 

134 if errno is None: 

135 from zmq.backend import zmq_errno 

136 

137 errno = zmq_errno() 

138 if errno == 0 and not error_without_errno: 

139 return 

140 from zmq import EAGAIN, ETERM 

141 

142 if errno == EINTR: 

143 raise InterruptedSystemCall(errno) 

144 elif errno == EAGAIN: 

145 raise Again(errno) 

146 elif errno == ETERM: 

147 raise ContextTerminated(errno) 

148 else: 

149 raise ZMQError(errno) 

150 

151 

152_zmq_version_info = None 

153_zmq_version = None 

154 

155 

156class ZMQVersionError(NotImplementedError): 

157 """Raised when a feature is not provided by the linked version of libzmq. 

158 

159 .. versionadded:: 14.2 

160 """ 

161 

162 min_version = None 

163 

164 def __init__(self, min_version: str, msg: str = "Feature"): 

165 global _zmq_version 

166 if _zmq_version is None: 

167 from zmq import zmq_version 

168 

169 _zmq_version = zmq_version() 

170 self.msg = msg 

171 self.min_version = min_version 

172 self.version = _zmq_version 

173 

174 def __repr__(self): 

175 return "ZMQVersionError('%s')" % str(self) 

176 

177 def __str__(self): 

178 return "{} requires libzmq >= {}, have {}".format( 

179 self.msg, 

180 self.min_version, 

181 self.version, 

182 ) 

183 

184 

185def _check_version( 

186 min_version_info: Union[Tuple[int], Tuple[int, int], Tuple[int, int, int]], 

187 msg: str = "Feature", 

188): 

189 """Check for libzmq 

190 

191 raises ZMQVersionError if current zmq version is not at least min_version 

192 

193 min_version_info is a tuple of integers, and will be compared against zmq.zmq_version_info(). 

194 """ 

195 global _zmq_version_info 

196 if _zmq_version_info is None: 

197 from zmq import zmq_version_info 

198 

199 _zmq_version_info = zmq_version_info() 

200 if _zmq_version_info < min_version_info: 

201 min_version = ".".join(str(v) for v in min_version_info) 

202 raise ZMQVersionError(min_version, msg) 

203 

204 

205__all__ = [ 

206 "ZMQBaseError", 

207 "ZMQBindError", 

208 "ZMQError", 

209 "NotDone", 

210 "ContextTerminated", 

211 "InterruptedSystemCall", 

212 "Again", 

213 "ZMQVersionError", 

214]