Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/pool.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Custom implementation of multiprocessing.Pool with custom pickler.
3This module provides efficient ways of working with data stored in
4shared memory with numpy.memmap arrays without inducing any memory
5copy between the parent and child processes.
7This module should not be imported if multiprocessing is not
8available as it implements subclasses of multiprocessing Pool
9that uses a custom alternative to SimpleQueue.
11"""
12# Author: Olivier Grisel <olivier.grisel@ensta.org>
13# Copyright: 2012, Olivier Grisel
14# License: BSD 3 clause
16import copyreg
17import sys
18import warnings
19from time import sleep
21try:
22 WindowsError
23except NameError:
24 WindowsError = type(None)
26from io import BytesIO
28# We need the class definition to derive from it, not the multiprocessing.Pool
29# factory function
30from multiprocessing.pool import Pool
31from pickle import HIGHEST_PROTOCOL, Pickler
33from ._memmapping_reducer import TemporaryResourcesManager, get_memmapping_reducers
34from ._multiprocessing_helpers import assert_spawning, mp
36try:
37 import numpy as np
38except ImportError:
39 np = None
42###############################################################################
43# Enable custom pickling in Pool queues
46class CustomizablePickler(Pickler):
47 """Pickler that accepts custom reducers.
49 TODO python2_drop : can this be simplified ?
51 HIGHEST_PROTOCOL is selected by default as this pickler is used
52 to pickle ephemeral datastructures for interprocess communication
53 hence no backward compatibility is required.
55 `reducers` is expected to be a dictionary with key/values
56 being `(type, callable)` pairs where `callable` is a function that
57 give an instance of `type` will return a tuple `(constructor,
58 tuple_of_objects)` to rebuild an instance out of the pickled
59 `tuple_of_objects` as would return a `__reduce__` method. See the
60 standard library documentation on pickling for more details.
62 """
64 # We override the pure Python pickler as its the only way to be able to
65 # customize the dispatch table without side effects in Python 2.7
66 # to 3.2. For Python 3.3+ leverage the new dispatch_table
67 # feature from https://bugs.python.org/issue14166 that makes it possible
68 # to use the C implementation of the Pickler which is faster.
70 def __init__(self, writer, reducers=None, protocol=HIGHEST_PROTOCOL):
71 Pickler.__init__(self, writer, protocol=protocol)
72 if reducers is None:
73 reducers = {}
74 if hasattr(Pickler, "dispatch"):
75 # Make the dispatch registry an instance level attribute instead of
76 # a reference to the class dictionary under Python 2
77 self.dispatch = Pickler.dispatch.copy()
78 else:
79 # Under Python 3 initialize the dispatch table with a copy of the
80 # default registry
81 self.dispatch_table = copyreg.dispatch_table.copy()
82 for type, reduce_func in reducers.items():
83 self.register(type, reduce_func)
85 def register(self, type, reduce_func):
86 """Attach a reducer function to a given type in the dispatch table."""
87 if hasattr(Pickler, "dispatch"):
88 # Python 2 pickler dispatching is not explicitly customizable.
89 # Let us use a closure to workaround this limitation.
90 def dispatcher(self, obj):
91 reduced = reduce_func(obj)
92 self.save_reduce(obj=obj, *reduced)
94 self.dispatch[type] = dispatcher
95 else:
96 self.dispatch_table[type] = reduce_func
99class CustomizablePicklingQueue(object):
100 """Locked Pipe implementation that uses a customizable pickler.
102 This class is an alternative to the multiprocessing implementation
103 of SimpleQueue in order to make it possible to pass custom
104 pickling reducers, for instance to avoid memory copy when passing
105 memory mapped datastructures.
107 `reducers` is expected to be a dict with key / values being
108 `(type, callable)` pairs where `callable` is a function that, given an
109 instance of `type`, will return a tuple `(constructor, tuple_of_objects)`
110 to rebuild an instance out of the pickled `tuple_of_objects` as would
111 return a `__reduce__` method.
113 See the standard library documentation on pickling for more details.
114 """
116 def __init__(self, context, reducers=None):
117 self._reducers = reducers
118 self._reader, self._writer = context.Pipe(duplex=False)
119 self._rlock = context.Lock()
120 if sys.platform == "win32":
121 self._wlock = None
122 else:
123 self._wlock = context.Lock()
124 self._make_methods()
126 def __getstate__(self):
127 assert_spawning(self)
128 return (self._reader, self._writer, self._rlock, self._wlock, self._reducers)
130 def __setstate__(self, state):
131 (self._reader, self._writer, self._rlock, self._wlock, self._reducers) = state
132 self._make_methods()
134 def empty(self):
135 return not self._reader.poll()
137 def _make_methods(self):
138 self._recv = recv = self._reader.recv
139 racquire, rrelease = self._rlock.acquire, self._rlock.release
141 def get():
142 racquire()
143 try:
144 return recv()
145 finally:
146 rrelease()
148 self.get = get
150 if self._reducers:
152 def send(obj):
153 buffer = BytesIO()
154 CustomizablePickler(buffer, self._reducers).dump(obj)
155 self._writer.send_bytes(buffer.getvalue())
157 self._send = send
158 else:
159 self._send = send = self._writer.send
160 if self._wlock is None:
161 # writes to a message oriented win32 pipe are atomic
162 self.put = send
163 else:
164 wlock_acquire, wlock_release = (self._wlock.acquire, self._wlock.release)
166 def put(obj):
167 wlock_acquire()
168 try:
169 return send(obj)
170 finally:
171 wlock_release()
173 self.put = put
176class PicklingPool(Pool):
177 """Pool implementation with customizable pickling reducers.
179 This is useful to control how data is shipped between processes
180 and makes it possible to use shared memory without useless
181 copies induces by the default pickling methods of the original
182 objects passed as arguments to dispatch.
184 `forward_reducers` and `backward_reducers` are expected to be
185 dictionaries with key/values being `(type, callable)` pairs where
186 `callable` is a function that, given an instance of `type`, will return a
187 tuple `(constructor, tuple_of_objects)` to rebuild an instance out of the
188 pickled `tuple_of_objects` as would return a `__reduce__` method.
189 See the standard library documentation about pickling for more details.
191 """
193 def __init__(
194 self, processes=None, forward_reducers=None, backward_reducers=None, **kwargs
195 ):
196 if forward_reducers is None:
197 forward_reducers = dict()
198 if backward_reducers is None:
199 backward_reducers = dict()
200 self._forward_reducers = forward_reducers
201 self._backward_reducers = backward_reducers
202 poolargs = dict(processes=processes)
203 poolargs.update(kwargs)
204 super(PicklingPool, self).__init__(**poolargs)
206 def _setup_queues(self):
207 context = getattr(self, "_ctx", mp)
208 self._inqueue = CustomizablePicklingQueue(context, self._forward_reducers)
209 self._outqueue = CustomizablePicklingQueue(context, self._backward_reducers)
210 self._quick_put = self._inqueue._send
211 self._quick_get = self._outqueue._recv
214class MemmappingPool(PicklingPool):
215 """Process pool that shares large arrays to avoid memory copy.
217 This drop-in replacement for `multiprocessing.pool.Pool` makes
218 it possible to work efficiently with shared memory in a numpy
219 context.
221 Existing instances of numpy.memmap are preserved: the child
222 suprocesses will have access to the same shared memory in the
223 original mode except for the 'w+' mode that is automatically
224 transformed as 'r+' to avoid zeroing the original data upon
225 instantiation.
227 Furthermore large arrays from the parent process are automatically
228 dumped to a temporary folder on the filesystem such as child
229 processes to access their content via memmapping (file system
230 backed shared memory).
232 Note: it is important to call the terminate method to collect
233 the temporary folder used by the pool.
235 Parameters
236 ----------
237 processes: int, optional
238 Number of worker processes running concurrently in the pool.
239 initializer: callable, optional
240 Callable executed on worker process creation.
241 initargs: tuple, optional
242 Arguments passed to the initializer callable.
243 temp_folder: (str, callable) optional
244 If str:
245 Folder to be used by the pool for memmapping large arrays
246 for sharing memory with worker processes. If None, this will try in
247 order:
248 - a folder pointed by the JOBLIB_TEMP_FOLDER environment variable,
249 - /dev/shm if the folder exists and is writable: this is a RAMdisk
250 filesystem available by default on modern Linux distributions,
251 - the default system temporary folder that can be overridden
252 with TMP, TMPDIR or TEMP environment variables, typically /tmp
253 under Unix operating systems.
254 if callable:
255 An callable in charge of dynamically resolving a temporary folder
256 for memmapping large arrays.
257 max_nbytes int or None, optional, 1e6 by default
258 Threshold on the size of arrays passed to the workers that
259 triggers automated memory mapping in temp_folder.
260 Use None to disable memmapping of large arrays.
261 mmap_mode: {'r+', 'r', 'w+', 'c'}
262 Memmapping mode for numpy arrays passed to workers.
263 See 'max_nbytes' parameter documentation for more details.
264 forward_reducers: dictionary, optional
265 Reducers used to pickle objects passed from main process to worker
266 processes: see below.
267 backward_reducers: dictionary, optional
268 Reducers used to pickle return values from workers back to the
269 main process.
270 verbose: int, optional
271 Make it possible to monitor how the communication of numpy arrays
272 with the subprocess is handled (pickling or memmapping)
273 prewarm: bool or str, optional, "auto" by default.
274 If True, force a read on newly memmapped array to make sure that OS
275 pre-cache it in memory. This can be useful to avoid concurrent disk
276 access when the same data array is passed to different worker
277 processes. If "auto" (by default), prewarm is set to True, unless the
278 Linux shared memory partition /dev/shm is available and used as temp
279 folder.
281 `forward_reducers` and `backward_reducers` are expected to be
282 dictionaries with key/values being `(type, callable)` pairs where
283 `callable` is a function that give an instance of `type` will return
284 a tuple `(constructor, tuple_of_objects)` to rebuild an instance out
285 of the pickled `tuple_of_objects` as would return a `__reduce__`
286 method. See the standard library documentation on pickling for more
287 details.
289 """
291 def __init__(
292 self,
293 processes=None,
294 temp_folder=None,
295 max_nbytes=1e6,
296 mmap_mode="r",
297 forward_reducers=None,
298 backward_reducers=None,
299 verbose=0,
300 prewarm=False,
301 **kwargs,
302 ):
303 manager = TemporaryResourcesManager(temp_folder)
304 self._temp_folder_manager = manager
306 # The usage of a temp_folder_resolver over a simple temp_folder is
307 # superfluous for multiprocessing pools, as they don't get reused, see
308 # get_memmapping_executor for more details. We still use it for code
309 # simplicity.
310 forward_reducers, backward_reducers = get_memmapping_reducers(
311 temp_folder_resolver=manager.resolve_temp_folder_name,
312 max_nbytes=max_nbytes,
313 mmap_mode=mmap_mode,
314 forward_reducers=forward_reducers,
315 backward_reducers=backward_reducers,
316 verbose=verbose,
317 unlink_on_gc_collect=False,
318 prewarm=prewarm,
319 )
321 poolargs = dict(
322 processes=processes,
323 forward_reducers=forward_reducers,
324 backward_reducers=backward_reducers,
325 )
326 poolargs.update(kwargs)
327 super(MemmappingPool, self).__init__(**poolargs)
329 def terminate(self):
330 n_retries = 10
331 for i in range(n_retries):
332 try:
333 super(MemmappingPool, self).terminate()
334 break
335 except OSError as e:
336 if isinstance(e, WindowsError):
337 # Workaround occasional "[Error 5] Access is denied" issue
338 # when trying to terminate a process under windows.
339 sleep(0.1)
340 if i + 1 == n_retries:
341 warnings.warn(
342 "Failed to terminate worker processes in"
343 " multiprocessing pool: %r" % e
344 )
346 # Clean up the temporary resources as the workers should now be off.
347 self._temp_folder_manager._clean_temporary_resources()
349 @property
350 def _temp_folder(self):
351 # Legacy property in tests. could be removed if we refactored the
352 # memmapping tests. SHOULD ONLY BE USED IN TESTS!
353 # We cache this property because it is called late in the tests - at
354 # this point, all context have been unregistered, and
355 # resolve_temp_folder_name raises an error.
356 if getattr(self, "_cached_temp_folder", None) is not None:
357 return self._cached_temp_folder
358 else:
359 self._cached_temp_folder = (
360 self._temp_folder_manager.resolve_temp_folder_name()
361 ) # noqa
362 return self._cached_temp_folder