Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/util/concurrency.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

60 statements  

1# util/concurrency.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9from __future__ import annotations 

10 

11import asyncio # noqa 

12import typing 

13from typing import Any 

14from typing import Callable 

15from typing import Coroutine 

16from typing import TypeVar 

17 

18have_greenlet = False 

19greenlet_error = None 

20try: 

21 import greenlet # type: ignore[import-untyped,unused-ignore] # noqa: F401,E501 

22except ImportError as e: 

23 greenlet_error = str(e) 

24 pass 

25else: 

26 have_greenlet = True 

27 from ._concurrency_py3k import await_only as await_only 

28 from ._concurrency_py3k import await_fallback as await_fallback 

29 from ._concurrency_py3k import in_greenlet as in_greenlet 

30 from ._concurrency_py3k import greenlet_spawn as greenlet_spawn 

31 from ._concurrency_py3k import is_exit_exception as is_exit_exception 

32 from ._concurrency_py3k import AsyncAdaptedLock as AsyncAdaptedLock 

33 from ._concurrency_py3k import _Runner 

34 

35_T = TypeVar("_T") 

36 

37 

38class _AsyncUtil: 

39 """Asyncio util for test suite/ util only""" 

40 

41 def __init__(self) -> None: 

42 if have_greenlet: 

43 self.runner = _Runner() 

44 

45 def run( 

46 self, 

47 fn: Callable[..., Coroutine[Any, Any, _T]], 

48 *args: Any, 

49 **kwargs: Any, 

50 ) -> _T: 

51 """Run coroutine on the loop""" 

52 return self.runner.run(fn(*args, **kwargs)) 

53 

54 def run_in_greenlet( 

55 self, fn: Callable[..., _T], *args: Any, **kwargs: Any 

56 ) -> _T: 

57 """Run sync function in greenlet. Support nested calls""" 

58 if have_greenlet: 

59 if self.runner.get_loop().is_running(): 

60 return fn(*args, **kwargs) 

61 else: 

62 return self.runner.run(greenlet_spawn(fn, *args, **kwargs)) 

63 else: 

64 return fn(*args, **kwargs) 

65 

66 def close(self) -> None: 

67 if have_greenlet: 

68 self.runner.close() 

69 

70 

71if not typing.TYPE_CHECKING and not have_greenlet: 

72 

73 def _not_implemented(): 

74 # this conditional is to prevent pylance from considering 

75 # greenlet_spawn() etc as "no return" and dimming out code below it 

76 if have_greenlet: 

77 return None 

78 

79 raise ValueError( 

80 "the greenlet library is required to use this function." 

81 " %s" % greenlet_error 

82 if greenlet_error 

83 else "" 

84 ) 

85 

86 def is_exit_exception(e): # noqa: F811 

87 return not isinstance(e, Exception) 

88 

89 def await_only(thing): # type: ignore # noqa: F811 

90 _not_implemented() 

91 

92 def await_fallback(thing): # type: ignore # noqa: F811 

93 return thing 

94 

95 def in_greenlet(): # type: ignore # noqa: F811 

96 _not_implemented() 

97 

98 def greenlet_spawn(fn, *args, **kw): # type: ignore # noqa: F811 

99 _not_implemented() 

100 

101 def AsyncAdaptedLock(*args, **kw): # type: ignore # noqa: F811 

102 _not_implemented() 

103 

104 def _util_async_run(fn, *arg, **kw): # type: ignore # noqa: F811 

105 return fn(*arg, **kw) 

106 

107 def _util_async_run_coroutine_function(fn, *arg, **kw): # type: ignore # noqa: F811,E501 

108 _not_implemented()