Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/asyncio/queues.py: 30%

131 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:05 +0000

1__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') 

2 

3import collections 

4import heapq 

5import warnings 

6 

7from . import events 

8from . import locks 

9 

10 

11class QueueEmpty(Exception): 

12 """Raised when Queue.get_nowait() is called on an empty Queue.""" 

13 pass 

14 

15 

16class QueueFull(Exception): 

17 """Raised when the Queue.put_nowait() method is called on a full Queue.""" 

18 pass 

19 

20 

21class Queue: 

22 """A queue, useful for coordinating producer and consumer coroutines. 

23 

24 If maxsize is less than or equal to zero, the queue size is infinite. If it 

25 is an integer greater than 0, then "await put()" will block when the 

26 queue reaches maxsize, until an item is removed by get(). 

27 

28 Unlike the standard library Queue, you can reliably know this Queue's size 

29 with qsize(), since your single-threaded asyncio application won't be 

30 interrupted between calling qsize() and doing an operation on the Queue. 

31 """ 

32 

33 def __init__(self, maxsize=0, *, loop=None): 

34 if loop is None: 

35 self._loop = events.get_event_loop() 

36 else: 

37 self._loop = loop 

38 warnings.warn("The loop argument is deprecated since Python 3.8, " 

39 "and scheduled for removal in Python 3.10.", 

40 DeprecationWarning, stacklevel=2) 

41 self._maxsize = maxsize 

42 

43 # Futures. 

44 self._getters = collections.deque() 

45 # Futures. 

46 self._putters = collections.deque() 

47 self._unfinished_tasks = 0 

48 self._finished = locks.Event(loop=loop) 

49 self._finished.set() 

50 self._init(maxsize) 

51 

52 # These three are overridable in subclasses. 

53 

54 def _init(self, maxsize): 

55 self._queue = collections.deque() 

56 

57 def _get(self): 

58 return self._queue.popleft() 

59 

60 def _put(self, item): 

61 self._queue.append(item) 

62 

63 # End of the overridable methods. 

64 

65 def _wakeup_next(self, waiters): 

66 # Wake up the next waiter (if any) that isn't cancelled. 

67 while waiters: 

68 waiter = waiters.popleft() 

69 if not waiter.done(): 

70 waiter.set_result(None) 

71 break 

72 

73 def __repr__(self): 

74 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' 

75 

76 def __str__(self): 

77 return f'<{type(self).__name__} {self._format()}>' 

78 

79 def __class_getitem__(cls, type): 

80 return cls 

81 

82 def _format(self): 

83 result = f'maxsize={self._maxsize!r}' 

84 if getattr(self, '_queue', None): 

85 result += f' _queue={list(self._queue)!r}' 

86 if self._getters: 

87 result += f' _getters[{len(self._getters)}]' 

88 if self._putters: 

89 result += f' _putters[{len(self._putters)}]' 

90 if self._unfinished_tasks: 

91 result += f' tasks={self._unfinished_tasks}' 

92 return result 

93 

94 def qsize(self): 

95 """Number of items in the queue.""" 

96 return len(self._queue) 

97 

98 @property 

99 def maxsize(self): 

100 """Number of items allowed in the queue.""" 

101 return self._maxsize 

102 

103 def empty(self): 

104 """Return True if the queue is empty, False otherwise.""" 

105 return not self._queue 

106 

107 def full(self): 

108 """Return True if there are maxsize items in the queue. 

109 

110 Note: if the Queue was initialized with maxsize=0 (the default), 

111 then full() is never True. 

112 """ 

113 if self._maxsize <= 0: 

114 return False 

115 else: 

116 return self.qsize() >= self._maxsize 

117 

118 async def put(self, item): 

119 """Put an item into the queue. 

120 

121 Put an item into the queue. If the queue is full, wait until a free 

122 slot is available before adding item. 

123 """ 

124 while self.full(): 

125 putter = self._loop.create_future() 

126 self._putters.append(putter) 

127 try: 

128 await putter 

129 except: 

130 putter.cancel() # Just in case putter is not done yet. 

131 try: 

132 # Clean self._putters from canceled putters. 

133 self._putters.remove(putter) 

134 except ValueError: 

135 # The putter could be removed from self._putters by a 

136 # previous get_nowait call. 

137 pass 

138 if not self.full() and not putter.cancelled(): 

139 # We were woken up by get_nowait(), but can't take 

140 # the call. Wake up the next in line. 

141 self._wakeup_next(self._putters) 

142 raise 

143 return self.put_nowait(item) 

144 

145 def put_nowait(self, item): 

146 """Put an item into the queue without blocking. 

147 

148 If no free slot is immediately available, raise QueueFull. 

149 """ 

150 if self.full(): 

151 raise QueueFull 

152 self._put(item) 

153 self._unfinished_tasks += 1 

154 self._finished.clear() 

155 self._wakeup_next(self._getters) 

156 

157 async def get(self): 

158 """Remove and return an item from the queue. 

159 

160 If queue is empty, wait until an item is available. 

161 """ 

162 while self.empty(): 

163 getter = self._loop.create_future() 

164 self._getters.append(getter) 

165 try: 

166 await getter 

167 except: 

168 getter.cancel() # Just in case getter is not done yet. 

169 try: 

170 # Clean self._getters from canceled getters. 

171 self._getters.remove(getter) 

172 except ValueError: 

173 # The getter could be removed from self._getters by a 

174 # previous put_nowait call. 

175 pass 

176 if not self.empty() and not getter.cancelled(): 

177 # We were woken up by put_nowait(), but can't take 

178 # the call. Wake up the next in line. 

179 self._wakeup_next(self._getters) 

180 raise 

181 return self.get_nowait() 

182 

183 def get_nowait(self): 

184 """Remove and return an item from the queue. 

185 

186 Return an item if one is immediately available, else raise QueueEmpty. 

187 """ 

188 if self.empty(): 

189 raise QueueEmpty 

190 item = self._get() 

191 self._wakeup_next(self._putters) 

192 return item 

193 

194 def task_done(self): 

195 """Indicate that a formerly enqueued task is complete. 

196 

197 Used by queue consumers. For each get() used to fetch a task, 

198 a subsequent call to task_done() tells the queue that the processing 

199 on the task is complete. 

200 

201 If a join() is currently blocking, it will resume when all items have 

202 been processed (meaning that a task_done() call was received for every 

203 item that had been put() into the queue). 

204 

205 Raises ValueError if called more times than there were items placed in 

206 the queue. 

207 """ 

208 if self._unfinished_tasks <= 0: 

209 raise ValueError('task_done() called too many times') 

210 self._unfinished_tasks -= 1 

211 if self._unfinished_tasks == 0: 

212 self._finished.set() 

213 

214 async def join(self): 

215 """Block until all items in the queue have been gotten and processed. 

216 

217 The count of unfinished tasks goes up whenever an item is added to the 

218 queue. The count goes down whenever a consumer calls task_done() to 

219 indicate that the item was retrieved and all work on it is complete. 

220 When the count of unfinished tasks drops to zero, join() unblocks. 

221 """ 

222 if self._unfinished_tasks > 0: 

223 await self._finished.wait() 

224 

225 

226class PriorityQueue(Queue): 

227 """A subclass of Queue; retrieves entries in priority order (lowest first). 

228 

229 Entries are typically tuples of the form: (priority number, data). 

230 """ 

231 

232 def _init(self, maxsize): 

233 self._queue = [] 

234 

235 def _put(self, item, heappush=heapq.heappush): 

236 heappush(self._queue, item) 

237 

238 def _get(self, heappop=heapq.heappop): 

239 return heappop(self._queue) 

240 

241 

242class LifoQueue(Queue): 

243 """A subclass of Queue that retrieves most recently added entries first.""" 

244 

245 def _init(self, maxsize): 

246 self._queue = [] 

247 

248 def _put(self, item): 

249 self._queue.append(item) 

250 

251 def _get(self): 

252 return self._queue.pop()