1"""
2Backends for embarrassingly parallel code.
3"""
4
5import gc
6import os
7import warnings
8import threading
9import contextlib
10from abc import ABCMeta, abstractmethod
11
12from ._utils import (
13 _TracebackCapturingWrapper,
14 _retrieve_traceback_capturing_wrapped_call
15)
16
17from ._multiprocessing_helpers import mp
18
19if mp is not None:
20 from .pool import MemmappingPool
21 from multiprocessing.pool import ThreadPool
22 from .executor import get_memmapping_executor
23
24 # Import loky only if multiprocessing is present
25 from .externals.loky import process_executor, cpu_count
26 from .externals.loky.process_executor import ShutdownExecutorError
27
28
29class ParallelBackendBase(metaclass=ABCMeta):
30 """Helper abc which defines all methods a ParallelBackend must implement"""
31
32 supports_inner_max_num_threads = False
33 supports_retrieve_callback = False
34 default_n_jobs = 1
35
36 @property
37 def supports_return_generator(self):
38 return self.supports_retrieve_callback
39
40 @property
41 def supports_timeout(self):
42 return self.supports_retrieve_callback
43
44 nesting_level = None
45
46 def __init__(self, nesting_level=None, inner_max_num_threads=None,
47 **kwargs):
48 super().__init__(**kwargs)
49 self.nesting_level = nesting_level
50 self.inner_max_num_threads = inner_max_num_threads
51
52 MAX_NUM_THREADS_VARS = [
53 'OMP_NUM_THREADS', 'OPENBLAS_NUM_THREADS', 'MKL_NUM_THREADS',
54 'BLIS_NUM_THREADS', 'VECLIB_MAXIMUM_THREADS', 'NUMBA_NUM_THREADS',
55 'NUMEXPR_NUM_THREADS',
56 ]
57
58 TBB_ENABLE_IPC_VAR = "ENABLE_IPC"
59
60 @abstractmethod
61 def effective_n_jobs(self, n_jobs):
62 """Determine the number of jobs that can actually run in parallel
63
64 n_jobs is the number of workers requested by the callers. Passing
65 n_jobs=-1 means requesting all available workers for instance matching
66 the number of CPU cores on the worker host(s).
67
68 This method should return a guesstimate of the number of workers that
69 can actually perform work concurrently. The primary use case is to make
70 it possible for the caller to know in how many chunks to slice the
71 work.
72
73 In general working on larger data chunks is more efficient (less
74 scheduling overhead and better use of CPU cache prefetching heuristics)
75 as long as all the workers have enough work to do.
76 """
77
78 @abstractmethod
79 def apply_async(self, func, callback=None):
80 """Schedule a func to be run"""
81
82 def retrieve_result_callback(self, out):
83 """Called within the callback function passed in apply_async.
84
85 The argument of this function is the argument given to a callback in
86 the considered backend. It is supposed to return the outcome of a task
87 if it succeeded or raise the exception if it failed.
88 """
89
90 def configure(self, n_jobs=1, parallel=None, prefer=None, require=None,
91 **backend_args):
92 """Reconfigure the backend and return the number of workers.
93
94 This makes it possible to reuse an existing backend instance for
95 successive independent calls to Parallel with different parameters.
96 """
97 self.parallel = parallel
98 return self.effective_n_jobs(n_jobs)
99
100 def start_call(self):
101 """Call-back method called at the beginning of a Parallel call"""
102
103 def stop_call(self):
104 """Call-back method called at the end of a Parallel call"""
105
106 def terminate(self):
107 """Shutdown the workers and free the shared memory."""
108
109 def compute_batch_size(self):
110 """Determine the optimal batch size"""
111 return 1
112
113 def batch_completed(self, batch_size, duration):
114 """Callback indicate how long it took to run a batch"""
115
116 def get_exceptions(self):
117 """List of exception types to be captured."""
118 return []
119
120 def abort_everything(self, ensure_ready=True):
121 """Abort any running tasks
122
123 This is called when an exception has been raised when executing a task
124 and all the remaining tasks will be ignored and can therefore be
125 aborted to spare computation resources.
126
127 If ensure_ready is True, the backend should be left in an operating
128 state as future tasks might be re-submitted via that same backend
129 instance.
130
131 If ensure_ready is False, the implementer of this method can decide
132 to leave the backend in a closed / terminated state as no new task
133 are expected to be submitted to this backend.
134
135 Setting ensure_ready to False is an optimization that can be leveraged
136 when aborting tasks via killing processes from a local process pool
137 managed by the backend it-self: if we expect no new tasks, there is no
138 point in re-creating new workers.
139 """
140 # Does nothing by default: to be overridden in subclasses when
141 # canceling tasks is possible.
142 pass
143
144 def get_nested_backend(self):
145 """Backend instance to be used by nested Parallel calls.
146
147 By default a thread-based backend is used for the first level of
148 nesting. Beyond, switch to sequential backend to avoid spawning too
149 many threads on the host.
150 """
151 nesting_level = getattr(self, 'nesting_level', 0) + 1
152 if nesting_level > 1:
153 return SequentialBackend(nesting_level=nesting_level), None
154 else:
155 return ThreadingBackend(nesting_level=nesting_level), None
156
157 @contextlib.contextmanager
158 def retrieval_context(self):
159 """Context manager to manage an execution context.
160
161 Calls to Parallel.retrieve will be made inside this context.
162
163 By default, this does nothing. It may be useful for subclasses to
164 handle nested parallelism. In particular, it may be required to avoid
165 deadlocks if a backend manages a fixed number of workers, when those
166 workers may be asked to do nested Parallel calls. Without
167 'retrieval_context' this could lead to deadlock, as all the workers
168 managed by the backend may be "busy" waiting for the nested parallel
169 calls to finish, but the backend has no free workers to execute those
170 tasks.
171 """
172 yield
173
174 def _prepare_worker_env(self, n_jobs):
175 """Return environment variables limiting threadpools in external libs.
176
177 This function return a dict containing environment variables to pass
178 when creating a pool of process. These environment variables limit the
179 number of threads to `n_threads` for OpenMP, MKL, Accelerated and
180 OpenBLAS libraries in the child processes.
181 """
182 explicit_n_threads = self.inner_max_num_threads
183 default_n_threads = max(cpu_count() // n_jobs, 1)
184
185 # Set the inner environment variables to self.inner_max_num_threads if
186 # it is given. Else, default to cpu_count // n_jobs unless the variable
187 # is already present in the parent process environment.
188 env = {}
189 for var in self.MAX_NUM_THREADS_VARS:
190 if explicit_n_threads is None:
191 var_value = os.environ.get(var, default_n_threads)
192 else:
193 var_value = explicit_n_threads
194
195 env[var] = str(var_value)
196
197 if self.TBB_ENABLE_IPC_VAR not in os.environ:
198 # To avoid over-subscription when using TBB, let the TBB schedulers
199 # use Inter Process Communication to coordinate:
200 env[self.TBB_ENABLE_IPC_VAR] = "1"
201 return env
202
203 @staticmethod
204 def in_main_thread():
205 return isinstance(threading.current_thread(), threading._MainThread)
206
207
208class SequentialBackend(ParallelBackendBase):
209 """A ParallelBackend which will execute all batches sequentially.
210
211 Does not use/create any threading objects, and hence has minimal
212 overhead. Used when n_jobs == 1.
213 """
214
215 uses_threads = True
216 supports_timeout = False
217 supports_retrieve_callback = False
218 supports_sharedmem = True
219
220 def effective_n_jobs(self, n_jobs):
221 """Determine the number of jobs which are going to run in parallel"""
222 if n_jobs == 0:
223 raise ValueError('n_jobs == 0 in Parallel has no meaning')
224 return 1
225
226 def apply_async(self, func, callback=None):
227 """Schedule a func to be run"""
228 raise RuntimeError("Should never be called for SequentialBackend.")
229
230 def retrieve_result_callback(self, out):
231 raise RuntimeError("Should never be called for SequentialBackend.")
232
233 def get_nested_backend(self):
234 # import is not top level to avoid cyclic import errors.
235 from .parallel import get_active_backend
236
237 # SequentialBackend should neither change the nesting level, the
238 # default backend or the number of jobs. Just return the current one.
239 return get_active_backend()
240
241
242class PoolManagerMixin(object):
243 """A helper class for managing pool of workers."""
244
245 _pool = None
246
247 def effective_n_jobs(self, n_jobs):
248 """Determine the number of jobs which are going to run in parallel"""
249 if n_jobs == 0:
250 raise ValueError('n_jobs == 0 in Parallel has no meaning')
251 elif mp is None or n_jobs is None:
252 # multiprocessing is not available or disabled, fallback
253 # to sequential mode
254 return 1
255 elif n_jobs < 0:
256 n_jobs = max(cpu_count() + 1 + n_jobs, 1)
257 return n_jobs
258
259 def terminate(self):
260 """Shutdown the process or thread pool"""
261 if self._pool is not None:
262 self._pool.close()
263 self._pool.terminate() # terminate does a join()
264 self._pool = None
265
266 def _get_pool(self):
267 """Used by apply_async to make it possible to implement lazy init"""
268 return self._pool
269
270 def apply_async(self, func, callback=None):
271 """Schedule a func to be run"""
272 # Here, we need a wrapper to avoid crashes on KeyboardInterruptErrors.
273 # We also call the callback on error, to make sure the pool does not
274 # wait on crashed jobs.
275 return self._get_pool().apply_async(
276 _TracebackCapturingWrapper(func), (),
277 callback=callback, error_callback=callback
278 )
279
280 def retrieve_result_callback(self, out):
281 """Mimic concurrent.futures results, raising an error if needed."""
282 return _retrieve_traceback_capturing_wrapped_call(out)
283
284 def abort_everything(self, ensure_ready=True):
285 """Shutdown the pool and restart a new one with the same parameters"""
286 self.terminate()
287 if ensure_ready:
288 self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel,
289 **self.parallel._backend_args)
290
291
292class AutoBatchingMixin(object):
293 """A helper class for automagically batching jobs."""
294
295 # In seconds, should be big enough to hide multiprocessing dispatching
296 # overhead.
297 # This settings was found by running benchmarks/bench_auto_batching.py
298 # with various parameters on various platforms.
299 MIN_IDEAL_BATCH_DURATION = .2
300
301 # Should not be too high to avoid stragglers: long jobs running alone
302 # on a single worker while other workers have no work to process any more.
303 MAX_IDEAL_BATCH_DURATION = 2
304
305 # Batching counters default values
306 _DEFAULT_EFFECTIVE_BATCH_SIZE = 1
307 _DEFAULT_SMOOTHED_BATCH_DURATION = 0.0
308
309 def __init__(self, **kwargs):
310 super().__init__(**kwargs)
311 self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE
312 self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION
313
314 def compute_batch_size(self):
315 """Determine the optimal batch size"""
316 old_batch_size = self._effective_batch_size
317 batch_duration = self._smoothed_batch_duration
318 if (batch_duration > 0 and
319 batch_duration < self.MIN_IDEAL_BATCH_DURATION):
320 # The current batch size is too small: the duration of the
321 # processing of a batch of task is not large enough to hide
322 # the scheduling overhead.
323 ideal_batch_size = int(old_batch_size *
324 self.MIN_IDEAL_BATCH_DURATION /
325 batch_duration)
326 # Multiply by two to limit oscilations between min and max.
327 ideal_batch_size *= 2
328
329 # dont increase the batch size too fast to limit huge batch sizes
330 # potentially leading to starving worker
331 batch_size = min(2 * old_batch_size, ideal_batch_size)
332
333 batch_size = max(batch_size, 1)
334
335 self._effective_batch_size = batch_size
336 if self.parallel.verbose >= 10:
337 self.parallel._print(
338 f"Batch computation too fast ({batch_duration}s.) "
339 f"Setting batch_size={batch_size}."
340 )
341 elif (batch_duration > self.MAX_IDEAL_BATCH_DURATION and
342 old_batch_size >= 2):
343 # The current batch size is too big. If we schedule overly long
344 # running batches some CPUs might wait with nothing left to do
345 # while a couple of CPUs a left processing a few long running
346 # batches. Better reduce the batch size a bit to limit the
347 # likelihood of scheduling such stragglers.
348
349 # decrease the batch size quickly to limit potential starving
350 ideal_batch_size = int(
351 old_batch_size * self.MIN_IDEAL_BATCH_DURATION / batch_duration
352 )
353 # Multiply by two to limit oscilations between min and max.
354 batch_size = max(2 * ideal_batch_size, 1)
355 self._effective_batch_size = batch_size
356 if self.parallel.verbose >= 10:
357 self.parallel._print(
358 f"Batch computation too slow ({batch_duration}s.) "
359 f"Setting batch_size={batch_size}."
360 )
361 else:
362 # No batch size adjustment
363 batch_size = old_batch_size
364
365 if batch_size != old_batch_size:
366 # Reset estimation of the smoothed mean batch duration: this
367 # estimate is updated in the multiprocessing apply_async
368 # CallBack as long as the batch_size is constant. Therefore
369 # we need to reset the estimate whenever we re-tune the batch
370 # size.
371 self._smoothed_batch_duration = \
372 self._DEFAULT_SMOOTHED_BATCH_DURATION
373
374 return batch_size
375
376 def batch_completed(self, batch_size, duration):
377 """Callback indicate how long it took to run a batch"""
378 if batch_size == self._effective_batch_size:
379 # Update the smoothed streaming estimate of the duration of a batch
380 # from dispatch to completion
381 old_duration = self._smoothed_batch_duration
382 if old_duration == self._DEFAULT_SMOOTHED_BATCH_DURATION:
383 # First record of duration for this batch size after the last
384 # reset.
385 new_duration = duration
386 else:
387 # Update the exponentially weighted average of the duration of
388 # batch for the current effective size.
389 new_duration = 0.8 * old_duration + 0.2 * duration
390 self._smoothed_batch_duration = new_duration
391
392 def reset_batch_stats(self):
393 """Reset batch statistics to default values.
394
395 This avoids interferences with future jobs.
396 """
397 self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE
398 self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION
399
400
401class ThreadingBackend(PoolManagerMixin, ParallelBackendBase):
402 """A ParallelBackend which will use a thread pool to execute batches in.
403
404 This is a low-overhead backend but it suffers from the Python Global
405 Interpreter Lock if the called function relies a lot on Python objects.
406 Mostly useful when the execution bottleneck is a compiled extension that
407 explicitly releases the GIL (for instance a Cython loop wrapped in a "with
408 nogil" block or an expensive call to a library such as NumPy).
409
410 The actual thread pool is lazily initialized: the actual thread pool
411 construction is delayed to the first call to apply_async.
412
413 ThreadingBackend is used as the default backend for nested calls.
414 """
415
416 supports_retrieve_callback = True
417 uses_threads = True
418 supports_sharedmem = True
419
420 def configure(self, n_jobs=1, parallel=None, **backend_args):
421 """Build a process or thread pool and return the number of workers"""
422 n_jobs = self.effective_n_jobs(n_jobs)
423 if n_jobs == 1:
424 # Avoid unnecessary overhead and use sequential backend instead.
425 raise FallbackToBackend(
426 SequentialBackend(nesting_level=self.nesting_level))
427 self.parallel = parallel
428 self._n_jobs = n_jobs
429 return n_jobs
430
431 def _get_pool(self):
432 """Lazily initialize the thread pool
433
434 The actual pool of worker threads is only initialized at the first
435 call to apply_async.
436 """
437 if self._pool is None:
438 self._pool = ThreadPool(self._n_jobs)
439 return self._pool
440
441
442class MultiprocessingBackend(PoolManagerMixin, AutoBatchingMixin,
443 ParallelBackendBase):
444 """A ParallelBackend which will use a multiprocessing.Pool.
445
446 Will introduce some communication and memory overhead when exchanging
447 input and output data with the with the worker Python processes.
448 However, does not suffer from the Python Global Interpreter Lock.
449 """
450
451 supports_retrieve_callback = True
452 supports_return_generator = False
453
454 def effective_n_jobs(self, n_jobs):
455 """Determine the number of jobs which are going to run in parallel.
456
457 This also checks if we are attempting to create a nested parallel
458 loop.
459 """
460 if mp is None:
461 return 1
462
463 if mp.current_process().daemon:
464 # Daemonic processes cannot have children
465 if n_jobs != 1:
466 if inside_dask_worker():
467 msg = (
468 "Inside a Dask worker with daemon=True, "
469 "setting n_jobs=1.\nPossible work-arounds:\n"
470 "- dask.config.set("
471 "{'distributed.worker.daemon': False})"
472 "- set the environment variable "
473 "DASK_DISTRIBUTED__WORKER__DAEMON=False\n"
474 "before creating your Dask cluster."
475 )
476 else:
477 msg = (
478 'Multiprocessing-backed parallel loops '
479 'cannot be nested, setting n_jobs=1'
480 )
481 warnings.warn(msg, stacklevel=3)
482 return 1
483
484 if process_executor._CURRENT_DEPTH > 0:
485 # Mixing loky and multiprocessing in nested loop is not supported
486 if n_jobs != 1:
487 warnings.warn(
488 'Multiprocessing-backed parallel loops cannot be nested,'
489 ' below loky, setting n_jobs=1',
490 stacklevel=3)
491 return 1
492
493 elif not (self.in_main_thread() or self.nesting_level == 0):
494 # Prevent posix fork inside in non-main posix threads
495 if n_jobs != 1:
496 warnings.warn(
497 'Multiprocessing-backed parallel loops cannot be nested'
498 ' below threads, setting n_jobs=1',
499 stacklevel=3)
500 return 1
501
502 return super(MultiprocessingBackend, self).effective_n_jobs(n_jobs)
503
504 def configure(self, n_jobs=1, parallel=None, prefer=None, require=None,
505 **memmappingpool_args):
506 """Build a process or thread pool and return the number of workers"""
507 n_jobs = self.effective_n_jobs(n_jobs)
508 if n_jobs == 1:
509 raise FallbackToBackend(
510 SequentialBackend(nesting_level=self.nesting_level))
511
512 # Make sure to free as much memory as possible before forking
513 gc.collect()
514 self._pool = MemmappingPool(n_jobs, **memmappingpool_args)
515 self.parallel = parallel
516 return n_jobs
517
518 def terminate(self):
519 """Shutdown the process or thread pool"""
520 super(MultiprocessingBackend, self).terminate()
521 self.reset_batch_stats()
522
523
524class LokyBackend(AutoBatchingMixin, ParallelBackendBase):
525 """Managing pool of workers with loky instead of multiprocessing."""
526
527 supports_retrieve_callback = True
528 supports_inner_max_num_threads = True
529
530 def configure(self, n_jobs=1, parallel=None, prefer=None, require=None,
531 idle_worker_timeout=300, **memmappingexecutor_args):
532 """Build a process executor and return the number of workers"""
533 n_jobs = self.effective_n_jobs(n_jobs)
534 if n_jobs == 1:
535 raise FallbackToBackend(
536 SequentialBackend(nesting_level=self.nesting_level))
537
538 self._workers = get_memmapping_executor(
539 n_jobs, timeout=idle_worker_timeout,
540 env=self._prepare_worker_env(n_jobs=n_jobs),
541 context_id=parallel._id, **memmappingexecutor_args)
542 self.parallel = parallel
543 return n_jobs
544
545 def effective_n_jobs(self, n_jobs):
546 """Determine the number of jobs which are going to run in parallel"""
547 if n_jobs == 0:
548 raise ValueError('n_jobs == 0 in Parallel has no meaning')
549 elif mp is None or n_jobs is None:
550 # multiprocessing is not available or disabled, fallback
551 # to sequential mode
552 return 1
553 elif mp.current_process().daemon:
554 # Daemonic processes cannot have children
555 if n_jobs != 1:
556 if inside_dask_worker():
557 msg = (
558 "Inside a Dask worker with daemon=True, "
559 "setting n_jobs=1.\nPossible work-arounds:\n"
560 "- dask.config.set("
561 "{'distributed.worker.daemon': False})\n"
562 "- set the environment variable "
563 "DASK_DISTRIBUTED__WORKER__DAEMON=False\n"
564 "before creating your Dask cluster."
565 )
566 else:
567 msg = (
568 'Loky-backed parallel loops cannot be called in a'
569 ' multiprocessing, setting n_jobs=1'
570 )
571 warnings.warn(msg, stacklevel=3)
572
573 return 1
574 elif not (self.in_main_thread() or self.nesting_level == 0):
575 # Prevent posix fork inside in non-main posix threads
576 if n_jobs != 1:
577 warnings.warn(
578 'Loky-backed parallel loops cannot be nested below '
579 'threads, setting n_jobs=1',
580 stacklevel=3)
581 return 1
582 elif n_jobs < 0:
583 n_jobs = max(cpu_count() + 1 + n_jobs, 1)
584 return n_jobs
585
586 def apply_async(self, func, callback=None):
587 """Schedule a func to be run"""
588 future = self._workers.submit(func)
589 if callback is not None:
590 future.add_done_callback(callback)
591 return future
592
593 def retrieve_result_callback(self, out):
594 try:
595 return out.result()
596 except ShutdownExecutorError:
597 raise RuntimeError(
598 "The executor underlying Parallel has been shutdown. "
599 "This is likely due to the garbage collection of a previous "
600 "generator from a call to Parallel with return_as='generator'."
601 " Make sure the generator is not garbage collected when "
602 "submitting a new job or that it is first properly exhausted."
603 )
604
605 def terminate(self):
606 if self._workers is not None:
607 # Don't terminate the workers as we want to reuse them in later
608 # calls, but cleanup the temporary resources that the Parallel call
609 # created. This 'hack' requires a private, low-level operation.
610 self._workers._temp_folder_manager._clean_temporary_resources(
611 context_id=self.parallel._id, force=False
612 )
613 self._workers = None
614
615 self.reset_batch_stats()
616
617 def abort_everything(self, ensure_ready=True):
618 """Shutdown the workers and restart a new one with the same parameters
619 """
620 self._workers.terminate(kill_workers=True)
621 self._workers = None
622
623 if ensure_ready:
624 self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel)
625
626
627class FallbackToBackend(Exception):
628 """Raised when configuration should fallback to another backend"""
629
630 def __init__(self, backend):
631 self.backend = backend
632
633
634def inside_dask_worker():
635 """Check whether the current function is executed inside a Dask worker.
636 """
637 # This function can not be in joblib._dask because there would be a
638 # circular import:
639 # _dask imports _parallel_backend that imports _dask ...
640 try:
641 from distributed import get_worker
642 except ImportError:
643 return False
644
645 try:
646 get_worker()
647 return True
648 except ValueError:
649 return False