Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/pool.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Custom implementation of multiprocessing.Pool with custom pickler.
3This module provides efficient ways of working with data stored in
4shared memory with numpy.memmap arrays without inducing any memory
5copy between the parent and child processes.
7This module should not be imported if multiprocessing is not
8available as it implements subclasses of multiprocessing Pool
9that uses a custom alternative to SimpleQueue.
11"""
12# Author: Olivier Grisel <olivier.grisel@ensta.org>
13# Copyright: 2012, Olivier Grisel
14# License: BSD 3 clause
16import copyreg
17import sys
18import warnings
19from time import sleep
21try:
22 WindowsError
23except NameError:
24 WindowsError = type(None)
26from pickle import Pickler
28from pickle import HIGHEST_PROTOCOL
29from io import BytesIO
31from ._memmapping_reducer import get_memmapping_reducers
32from ._memmapping_reducer import TemporaryResourcesManager
33from ._multiprocessing_helpers import mp, assert_spawning
35# We need the class definition to derive from it, not the multiprocessing.Pool
36# factory function
37from multiprocessing.pool import Pool
39try:
40 import numpy as np
41except ImportError:
42 np = None
45###############################################################################
46# Enable custom pickling in Pool queues
48class CustomizablePickler(Pickler):
49 """Pickler that accepts custom reducers.
51 TODO python2_drop : can this be simplified ?
53 HIGHEST_PROTOCOL is selected by default as this pickler is used
54 to pickle ephemeral datastructures for interprocess communication
55 hence no backward compatibility is required.
57 `reducers` is expected to be a dictionary with key/values
58 being `(type, callable)` pairs where `callable` is a function that
59 give an instance of `type` will return a tuple `(constructor,
60 tuple_of_objects)` to rebuild an instance out of the pickled
61 `tuple_of_objects` as would return a `__reduce__` method. See the
62 standard library documentation on pickling for more details.
64 """
66 # We override the pure Python pickler as its the only way to be able to
67 # customize the dispatch table without side effects in Python 2.7
68 # to 3.2. For Python 3.3+ leverage the new dispatch_table
69 # feature from https://bugs.python.org/issue14166 that makes it possible
70 # to use the C implementation of the Pickler which is faster.
72 def __init__(self, writer, reducers=None, protocol=HIGHEST_PROTOCOL):
73 Pickler.__init__(self, writer, protocol=protocol)
74 if reducers is None:
75 reducers = {}
76 if hasattr(Pickler, 'dispatch'):
77 # Make the dispatch registry an instance level attribute instead of
78 # a reference to the class dictionary under Python 2
79 self.dispatch = Pickler.dispatch.copy()
80 else:
81 # Under Python 3 initialize the dispatch table with a copy of the
82 # default registry
83 self.dispatch_table = copyreg.dispatch_table.copy()
84 for type, reduce_func in reducers.items():
85 self.register(type, reduce_func)
87 def register(self, type, reduce_func):
88 """Attach a reducer function to a given type in the dispatch table."""
89 if hasattr(Pickler, 'dispatch'):
90 # Python 2 pickler dispatching is not explicitly customizable.
91 # Let us use a closure to workaround this limitation.
92 def dispatcher(self, obj):
93 reduced = reduce_func(obj)
94 self.save_reduce(obj=obj, *reduced)
95 self.dispatch[type] = dispatcher
96 else:
97 self.dispatch_table[type] = reduce_func
100class CustomizablePicklingQueue(object):
101 """Locked Pipe implementation that uses a customizable pickler.
103 This class is an alternative to the multiprocessing implementation
104 of SimpleQueue in order to make it possible to pass custom
105 pickling reducers, for instance to avoid memory copy when passing
106 memory mapped datastructures.
108 `reducers` is expected to be a dict with key / values being
109 `(type, callable)` pairs where `callable` is a function that, given an
110 instance of `type`, will return a tuple `(constructor, tuple_of_objects)`
111 to rebuild an instance out of the pickled `tuple_of_objects` as would
112 return a `__reduce__` method.
114 See the standard library documentation on pickling for more details.
115 """
117 def __init__(self, context, reducers=None):
118 self._reducers = reducers
119 self._reader, self._writer = context.Pipe(duplex=False)
120 self._rlock = context.Lock()
121 if sys.platform == 'win32':
122 self._wlock = None
123 else:
124 self._wlock = context.Lock()
125 self._make_methods()
127 def __getstate__(self):
128 assert_spawning(self)
129 return (self._reader, self._writer, self._rlock, self._wlock,
130 self._reducers)
132 def __setstate__(self, state):
133 (self._reader, self._writer, self._rlock, self._wlock,
134 self._reducers) = state
135 self._make_methods()
137 def empty(self):
138 return not self._reader.poll()
140 def _make_methods(self):
141 self._recv = recv = self._reader.recv
142 racquire, rrelease = self._rlock.acquire, self._rlock.release
144 def get():
145 racquire()
146 try:
147 return recv()
148 finally:
149 rrelease()
151 self.get = get
153 if self._reducers:
154 def send(obj):
155 buffer = BytesIO()
156 CustomizablePickler(buffer, self._reducers).dump(obj)
157 self._writer.send_bytes(buffer.getvalue())
158 self._send = send
159 else:
160 self._send = send = self._writer.send
161 if self._wlock is None:
162 # writes to a message oriented win32 pipe are atomic
163 self.put = send
164 else:
165 wlock_acquire, wlock_release = (
166 self._wlock.acquire, self._wlock.release)
168 def put(obj):
169 wlock_acquire()
170 try:
171 return send(obj)
172 finally:
173 wlock_release()
175 self.put = put
178class PicklingPool(Pool):
179 """Pool implementation with customizable pickling reducers.
181 This is useful to control how data is shipped between processes
182 and makes it possible to use shared memory without useless
183 copies induces by the default pickling methods of the original
184 objects passed as arguments to dispatch.
186 `forward_reducers` and `backward_reducers` are expected to be
187 dictionaries with key/values being `(type, callable)` pairs where
188 `callable` is a function that, given an instance of `type`, will return a
189 tuple `(constructor, tuple_of_objects)` to rebuild an instance out of the
190 pickled `tuple_of_objects` as would return a `__reduce__` method.
191 See the standard library documentation about pickling for more details.
193 """
195 def __init__(self, processes=None, forward_reducers=None,
196 backward_reducers=None, **kwargs):
197 if forward_reducers is None:
198 forward_reducers = dict()
199 if backward_reducers is None:
200 backward_reducers = dict()
201 self._forward_reducers = forward_reducers
202 self._backward_reducers = backward_reducers
203 poolargs = dict(processes=processes)
204 poolargs.update(kwargs)
205 super(PicklingPool, self).__init__(**poolargs)
207 def _setup_queues(self):
208 context = getattr(self, '_ctx', mp)
209 self._inqueue = CustomizablePicklingQueue(context,
210 self._forward_reducers)
211 self._outqueue = CustomizablePicklingQueue(context,
212 self._backward_reducers)
213 self._quick_put = self._inqueue._send
214 self._quick_get = self._outqueue._recv
217class MemmappingPool(PicklingPool):
218 """Process pool that shares large arrays to avoid memory copy.
220 This drop-in replacement for `multiprocessing.pool.Pool` makes
221 it possible to work efficiently with shared memory in a numpy
222 context.
224 Existing instances of numpy.memmap are preserved: the child
225 suprocesses will have access to the same shared memory in the
226 original mode except for the 'w+' mode that is automatically
227 transformed as 'r+' to avoid zeroing the original data upon
228 instantiation.
230 Furthermore large arrays from the parent process are automatically
231 dumped to a temporary folder on the filesystem such as child
232 processes to access their content via memmapping (file system
233 backed shared memory).
235 Note: it is important to call the terminate method to collect
236 the temporary folder used by the pool.
238 Parameters
239 ----------
240 processes: int, optional
241 Number of worker processes running concurrently in the pool.
242 initializer: callable, optional
243 Callable executed on worker process creation.
244 initargs: tuple, optional
245 Arguments passed to the initializer callable.
246 temp_folder: (str, callable) optional
247 If str:
248 Folder to be used by the pool for memmapping large arrays
249 for sharing memory with worker processes. If None, this will try in
250 order:
251 - a folder pointed by the JOBLIB_TEMP_FOLDER environment variable,
252 - /dev/shm if the folder exists and is writable: this is a RAMdisk
253 filesystem available by default on modern Linux distributions,
254 - the default system temporary folder that can be overridden
255 with TMP, TMPDIR or TEMP environment variables, typically /tmp
256 under Unix operating systems.
257 if callable:
258 An callable in charge of dynamically resolving a temporary folder
259 for memmapping large arrays.
260 max_nbytes int or None, optional, 1e6 by default
261 Threshold on the size of arrays passed to the workers that
262 triggers automated memory mapping in temp_folder.
263 Use None to disable memmapping of large arrays.
264 mmap_mode: {'r+', 'r', 'w+', 'c'}
265 Memmapping mode for numpy arrays passed to workers.
266 See 'max_nbytes' parameter documentation for more details.
267 forward_reducers: dictionary, optional
268 Reducers used to pickle objects passed from main process to worker
269 processes: see below.
270 backward_reducers: dictionary, optional
271 Reducers used to pickle return values from workers back to the
272 main process.
273 verbose: int, optional
274 Make it possible to monitor how the communication of numpy arrays
275 with the subprocess is handled (pickling or memmapping)
276 prewarm: bool or str, optional, "auto" by default.
277 If True, force a read on newly memmapped array to make sure that OS
278 pre-cache it in memory. This can be useful to avoid concurrent disk
279 access when the same data array is passed to different worker
280 processes. If "auto" (by default), prewarm is set to True, unless the
281 Linux shared memory partition /dev/shm is available and used as temp
282 folder.
284 `forward_reducers` and `backward_reducers` are expected to be
285 dictionaries with key/values being `(type, callable)` pairs where
286 `callable` is a function that give an instance of `type` will return
287 a tuple `(constructor, tuple_of_objects)` to rebuild an instance out
288 of the pickled `tuple_of_objects` as would return a `__reduce__`
289 method. See the standard library documentation on pickling for more
290 details.
292 """
294 def __init__(self, processes=None, temp_folder=None, max_nbytes=1e6,
295 mmap_mode='r', forward_reducers=None, backward_reducers=None,
296 verbose=0, context_id=None, prewarm=False, **kwargs):
298 if context_id is not None:
299 warnings.warn('context_id is deprecated and ignored in joblib'
300 ' 0.9.4 and will be removed in 0.11',
301 DeprecationWarning)
303 manager = TemporaryResourcesManager(temp_folder)
304 self._temp_folder_manager = manager
306 # The usage of a temp_folder_resolver over a simple temp_folder is
307 # superfluous for multiprocessing pools, as they don't get reused, see
308 # get_memmapping_executor for more details. We still use it for code
309 # simplicity.
310 forward_reducers, backward_reducers = \
311 get_memmapping_reducers(
312 temp_folder_resolver=manager.resolve_temp_folder_name,
313 max_nbytes=max_nbytes, mmap_mode=mmap_mode,
314 forward_reducers=forward_reducers,
315 backward_reducers=backward_reducers, verbose=verbose,
316 unlink_on_gc_collect=False, prewarm=prewarm)
318 poolargs = dict(
319 processes=processes,
320 forward_reducers=forward_reducers,
321 backward_reducers=backward_reducers)
322 poolargs.update(kwargs)
323 super(MemmappingPool, self).__init__(**poolargs)
325 def terminate(self):
326 n_retries = 10
327 for i in range(n_retries):
328 try:
329 super(MemmappingPool, self).terminate()
330 break
331 except OSError as e:
332 if isinstance(e, WindowsError):
333 # Workaround occasional "[Error 5] Access is denied" issue
334 # when trying to terminate a process under windows.
335 sleep(0.1)
336 if i + 1 == n_retries:
337 warnings.warn("Failed to terminate worker processes in"
338 " multiprocessing pool: %r" % e)
340 # Clean up the temporary resources as the workers should now be off.
341 self._temp_folder_manager._clean_temporary_resources()
343 @property
344 def _temp_folder(self):
345 # Legacy property in tests. could be removed if we refactored the
346 # memmapping tests. SHOULD ONLY BE USED IN TESTS!
347 # We cache this property because it is called late in the tests - at
348 # this point, all context have been unregistered, and
349 # resolve_temp_folder_name raises an error.
350 if getattr(self, '_cached_temp_folder', None) is not None:
351 return self._cached_temp_folder
352 else:
353 self._cached_temp_folder = self._temp_folder_manager.resolve_temp_folder_name() # noqa
354 return self._cached_temp_folder