Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/mock/backports.py: 7%

56 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:45 +0000

1import sys 

2 

3 

4if sys.version_info[:2] < (3, 8): 

5 

6 import asyncio, functools 

7 from asyncio.coroutines import _is_coroutine 

8 from inspect import ismethod, isfunction, CO_COROUTINE 

9 from unittest import TestCase 

10 

11 def _unwrap_partial(func): 

12 while isinstance(func, functools.partial): 

13 func = func.func 

14 return func 

15 

16 def _has_code_flag(f, flag): 

17 """Return true if ``f`` is a function (or a method or functools.partial 

18 wrapper wrapping a function) whose code object has the given ``flag`` 

19 set in its flags.""" 

20 while ismethod(f): 

21 f = f.__func__ 

22 f = _unwrap_partial(f) 

23 if not isfunction(f): 

24 return False 

25 return bool(f.__code__.co_flags & flag) 

26 

27 def iscoroutinefunction(obj): 

28 """Return true if the object is a coroutine function. 

29 

30 Coroutine functions are defined with "async def" syntax. 

31 """ 

32 return ( 

33 _has_code_flag(obj, CO_COROUTINE) or 

34 getattr(obj, '_is_coroutine', None) is _is_coroutine 

35 ) 

36 

37 

38 class IsolatedAsyncioTestCase(TestCase): 

39 

40 def __init__(self, methodName='runTest'): 

41 super().__init__(methodName) 

42 self._asyncioTestLoop = None 

43 self._asyncioCallsQueue = None 

44 

45 async def _asyncioLoopRunner(self, fut): 

46 self._asyncioCallsQueue = queue = asyncio.Queue() 

47 fut.set_result(None) 

48 while True: 

49 query = await queue.get() 

50 queue.task_done() 

51 assert query is None 

52 

53 def _setupAsyncioLoop(self): 

54 assert self._asyncioTestLoop is None 

55 loop = asyncio.new_event_loop() 

56 asyncio.set_event_loop(loop) 

57 loop.set_debug(True) 

58 self._asyncioTestLoop = loop 

59 fut = loop.create_future() 

60 self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut)) 

61 loop.run_until_complete(fut) 

62 

63 def _tearDownAsyncioLoop(self): 

64 assert self._asyncioTestLoop is not None 

65 loop = self._asyncioTestLoop 

66 self._asyncioTestLoop = None 

67 self._asyncioCallsQueue.put_nowait(None) 

68 loop.run_until_complete(self._asyncioCallsQueue.join()) 

69 

70 try: 

71 # shutdown asyncgens 

72 loop.run_until_complete(loop.shutdown_asyncgens()) 

73 finally: 

74 asyncio.set_event_loop(None) 

75 loop.close() 

76 

77 def run(self, result=None): 

78 self._setupAsyncioLoop() 

79 try: 

80 return super().run(result) 

81 finally: 

82 self._tearDownAsyncioLoop() 

83 

84 

85else: 

86 

87 from asyncio import iscoroutinefunction 

88 from unittest import IsolatedAsyncioTestCase 

89