1"""Future-related utils"""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5import sys
6from concurrent.futures import Future
7from threading import Event
8
9from tornado.log import app_log
10
11
12class MessageFuture(Future):
13 """Future class to wrap async messages"""
14
15 def __init__(self, msg_id, header=None, *, track=False):
16 super().__init__()
17 self.msg_id = msg_id
18 self.header = header or {"msg_type": "unknown_request"}
19 self._evt = Event()
20 self.track = track
21 self._tracker = None
22 self.tracker = Future()
23 self.iopub_callbacks = []
24 if not track:
25 self.tracker.set_result(None)
26 self.add_done_callback(lambda f: self._evt.set())
27
28 def wait(self, timeout=None):
29 if not self.done():
30 return self._evt.wait(timeout)
31 return True
32
33
34# The following are from tornado 5.0b1
35# avoids hang using gen.multi_future on asyncio,
36# because Futures cannot be created in another thread
37
38
39def future_set_result_unless_cancelled(future, value):
40 """Set the given ``value`` as the `Future`'s result, if not cancelled.
41
42 Avoids asyncio.InvalidStateError when calling set_result() on
43 a cancelled `asyncio.Future`.
44
45 .. versionadded:: 5.0
46 """
47 if not future.cancelled():
48 future.set_result(value)
49
50
51def future_set_exc_info(future, exc_info):
52 """Set the given ``exc_info`` as the `Future`'s exception.
53
54 Understands both `asyncio.Future` and Tornado's extensions to
55 enable better tracebacks on Python 2.
56
57 .. versionadded:: 5.0
58 """
59 if hasattr(future, 'set_exc_info'):
60 # Tornado's Future
61 future.set_exc_info(exc_info)
62 else:
63 # asyncio.Future
64 future.set_exception(exc_info[1])
65
66
67def future_add_done_callback(future, callback):
68 """Arrange to call ``callback`` when ``future`` is complete.
69
70 ``callback`` is invoked with one argument, the ``future``.
71
72 If ``future`` is already done, ``callback`` is invoked immediately.
73 This may differ from the behavior of ``Future.add_done_callback``,
74 which makes no such guarantee.
75
76 .. versionadded:: 5.0
77 """
78 if future.done():
79 callback(future)
80 else:
81 future.add_done_callback(callback)
82
83
84def multi_future(children):
85 """Wait for multiple asynchronous futures in parallel.
86
87 This function is similar to `multi`, but does not support
88 `YieldPoints <YieldPoint>`.
89
90 .. versionadded:: 4.0
91 """
92 unfinished_children = set(children)
93
94 future = Future()
95 if not children:
96 future_set_result_unless_cancelled(future, [])
97
98 def callback(f):
99 unfinished_children.remove(f)
100 if not unfinished_children:
101 result_list = []
102 for f in children:
103 try:
104 result_list.append(f.result())
105 except Exception as e:
106 if future.done():
107 app_log.error(
108 "Multiple exceptions in yield list", exc_info=True
109 )
110 else:
111 future_set_exc_info(future, sys.exc_info())
112 if not future.done():
113 future_set_result_unless_cancelled(future, result_list)
114
115 listening = set()
116 for f in children:
117 if f not in listening:
118 listening.add(f)
119 future_add_done_callback(f, callback)
120 return future