Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/externals/loky/reusable_executor.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

96 statements  

1############################################################################### 

2# Reusable ProcessPoolExecutor 

3# 

4# author: Thomas Moreau and Olivier Grisel 

5# 

6import time 

7import warnings 

8import threading 

9import multiprocessing as mp 

10 

11from .process_executor import ProcessPoolExecutor, EXTRA_QUEUED_CALLS 

12from .backend.context import cpu_count 

13from .backend import get_context 

14 

15__all__ = ["get_reusable_executor"] 

16 

17# Singleton executor and id management 

18_executor_lock = threading.RLock() 

19_next_executor_id = 0 

20_executor = None 

21_executor_kwargs = None 

22 

23 

24def _get_next_executor_id(): 

25 """Ensure that each successive executor instance has a unique, monotonic id. 

26 

27 The purpose of this monotonic id is to help debug and test automated 

28 instance creation. 

29 """ 

30 global _next_executor_id 

31 with _executor_lock: 

32 executor_id = _next_executor_id 

33 _next_executor_id += 1 

34 return executor_id 

35 

36 

37def get_reusable_executor( 

38 max_workers=None, 

39 context=None, 

40 timeout=10, 

41 kill_workers=False, 

42 reuse="auto", 

43 job_reducers=None, 

44 result_reducers=None, 

45 initializer=None, 

46 initargs=(), 

47 env=None, 

48): 

49 """Return the current ReusableExectutor instance. 

50 

51 Start a new instance if it has not been started already or if the previous 

52 instance was left in a broken state. 

53 

54 If the previous instance does not have the requested number of workers, the 

55 executor is dynamically resized to adjust the number of workers prior to 

56 returning. 

57 

58 Reusing a singleton instance spares the overhead of starting new worker 

59 processes and importing common python packages each time. 

60 

61 ``max_workers`` controls the maximum number of tasks that can be running in 

62 parallel in worker processes. By default this is set to the number of 

63 CPUs on the host. 

64 

65 Setting ``timeout`` (in seconds) makes idle workers automatically shutdown 

66 so as to release system resources. New workers are respawn upon submission 

67 of new tasks so that ``max_workers`` are available to accept the newly 

68 submitted tasks. Setting ``timeout`` to around 100 times the time required 

69 to spawn new processes and import packages in them (on the order of 100ms) 

70 ensures that the overhead of spawning workers is negligible. 

71 

72 Setting ``kill_workers=True`` makes it possible to forcibly interrupt 

73 previously spawned jobs to get a new instance of the reusable executor 

74 with new constructor argument values. 

75 

76 The ``job_reducers`` and ``result_reducers`` are used to customize the 

77 pickling of tasks and results send to the executor. 

78 

79 When provided, the ``initializer`` is run first in newly spawned 

80 processes with argument ``initargs``. 

81 

82 The environment variable in the child process are a copy of the values in 

83 the main process. One can provide a dict ``{ENV: VAL}`` where ``ENV`` and 

84 ``VAL`` are string literals to overwrite the environment variable ``ENV`` 

85 in the child processes to value ``VAL``. The environment variables are set 

86 in the children before any module is loaded. This only works with the 

87 ``loky`` context. 

88 """ 

89 _executor, _ = _ReusablePoolExecutor.get_reusable_executor( 

90 max_workers=max_workers, 

91 context=context, 

92 timeout=timeout, 

93 kill_workers=kill_workers, 

94 reuse=reuse, 

95 job_reducers=job_reducers, 

96 result_reducers=result_reducers, 

97 initializer=initializer, 

98 initargs=initargs, 

99 env=env, 

100 ) 

101 return _executor 

102 

103 

104class _ReusablePoolExecutor(ProcessPoolExecutor): 

105 def __init__( 

106 self, 

107 submit_resize_lock, 

108 max_workers=None, 

109 context=None, 

110 timeout=None, 

111 executor_id=0, 

112 job_reducers=None, 

113 result_reducers=None, 

114 initializer=None, 

115 initargs=(), 

116 env=None, 

117 ): 

118 super().__init__( 

119 max_workers=max_workers, 

120 context=context, 

121 timeout=timeout, 

122 job_reducers=job_reducers, 

123 result_reducers=result_reducers, 

124 initializer=initializer, 

125 initargs=initargs, 

126 env=env, 

127 ) 

128 self.executor_id = executor_id 

129 self._submit_resize_lock = submit_resize_lock 

130 

131 @classmethod 

132 def get_reusable_executor( 

133 cls, 

134 max_workers=None, 

135 context=None, 

136 timeout=10, 

137 kill_workers=False, 

138 reuse="auto", 

139 job_reducers=None, 

140 result_reducers=None, 

141 initializer=None, 

142 initargs=(), 

143 env=None, 

144 ): 

145 with _executor_lock: 

146 global _executor, _executor_kwargs 

147 executor = _executor 

148 

149 if max_workers is None: 

150 if reuse is True and executor is not None: 

151 max_workers = executor._max_workers 

152 else: 

153 max_workers = cpu_count() 

154 elif max_workers <= 0: 

155 raise ValueError( 

156 f"max_workers must be greater than 0, got {max_workers}." 

157 ) 

158 

159 if isinstance(context, str): 

160 context = get_context(context) 

161 if context is not None and context.get_start_method() == "fork": 

162 raise ValueError( 

163 "Cannot use reusable executor with the 'fork' context" 

164 ) 

165 

166 kwargs = dict( 

167 context=context, 

168 timeout=timeout, 

169 job_reducers=job_reducers, 

170 result_reducers=result_reducers, 

171 initializer=initializer, 

172 initargs=initargs, 

173 env=env, 

174 ) 

175 if executor is None: 

176 is_reused = False 

177 mp.util.debug( 

178 f"Create a executor with max_workers={max_workers}." 

179 ) 

180 executor_id = _get_next_executor_id() 

181 _executor_kwargs = kwargs 

182 _executor = executor = cls( 

183 _executor_lock, 

184 max_workers=max_workers, 

185 executor_id=executor_id, 

186 **kwargs, 

187 ) 

188 else: 

189 if reuse == "auto": 

190 reuse = kwargs == _executor_kwargs 

191 if ( 

192 executor._flags.broken 

193 or executor._flags.shutdown 

194 or not reuse 

195 ): 

196 if executor._flags.broken: 

197 reason = "broken" 

198 elif executor._flags.shutdown: 

199 reason = "shutdown" 

200 else: 

201 reason = "arguments have changed" 

202 mp.util.debug( 

203 "Creating a new executor with max_workers=" 

204 f"{max_workers} as the previous instance cannot be " 

205 f"reused ({reason})." 

206 ) 

207 executor.shutdown(wait=True, kill_workers=kill_workers) 

208 _executor = executor = _executor_kwargs = None 

209 # Recursive call to build a new instance 

210 return cls.get_reusable_executor( 

211 max_workers=max_workers, **kwargs 

212 ) 

213 else: 

214 mp.util.debug( 

215 "Reusing existing executor with " 

216 f"max_workers={executor._max_workers}." 

217 ) 

218 is_reused = True 

219 executor._resize(max_workers) 

220 

221 return executor, is_reused 

222 

223 def submit(self, fn, *args, **kwargs): 

224 with self._submit_resize_lock: 

225 return super().submit(fn, *args, **kwargs) 

226 

227 def _resize(self, max_workers): 

228 with self._submit_resize_lock: 

229 if max_workers is None: 

230 raise ValueError("Trying to resize with max_workers=None") 

231 elif max_workers == self._max_workers: 

232 return 

233 

234 if self._executor_manager_thread is None: 

235 # If the executor_manager_thread has not been started 

236 # then no processes have been spawned and we can just 

237 # update _max_workers and return 

238 self._max_workers = max_workers 

239 return 

240 

241 self._wait_job_completion() 

242 

243 # Some process might have returned due to timeout so check how many 

244 # children are still alive. Use the _process_management_lock to 

245 # ensure that no process are spawned or timeout during the resize. 

246 with self._processes_management_lock: 

247 processes = list(self._processes.values()) 

248 nb_children_alive = sum(p.is_alive() for p in processes) 

249 self._max_workers = max_workers 

250 for _ in range(max_workers, nb_children_alive): 

251 self._call_queue.put(None) 

252 while ( 

253 len(self._processes) > max_workers and not self._flags.broken 

254 ): 

255 time.sleep(1e-3) 

256 

257 self._adjust_process_count() 

258 processes = list(self._processes.values()) 

259 while not all(p.is_alive() for p in processes): 

260 time.sleep(1e-3) 

261 

262 def _wait_job_completion(self): 

263 """Wait for the cache to be empty before resizing the pool.""" 

264 # Issue a warning to the user about the bad effect of this usage. 

265 if self._pending_work_items: 

266 warnings.warn( 

267 "Trying to resize an executor with running jobs: " 

268 "waiting for jobs completion before resizing.", 

269 UserWarning, 

270 ) 

271 mp.util.debug( 

272 f"Executor {self.executor_id} waiting for jobs completion " 

273 "before resizing" 

274 ) 

275 # Wait for the completion of the jobs 

276 while self._pending_work_items: 

277 time.sleep(1e-3) 

278 

279 def _setup_queues(self, job_reducers, result_reducers): 

280 # As this executor can be resized, use a large queue size to avoid 

281 # underestimating capacity and introducing overhead 

282 queue_size = 2 * cpu_count() + EXTRA_QUEUED_CALLS 

283 super()._setup_queues( 

284 job_reducers, result_reducers, queue_size=queue_size 

285 )