Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py: 32%

74 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# util/_concurrency_py3k.py 

2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8import asyncio 

9import sys 

10from typing import Any 

11from typing import Callable 

12from typing import Coroutine 

13 

14import greenlet 

15 

16from . import compat 

17from .langhelpers import memoized_property 

18from .. import exc 

19 

20# If greenlet.gr_context is present in current version of greenlet, 

21# it will be set with the current context on creation. 

22# Refs: https://github.com/python-greenlet/greenlet/pull/198 

23_has_gr_context = hasattr(greenlet.getcurrent(), "gr_context") 

24 

25 

26def is_exit_exception(e): 

27 # note asyncio.CancelledError is already BaseException 

28 # so was an exit exception in any case 

29 return not isinstance(e, Exception) or isinstance( 

30 e, (asyncio.TimeoutError, asyncio.CancelledError) 

31 ) 

32 

33 

34# implementation based on snaury gist at 

35# https://gist.github.com/snaury/202bf4f22c41ca34e56297bae5f33fef 

36# Issue for context: https://github.com/python-greenlet/greenlet/issues/173 

37 

38 

39class _AsyncIoGreenlet(greenlet.greenlet): 

40 def __init__(self, fn, driver): 

41 greenlet.greenlet.__init__(self, fn, driver) 

42 self.driver = driver 

43 if _has_gr_context: 

44 self.gr_context = driver.gr_context 

45 

46 

47def await_only(awaitable: Coroutine) -> Any: 

48 """Awaits an async function in a sync method. 

49 

50 The sync method must be inside a :func:`greenlet_spawn` context. 

51 :func:`await_only` calls cannot be nested. 

52 

53 :param awaitable: The coroutine to call. 

54 

55 """ 

56 # this is called in the context greenlet while running fn 

57 current = greenlet.getcurrent() 

58 if not isinstance(current, _AsyncIoGreenlet): 

59 raise exc.MissingGreenlet( 

60 "greenlet_spawn has not been called; can't call await_only() " 

61 "here. Was IO attempted in an unexpected place?" 

62 ) 

63 

64 # returns the control to the driver greenlet passing it 

65 # a coroutine to run. Once the awaitable is done, the driver greenlet 

66 # switches back to this greenlet with the result of awaitable that is 

67 # then returned to the caller (or raised as error) 

68 return current.driver.switch(awaitable) 

69 

70 

71def await_fallback(awaitable: Coroutine) -> Any: 

72 """Awaits an async function in a sync method. 

73 

74 The sync method must be inside a :func:`greenlet_spawn` context. 

75 :func:`await_fallback` calls cannot be nested. 

76 

77 :param awaitable: The coroutine to call. 

78 

79 """ 

80 # this is called in the context greenlet while running fn 

81 current = greenlet.getcurrent() 

82 if not isinstance(current, _AsyncIoGreenlet): 

83 loop = get_event_loop() 

84 if loop.is_running(): 

85 raise exc.MissingGreenlet( 

86 "greenlet_spawn has not been called and asyncio event " 

87 "loop is already running; can't call await_fallback() here. " 

88 "Was IO attempted in an unexpected place?" 

89 ) 

90 return loop.run_until_complete(awaitable) 

91 

92 return current.driver.switch(awaitable) 

93 

94 

95async def greenlet_spawn( 

96 fn: Callable, *args, _require_await=False, **kwargs 

97) -> Any: 

98 """Runs a sync function ``fn`` in a new greenlet. 

99 

100 The sync function can then use :func:`await_only` to wait for async 

101 functions. 

102 

103 :param fn: The sync callable to call. 

104 :param \\*args: Positional arguments to pass to the ``fn`` callable. 

105 :param \\*\\*kwargs: Keyword arguments to pass to the ``fn`` callable. 

106 """ 

107 

108 context = _AsyncIoGreenlet(fn, greenlet.getcurrent()) 

109 # runs the function synchronously in gl greenlet. If the execution 

110 # is interrupted by await_only, context is not dead and result is a 

111 # coroutine to wait. If the context is dead the function has 

112 # returned, and its result can be returned. 

113 switch_occurred = False 

114 try: 

115 result = context.switch(*args, **kwargs) 

116 while not context.dead: 

117 switch_occurred = True 

118 try: 

119 # wait for a coroutine from await_only and then return its 

120 # result back to it. 

121 value = await result 

122 except BaseException: 

123 # this allows an exception to be raised within 

124 # the moderated greenlet so that it can continue 

125 # its expected flow. 

126 result = context.throw(*sys.exc_info()) 

127 else: 

128 result = context.switch(value) 

129 finally: 

130 # clean up to avoid cycle resolution by gc 

131 del context.driver 

132 if _require_await and not switch_occurred: 

133 raise exc.AwaitRequired( 

134 "The current operation required an async execution but none was " 

135 "detected. This will usually happen when using a non compatible " 

136 "DBAPI driver. Please ensure that an async DBAPI is used." 

137 ) 

138 return result 

139 

140 

141class AsyncAdaptedLock: 

142 @memoized_property 

143 def mutex(self): 

144 # there should not be a race here for coroutines creating the 

145 # new lock as we are not using await, so therefore no concurrency 

146 return asyncio.Lock() 

147 

148 def __enter__(self): 

149 # await is used to acquire the lock only after the first calling 

150 # coroutine has created the mutex. 

151 await_fallback(self.mutex.acquire()) 

152 return self 

153 

154 def __exit__(self, *arg, **kw): 

155 self.mutex.release() 

156 

157 

158def _util_async_run_coroutine_function(fn, *args, **kwargs): 

159 """for test suite/ util only""" 

160 

161 loop = get_event_loop() 

162 if loop.is_running(): 

163 raise Exception( 

164 "for async run coroutine we expect that no greenlet or event " 

165 "loop is running when we start out" 

166 ) 

167 return loop.run_until_complete(fn(*args, **kwargs)) 

168 

169 

170def _util_async_run(fn, *args, **kwargs): 

171 """for test suite/ util only""" 

172 

173 loop = get_event_loop() 

174 if not loop.is_running(): 

175 return loop.run_until_complete(greenlet_spawn(fn, *args, **kwargs)) 

176 else: 

177 # allow for a wrapped test function to call another 

178 assert isinstance(greenlet.getcurrent(), _AsyncIoGreenlet) 

179 return fn(*args, **kwargs) 

180 

181 

182def get_event_loop(): 

183 """vendor asyncio.get_event_loop() for python 3.7 and above. 

184 

185 Python 3.10 deprecates get_event_loop() as a standalone. 

186 

187 """ 

188 if compat.py37: 

189 try: 

190 return asyncio.get_running_loop() 

191 except RuntimeError: 

192 return asyncio.get_event_loop_policy().get_event_loop() 

193 else: 

194 return asyncio.get_event_loop()