1###############################################################################
2# Reusable ProcessPoolExecutor
3#
4# author: Thomas Moreau and Olivier Grisel
5#
6import time
7import warnings
8import threading
9import multiprocessing as mp
10
11from .process_executor import ProcessPoolExecutor, EXTRA_QUEUED_CALLS
12from .backend.context import cpu_count
13from .backend import get_context
14
15__all__ = ["get_reusable_executor"]
16
17# Singleton executor and id management
18_executor_lock = threading.RLock()
19_next_executor_id = 0
20_executor = None
21_executor_kwargs = None
22
23
24def _get_next_executor_id():
25 """Ensure that each successive executor instance has a unique, monotonic id.
26
27 The purpose of this monotonic id is to help debug and test automated
28 instance creation.
29 """
30 global _next_executor_id
31 with _executor_lock:
32 executor_id = _next_executor_id
33 _next_executor_id += 1
34 return executor_id
35
36
37def get_reusable_executor(
38 max_workers=None,
39 context=None,
40 timeout=10,
41 kill_workers=False,
42 reuse="auto",
43 job_reducers=None,
44 result_reducers=None,
45 initializer=None,
46 initargs=(),
47 env=None,
48):
49 """Return the current ReusableExectutor instance.
50
51 Start a new instance if it has not been started already or if the previous
52 instance was left in a broken state.
53
54 If the previous instance does not have the requested number of workers, the
55 executor is dynamically resized to adjust the number of workers prior to
56 returning.
57
58 Reusing a singleton instance spares the overhead of starting new worker
59 processes and importing common python packages each time.
60
61 ``max_workers`` controls the maximum number of tasks that can be running in
62 parallel in worker processes. By default this is set to the number of
63 CPUs on the host.
64
65 Setting ``timeout`` (in seconds) makes idle workers automatically shutdown
66 so as to release system resources. New workers are respawn upon submission
67 of new tasks so that ``max_workers`` are available to accept the newly
68 submitted tasks. Setting ``timeout`` to around 100 times the time required
69 to spawn new processes and import packages in them (on the order of 100ms)
70 ensures that the overhead of spawning workers is negligible.
71
72 Setting ``kill_workers=True`` makes it possible to forcibly interrupt
73 previously spawned jobs to get a new instance of the reusable executor
74 with new constructor argument values.
75
76 The ``job_reducers`` and ``result_reducers`` are used to customize the
77 pickling of tasks and results send to the executor.
78
79 When provided, the ``initializer`` is run first in newly spawned
80 processes with argument ``initargs``.
81
82 The environment variable in the child process are a copy of the values in
83 the main process. One can provide a dict ``{ENV: VAL}`` where ``ENV`` and
84 ``VAL`` are string literals to overwrite the environment variable ``ENV``
85 in the child processes to value ``VAL``. The environment variables are set
86 in the children before any module is loaded. This only works with the
87 ``loky`` context.
88 """
89 _executor, _ = _ReusablePoolExecutor.get_reusable_executor(
90 max_workers=max_workers,
91 context=context,
92 timeout=timeout,
93 kill_workers=kill_workers,
94 reuse=reuse,
95 job_reducers=job_reducers,
96 result_reducers=result_reducers,
97 initializer=initializer,
98 initargs=initargs,
99 env=env,
100 )
101 return _executor
102
103
104class _ReusablePoolExecutor(ProcessPoolExecutor):
105 def __init__(
106 self,
107 submit_resize_lock,
108 max_workers=None,
109 context=None,
110 timeout=None,
111 executor_id=0,
112 job_reducers=None,
113 result_reducers=None,
114 initializer=None,
115 initargs=(),
116 env=None,
117 ):
118 super().__init__(
119 max_workers=max_workers,
120 context=context,
121 timeout=timeout,
122 job_reducers=job_reducers,
123 result_reducers=result_reducers,
124 initializer=initializer,
125 initargs=initargs,
126 env=env,
127 )
128 self.executor_id = executor_id
129 self._submit_resize_lock = submit_resize_lock
130
131 @classmethod
132 def get_reusable_executor(
133 cls,
134 max_workers=None,
135 context=None,
136 timeout=10,
137 kill_workers=False,
138 reuse="auto",
139 job_reducers=None,
140 result_reducers=None,
141 initializer=None,
142 initargs=(),
143 env=None,
144 ):
145 with _executor_lock:
146 global _executor, _executor_kwargs
147 executor = _executor
148
149 if max_workers is None:
150 if reuse is True and executor is not None:
151 max_workers = executor._max_workers
152 else:
153 max_workers = cpu_count()
154 elif max_workers <= 0:
155 raise ValueError(
156 f"max_workers must be greater than 0, got {max_workers}."
157 )
158
159 if isinstance(context, str):
160 context = get_context(context)
161 if context is not None and context.get_start_method() == "fork":
162 raise ValueError(
163 "Cannot use reusable executor with the 'fork' context"
164 )
165
166 kwargs = dict(
167 context=context,
168 timeout=timeout,
169 job_reducers=job_reducers,
170 result_reducers=result_reducers,
171 initializer=initializer,
172 initargs=initargs,
173 env=env,
174 )
175 if executor is None:
176 is_reused = False
177 mp.util.debug(
178 f"Create a executor with max_workers={max_workers}."
179 )
180 executor_id = _get_next_executor_id()
181 _executor_kwargs = kwargs
182 _executor = executor = cls(
183 _executor_lock,
184 max_workers=max_workers,
185 executor_id=executor_id,
186 **kwargs,
187 )
188 else:
189 if reuse == "auto":
190 reuse = kwargs == _executor_kwargs
191 if (
192 executor._flags.broken
193 or executor._flags.shutdown
194 or not reuse
195 or executor.queue_size < max_workers
196 ):
197 if executor._flags.broken:
198 reason = "broken"
199 elif executor._flags.shutdown:
200 reason = "shutdown"
201 elif executor.queue_size < max_workers:
202 # Do not reuse the executor if the queue size is too
203 # small as this would lead to limited parallelism.
204 reason = "queue size is too small"
205 else:
206 reason = "arguments have changed"
207 mp.util.debug(
208 "Creating a new executor with max_workers="
209 f"{max_workers} as the previous instance cannot be "
210 f"reused ({reason})."
211 )
212 executor.shutdown(wait=True, kill_workers=kill_workers)
213 _executor = executor = _executor_kwargs = None
214 # Recursive call to build a new instance
215 return cls.get_reusable_executor(
216 max_workers=max_workers, **kwargs
217 )
218 else:
219 mp.util.debug(
220 "Reusing existing executor with "
221 f"max_workers={executor._max_workers}."
222 )
223 is_reused = True
224 executor._resize(max_workers)
225
226 return executor, is_reused
227
228 def submit(self, fn, *args, **kwargs):
229 with self._submit_resize_lock:
230 return super().submit(fn, *args, **kwargs)
231
232 def _resize(self, max_workers):
233 with self._submit_resize_lock:
234 if max_workers is None:
235 raise ValueError("Trying to resize with max_workers=None")
236 elif max_workers == self._max_workers:
237 return
238
239 if self._executor_manager_thread is None:
240 # If the executor_manager_thread has not been started
241 # then no processes have been spawned and we can just
242 # update _max_workers and return
243 self._max_workers = max_workers
244 return
245
246 self._wait_job_completion()
247
248 # Some process might have returned due to timeout so check how many
249 # children are still alive. Use the _process_management_lock to
250 # ensure that no process are spawned or timeout during the resize.
251 with self._processes_management_lock:
252 processes = list(self._processes.values())
253 nb_children_alive = sum(p.is_alive() for p in processes)
254 self._max_workers = max_workers
255 for _ in range(max_workers, nb_children_alive):
256 self._call_queue.put(None)
257 while (
258 len(self._processes) > max_workers and not self._flags.broken
259 ):
260 time.sleep(1e-3)
261
262 self._adjust_process_count()
263 processes = list(self._processes.values())
264 while not all(p.is_alive() for p in processes):
265 time.sleep(1e-3)
266
267 def _wait_job_completion(self):
268 """Wait for the cache to be empty before resizing the pool."""
269 # Issue a warning to the user about the bad effect of this usage.
270 if self._pending_work_items:
271 warnings.warn(
272 "Trying to resize an executor with running jobs: "
273 "waiting for jobs completion before resizing.",
274 UserWarning,
275 )
276 mp.util.debug(
277 f"Executor {self.executor_id} waiting for jobs completion "
278 "before resizing"
279 )
280 # Wait for the completion of the jobs
281 while self._pending_work_items:
282 time.sleep(1e-3)
283
284 def _setup_queues(self, job_reducers, result_reducers):
285 # As this executor can be resized, use a large queue size to avoid
286 # underestimating capacity and introducing overhead
287 # Also handle the case where the user set max_workers to a value larger
288 # than cpu_count(), to avoid limiting the number of parallel jobs.
289
290 min_queue_size = max(cpu_count(), self._max_workers)
291 self.queue_size = 2 * min_queue_size + EXTRA_QUEUED_CALLS
292 super()._setup_queues(
293 job_reducers, result_reducers, queue_size=self.queue_size
294 )