Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/util/_concurrency_py3k.py: 32%
74 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# util/_concurrency_py3k.py
2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8import asyncio
9import sys
10from typing import Any
11from typing import Callable
12from typing import Coroutine
14import greenlet
16from . import compat
17from .langhelpers import memoized_property
18from .. import exc
20# If greenlet.gr_context is present in current version of greenlet,
21# it will be set with the current context on creation.
22# Refs: https://github.com/python-greenlet/greenlet/pull/198
23_has_gr_context = hasattr(greenlet.getcurrent(), "gr_context")
26def is_exit_exception(e):
27 # note asyncio.CancelledError is already BaseException
28 # so was an exit exception in any case
29 return not isinstance(e, Exception) or isinstance(
30 e, (asyncio.TimeoutError, asyncio.CancelledError)
31 )
34# implementation based on snaury gist at
35# https://gist.github.com/snaury/202bf4f22c41ca34e56297bae5f33fef
36# Issue for context: https://github.com/python-greenlet/greenlet/issues/173
39class _AsyncIoGreenlet(greenlet.greenlet):
40 def __init__(self, fn, driver):
41 greenlet.greenlet.__init__(self, fn, driver)
42 self.driver = driver
43 if _has_gr_context:
44 self.gr_context = driver.gr_context
47def await_only(awaitable: Coroutine) -> Any:
48 """Awaits an async function in a sync method.
50 The sync method must be inside a :func:`greenlet_spawn` context.
51 :func:`await_only` calls cannot be nested.
53 :param awaitable: The coroutine to call.
55 """
56 # this is called in the context greenlet while running fn
57 current = greenlet.getcurrent()
58 if not isinstance(current, _AsyncIoGreenlet):
59 raise exc.MissingGreenlet(
60 "greenlet_spawn has not been called; can't call await_only() "
61 "here. Was IO attempted in an unexpected place?"
62 )
64 # returns the control to the driver greenlet passing it
65 # a coroutine to run. Once the awaitable is done, the driver greenlet
66 # switches back to this greenlet with the result of awaitable that is
67 # then returned to the caller (or raised as error)
68 return current.driver.switch(awaitable)
71def await_fallback(awaitable: Coroutine) -> Any:
72 """Awaits an async function in a sync method.
74 The sync method must be inside a :func:`greenlet_spawn` context.
75 :func:`await_fallback` calls cannot be nested.
77 :param awaitable: The coroutine to call.
79 """
80 # this is called in the context greenlet while running fn
81 current = greenlet.getcurrent()
82 if not isinstance(current, _AsyncIoGreenlet):
83 loop = get_event_loop()
84 if loop.is_running():
85 raise exc.MissingGreenlet(
86 "greenlet_spawn has not been called and asyncio event "
87 "loop is already running; can't call await_fallback() here. "
88 "Was IO attempted in an unexpected place?"
89 )
90 return loop.run_until_complete(awaitable)
92 return current.driver.switch(awaitable)
95async def greenlet_spawn(
96 fn: Callable, *args, _require_await=False, **kwargs
97) -> Any:
98 """Runs a sync function ``fn`` in a new greenlet.
100 The sync function can then use :func:`await_only` to wait for async
101 functions.
103 :param fn: The sync callable to call.
104 :param \\*args: Positional arguments to pass to the ``fn`` callable.
105 :param \\*\\*kwargs: Keyword arguments to pass to the ``fn`` callable.
106 """
108 context = _AsyncIoGreenlet(fn, greenlet.getcurrent())
109 # runs the function synchronously in gl greenlet. If the execution
110 # is interrupted by await_only, context is not dead and result is a
111 # coroutine to wait. If the context is dead the function has
112 # returned, and its result can be returned.
113 switch_occurred = False
114 try:
115 result = context.switch(*args, **kwargs)
116 while not context.dead:
117 switch_occurred = True
118 try:
119 # wait for a coroutine from await_only and then return its
120 # result back to it.
121 value = await result
122 except BaseException:
123 # this allows an exception to be raised within
124 # the moderated greenlet so that it can continue
125 # its expected flow.
126 result = context.throw(*sys.exc_info())
127 else:
128 result = context.switch(value)
129 finally:
130 # clean up to avoid cycle resolution by gc
131 del context.driver
132 if _require_await and not switch_occurred:
133 raise exc.AwaitRequired(
134 "The current operation required an async execution but none was "
135 "detected. This will usually happen when using a non compatible "
136 "DBAPI driver. Please ensure that an async DBAPI is used."
137 )
138 return result
141class AsyncAdaptedLock:
142 @memoized_property
143 def mutex(self):
144 # there should not be a race here for coroutines creating the
145 # new lock as we are not using await, so therefore no concurrency
146 return asyncio.Lock()
148 def __enter__(self):
149 # await is used to acquire the lock only after the first calling
150 # coroutine has created the mutex.
151 await_fallback(self.mutex.acquire())
152 return self
154 def __exit__(self, *arg, **kw):
155 self.mutex.release()
158def _util_async_run_coroutine_function(fn, *args, **kwargs):
159 """for test suite/ util only"""
161 loop = get_event_loop()
162 if loop.is_running():
163 raise Exception(
164 "for async run coroutine we expect that no greenlet or event "
165 "loop is running when we start out"
166 )
167 return loop.run_until_complete(fn(*args, **kwargs))
170def _util_async_run(fn, *args, **kwargs):
171 """for test suite/ util only"""
173 loop = get_event_loop()
174 if not loop.is_running():
175 return loop.run_until_complete(greenlet_spawn(fn, *args, **kwargs))
176 else:
177 # allow for a wrapped test function to call another
178 assert isinstance(greenlet.getcurrent(), _AsyncIoGreenlet)
179 return fn(*args, **kwargs)
182def get_event_loop():
183 """vendor asyncio.get_event_loop() for python 3.7 and above.
185 Python 3.10 deprecates get_event_loop() as a standalone.
187 """
188 if compat.py37:
189 try:
190 return asyncio.get_running_loop()
191 except RuntimeError:
192 return asyncio.get_event_loop_policy().get_event_loop()
193 else:
194 return asyncio.get_event_loop()