Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipyparallel/client/futures.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

57 statements  

1"""Future-related utils""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5import sys 

6from concurrent.futures import Future 

7from threading import Event 

8 

9from tornado.log import app_log 

10 

11 

12class MessageFuture(Future): 

13 """Future class to wrap async messages""" 

14 

15 def __init__(self, msg_id, header=None, *, track=False): 

16 super().__init__() 

17 self.msg_id = msg_id 

18 self.header = header or {"msg_type": "unknown_request"} 

19 self._evt = Event() 

20 self.track = track 

21 self._tracker = None 

22 self.tracker = Future() 

23 self.iopub_callbacks = [] 

24 if not track: 

25 self.tracker.set_result(None) 

26 self.add_done_callback(lambda f: self._evt.set()) 

27 

28 def wait(self, timeout=None): 

29 if not self.done(): 

30 return self._evt.wait(timeout) 

31 return True 

32 

33 

34# The following are from tornado 5.0b1 

35# avoids hang using gen.multi_future on asyncio, 

36# because Futures cannot be created in another thread 

37 

38 

39def future_set_result_unless_cancelled(future, value): 

40 """Set the given ``value`` as the `Future`'s result, if not cancelled. 

41 

42 Avoids asyncio.InvalidStateError when calling set_result() on 

43 a cancelled `asyncio.Future`. 

44 

45 .. versionadded:: 5.0 

46 """ 

47 if not future.cancelled(): 

48 future.set_result(value) 

49 

50 

51def future_set_exc_info(future, exc_info): 

52 """Set the given ``exc_info`` as the `Future`'s exception. 

53 

54 Understands both `asyncio.Future` and Tornado's extensions to 

55 enable better tracebacks on Python 2. 

56 

57 .. versionadded:: 5.0 

58 """ 

59 if hasattr(future, 'set_exc_info'): 

60 # Tornado's Future 

61 future.set_exc_info(exc_info) 

62 else: 

63 # asyncio.Future 

64 future.set_exception(exc_info[1]) 

65 

66 

67def future_add_done_callback(future, callback): 

68 """Arrange to call ``callback`` when ``future`` is complete. 

69 

70 ``callback`` is invoked with one argument, the ``future``. 

71 

72 If ``future`` is already done, ``callback`` is invoked immediately. 

73 This may differ from the behavior of ``Future.add_done_callback``, 

74 which makes no such guarantee. 

75 

76 .. versionadded:: 5.0 

77 """ 

78 if future.done(): 

79 callback(future) 

80 else: 

81 future.add_done_callback(callback) 

82 

83 

84def multi_future(children): 

85 """Wait for multiple asynchronous futures in parallel. 

86 

87 This function is similar to `multi`, but does not support 

88 `YieldPoints <YieldPoint>`. 

89 

90 .. versionadded:: 4.0 

91 """ 

92 unfinished_children = set(children) 

93 

94 future = Future() 

95 if not children: 

96 future_set_result_unless_cancelled(future, []) 

97 

98 def callback(f): 

99 unfinished_children.remove(f) 

100 if not unfinished_children: 

101 result_list = [] 

102 for f in children: 

103 try: 

104 result_list.append(f.result()) 

105 except Exception as e: 

106 if future.done(): 

107 app_log.error( 

108 "Multiple exceptions in yield list", exc_info=True 

109 ) 

110 else: 

111 future_set_exc_info(future, sys.exc_info()) 

112 if not future.done(): 

113 future_set_result_unless_cancelled(future, result_list) 

114 

115 listening = set() 

116 for f in children: 

117 if f not in listening: 

118 listening.add(f) 

119 future_add_done_callback(f, callback) 

120 return future