1"""Async utilities"""
2
3import asyncio
4import concurrent.futures
5import inspect
6import threading
7from functools import partial
8
9from tornado.ioloop import IOLoop
10
11from ipyparallel.util import _OutputProducingThread as Thread
12
13
14def _asyncio_run(coro):
15 """Like asyncio.run, but works when there's no event loop"""
16 # for now: using tornado for broader compatibility with FDs,
17 # e.g. when using the only partially functional default
18 # Proactor on windows
19 loop = IOLoop(make_current=False)
20 try:
21 return loop.run_sync(lambda: asyncio.ensure_future(coro))
22 finally:
23 loop.close()
24
25
26class AsyncFirst:
27 """Wrapper class that defines synchronous `_sync` method wrappers
28
29 around async-native methods.
30
31 Every coroutine method automatically gets an `_sync` alias
32 that runs it synchronously.
33 """
34
35 _async_thread = None
36
37 def _thread_main(self):
38 loop = self._thread_loop = IOLoop(make_current=False)
39 loop.add_callback(self._loop_started.set)
40 loop.start()
41
42 def _in_thread(self, async_f, *args, **kwargs):
43 """Run an async function in a background thread"""
44 if self._async_thread is None:
45 self._loop_started = threading.Event()
46 self._async_thread = Thread(target=self._thread_main, daemon=True)
47 self._async_thread.start()
48 self._loop_started.wait(timeout=5)
49
50 future = concurrent.futures.Future()
51
52 async def thread_callback():
53 try:
54 future.set_result(await async_f(*args, **kwargs))
55 except Exception as e:
56 future.set_exception(e)
57
58 self._thread_loop.add_callback(thread_callback)
59 return future.result()
60
61 def _synchronize(self, async_f, *args, **kwargs):
62 """Run a method synchronously
63
64 Uses asyncio.run if asyncio is not running,
65 otherwise puts it in a background thread
66 """
67 try:
68 loop = asyncio.get_running_loop()
69 except RuntimeError:
70 # not in a running loop
71 loop = None
72 if loop:
73 return self._in_thread(async_f, *args, **kwargs)
74 else:
75 return _asyncio_run(async_f(*args, **kwargs))
76
77 def __getattr__(self, name):
78 if name.endswith("_sync"):
79 # lazily define `_sync` method wrappers for coroutine methods
80 async_name = name[:-5]
81 async_method = super().__getattribute__(async_name)
82 if not inspect.iscoroutinefunction(async_method):
83 raise AttributeError(async_name)
84 return partial(self._synchronize, async_method)
85 return super().__getattribute__(name)
86
87 def __dir__(self):
88 attrs = super().__dir__()
89 seen = set()
90 for cls in self.__class__.mro():
91 for name, value in cls.__dict__.items():
92 if name in seen:
93 continue
94 seen.add(name)
95 if inspect.iscoroutinefunction(value):
96 async_name = name + "_sync"
97 attrs.append(async_name)
98 return attrs