Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prompt_toolkit/eventloop/async_generator.py: 29%

51 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1""" 

2Implementation for async generators. 

3""" 

4from __future__ import annotations 

5 

6from asyncio import get_running_loop 

7from contextlib import asynccontextmanager 

8from queue import Empty, Full, Queue 

9from typing import Any, AsyncGenerator, Callable, Iterable, TypeVar 

10 

11from .utils import run_in_executor_with_context 

12 

13__all__ = [ 

14 "aclosing", 

15 "generator_to_async_generator", 

16] 

17 

18_T_Generator = TypeVar("_T_Generator", bound=AsyncGenerator[Any, None]) 

19 

20 

21@asynccontextmanager 

22async def aclosing( 

23 thing: _T_Generator, 

24) -> AsyncGenerator[_T_Generator, None]: 

25 "Similar to `contextlib.aclosing`, in Python 3.10." 

26 try: 

27 yield thing 

28 finally: 

29 await thing.aclose() 

30 

31 

32# By default, choose a buffer size that's a good balance between having enough 

33# throughput, but not consuming too much memory. We use this to consume a sync 

34# generator of completions as an async generator. If the queue size is very 

35# small (like 1), consuming the completions goes really slow (when there are a 

36# lot of items). If the queue size would be unlimited or too big, this can 

37# cause overconsumption of memory, and cause CPU time spent producing items 

38# that are no longer needed (if the consumption of the async generator stops at 

39# some point). We need a fixed size in order to get some back pressure from the 

40# async consumer to the sync producer. We choose 1000 by default here. If we 

41# have around 50k completions, measurements show that 1000 is still 

42# significantly faster than a buffer of 100. 

43DEFAULT_BUFFER_SIZE: int = 1000 

44 

45_T = TypeVar("_T") 

46 

47 

48class _Done: 

49 pass 

50 

51 

52async def generator_to_async_generator( 

53 get_iterable: Callable[[], Iterable[_T]], 

54 buffer_size: int = DEFAULT_BUFFER_SIZE, 

55) -> AsyncGenerator[_T, None]: 

56 """ 

57 Turn a generator or iterable into an async generator. 

58 

59 This works by running the generator in a background thread. 

60 

61 :param get_iterable: Function that returns a generator or iterable when 

62 called. 

63 :param buffer_size: Size of the queue between the async consumer and the 

64 synchronous generator that produces items. 

65 """ 

66 quitting = False 

67 # NOTE: We are limiting the queue size in order to have back-pressure. 

68 q: Queue[_T | _Done] = Queue(maxsize=buffer_size) 

69 loop = get_running_loop() 

70 

71 def runner() -> None: 

72 """ 

73 Consume the generator in background thread. 

74 When items are received, they'll be pushed to the queue. 

75 """ 

76 try: 

77 for item in get_iterable(): 

78 # When this async generator was cancelled (closed), stop this 

79 # thread. 

80 if quitting: 

81 return 

82 

83 while True: 

84 try: 

85 q.put(item, timeout=1) 

86 except Full: 

87 if quitting: 

88 return 

89 continue 

90 else: 

91 break 

92 

93 finally: 

94 while True: 

95 try: 

96 q.put(_Done(), timeout=1) 

97 except Full: 

98 if quitting: 

99 return 

100 continue 

101 else: 

102 break 

103 

104 # Start background thread. 

105 runner_f = run_in_executor_with_context(runner) 

106 

107 try: 

108 while True: 

109 try: 

110 item = q.get_nowait() 

111 except Empty: 

112 item = await loop.run_in_executor(None, q.get) 

113 if isinstance(item, _Done): 

114 break 

115 else: 

116 yield item 

117 finally: 

118 # When this async generator is closed (GeneratorExit exception, stop 

119 # the background thread as well. - we don't need that anymore.) 

120 quitting = True 

121 

122 # Wait for the background thread to finish. (should happen right after 

123 # the last item is yielded). 

124 await runner_f