1"""Classes and functions for kernel related errors and exceptions.
2
3Inheritance diagram:
4
5.. inheritance-diagram:: ipyparallel.error
6 :parts: 3
7"""
8
9import builtins
10import sys
11import traceback
12
13__docformat__ = "restructuredtext en"
14
15
16class IPythonError(Exception):
17 """Base exception that all of our exceptions inherit from.
18
19 This can be raised by code that doesn't have any more specific
20 information."""
21
22 pass
23
24
25class KernelError(IPythonError):
26 pass
27
28
29class EngineError(KernelError):
30 pass
31
32
33class NoEnginesRegistered(KernelError):
34 """Exception for operations that require some engines, but none exist"""
35
36 def __str__(self):
37 return (
38 "This operation requires engines."
39 " Try client.wait_for_engines(n) to wait for engines to register."
40 )
41
42
43class TaskAborted(KernelError):
44 pass
45
46
47class TaskTimeout(KernelError):
48 pass
49
50
51# backward-compat: use builtin TimeoutError, but preserve `error.TimeoutError` import
52TimeoutError = builtins.TimeoutError
53
54
55class UnmetDependency(KernelError):
56 pass
57
58
59class ImpossibleDependency(UnmetDependency):
60 pass
61
62
63class DependencyTimeout(ImpossibleDependency):
64 pass
65
66
67class InvalidDependency(ImpossibleDependency):
68 pass
69
70
71class RemoteError(KernelError):
72 """Error raised elsewhere"""
73
74 ename = None
75 evalue = None
76 traceback = None
77 engine_info = None
78
79 def __init__(self, ename, evalue, traceback, engine_info=None):
80 self.ename = ename
81 self.evalue = evalue
82 self.traceback = traceback
83 self.engine_info = engine_info or {}
84 self.args = (ename, evalue)
85
86 def __repr__(self):
87 engineid = self.engine_info.get('engine_id', ' ')
88 return f"<{self.__class__.__name__}[{engineid}]:{self.ename}({self.evalue})>"
89
90 def __str__(self):
91 label = self._get_engine_str(self.engine_info)
92 engineid = self.engine_info.get('engine_id', ' ')
93 return f"{label} {self.ename}: {self.evalue}"
94
95 @staticmethod
96 def _get_engine_str(engine_info):
97 if not engine_info:
98 return '[Engine Exception]'
99 else:
100 return f"[{engine_info['engine_id']}:{engine_info['method']}]"
101
102 def render_traceback(self):
103 """render traceback to a list of lines"""
104 return [self._get_engine_str(self.engine_info)] + (
105 self.traceback or "No traceback available"
106 ).splitlines()
107
108 def _render_traceback_(self):
109 """Special method for custom tracebacks within IPython.
110
111 This will be called by IPython instead of displaying the local traceback.
112
113 It should return a traceback rendered as a list of lines.
114 """
115 return self.render_traceback()
116
117 def print_traceback(self, excid=None):
118 """print my traceback"""
119 print('\n'.join(self.render_traceback()))
120
121
122class TaskRejectError(KernelError):
123 """Exception to raise when a task should be rejected by an engine.
124
125 This exception can be used to allow a task running on an engine to test
126 if the engine (or the user's namespace on the engine) has the needed
127 task dependencies. If not, the task should raise this exception. For
128 the task to be retried on another engine, the task should be created
129 with the `retries` argument > 1.
130
131 The advantage of this approach over our older properties system is that
132 tasks have full access to the user's namespace on the engines and the
133 properties don't have to be managed or tested by the controller.
134 """
135
136
137class CompositeError(RemoteError):
138 """Error for representing possibly multiple errors on engines"""
139
140 tb_limit = 4 # limit on how many tracebacks to draw
141
142 def __init__(self, message, elist):
143 Exception.__init__(self, *(message, elist))
144 # Don't use pack_exception because it will conflict with the .message
145 # attribute that is being deprecated in 2.6 and beyond.
146 self.msg = message
147 self.elist = elist
148 self.args = [e[0] for e in elist]
149
150 def _get_traceback(self, ev):
151 try:
152 tb = ev._ipython_traceback_text
153 except AttributeError:
154 return 'No traceback available'
155 else:
156 return tb
157
158 def __str__(self):
159 s = str(self.msg)
160 for en, ev, etb, ei in self.elist[: self.tb_limit]:
161 engine_str = self._get_engine_str(ei)
162 s = s + '\n' + engine_str + en + ': ' + str(ev)
163 if len(self.elist) > self.tb_limit:
164 s = s + f'\n.... {len(self.elist) - self.tb_limit} more exceptions ...'
165 return s
166
167 def __repr__(self):
168 return f"CompositeError({len(self.elist)})"
169
170 def render_traceback(self, excid=None):
171 """render one or all of my tracebacks to a list of lines"""
172 lines = []
173 if excid is None:
174 for en, ev, etb, ei in self.elist[: self.tb_limit]:
175 lines.append(self._get_engine_str(ei) + ":")
176 lines.extend((etb or 'No traceback available').splitlines())
177 lines.append('')
178 if len(self.elist) > self.tb_limit:
179 lines.append(
180 f'... {len(self.elist) - self.tb_limit} more exceptions ...'
181 )
182 else:
183 try:
184 en, ev, etb, ei = self.elist[excid]
185 except Exception:
186 raise IndexError(f"an exception with index {excid} does not exist")
187 else:
188 lines.append(self._get_engine_str(ei) + ":")
189 lines.extend((etb or 'No traceback available').splitlines())
190
191 return lines
192
193 def print_traceback(self, excid=None):
194 print('\n'.join(self.render_traceback(excid)))
195
196 def raise_exception(self, excid=0):
197 try:
198 en, ev, etb, ei = self.elist[excid]
199 except Exception:
200 raise IndexError(f"an exception with index {excid} does not exist")
201 else:
202 raise RemoteError(en, ev, etb, ei)
203
204
205class AlreadyDisplayedError(RemoteError):
206 def __init__(self, original_error):
207 self.original_error = original_error
208 self.n = len(self.original_error.elist)
209
210 def __repr__(self):
211 return f"<{self.__class__.__name__}({self.n} errors)>"
212
213 def __str__(self):
214 return f"{self.n} errors"
215
216 def render_traceback(self):
217 """IPython special method, short-circuit traceback display
218
219 for raising when streaming output has already displayed errors
220 """
221 return [str(self)]
222
223
224def collect_exceptions(rdict_or_list, method='unspecified'):
225 """check a result dict for errors, and raise CompositeError if any exist.
226 Passthrough otherwise."""
227 elist = []
228 if isinstance(rdict_or_list, dict):
229 rlist = rdict_or_list.values()
230 else:
231 rlist = rdict_or_list
232 for r in rlist:
233 if isinstance(r, RemoteError):
234 en, ev, etb, ei = r.ename, r.evalue, r.traceback, r.engine_info
235 # Sometimes we could have CompositeError in our list. Just take
236 # the errors out of them and put them in our new list. This
237 # has the effect of flattening lists of CompositeErrors into one
238 # CompositeError
239 if en == 'CompositeError':
240 for e in ev.elist:
241 elist.append(e)
242 else:
243 elist.append((en, ev, etb, ei))
244 if len(elist) == 0:
245 return rdict_or_list
246 else:
247 msg = f"one or more exceptions raised in: {method}"
248 err = CompositeError(msg, elist)
249 raise err
250
251
252def wrap_exception(engine_info={}):
253 etype, evalue, tb = sys.exc_info()
254 stb = traceback.format_exception(etype, evalue, tb)
255 exc_content = {
256 'status': 'error',
257 'traceback': stb,
258 'ename': etype.__name__,
259 'evalue': str(evalue),
260 'engine_info': engine_info,
261 }
262 return exc_content
263
264
265def unwrap_exception(content):
266 err = RemoteError(
267 content['ename'],
268 content['evalue'],
269 '\n'.join(content['traceback']),
270 content.get('engine_info', {}),
271 )
272 return err