Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/externals/loky/reusable_executor.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

100 statements  

1############################################################################### 

2# Reusable ProcessPoolExecutor 

3# 

4# author: Thomas Moreau and Olivier Grisel 

5# 

6import time 

7import warnings 

8import threading 

9import multiprocessing as mp 

10 

11from .process_executor import ProcessPoolExecutor, EXTRA_QUEUED_CALLS 

12from .backend.context import cpu_count 

13from .backend import get_context 

14 

15__all__ = ["get_reusable_executor"] 

16 

17# Singleton executor and id management 

18_executor_lock = threading.RLock() 

19_next_executor_id = 0 

20_executor = None 

21_executor_kwargs = None 

22 

23 

24def _get_next_executor_id(): 

25 """Ensure that each successive executor instance has a unique, monotonic id. 

26 

27 The purpose of this monotonic id is to help debug and test automated 

28 instance creation. 

29 """ 

30 global _next_executor_id 

31 with _executor_lock: 

32 executor_id = _next_executor_id 

33 _next_executor_id += 1 

34 return executor_id 

35 

36 

37def get_reusable_executor( 

38 max_workers=None, 

39 context=None, 

40 timeout=10, 

41 kill_workers=False, 

42 reuse="auto", 

43 job_reducers=None, 

44 result_reducers=None, 

45 initializer=None, 

46 initargs=(), 

47 env=None, 

48): 

49 """Return the current ReusableExectutor instance. 

50 

51 Start a new instance if it has not been started already or if the previous 

52 instance was left in a broken state. 

53 

54 If the previous instance does not have the requested number of workers, the 

55 executor is dynamically resized to adjust the number of workers prior to 

56 returning. 

57 

58 Reusing a singleton instance spares the overhead of starting new worker 

59 processes and importing common python packages each time. 

60 

61 ``max_workers`` controls the maximum number of tasks that can be running in 

62 parallel in worker processes. By default this is set to the number of 

63 CPUs on the host. 

64 

65 Setting ``timeout`` (in seconds) makes idle workers automatically shutdown 

66 so as to release system resources. New workers are respawn upon submission 

67 of new tasks so that ``max_workers`` are available to accept the newly 

68 submitted tasks. Setting ``timeout`` to around 100 times the time required 

69 to spawn new processes and import packages in them (on the order of 100ms) 

70 ensures that the overhead of spawning workers is negligible. 

71 

72 Setting ``kill_workers=True`` makes it possible to forcibly interrupt 

73 previously spawned jobs to get a new instance of the reusable executor 

74 with new constructor argument values. 

75 

76 The ``job_reducers`` and ``result_reducers`` are used to customize the 

77 pickling of tasks and results send to the executor. 

78 

79 When provided, the ``initializer`` is run first in newly spawned 

80 processes with argument ``initargs``. 

81 

82 The environment variable in the child process are a copy of the values in 

83 the main process. One can provide a dict ``{ENV: VAL}`` where ``ENV`` and 

84 ``VAL`` are string literals to overwrite the environment variable ``ENV`` 

85 in the child processes to value ``VAL``. The environment variables are set 

86 in the children before any module is loaded. This only works with the 

87 ``loky`` context. 

88 """ 

89 _executor, _ = _ReusablePoolExecutor.get_reusable_executor( 

90 max_workers=max_workers, 

91 context=context, 

92 timeout=timeout, 

93 kill_workers=kill_workers, 

94 reuse=reuse, 

95 job_reducers=job_reducers, 

96 result_reducers=result_reducers, 

97 initializer=initializer, 

98 initargs=initargs, 

99 env=env, 

100 ) 

101 return _executor 

102 

103 

104class _ReusablePoolExecutor(ProcessPoolExecutor): 

105 def __init__( 

106 self, 

107 submit_resize_lock, 

108 max_workers=None, 

109 context=None, 

110 timeout=None, 

111 executor_id=0, 

112 job_reducers=None, 

113 result_reducers=None, 

114 initializer=None, 

115 initargs=(), 

116 env=None, 

117 ): 

118 super().__init__( 

119 max_workers=max_workers, 

120 context=context, 

121 timeout=timeout, 

122 job_reducers=job_reducers, 

123 result_reducers=result_reducers, 

124 initializer=initializer, 

125 initargs=initargs, 

126 env=env, 

127 ) 

128 self.executor_id = executor_id 

129 self._submit_resize_lock = submit_resize_lock 

130 

131 @classmethod 

132 def get_reusable_executor( 

133 cls, 

134 max_workers=None, 

135 context=None, 

136 timeout=10, 

137 kill_workers=False, 

138 reuse="auto", 

139 job_reducers=None, 

140 result_reducers=None, 

141 initializer=None, 

142 initargs=(), 

143 env=None, 

144 ): 

145 with _executor_lock: 

146 global _executor, _executor_kwargs 

147 executor = _executor 

148 

149 if max_workers is None: 

150 if reuse is True and executor is not None: 

151 max_workers = executor._max_workers 

152 else: 

153 max_workers = cpu_count() 

154 elif max_workers <= 0: 

155 raise ValueError( 

156 f"max_workers must be greater than 0, got {max_workers}." 

157 ) 

158 

159 if isinstance(context, str): 

160 context = get_context(context) 

161 if context is not None and context.get_start_method() == "fork": 

162 raise ValueError( 

163 "Cannot use reusable executor with the 'fork' context" 

164 ) 

165 

166 kwargs = dict( 

167 context=context, 

168 timeout=timeout, 

169 job_reducers=job_reducers, 

170 result_reducers=result_reducers, 

171 initializer=initializer, 

172 initargs=initargs, 

173 env=env, 

174 ) 

175 if executor is None: 

176 is_reused = False 

177 mp.util.debug( 

178 f"Create a executor with max_workers={max_workers}." 

179 ) 

180 executor_id = _get_next_executor_id() 

181 _executor_kwargs = kwargs 

182 _executor = executor = cls( 

183 _executor_lock, 

184 max_workers=max_workers, 

185 executor_id=executor_id, 

186 **kwargs, 

187 ) 

188 else: 

189 if reuse == "auto": 

190 reuse = kwargs == _executor_kwargs 

191 if ( 

192 executor._flags.broken 

193 or executor._flags.shutdown 

194 or not reuse 

195 or executor.queue_size < max_workers 

196 ): 

197 if executor._flags.broken: 

198 reason = "broken" 

199 elif executor._flags.shutdown: 

200 reason = "shutdown" 

201 elif executor.queue_size < max_workers: 

202 # Do not reuse the executor if the queue size is too 

203 # small as this would lead to limited parallelism. 

204 reason = "queue size is too small" 

205 else: 

206 reason = "arguments have changed" 

207 mp.util.debug( 

208 "Creating a new executor with max_workers=" 

209 f"{max_workers} as the previous instance cannot be " 

210 f"reused ({reason})." 

211 ) 

212 executor.shutdown(wait=True, kill_workers=kill_workers) 

213 _executor = executor = _executor_kwargs = None 

214 # Recursive call to build a new instance 

215 return cls.get_reusable_executor( 

216 max_workers=max_workers, **kwargs 

217 ) 

218 else: 

219 mp.util.debug( 

220 "Reusing existing executor with " 

221 f"max_workers={executor._max_workers}." 

222 ) 

223 is_reused = True 

224 executor._resize(max_workers) 

225 

226 return executor, is_reused 

227 

228 def submit(self, fn, *args, **kwargs): 

229 with self._submit_resize_lock: 

230 return super().submit(fn, *args, **kwargs) 

231 

232 def _resize(self, max_workers): 

233 with self._submit_resize_lock: 

234 if max_workers is None: 

235 raise ValueError("Trying to resize with max_workers=None") 

236 elif max_workers == self._max_workers: 

237 return 

238 

239 if self._executor_manager_thread is None: 

240 # If the executor_manager_thread has not been started 

241 # then no processes have been spawned and we can just 

242 # update _max_workers and return 

243 self._max_workers = max_workers 

244 return 

245 

246 self._wait_job_completion() 

247 

248 # Some process might have returned due to timeout so check how many 

249 # children are still alive. Use the _process_management_lock to 

250 # ensure that no process are spawned or timeout during the resize. 

251 with self._processes_management_lock: 

252 processes = list(self._processes.values()) 

253 nb_children_alive = sum(p.is_alive() for p in processes) 

254 self._max_workers = max_workers 

255 for _ in range(max_workers, nb_children_alive): 

256 self._call_queue.put(None) 

257 while ( 

258 len(self._processes) > max_workers and not self._flags.broken 

259 ): 

260 time.sleep(1e-3) 

261 

262 self._adjust_process_count() 

263 processes = list(self._processes.values()) 

264 while not all(p.is_alive() for p in processes): 

265 time.sleep(1e-3) 

266 

267 def _wait_job_completion(self): 

268 """Wait for the cache to be empty before resizing the pool.""" 

269 # Issue a warning to the user about the bad effect of this usage. 

270 if self._pending_work_items: 

271 warnings.warn( 

272 "Trying to resize an executor with running jobs: " 

273 "waiting for jobs completion before resizing.", 

274 UserWarning, 

275 ) 

276 mp.util.debug( 

277 f"Executor {self.executor_id} waiting for jobs completion " 

278 "before resizing" 

279 ) 

280 # Wait for the completion of the jobs 

281 while self._pending_work_items: 

282 time.sleep(1e-3) 

283 

284 def _setup_queues(self, job_reducers, result_reducers): 

285 # As this executor can be resized, use a large queue size to avoid 

286 # underestimating capacity and introducing overhead 

287 # Also handle the case where the user set max_workers to a value larger 

288 # than cpu_count(), to avoid limiting the number of parallel jobs. 

289 

290 min_queue_size = max(cpu_count(), self._max_workers) 

291 self.queue_size = 2 * min_queue_size + EXTRA_QUEUED_CALLS 

292 super()._setup_queues( 

293 job_reducers, result_reducers, queue_size=self.queue_size 

294 )