1# util/concurrency.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""asyncio-related concurrency functions."""
10
11from __future__ import annotations
12
13import asyncio # noqa
14import typing
15from typing import Any
16from typing import Callable
17from typing import Coroutine
18from typing import TypeVar
19
20have_greenlet = False
21greenlet_error = None
22try:
23 import greenlet # type: ignore[import-untyped,unused-ignore] # noqa: F401,E501
24except ImportError as e:
25 greenlet_error = str(e)
26 pass
27else:
28 have_greenlet = True
29 from ._concurrency_py3k import await_only as await_only
30 from ._concurrency_py3k import await_fallback as await_fallback
31 from ._concurrency_py3k import in_greenlet as in_greenlet
32 from ._concurrency_py3k import greenlet_spawn as greenlet_spawn
33 from ._concurrency_py3k import is_exit_exception as is_exit_exception
34 from ._concurrency_py3k import AsyncAdaptedLock as AsyncAdaptedLock
35 from ._concurrency_py3k import _Runner
36
37_T = TypeVar("_T")
38
39
40class _AsyncUtil:
41 """Asyncio util for test suite/ util only"""
42
43 def __init__(self) -> None:
44 if have_greenlet:
45 self.runner = _Runner()
46
47 def run(
48 self,
49 fn: Callable[..., Coroutine[Any, Any, _T]],
50 *args: Any,
51 **kwargs: Any,
52 ) -> _T:
53 """Run coroutine on the loop"""
54 return self.runner.run(fn(*args, **kwargs))
55
56 def run_in_greenlet(
57 self, fn: Callable[..., _T], *args: Any, **kwargs: Any
58 ) -> _T:
59 """Run sync function in greenlet. Support nested calls"""
60 if have_greenlet:
61 if self.runner.get_loop().is_running():
62 return fn(*args, **kwargs)
63 else:
64 return self.runner.run(greenlet_spawn(fn, *args, **kwargs))
65 else:
66 return fn(*args, **kwargs)
67
68 def close(self) -> None:
69 if have_greenlet:
70 self.runner.close()
71
72
73if not typing.TYPE_CHECKING and not have_greenlet:
74
75 def _not_implemented():
76 # this conditional is to prevent pylance from considering
77 # greenlet_spawn() etc as "no return" and dimming out code below it
78 if have_greenlet:
79 return None
80
81 raise ValueError(
82 "the greenlet library is required to use this function."
83 " %s" % greenlet_error
84 if greenlet_error
85 else ""
86 )
87
88 def is_exit_exception(e): # noqa: F811
89 return not isinstance(e, Exception)
90
91 def await_only(thing): # type: ignore # noqa: F811
92 _not_implemented()
93
94 def await_fallback(thing): # type: ignore # noqa: F811
95 return thing
96
97 def in_greenlet(): # type: ignore # noqa: F811
98 _not_implemented()
99
100 def greenlet_spawn(fn, *args, **kw): # type: ignore # noqa: F811
101 _not_implemented()
102
103 def AsyncAdaptedLock(*args, **kw): # type: ignore # noqa: F811
104 _not_implemented()
105
106 def _util_async_run(fn, *arg, **kw): # type: ignore # noqa: F811
107 return fn(*arg, **kw)
108
109 def _util_async_run_coroutine_function(fn, *arg, **kw): # type: ignore # noqa: F811,E501
110 _not_implemented()