Coverage for /pythoncovmergedfiles/medio/medio/usr/lib/python3.9/concurrent/futures/thread.py: 25%

136 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:05 +0000

1# Copyright 2009 Brian Quinlan. All Rights Reserved. 

2# Licensed to PSF under a Contributor Agreement. 

3 

4"""Implements ThreadPoolExecutor.""" 

5 

6__author__ = 'Brian Quinlan (brian@sweetapp.com)' 

7 

8from concurrent.futures import _base 

9import itertools 

10import queue 

11import threading 

12import types 

13import weakref 

14import os 

15 

16 

17_threads_queues = weakref.WeakKeyDictionary() 

18_shutdown = False 

19# Lock that ensures that new workers are not created while the interpreter is 

20# shutting down. Must be held while mutating _threads_queues and _shutdown. 

21_global_shutdown_lock = threading.Lock() 

22 

23def _python_exit(): 

24 global _shutdown 

25 with _global_shutdown_lock: 

26 _shutdown = True 

27 items = list(_threads_queues.items()) 

28 for t, q in items: 

29 q.put(None) 

30 for t, q in items: 

31 t.join() 

32 

33# Register for `_python_exit()` to be called just before joining all 

34# non-daemon threads. This is used instead of `atexit.register()` for 

35# compatibility with subinterpreters, which no longer support daemon threads. 

36# See bpo-39812 for context. 

37threading._register_atexit(_python_exit) 

38 

39 

40class _WorkItem(object): 

41 def __init__(self, future, fn, args, kwargs): 

42 self.future = future 

43 self.fn = fn 

44 self.args = args 

45 self.kwargs = kwargs 

46 

47 def run(self): 

48 if not self.future.set_running_or_notify_cancel(): 

49 return 

50 

51 try: 

52 result = self.fn(*self.args, **self.kwargs) 

53 except BaseException as exc: 

54 self.future.set_exception(exc) 

55 # Break a reference cycle with the exception 'exc' 

56 self = None 

57 else: 

58 self.future.set_result(result) 

59 

60 __class_getitem__ = classmethod(types.GenericAlias) 

61 

62 

63def _worker(executor_reference, work_queue, initializer, initargs): 

64 if initializer is not None: 

65 try: 

66 initializer(*initargs) 

67 except BaseException: 

68 _base.LOGGER.critical('Exception in initializer:', exc_info=True) 

69 executor = executor_reference() 

70 if executor is not None: 

71 executor._initializer_failed() 

72 return 

73 try: 

74 while True: 

75 work_item = work_queue.get(block=True) 

76 if work_item is not None: 

77 work_item.run() 

78 # Delete references to object. See issue16284 

79 del work_item 

80 

81 # attempt to increment idle count 

82 executor = executor_reference() 

83 if executor is not None: 

84 executor._idle_semaphore.release() 

85 del executor 

86 continue 

87 

88 executor = executor_reference() 

89 # Exit if: 

90 # - The interpreter is shutting down OR 

91 # - The executor that owns the worker has been collected OR 

92 # - The executor that owns the worker has been shutdown. 

93 if _shutdown or executor is None or executor._shutdown: 

94 # Flag the executor as shutting down as early as possible if it 

95 # is not gc-ed yet. 

96 if executor is not None: 

97 executor._shutdown = True 

98 # Notice other workers 

99 work_queue.put(None) 

100 return 

101 del executor 

102 except BaseException: 

103 _base.LOGGER.critical('Exception in worker', exc_info=True) 

104 

105 

106class BrokenThreadPool(_base.BrokenExecutor): 

107 """ 

108 Raised when a worker thread in a ThreadPoolExecutor failed initializing. 

109 """ 

110 

111 

112class ThreadPoolExecutor(_base.Executor): 

113 

114 # Used to assign unique thread names when thread_name_prefix is not supplied. 

115 _counter = itertools.count().__next__ 

116 

117 def __init__(self, max_workers=None, thread_name_prefix='', 

118 initializer=None, initargs=()): 

119 """Initializes a new ThreadPoolExecutor instance. 

120 

121 Args: 

122 max_workers: The maximum number of threads that can be used to 

123 execute the given calls. 

124 thread_name_prefix: An optional name prefix to give our threads. 

125 initializer: A callable used to initialize worker threads. 

126 initargs: A tuple of arguments to pass to the initializer. 

127 """ 

128 if max_workers is None: 

129 # ThreadPoolExecutor is often used to: 

130 # * CPU bound task which releases GIL 

131 # * I/O bound task (which releases GIL, of course) 

132 # 

133 # We use cpu_count + 4 for both types of tasks. 

134 # But we limit it to 32 to avoid consuming surprisingly large resource 

135 # on many core machine. 

136 max_workers = min(32, (os.cpu_count() or 1) + 4) 

137 if max_workers <= 0: 

138 raise ValueError("max_workers must be greater than 0") 

139 

140 if initializer is not None and not callable(initializer): 

141 raise TypeError("initializer must be a callable") 

142 

143 self._max_workers = max_workers 

144 self._work_queue = queue.SimpleQueue() 

145 self._idle_semaphore = threading.Semaphore(0) 

146 self._threads = set() 

147 self._broken = False 

148 self._shutdown = False 

149 self._shutdown_lock = threading.Lock() 

150 self._thread_name_prefix = (thread_name_prefix or 

151 ("ThreadPoolExecutor-%d" % self._counter())) 

152 self._initializer = initializer 

153 self._initargs = initargs 

154 

155 def submit(self, fn, /, *args, **kwargs): 

156 with self._shutdown_lock, _global_shutdown_lock: 

157 if self._broken: 

158 raise BrokenThreadPool(self._broken) 

159 

160 if self._shutdown: 

161 raise RuntimeError('cannot schedule new futures after shutdown') 

162 if _shutdown: 

163 raise RuntimeError('cannot schedule new futures after ' 

164 'interpreter shutdown') 

165 

166 f = _base.Future() 

167 w = _WorkItem(f, fn, args, kwargs) 

168 

169 self._work_queue.put(w) 

170 self._adjust_thread_count() 

171 return f 

172 submit.__doc__ = _base.Executor.submit.__doc__ 

173 

174 def _adjust_thread_count(self): 

175 # if idle threads are available, don't spin new threads 

176 if self._idle_semaphore.acquire(timeout=0): 

177 return 

178 

179 # When the executor gets lost, the weakref callback will wake up 

180 # the worker threads. 

181 def weakref_cb(_, q=self._work_queue): 

182 q.put(None) 

183 

184 num_threads = len(self._threads) 

185 if num_threads < self._max_workers: 

186 thread_name = '%s_%d' % (self._thread_name_prefix or self, 

187 num_threads) 

188 t = threading.Thread(name=thread_name, target=_worker, 

189 args=(weakref.ref(self, weakref_cb), 

190 self._work_queue, 

191 self._initializer, 

192 self._initargs)) 

193 t.start() 

194 self._threads.add(t) 

195 _threads_queues[t] = self._work_queue 

196 

197 def _initializer_failed(self): 

198 with self._shutdown_lock: 

199 self._broken = ('A thread initializer failed, the thread pool ' 

200 'is not usable anymore') 

201 # Drain work queue and mark pending futures failed 

202 while True: 

203 try: 

204 work_item = self._work_queue.get_nowait() 

205 except queue.Empty: 

206 break 

207 if work_item is not None: 

208 work_item.future.set_exception(BrokenThreadPool(self._broken)) 

209 

210 def shutdown(self, wait=True, *, cancel_futures=False): 

211 with self._shutdown_lock: 

212 self._shutdown = True 

213 if cancel_futures: 

214 # Drain all work items from the queue, and then cancel their 

215 # associated futures. 

216 while True: 

217 try: 

218 work_item = self._work_queue.get_nowait() 

219 except queue.Empty: 

220 break 

221 if work_item is not None: 

222 work_item.future.cancel() 

223 

224 # Send a wake-up to prevent threads calling 

225 # _work_queue.get(block=True) from permanently blocking. 

226 self._work_queue.put(None) 

227 if wait: 

228 for t in self._threads: 

229 t.join() 

230 shutdown.__doc__ = _base.Executor.shutdown.__doc__