Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/asyncio/queues.py: 29%

129 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') 

2 

3import collections 

4import heapq 

5import warnings 

6 

7from . import events 

8from . import locks 

9 

10 

11class QueueEmpty(Exception): 

12 """Raised when Queue.get_nowait() is called on an empty Queue.""" 

13 pass 

14 

15 

16class QueueFull(Exception): 

17 """Raised when the Queue.put_nowait() method is called on a full Queue.""" 

18 pass 

19 

20 

21class Queue: 

22 """A queue, useful for coordinating producer and consumer coroutines. 

23 

24 If maxsize is less than or equal to zero, the queue size is infinite. If it 

25 is an integer greater than 0, then "await put()" will block when the 

26 queue reaches maxsize, until an item is removed by get(). 

27 

28 Unlike the standard library Queue, you can reliably know this Queue's size 

29 with qsize(), since your single-threaded asyncio application won't be 

30 interrupted between calling qsize() and doing an operation on the Queue. 

31 """ 

32 

33 def __init__(self, maxsize=0, *, loop=None): 

34 if loop is None: 

35 self._loop = events.get_event_loop() 

36 else: 

37 self._loop = loop 

38 warnings.warn("The loop argument is deprecated since Python 3.8, " 

39 "and scheduled for removal in Python 3.10.", 

40 DeprecationWarning, stacklevel=2) 

41 self._maxsize = maxsize 

42 

43 # Futures. 

44 self._getters = collections.deque() 

45 # Futures. 

46 self._putters = collections.deque() 

47 self._unfinished_tasks = 0 

48 self._finished = locks.Event(loop=loop) 

49 self._finished.set() 

50 self._init(maxsize) 

51 

52 # These three are overridable in subclasses. 

53 

54 def _init(self, maxsize): 

55 self._queue = collections.deque() 

56 

57 def _get(self): 

58 return self._queue.popleft() 

59 

60 def _put(self, item): 

61 self._queue.append(item) 

62 

63 # End of the overridable methods. 

64 

65 def _wakeup_next(self, waiters): 

66 # Wake up the next waiter (if any) that isn't cancelled. 

67 while waiters: 

68 waiter = waiters.popleft() 

69 if not waiter.done(): 

70 waiter.set_result(None) 

71 break 

72 

73 def __repr__(self): 

74 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' 

75 

76 def __str__(self): 

77 return f'<{type(self).__name__} {self._format()}>' 

78 

79 def _format(self): 

80 result = f'maxsize={self._maxsize!r}' 

81 if getattr(self, '_queue', None): 

82 result += f' _queue={list(self._queue)!r}' 

83 if self._getters: 

84 result += f' _getters[{len(self._getters)}]' 

85 if self._putters: 

86 result += f' _putters[{len(self._putters)}]' 

87 if self._unfinished_tasks: 

88 result += f' tasks={self._unfinished_tasks}' 

89 return result 

90 

91 def qsize(self): 

92 """Number of items in the queue.""" 

93 return len(self._queue) 

94 

95 @property 

96 def maxsize(self): 

97 """Number of items allowed in the queue.""" 

98 return self._maxsize 

99 

100 def empty(self): 

101 """Return True if the queue is empty, False otherwise.""" 

102 return not self._queue 

103 

104 def full(self): 

105 """Return True if there are maxsize items in the queue. 

106 

107 Note: if the Queue was initialized with maxsize=0 (the default), 

108 then full() is never True. 

109 """ 

110 if self._maxsize <= 0: 

111 return False 

112 else: 

113 return self.qsize() >= self._maxsize 

114 

115 async def put(self, item): 

116 """Put an item into the queue. 

117 

118 Put an item into the queue. If the queue is full, wait until a free 

119 slot is available before adding item. 

120 """ 

121 while self.full(): 

122 putter = self._loop.create_future() 

123 self._putters.append(putter) 

124 try: 

125 await putter 

126 except: 

127 putter.cancel() # Just in case putter is not done yet. 

128 try: 

129 # Clean self._putters from canceled putters. 

130 self._putters.remove(putter) 

131 except ValueError: 

132 # The putter could be removed from self._putters by a 

133 # previous get_nowait call. 

134 pass 

135 if not self.full() and not putter.cancelled(): 

136 # We were woken up by get_nowait(), but can't take 

137 # the call. Wake up the next in line. 

138 self._wakeup_next(self._putters) 

139 raise 

140 return self.put_nowait(item) 

141 

142 def put_nowait(self, item): 

143 """Put an item into the queue without blocking. 

144 

145 If no free slot is immediately available, raise QueueFull. 

146 """ 

147 if self.full(): 

148 raise QueueFull 

149 self._put(item) 

150 self._unfinished_tasks += 1 

151 self._finished.clear() 

152 self._wakeup_next(self._getters) 

153 

154 async def get(self): 

155 """Remove and return an item from the queue. 

156 

157 If queue is empty, wait until an item is available. 

158 """ 

159 while self.empty(): 

160 getter = self._loop.create_future() 

161 self._getters.append(getter) 

162 try: 

163 await getter 

164 except: 

165 getter.cancel() # Just in case getter is not done yet. 

166 try: 

167 # Clean self._getters from canceled getters. 

168 self._getters.remove(getter) 

169 except ValueError: 

170 # The getter could be removed from self._getters by a 

171 # previous put_nowait call. 

172 pass 

173 if not self.empty() and not getter.cancelled(): 

174 # We were woken up by put_nowait(), but can't take 

175 # the call. Wake up the next in line. 

176 self._wakeup_next(self._getters) 

177 raise 

178 return self.get_nowait() 

179 

180 def get_nowait(self): 

181 """Remove and return an item from the queue. 

182 

183 Return an item if one is immediately available, else raise QueueEmpty. 

184 """ 

185 if self.empty(): 

186 raise QueueEmpty 

187 item = self._get() 

188 self._wakeup_next(self._putters) 

189 return item 

190 

191 def task_done(self): 

192 """Indicate that a formerly enqueued task is complete. 

193 

194 Used by queue consumers. For each get() used to fetch a task, 

195 a subsequent call to task_done() tells the queue that the processing 

196 on the task is complete. 

197 

198 If a join() is currently blocking, it will resume when all items have 

199 been processed (meaning that a task_done() call was received for every 

200 item that had been put() into the queue). 

201 

202 Raises ValueError if called more times than there were items placed in 

203 the queue. 

204 """ 

205 if self._unfinished_tasks <= 0: 

206 raise ValueError('task_done() called too many times') 

207 self._unfinished_tasks -= 1 

208 if self._unfinished_tasks == 0: 

209 self._finished.set() 

210 

211 async def join(self): 

212 """Block until all items in the queue have been gotten and processed. 

213 

214 The count of unfinished tasks goes up whenever an item is added to the 

215 queue. The count goes down whenever a consumer calls task_done() to 

216 indicate that the item was retrieved and all work on it is complete. 

217 When the count of unfinished tasks drops to zero, join() unblocks. 

218 """ 

219 if self._unfinished_tasks > 0: 

220 await self._finished.wait() 

221 

222 

223class PriorityQueue(Queue): 

224 """A subclass of Queue; retrieves entries in priority order (lowest first). 

225 

226 Entries are typically tuples of the form: (priority number, data). 

227 """ 

228 

229 def _init(self, maxsize): 

230 self._queue = [] 

231 

232 def _put(self, item, heappush=heapq.heappush): 

233 heappush(self._queue, item) 

234 

235 def _get(self, heappop=heapq.heappop): 

236 return heappop(self._queue) 

237 

238 

239class LifoQueue(Queue): 

240 """A subclass of Queue that retrieves most recently added entries first.""" 

241 

242 def _init(self, maxsize): 

243 self._queue = [] 

244 

245 def _put(self, item): 

246 self._queue.append(item) 

247 

248 def _get(self): 

249 return self._queue.pop()