Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/executor.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Utility function to construct a loky.ReusableExecutor with custom pickler.
3This module provides efficient ways of working with data stored in
4shared memory with numpy.memmap arrays without inducing any memory
5copy between the parent and child processes.
6"""
7# Author: Thomas Moreau <thomas.moreau.2010@gmail.com>
8# Copyright: 2017, Thomas Moreau
9# License: BSD 3 clause
11from ._memmapping_reducer import TemporaryResourcesManager, get_memmapping_reducers
12from .externals.loky.reusable_executor import _ReusablePoolExecutor
14_executor_args = None
17def get_memmapping_executor(n_jobs, **kwargs):
18 return MemmappingExecutor.get_memmapping_executor(n_jobs, **kwargs)
21class MemmappingExecutor(_ReusablePoolExecutor):
22 @classmethod
23 def get_memmapping_executor(
24 cls,
25 n_jobs,
26 timeout=300,
27 initializer=None,
28 initargs=(),
29 env=None,
30 temp_folder=None,
31 context_id=None,
32 **backend_args,
33 ):
34 """Factory for ReusableExecutor with automatic memmapping for large
35 numpy arrays.
36 """
37 global _executor_args
38 # Check if we can reuse the executor here instead of deferring the test
39 # to loky as the reducers are objects that changes at each call.
40 executor_args = backend_args.copy()
41 executor_args.update(env if env else {})
42 executor_args.update(
43 dict(timeout=timeout, initializer=initializer, initargs=initargs)
44 )
45 reuse = _executor_args is None or _executor_args == executor_args
46 _executor_args = executor_args
48 manager = TemporaryResourcesManager(temp_folder)
50 # reducers access the temporary folder in which to store temporary
51 # pickles through a call to manager.resolve_temp_folder_name. resolving
52 # the folder name dynamically is useful to use different folders across
53 # calls of a same reusable executor
54 job_reducers, result_reducers = get_memmapping_reducers(
55 unlink_on_gc_collect=True,
56 temp_folder_resolver=manager.resolve_temp_folder_name,
57 **backend_args,
58 )
59 _executor, executor_is_reused = super().get_reusable_executor(
60 n_jobs,
61 job_reducers=job_reducers,
62 result_reducers=result_reducers,
63 reuse=reuse,
64 timeout=timeout,
65 initializer=initializer,
66 initargs=initargs,
67 env=env,
68 )
70 if not executor_is_reused:
71 # Only set a _temp_folder_manager for new executors. Reused
72 # executors already have a _temporary_folder_manager that must not
73 # be re-assigned like that because it is referenced in various
74 # places in the reducing machinery of the executor.
75 _executor._temp_folder_manager = manager
77 if context_id is not None:
78 # Only register the specified context once we know which manager
79 # the current executor is using, in order to not register an atexit
80 # finalizer twice for the same folder.
81 _executor._temp_folder_manager.register_new_context(context_id)
83 return _executor
85 def terminate(self, kill_workers=False):
86 self.shutdown(kill_workers=kill_workers)
88 # When workers are killed in a brutal manner, they cannot execute the
89 # finalizer of their shared memmaps. The refcount of those memmaps may
90 # be off by an unknown number, so instead of decref'ing them, we force
91 # delete the whole temporary folder, and unregister them. There is no
92 # risk of PermissionError at folder deletion because at this
93 # point, all child processes are dead, so all references to temporary
94 # memmaps are closed. Otherwise, just try to delete as much as possible
95 # with allow_non_empty=True but if we can't, it will be clean up later
96 # on by the resource_tracker.
97 with self._submit_resize_lock:
98 self._temp_folder_manager._clean_temporary_resources(
99 force=kill_workers, allow_non_empty=True
100 )
102 @property
103 def _temp_folder(self):
104 # Legacy property in tests. could be removed if we refactored the
105 # memmapping tests. SHOULD ONLY BE USED IN TESTS!
106 # We cache this property because it is called late in the tests - at
107 # this point, all context have been unregistered, and
108 # resolve_temp_folder_name raises an error.
109 if getattr(self, "_cached_temp_folder", None) is not None:
110 return self._cached_temp_folder
111 else:
112 self._cached_temp_folder = (
113 self._temp_folder_manager.resolve_temp_folder_name()
114 ) # noqa
115 return self._cached_temp_folder
118class _TestingMemmappingExecutor(MemmappingExecutor):
119 """Wrapper around ReusableExecutor to ease memmapping testing with Pool
120 and Executor. This is only for testing purposes.
122 """
124 def apply_async(self, func, args):
125 """Schedule a func to be run"""
126 future = self.submit(func, *args)
127 future.get = future.result
128 return future
130 def map(self, f, *args):
131 return list(super().map(f, *args))