1# util/concurrency.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9from __future__ import annotations
10
11import asyncio # noqa
12import typing
13from typing import Any
14from typing import Callable
15from typing import Coroutine
16from typing import TypeVar
17
18have_greenlet = False
19greenlet_error = None
20try:
21 import greenlet # type: ignore[import-untyped,unused-ignore] # noqa: F401,E501
22except ImportError as e:
23 greenlet_error = str(e)
24 pass
25else:
26 have_greenlet = True
27 from ._concurrency_py3k import await_only as await_only
28 from ._concurrency_py3k import await_fallback as await_fallback
29 from ._concurrency_py3k import in_greenlet as in_greenlet
30 from ._concurrency_py3k import greenlet_spawn as greenlet_spawn
31 from ._concurrency_py3k import is_exit_exception as is_exit_exception
32 from ._concurrency_py3k import AsyncAdaptedLock as AsyncAdaptedLock
33 from ._concurrency_py3k import _Runner
34
35_T = TypeVar("_T")
36
37
38class _AsyncUtil:
39 """Asyncio util for test suite/ util only"""
40
41 def __init__(self) -> None:
42 if have_greenlet:
43 self.runner = _Runner()
44
45 def run(
46 self,
47 fn: Callable[..., Coroutine[Any, Any, _T]],
48 *args: Any,
49 **kwargs: Any,
50 ) -> _T:
51 """Run coroutine on the loop"""
52 return self.runner.run(fn(*args, **kwargs))
53
54 def run_in_greenlet(
55 self, fn: Callable[..., _T], *args: Any, **kwargs: Any
56 ) -> _T:
57 """Run sync function in greenlet. Support nested calls"""
58 if have_greenlet:
59 if self.runner.get_loop().is_running():
60 return fn(*args, **kwargs)
61 else:
62 return self.runner.run(greenlet_spawn(fn, *args, **kwargs))
63 else:
64 return fn(*args, **kwargs)
65
66 def close(self) -> None:
67 if have_greenlet:
68 self.runner.close()
69
70
71if not typing.TYPE_CHECKING and not have_greenlet:
72
73 def _not_implemented():
74 # this conditional is to prevent pylance from considering
75 # greenlet_spawn() etc as "no return" and dimming out code below it
76 if have_greenlet:
77 return None
78
79 raise ValueError(
80 "the greenlet library is required to use this function."
81 " %s" % greenlet_error
82 if greenlet_error
83 else ""
84 )
85
86 def is_exit_exception(e): # noqa: F811
87 return not isinstance(e, Exception)
88
89 def await_only(thing): # type: ignore # noqa: F811
90 _not_implemented()
91
92 def await_fallback(thing): # type: ignore # noqa: F811
93 return thing
94
95 def in_greenlet(): # type: ignore # noqa: F811
96 _not_implemented()
97
98 def greenlet_spawn(fn, *args, **kw): # type: ignore # noqa: F811
99 _not_implemented()
100
101 def AsyncAdaptedLock(*args, **kw): # type: ignore # noqa: F811
102 _not_implemented()
103
104 def _util_async_run(fn, *arg, **kw): # type: ignore # noqa: F811
105 return fn(*arg, **kw)
106
107 def _util_async_run_coroutine_function(fn, *arg, **kw): # type: ignore # noqa: F811,E501
108 _not_implemented()