Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipyparallel/_async.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

61 statements  

1"""Async utilities""" 

2 

3import asyncio 

4import concurrent.futures 

5import inspect 

6import threading 

7from functools import partial 

8 

9from tornado.ioloop import IOLoop 

10 

11from ipyparallel.util import _OutputProducingThread as Thread 

12 

13 

14def _asyncio_run(coro): 

15 """Like asyncio.run, but works when there's no event loop""" 

16 # for now: using tornado for broader compatibility with FDs, 

17 # e.g. when using the only partially functional default 

18 # Proactor on windows 

19 loop = IOLoop(make_current=False) 

20 try: 

21 return loop.run_sync(lambda: asyncio.ensure_future(coro)) 

22 finally: 

23 loop.close() 

24 

25 

26class AsyncFirst: 

27 """Wrapper class that defines synchronous `_sync` method wrappers 

28 

29 around async-native methods. 

30 

31 Every coroutine method automatically gets an `_sync` alias 

32 that runs it synchronously. 

33 """ 

34 

35 _async_thread = None 

36 

37 def _thread_main(self): 

38 loop = self._thread_loop = IOLoop(make_current=False) 

39 loop.add_callback(self._loop_started.set) 

40 loop.start() 

41 

42 def _in_thread(self, async_f, *args, **kwargs): 

43 """Run an async function in a background thread""" 

44 if self._async_thread is None: 

45 self._loop_started = threading.Event() 

46 self._async_thread = Thread(target=self._thread_main, daemon=True) 

47 self._async_thread.start() 

48 self._loop_started.wait(timeout=5) 

49 

50 future = concurrent.futures.Future() 

51 

52 async def thread_callback(): 

53 try: 

54 future.set_result(await async_f(*args, **kwargs)) 

55 except Exception as e: 

56 future.set_exception(e) 

57 

58 self._thread_loop.add_callback(thread_callback) 

59 return future.result() 

60 

61 def _synchronize(self, async_f, *args, **kwargs): 

62 """Run a method synchronously 

63 

64 Uses asyncio.run if asyncio is not running, 

65 otherwise puts it in a background thread 

66 """ 

67 try: 

68 loop = asyncio.get_running_loop() 

69 except RuntimeError: 

70 # not in a running loop 

71 loop = None 

72 if loop: 

73 return self._in_thread(async_f, *args, **kwargs) 

74 else: 

75 return _asyncio_run(async_f(*args, **kwargs)) 

76 

77 def __getattr__(self, name): 

78 if name.endswith("_sync"): 

79 # lazily define `_sync` method wrappers for coroutine methods 

80 async_name = name[:-5] 

81 async_method = super().__getattribute__(async_name) 

82 if not inspect.iscoroutinefunction(async_method): 

83 raise AttributeError(async_name) 

84 return partial(self._synchronize, async_method) 

85 return super().__getattribute__(name) 

86 

87 def __dir__(self): 

88 attrs = super().__dir__() 

89 seen = set() 

90 for cls in self.__class__.mro(): 

91 for name, value in cls.__dict__.items(): 

92 if name in seen: 

93 continue 

94 seen.add(name) 

95 if inspect.iscoroutinefunction(value): 

96 async_name = name + "_sync" 

97 attrs.append(async_name) 

98 return attrs