1""" 
    2Reducer using memory mapping for numpy arrays 
    3""" 
    4# Author: Thomas Moreau <thomas.moreau.2010@gmail.com> 
    5# Copyright: 2017, Thomas Moreau 
    6# License: BSD 3 clause 
    7 
    8import atexit 
    9import errno 
    10import os 
    11import stat 
    12import tempfile 
    13import threading 
    14import time 
    15import warnings 
    16import weakref 
    17from mmap import mmap 
    18from multiprocessing import util 
    19from pickle import HIGHEST_PROTOCOL, PicklingError, dumps, loads, whichmodule 
    20from uuid import uuid4 
    21 
    22try: 
    23    WindowsError 
    24except NameError: 
    25    WindowsError = type(None) 
    26 
    27try: 
    28    import numpy as np 
    29    from numpy.lib.stride_tricks import as_strided 
    30except ImportError: 
    31    np = None 
    32 
    33from .backports import make_memmap 
    34from .disk import delete_folder 
    35from .externals.loky.backend import resource_tracker 
    36from .numpy_pickle import dump, load, load_temporary_memmap 
    37 
    38# Some system have a ramdisk mounted by default, we can use it instead of /tmp 
    39# as the default folder to dump big arrays to share with subprocesses. 
    40SYSTEM_SHARED_MEM_FS = "/dev/shm" 
    41 
    42# Minimal number of bytes available on SYSTEM_SHARED_MEM_FS to consider using 
    43# it as the default folder to dump big arrays to share with subprocesses. 
    44SYSTEM_SHARED_MEM_FS_MIN_SIZE = int(2e9) 
    45 
    46# Folder and file permissions to chmod temporary files generated by the 
    47# memmapping pool. Only the owner of the Python process can access the 
    48# temporary files and folder. 
    49FOLDER_PERMISSIONS = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR 
    50FILE_PERMISSIONS = stat.S_IRUSR | stat.S_IWUSR 
    51 
    52# Set used in joblib workers, referencing the filenames of temporary memmaps 
    53# created by joblib to speed up data communication. In child processes, we add 
    54# a finalizer to these memmaps that sends a maybe_unlink call to the 
    55# resource_tracker, in order to free main memory as fast as possible. 
    56JOBLIB_MMAPS = set() 
    57 
    58 
    59def _log_and_unlink(filename): 
    60    from .externals.loky.backend.resource_tracker import _resource_tracker 
    61 
    62    util.debug( 
    63        "[FINALIZER CALL] object mapping to {} about to be deleted," 
    64        " decrementing the refcount of the file (pid: {})".format( 
    65            os.path.basename(filename), os.getpid() 
    66        ) 
    67    ) 
    68    _resource_tracker.maybe_unlink(filename, "file") 
    69 
    70 
    71def add_maybe_unlink_finalizer(memmap): 
    72    util.debug( 
    73        "[FINALIZER ADD] adding finalizer to {} (id {}, filename {}, pid  {})".format( 
    74            type(memmap), id(memmap), os.path.basename(memmap.filename), os.getpid() 
    75        ) 
    76    ) 
    77    weakref.finalize(memmap, _log_and_unlink, memmap.filename) 
    78 
    79 
    80def unlink_file(filename): 
    81    """Wrapper around os.unlink with a retry mechanism. 
    82 
    83    The retry mechanism has been implemented primarily to overcome a race 
    84    condition happening during the finalizer of a np.memmap: when a process 
    85    holding the last reference to a mmap-backed np.memmap/np.array is about to 
    86    delete this array (and close the reference), it sends a maybe_unlink 
    87    request to the resource_tracker. This request can be processed faster than 
    88    it takes for the last reference of the memmap to be closed, yielding (on 
    89    Windows) a PermissionError in the resource_tracker loop. 
    90    """ 
    91    NUM_RETRIES = 10 
    92    for retry_no in range(1, NUM_RETRIES + 1): 
    93        try: 
    94            os.unlink(filename) 
    95            break 
    96        except PermissionError: 
    97            util.debug( 
    98                "[ResourceTracker] tried to unlink {}, got PermissionError".format( 
    99                    filename 
    100                ) 
    101            ) 
    102            if retry_no == NUM_RETRIES: 
    103                raise 
    104            else: 
    105                time.sleep(0.2) 
    106        except FileNotFoundError: 
    107            # In case of a race condition when deleting the temporary folder, 
    108            # avoid noisy FileNotFoundError exception in the resource tracker. 
    109            pass 
    110 
    111 
    112resource_tracker._CLEANUP_FUNCS["file"] = unlink_file 
    113 
    114 
    115class _WeakArrayKeyMap: 
    116    """A variant of weakref.WeakKeyDictionary for unhashable numpy arrays. 
    117 
    118    This datastructure will be used with numpy arrays as obj keys, therefore we 
    119    do not use the __get__ / __set__ methods to avoid any conflict with the 
    120    numpy fancy indexing syntax. 
    121    """ 
    122 
    123    def __init__(self): 
    124        self._data = {} 
    125 
    126    def get(self, obj): 
    127        ref, val = self._data[id(obj)] 
    128        if ref() is not obj: 
    129            # In case of race condition with on_destroy: could never be 
    130            # triggered by the joblib tests with CPython. 
    131            raise KeyError(obj) 
    132        return val 
    133 
    134    def set(self, obj, value): 
    135        key = id(obj) 
    136        try: 
    137            ref, _ = self._data[key] 
    138            if ref() is not obj: 
    139                # In case of race condition with on_destroy: could never be 
    140                # triggered by the joblib tests with CPython. 
    141                raise KeyError(obj) 
    142        except KeyError: 
    143            # Insert the new entry in the mapping along with a weakref 
    144            # callback to automatically delete the entry from the mapping 
    145            # as soon as the object used as key is garbage collected. 
    146            def on_destroy(_): 
    147                del self._data[key] 
    148 
    149            ref = weakref.ref(obj, on_destroy) 
    150        self._data[key] = ref, value 
    151 
    152    def __getstate__(self): 
    153        raise PicklingError("_WeakArrayKeyMap is not pickleable") 
    154 
    155 
    156############################################################################### 
    157# Support for efficient transient pickling of numpy data structures 
    158 
    159 
    160def _get_backing_memmap(a): 
    161    """Recursively look up the original np.memmap instance base if any.""" 
    162    b = getattr(a, "base", None) 
    163    if b is None: 
    164        # TODO: check scipy sparse datastructure if scipy is installed 
    165        # a nor its descendants do not have a memmap base 
    166        return None 
    167 
    168    elif isinstance(b, mmap): 
    169        # a is already a real memmap instance. 
    170        return a 
    171 
    172    else: 
    173        # Recursive exploration of the base ancestry 
    174        return _get_backing_memmap(b) 
    175 
    176 
    177def _get_temp_dir(pool_folder_name, temp_folder=None): 
    178    """Get the full path to a subfolder inside the temporary folder. 
    179 
    180    Parameters 
    181    ---------- 
    182    pool_folder_name : str 
    183        Sub-folder name used for the serialization of a pool instance. 
    184 
    185    temp_folder: str, optional 
    186        Folder to be used by the pool for memmapping large arrays 
    187        for sharing memory with worker processes. If None, this will try in 
    188        order: 
    189 
    190        - a folder pointed by the JOBLIB_TEMP_FOLDER environment 
    191          variable, 
    192        - /dev/shm if the folder exists and is writable: this is a 
    193          RAMdisk filesystem available by default on modern Linux 
    194          distributions, 
    195        - the default system temporary folder that can be 
    196          overridden with TMP, TMPDIR or TEMP environment 
    197          variables, typically /tmp under Unix operating systems. 
    198 
    199    Returns 
    200    ------- 
    201    pool_folder : str 
    202       full path to the temporary folder 
    203    use_shared_mem : bool 
    204       whether the temporary folder is written to the system shared memory 
    205       folder or some other temporary folder. 
    206    """ 
    207    use_shared_mem = False 
    208    if temp_folder is None: 
    209        temp_folder = os.environ.get("JOBLIB_TEMP_FOLDER", None) 
    210    if temp_folder is None: 
    211        if os.path.exists(SYSTEM_SHARED_MEM_FS) and hasattr(os, "statvfs"): 
    212            try: 
    213                shm_stats = os.statvfs(SYSTEM_SHARED_MEM_FS) 
    214                available_nbytes = shm_stats.f_bsize * shm_stats.f_bavail 
    215                if available_nbytes > SYSTEM_SHARED_MEM_FS_MIN_SIZE: 
    216                    # Try to see if we have write access to the shared mem 
    217                    # folder only if it is reasonably large (that is 2GB or 
    218                    # more). 
    219                    temp_folder = SYSTEM_SHARED_MEM_FS 
    220                    pool_folder = os.path.join(temp_folder, pool_folder_name) 
    221                    if not os.path.exists(pool_folder): 
    222                        os.makedirs(pool_folder) 
    223                    use_shared_mem = True 
    224            except (IOError, OSError): 
    225                # Missing rights in the /dev/shm partition, fallback to regular 
    226                # temp folder. 
    227                temp_folder = None 
    228    if temp_folder is None: 
    229        # Fallback to the default tmp folder, typically /tmp 
    230        temp_folder = tempfile.gettempdir() 
    231    temp_folder = os.path.abspath(os.path.expanduser(temp_folder)) 
    232    pool_folder = os.path.join(temp_folder, pool_folder_name) 
    233    return pool_folder, use_shared_mem 
    234 
    235 
    236def has_shareable_memory(a): 
    237    """Return True if a is backed by some mmap buffer directly or not.""" 
    238    return _get_backing_memmap(a) is not None 
    239 
    240 
    241def _strided_from_memmap( 
    242    filename, 
    243    dtype, 
    244    mode, 
    245    offset, 
    246    order, 
    247    shape, 
    248    strides, 
    249    total_buffer_len, 
    250    unlink_on_gc_collect, 
    251): 
    252    """Reconstruct an array view on a memory mapped file.""" 
    253    if mode == "w+": 
    254        # Do not zero the original data when unpickling 
    255        mode = "r+" 
    256 
    257    if strides is None: 
    258        # Simple, contiguous memmap 
    259        return make_memmap( 
    260            filename, 
    261            dtype=dtype, 
    262            shape=shape, 
    263            mode=mode, 
    264            offset=offset, 
    265            order=order, 
    266            unlink_on_gc_collect=unlink_on_gc_collect, 
    267        ) 
    268    else: 
    269        # For non-contiguous data, memmap the total enclosing buffer and then 
    270        # extract the non-contiguous view with the stride-tricks API 
    271        base = make_memmap( 
    272            filename, 
    273            dtype=dtype, 
    274            shape=total_buffer_len, 
    275            offset=offset, 
    276            mode=mode, 
    277            order=order, 
    278            unlink_on_gc_collect=unlink_on_gc_collect, 
    279        ) 
    280        return as_strided(base, shape=shape, strides=strides) 
    281 
    282 
    283def _reduce_memmap_backed(a, m): 
    284    """Pickling reduction for memmap backed arrays. 
    285 
    286    a is expected to be an instance of np.ndarray (or np.memmap) 
    287    m is expected to be an instance of np.memmap on the top of the ``base`` 
    288    attribute ancestry of a. ``m.base`` should be the real python mmap object. 
    289    """ 
    290    # offset that comes from the striding differences between a and m 
    291    util.debug( 
    292        "[MEMMAP REDUCE] reducing a memmap-backed array (shape, {}, pid: {})".format( 
    293            a.shape, os.getpid() 
    294        ) 
    295    ) 
    296    try: 
    297        from numpy.lib.array_utils import byte_bounds 
    298    except (ModuleNotFoundError, ImportError): 
    299        # Backward-compat for numpy < 2.0 
    300        from numpy import byte_bounds 
    301    a_start, a_end = byte_bounds(a) 
    302    m_start = byte_bounds(m)[0] 
    303    offset = a_start - m_start 
    304 
    305    # offset from the backing memmap 
    306    offset += m.offset 
    307 
    308    # 1D arrays are both F and C contiguous, so only set the flag in 
    309    # higher dimensions. See https://github.com/joblib/joblib/pull/1704. 
    310    if m.ndim > 1 and m.flags["F_CONTIGUOUS"]: 
    311        order = "F" 
    312    else: 
    313        # The backing memmap buffer is necessarily contiguous hence C if not 
    314        # Fortran 
    315        order = "C" 
    316 
    317    if a.flags["F_CONTIGUOUS"] or a.flags["C_CONTIGUOUS"]: 
    318        # If the array is a contiguous view, no need to pass the strides 
    319        strides = None 
    320        total_buffer_len = None 
    321    else: 
    322        # Compute the total number of items to map from which the strided 
    323        # view will be extracted. 
    324        strides = a.strides 
    325        total_buffer_len = (a_end - a_start) // a.itemsize 
    326 
    327    return ( 
    328        _strided_from_memmap, 
    329        ( 
    330            m.filename, 
    331            a.dtype, 
    332            m.mode, 
    333            offset, 
    334            order, 
    335            a.shape, 
    336            strides, 
    337            total_buffer_len, 
    338            False, 
    339        ), 
    340    ) 
    341 
    342 
    343def reduce_array_memmap_backward(a): 
    344    """reduce a np.array or a np.memmap from a child process""" 
    345    m = _get_backing_memmap(a) 
    346    if isinstance(m, np.memmap) and m.filename not in JOBLIB_MMAPS: 
    347        # if a is backed by a memmaped file, reconstruct a using the 
    348        # memmaped file. 
    349        return _reduce_memmap_backed(a, m) 
    350    else: 
    351        # a is either a regular (not memmap-backed) numpy array, or an array 
    352        # backed by a shared temporary file created by joblib. In the latter 
    353        # case, in order to limit the lifespan of these temporary files, we 
    354        # serialize the memmap as a regular numpy array, and decref the 
    355        # file backing the memmap (done implicitly in a previously registered 
    356        # finalizer, see ``unlink_on_gc_collect`` for more details) 
    357        return (loads, (dumps(np.asarray(a), protocol=HIGHEST_PROTOCOL),)) 
    358 
    359 
    360class ArrayMemmapForwardReducer(object): 
    361    """Reducer callable to dump large arrays to memmap files. 
    362 
    363    Parameters 
    364    ---------- 
    365    max_nbytes: int 
    366        Threshold to trigger memmapping of large arrays to files created 
    367        a folder. 
    368    temp_folder_resolver: callable 
    369        An callable in charge of resolving a temporary folder name where files 
    370        for backing memmapped arrays are created. 
    371    mmap_mode: 'r', 'r+' or 'c' 
    372        Mode for the created memmap datastructure. See the documentation of 
    373        numpy.memmap for more details. Note: 'w+' is coerced to 'r+' 
    374        automatically to avoid zeroing the data on unpickling. 
    375    verbose: int, optional, 0 by default 
    376        If verbose > 0, memmap creations are logged. 
    377        If verbose > 1, both memmap creations, reuse and array pickling are 
    378        logged. 
    379    prewarm: bool, optional, False by default. 
    380        Force a read on newly memmapped array to make sure that OS pre-cache it 
    381        memory. This can be useful to avoid concurrent disk access when the 
    382        same data array is passed to different worker processes. 
    383    """ 
    384 
    385    def __init__( 
    386        self, 
    387        max_nbytes, 
    388        temp_folder_resolver, 
    389        mmap_mode, 
    390        unlink_on_gc_collect, 
    391        verbose=0, 
    392        prewarm=True, 
    393    ): 
    394        self._max_nbytes = max_nbytes 
    395        self._temp_folder_resolver = temp_folder_resolver 
    396        self._mmap_mode = mmap_mode 
    397        self.verbose = int(verbose) 
    398        if prewarm == "auto": 
    399            self._prewarm = not self._temp_folder.startswith(SYSTEM_SHARED_MEM_FS) 
    400        else: 
    401            self._prewarm = prewarm 
    402        self._prewarm = prewarm 
    403        self._memmaped_arrays = _WeakArrayKeyMap() 
    404        self._temporary_memmaped_filenames = set() 
    405        self._unlink_on_gc_collect = unlink_on_gc_collect 
    406 
    407    @property 
    408    def _temp_folder(self): 
    409        return self._temp_folder_resolver() 
    410 
    411    def __reduce__(self): 
    412        # The ArrayMemmapForwardReducer is passed to the children processes: it 
    413        # needs to be pickled but the _WeakArrayKeyMap need to be skipped as 
    414        # it's only guaranteed to be consistent with the parent process memory 
    415        # garbage collection. 
    416        # Although this reducer is pickled, it is not needed in its destination 
    417        # process (child processes), as we only use this reducer to send 
    418        # memmaps from the parent process to the children processes. For this 
    419        # reason, we can afford skipping the resolver, (which would otherwise 
    420        # be unpicklable), and pass it as None instead. 
    421        args = (self._max_nbytes, None, self._mmap_mode, self._unlink_on_gc_collect) 
    422        kwargs = { 
    423            "verbose": self.verbose, 
    424            "prewarm": self._prewarm, 
    425        } 
    426        return ArrayMemmapForwardReducer, args, kwargs 
    427 
    428    def __call__(self, a): 
    429        m = _get_backing_memmap(a) 
    430        if m is not None and isinstance(m, np.memmap): 
    431            # a is already backed by a memmap file, let's reuse it directly 
    432            return _reduce_memmap_backed(a, m) 
    433 
    434        if ( 
    435            not a.dtype.hasobject 
    436            and self._max_nbytes is not None 
    437            and a.nbytes > self._max_nbytes 
    438        ): 
    439            # check that the folder exists (lazily create the pool temp folder 
    440            # if required) 
    441            try: 
    442                os.makedirs(self._temp_folder) 
    443                os.chmod(self._temp_folder, FOLDER_PERMISSIONS) 
    444            except OSError as e: 
    445                if e.errno != errno.EEXIST: 
    446                    raise e 
    447 
    448            try: 
    449                basename = self._memmaped_arrays.get(a) 
    450            except KeyError: 
    451                # Generate a new unique random filename. The process and thread 
    452                # ids are only useful for debugging purpose and to make it 
    453                # easier to cleanup orphaned files in case of hard process 
    454                # kill (e.g. by "kill -9" or segfault). 
    455                basename = "{}-{}-{}.pkl".format( 
    456                    os.getpid(), id(threading.current_thread()), uuid4().hex 
    457                ) 
    458                self._memmaped_arrays.set(a, basename) 
    459            filename = os.path.join(self._temp_folder, basename) 
    460 
    461            # In case the same array with the same content is passed several 
    462            # times to the pool subprocess children, serialize it only once 
    463 
    464            is_new_memmap = filename not in self._temporary_memmaped_filenames 
    465 
    466            # add the memmap to the list of temporary memmaps created by joblib 
    467            self._temporary_memmaped_filenames.add(filename) 
    468 
    469            if self._unlink_on_gc_collect: 
    470                # Bump reference count of the memmap by 1 to account for 
    471                # shared usage of the memmap by a child process. The 
    472                # corresponding decref call will be executed upon calling 
    473                # resource_tracker.maybe_unlink, registered as a finalizer in 
    474                # the child. 
    475                # the incref/decref calls here are only possible when the child 
    476                # and the parent share the same resource_tracker. It is not the 
    477                # case for the multiprocessing backend, but it does not matter 
    478                # because unlinking a memmap from a child process is only 
    479                # useful to control the memory usage of long-lasting child 
    480                # processes, while the multiprocessing-based pools terminate 
    481                # their workers at the end of a map() call. 
    482                resource_tracker.register(filename, "file") 
    483 
    484            if is_new_memmap: 
    485                # Incref each temporary memmap created by joblib one extra 
    486                # time.  This means that these memmaps will only be deleted 
    487                # once an extra maybe_unlink() is called, which is done once 
    488                # all the jobs have completed (or been canceled) in the 
    489                # Parallel._terminate_backend() method. 
    490                resource_tracker.register(filename, "file") 
    491 
    492            if not os.path.exists(filename): 
    493                util.debug( 
    494                    "[ARRAY DUMP] Pickling new array (shape={}, dtype={}) " 
    495                    "creating a new memmap at {}".format(a.shape, a.dtype, filename) 
    496                ) 
    497                for dumped_filename in dump(a, filename): 
    498                    os.chmod(dumped_filename, FILE_PERMISSIONS) 
    499 
    500                if self._prewarm: 
    501                    # Warm up the data by accessing it. This operation ensures 
    502                    # that the disk access required to create the memmapping 
    503                    # file are performed in the reducing process and avoids 
    504                    # concurrent memmap creation in multiple children 
    505                    # processes. 
    506                    load(filename, mmap_mode=self._mmap_mode).max() 
    507 
    508            else: 
    509                util.debug( 
    510                    "[ARRAY DUMP] Pickling known array (shape={}, dtype={}) " 
    511                    "reusing memmap file: {}".format( 
    512                        a.shape, a.dtype, os.path.basename(filename) 
    513                    ) 
    514                ) 
    515 
    516            # The worker process will use joblib.load to memmap the data 
    517            return ( 
    518                load_temporary_memmap, 
    519                (filename, self._mmap_mode, self._unlink_on_gc_collect), 
    520            ) 
    521        else: 
    522            # do not convert a into memmap, let pickler do its usual copy with 
    523            # the default system pickler 
    524            util.debug( 
    525                "[ARRAY DUMP] Pickling array (NO MEMMAPPING) (shape={}, " 
    526                " dtype={}).".format(a.shape, a.dtype) 
    527            ) 
    528            return (loads, (dumps(a, protocol=HIGHEST_PROTOCOL),)) 
    529 
    530 
    531def get_memmapping_reducers( 
    532    forward_reducers=None, 
    533    backward_reducers=None, 
    534    temp_folder_resolver=None, 
    535    max_nbytes=1e6, 
    536    mmap_mode="r", 
    537    verbose=0, 
    538    prewarm=False, 
    539    unlink_on_gc_collect=True, 
    540    **kwargs, 
    541): 
    542    """Construct a pair of memmapping reducer linked to a tmpdir. 
    543 
    544    This function manage the creation and the clean up of the temporary folders 
    545    underlying the memory maps and should be use to get the reducers necessary 
    546    to construct joblib pool or executor. 
    547    """ 
    548    if forward_reducers is None: 
    549        forward_reducers = dict() 
    550    if backward_reducers is None: 
    551        backward_reducers = dict() 
    552 
    553    if np is not None: 
    554        # Register smart numpy.ndarray reducers that detects memmap backed 
    555        # arrays and that is also able to dump to memmap large in-memory 
    556        # arrays over the max_nbytes threshold 
    557        forward_reduce_ndarray = ArrayMemmapForwardReducer( 
    558            max_nbytes, 
    559            temp_folder_resolver, 
    560            mmap_mode, 
    561            unlink_on_gc_collect, 
    562            verbose, 
    563            prewarm=prewarm, 
    564        ) 
    565        forward_reducers[np.ndarray] = forward_reduce_ndarray 
    566        forward_reducers[np.memmap] = forward_reduce_ndarray 
    567 
    568        # Communication from child process to the parent process always 
    569        # pickles in-memory numpy.ndarray without dumping them as memmap 
    570        # to avoid confusing the caller and make it tricky to collect the 
    571        # temporary folder 
    572        backward_reducers[np.ndarray] = reduce_array_memmap_backward 
    573        backward_reducers[np.memmap] = reduce_array_memmap_backward 
    574 
    575    return forward_reducers, backward_reducers 
    576 
    577 
    578class TemporaryResourcesManager(object): 
    579    """Stateful object able to manage temporary folder and pickles 
    580 
    581    It exposes: 
    582    - a per-context folder name resolving API that memmap-based reducers will 
    583      rely on to know where to pickle the temporary memmaps 
    584    - a temporary file/folder management API that internally uses the 
    585      resource_tracker. 
    586    """ 
    587 
    588    def __init__(self, temp_folder_root=None, context_id=None): 
    589        self._current_temp_folder = None 
    590        self._temp_folder_root = temp_folder_root 
    591        self._use_shared_mem = None 
    592        self._cached_temp_folders = dict() 
    593        self._id = uuid4().hex 
    594        self._finalizers = {} 
    595        if context_id is None: 
    596            # It would be safer to not assign a default context id (less silent 
    597            # bugs), but doing this while maintaining backward compatibility 
    598            # with the previous, context-unaware version get_memmaping_executor 
    599            # exposes too many low-level details. 
    600            context_id = uuid4().hex 
    601        self.set_current_context(context_id) 
    602 
    603    def set_current_context(self, context_id): 
    604        self._current_context_id = context_id 
    605        self.register_new_context(context_id) 
    606 
    607    def register_new_context(self, context_id): 
    608        # Prepare a sub-folder name specific to a context (usually a unique id 
    609        # generated by each instance of the Parallel class). Do not create in 
    610        # advance to spare FS write access if no array is to be dumped). 
    611        if context_id in self._cached_temp_folders: 
    612            return 
    613        else: 
    614            # During its lifecycle, one Parallel object can have several 
    615            # executors associated to it (for instance, if a loky worker raises 
    616            # an exception, joblib shutdowns the executor and instantly 
    617            # recreates a new one before raising the error - see 
    618            # ``ensure_ready``.  Because we don't want two executors tied to 
    619            # the same Parallel object (and thus the same context id) to 
    620            # register/use/delete the same folder, we also add an id specific 
    621            # to the current Manager (and thus specific to its associated 
    622            # executor) to the folder name. 
    623            new_folder_name = "joblib_memmapping_folder_{}_{}_{}".format( 
    624                os.getpid(), self._id, context_id 
    625            ) 
    626            new_folder_path, _ = _get_temp_dir(new_folder_name, self._temp_folder_root) 
    627            self.register_folder_finalizer(new_folder_path, context_id) 
    628            self._cached_temp_folders[context_id] = new_folder_path 
    629 
    630    def resolve_temp_folder_name(self): 
    631        """Return a folder name specific to the currently activated context""" 
    632        return self._cached_temp_folders[self._current_context_id] 
    633 
    634    # resource management API 
    635 
    636    def register_folder_finalizer(self, pool_subfolder, context_id): 
    637        # Register the garbage collector at program exit in case caller forgets 
    638        # to call terminate explicitly: note we do not pass any reference to 
    639        # ensure that this callback won't prevent garbage collection of 
    640        # parallel instance and related file handler resources such as POSIX 
    641        # semaphores and pipes 
    642        pool_module_name = whichmodule(delete_folder, "delete_folder") 
    643        resource_tracker.register(pool_subfolder, "folder") 
    644 
    645        def _cleanup(): 
    646            # In some cases the Python runtime seems to set delete_folder to 
    647            # None just before exiting when accessing the delete_folder 
    648            # function from the closure namespace. So instead we reimport 
    649            # the delete_folder function explicitly. 
    650            # https://github.com/joblib/joblib/issues/328 
    651            # We cannot just use from 'joblib.pool import delete_folder' 
    652            # because joblib should only use relative imports to allow 
    653            # easy vendoring. 
    654            delete_folder = __import__( 
    655                pool_module_name, fromlist=["delete_folder"] 
    656            ).delete_folder 
    657            try: 
    658                delete_folder(pool_subfolder, allow_non_empty=True) 
    659                resource_tracker.unregister(pool_subfolder, "folder") 
    660            except OSError: 
    661                warnings.warn( 
    662                    "Failed to delete temporary folder: {}".format(pool_subfolder) 
    663                ) 
    664 
    665        self._finalizers[context_id] = atexit.register(_cleanup) 
    666 
    667    def _clean_temporary_resources( 
    668        self, context_id=None, force=False, allow_non_empty=False 
    669    ): 
    670        """Clean temporary resources created by a process-based pool""" 
    671        if context_id is None: 
    672            # Iterates over a copy of the cache keys to avoid Error due to 
    673            # iterating over a changing size dictionary. 
    674            for context_id in list(self._cached_temp_folders): 
    675                self._clean_temporary_resources( 
    676                    context_id, force=force, allow_non_empty=allow_non_empty 
    677                ) 
    678        else: 
    679            temp_folder = self._cached_temp_folders.get(context_id) 
    680            if temp_folder and os.path.exists(temp_folder): 
    681                for filename in os.listdir(temp_folder): 
    682                    if force: 
    683                        # Some workers have failed and the ref counted might 
    684                        # be off. The workers should have shut down by this 
    685                        # time so forcefully clean up the files. 
    686                        resource_tracker.unregister( 
    687                            os.path.join(temp_folder, filename), "file" 
    688                        ) 
    689                    else: 
    690                        resource_tracker.maybe_unlink( 
    691                            os.path.join(temp_folder, filename), "file" 
    692                        ) 
    693 
    694                # When forcing clean-up, try to delete the folder even if some 
    695                # files are still in it. Otherwise, try to delete the folder 
    696                allow_non_empty |= force 
    697 
    698                # Clean up the folder if possible, either if it is empty or 
    699                # if none of the files in it are in used and allow_non_empty. 
    700                try: 
    701                    delete_folder(temp_folder, allow_non_empty=allow_non_empty) 
    702                    # Forget the folder once it has been deleted 
    703                    self._cached_temp_folders.pop(context_id, None) 
    704                    resource_tracker.unregister(temp_folder, "folder") 
    705 
    706                    # Also cancel the finalizers  that gets triggered at gc. 
    707                    finalizer = self._finalizers.pop(context_id, None) 
    708                    if finalizer is not None: 
    709                        atexit.unregister(finalizer) 
    710 
    711                except OSError: 
    712                    # Temporary folder cannot be deleted right now. 
    713                    # This folder will be cleaned up by an atexit 
    714                    # finalizer registered by the memmapping_reducer. 
    715                    pass