1############################################################################### 
    2# Server process to keep track of unlinked resources, like folders and 
    3# semaphores and clean them. 
    4# 
    5# author: Thomas Moreau 
    6# 
    7# Adapted from multiprocessing/resource_tracker.py 
    8#  * add some VERBOSE logging, 
    9#  * add support to track folders, 
    10#  * add Windows support, 
    11#  * refcounting scheme to avoid unlinking resources still in use. 
    12# 
    13# On Unix we run a server process which keeps track of unlinked 
    14# resources. The server ignores SIGINT and SIGTERM and reads from a 
    15# pipe. The resource_tracker implements a reference counting scheme: each time 
    16# a Python process anticipates the shared usage of a resource by another 
    17# process, it signals the resource_tracker of this shared usage, and in return, 
    18# the resource_tracker increments the resource's reference count by 1. 
    19# Similarly, when access to a resource is closed by a Python process, the 
    20# process notifies the resource_tracker by asking it to decrement the 
    21# resource's reference count by 1.  When the reference count drops to 0, the 
    22# resource_tracker attempts to clean up the underlying resource. 
    23 
    24# Finally, every other process connected to the resource tracker has a copy of 
    25# the writable end of the pipe used to communicate with it, so the resource 
    26# tracker gets EOF when all other processes have exited. Then the 
    27# resource_tracker process unlinks any remaining leaked resources (with 
    28# reference count above 0) 
    29 
    30# For semaphores, this is important because the system only supports a limited 
    31# number of named semaphores, and they will not be automatically removed till 
    32# the next reboot.  Without this resource tracker process, "killall python" 
    33# would probably leave unlinked semaphores. 
    34 
    35# Note that this behavior differs from CPython's resource_tracker, which only 
    36# implements list of shared resources, and not a proper refcounting scheme. 
    37# Also, CPython's resource tracker will only attempt to cleanup those shared 
    38# resources once all processes connected to the resource tracker have exited. 
    39 
    40 
    41import os 
    42import shutil 
    43import sys 
    44import signal 
    45import warnings 
    46from multiprocessing import util 
    47from multiprocessing.resource_tracker import ( 
    48    ResourceTracker as _ResourceTracker, 
    49) 
    50 
    51from . import spawn 
    52 
    53if sys.platform == "win32": 
    54    import _winapi 
    55    import msvcrt 
    56    from multiprocessing.reduction import duplicate 
    57 
    58 
    59__all__ = ["ensure_running", "register", "unregister"] 
    60 
    61_HAVE_SIGMASK = hasattr(signal, "pthread_sigmask") 
    62_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) 
    63 
    64 
    65def cleanup_noop(name): 
    66    raise RuntimeError("noop should never be registered or cleaned up") 
    67 
    68 
    69_CLEANUP_FUNCS = { 
    70    "noop": cleanup_noop, 
    71    "folder": shutil.rmtree, 
    72    "file": os.unlink, 
    73} 
    74 
    75if os.name == "posix": 
    76    import _multiprocessing 
    77 
    78    # Use sem_unlink() to clean up named semaphores. 
    79    # 
    80    # sem_unlink() may be missing if the Python build process detected the 
    81    # absence of POSIX named semaphores. In that case, no named semaphores were 
    82    # ever opened, so no cleanup would be necessary. 
    83    if hasattr(_multiprocessing, "sem_unlink"): 
    84        _CLEANUP_FUNCS.update( 
    85            { 
    86                "semlock": _multiprocessing.sem_unlink, 
    87            } 
    88        ) 
    89 
    90 
    91VERBOSE = False 
    92 
    93 
    94class ResourceTracker(_ResourceTracker): 
    95    """Resource tracker with refcounting scheme. 
    96 
    97    This class is an extension of the multiprocessing ResourceTracker class 
    98    which implements a reference counting scheme to avoid unlinking shared 
    99    resources still in use in other processes. 
    100 
    101    This feature is notably used by `joblib.Parallel` to share temporary 
    102    folders and memory mapped files between the main process and the worker 
    103    processes. 
    104 
    105    The actual implementation of the refcounting scheme is in the main 
    106    function, which is run in a dedicated process. 
    107    """ 
    108 
    109    def maybe_unlink(self, name, rtype): 
    110        """Decrement the refcount of a resource, and delete it if it hits 0""" 
    111        self._send("MAYBE_UNLINK", name, rtype) 
    112 
    113    def ensure_running(self): 
    114        """Make sure that resource tracker process is running. 
    115 
    116        This can be run from any process.  Usually a child process will use 
    117        the resource created by its parent. 
    118 
    119        This function is necessary for backward compatibility with python 
    120        versions before 3.13.7. 
    121        """ 
    122        return self._ensure_running_and_write() 
    123 
    124    def _teardown_dead_process(self): 
    125        # Override this function for compatibility with windows and 
    126        # for python version before 3.13.7 
    127 
    128        # At this point, the resource_tracker process has been killed 
    129        # or crashed. 
    130        os.close(self._fd) 
    131 
    132        # Let's remove the process entry from the process table on POSIX system 
    133        # to avoid zombie processes. 
    134        if os.name == "posix": 
    135            try: 
    136                # _pid can be None if this process is a child from another 
    137                # python process, which has started the resource_tracker. 
    138                if self._pid is not None: 
    139                    os.waitpid(self._pid, 0) 
    140            except OSError: 
    141                # The resource_tracker has already been terminated. 
    142                pass 
    143        self._fd = None 
    144        self._pid = None 
    145 
    146        warnings.warn( 
    147            "resource_tracker: process died unexpectedly, relaunching. " 
    148            "Some folders/semaphores might leak." 
    149        ) 
    150 
    151    def _launch(self): 
    152        # This is the overridden part of the resource tracker, which launches 
    153        # loky's version, which is compatible with windows and allow to track 
    154        # folders with external ref counting. 
    155 
    156        fds_to_pass = [] 
    157        try: 
    158            fds_to_pass.append(sys.stderr.fileno()) 
    159        except Exception: 
    160            pass 
    161 
    162        # Create a pipe for posix and windows 
    163        r, w = os.pipe() 
    164        if sys.platform == "win32": 
    165            _r = duplicate(msvcrt.get_osfhandle(r), inheritable=True) 
    166            os.close(r) 
    167            r = _r 
    168 
    169        cmd = f"from {main.__module__} import main; main({r}, {VERBOSE})" 
    170        try: 
    171            fds_to_pass.append(r) 
    172            # process will out live us, so no need to wait on pid 
    173            exe = spawn.get_executable() 
    174            args = [exe, *util._args_from_interpreter_flags(), "-c", cmd] 
    175            util.debug(f"launching resource tracker: {args}") 
    176            # bpo-33613: Register a signal mask that will block the 
    177            # signals.  This signal mask will be inherited by the child 
    178            # that is going to be spawned and will protect the child from a 
    179            # race condition that can make the child die before it 
    180            # registers signal handlers for SIGINT and SIGTERM. The mask is 
    181            # unregistered after spawning the child. 
    182            try: 
    183                if _HAVE_SIGMASK: 
    184                    signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) 
    185                pid = spawnv_passfds(exe, args, fds_to_pass) 
    186            finally: 
    187                if _HAVE_SIGMASK: 
    188                    signal.pthread_sigmask( 
    189                        signal.SIG_UNBLOCK, _IGNORED_SIGNALS 
    190                    ) 
    191        except BaseException: 
    192            os.close(w) 
    193            raise 
    194        else: 
    195            self._fd = w 
    196            self._pid = pid 
    197        finally: 
    198            if sys.platform == "win32": 
    199                _winapi.CloseHandle(r) 
    200            else: 
    201                os.close(r) 
    202 
    203    def _ensure_running_and_write(self, msg=None): 
    204        """Make sure that resource tracker process is running. 
    205 
    206        This can be run from any process.  Usually a child process will use 
    207        the resource created by its parent. 
    208 
    209 
    210        This function is added for compatibility with python version before 3.13.7. 
    211        """ 
    212        with self._lock: 
    213            if ( 
    214                self._fd is not None 
    215            ):  # resource tracker was launched before, is it still running? 
    216                if msg is None: 
    217                    to_send = b"PROBE:0:noop\n" 
    218                else: 
    219                    to_send = msg 
    220                try: 
    221                    self._write(to_send) 
    222                except OSError: 
    223                    self._teardown_dead_process() 
    224                    self._launch() 
    225 
    226                msg = None  # message was sent in probe 
    227            else: 
    228                self._launch() 
    229 
    230        if msg is not None: 
    231            self._write(msg) 
    232 
    233    def _write(self, msg): 
    234        nbytes = os.write(self._fd, msg) 
    235        assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}" 
    236 
    237    def __del__(self): 
    238        # ignore error due to trying to clean up child process which has already been 
    239        # shutdown on windows. See https://github.com/joblib/loky/pull/450 
    240        # This is only required if __del__ is defined 
    241        if not hasattr(_ResourceTracker, "__del__"): 
    242            return 
    243        try: 
    244            super().__del__() 
    245        except ChildProcessError: 
    246            pass 
    247 
    248 
    249_resource_tracker = ResourceTracker() 
    250ensure_running = _resource_tracker.ensure_running 
    251register = _resource_tracker.register 
    252maybe_unlink = _resource_tracker.maybe_unlink 
    253unregister = _resource_tracker.unregister 
    254getfd = _resource_tracker.getfd 
    255 
    256 
    257def main(fd, verbose=0): 
    258    """Run resource tracker.""" 
    259    if verbose: 
    260        util.log_to_stderr(level=util.DEBUG) 
    261 
    262    # protect the process from ^C and "killall python" etc 
    263    signal.signal(signal.SIGINT, signal.SIG_IGN) 
    264    signal.signal(signal.SIGTERM, signal.SIG_IGN) 
    265 
    266    if _HAVE_SIGMASK: 
    267        signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) 
    268 
    269    for f in (sys.stdin, sys.stdout): 
    270        try: 
    271            f.close() 
    272        except Exception: 
    273            pass 
    274 
    275    if verbose: 
    276        util.debug("Main resource tracker is running") 
    277 
    278    registry = {rtype: {} for rtype in _CLEANUP_FUNCS.keys()} 
    279 
    280    try: 
    281        if sys.platform == "win32": 
    282            fd = msvcrt.open_osfhandle(fd, os.O_RDONLY) 
    283        # keep track of registered/unregistered resources 
    284        with open(fd, "rb") as f: 
    285            for line in f: 
    286                try: 
    287                    splitted = line.strip().decode("ascii").split(":") 
    288                    # name can potentially contain separator symbols (for 
    289                    # instance folders on Windows) 
    290                    cmd, name, rtype = ( 
    291                        splitted[0], 
    292                        ":".join(splitted[1:-1]), 
    293                        splitted[-1], 
    294                    ) 
    295 
    296                    if rtype not in _CLEANUP_FUNCS: 
    297                        raise ValueError( 
    298                            f"Cannot register {name} for automatic cleanup: " 
    299                            f"unknown resource type ({rtype}). Resource type " 
    300                            "should be one of the following: " 
    301                            f"{list(_CLEANUP_FUNCS.keys())}" 
    302                        ) 
    303 
    304                    if cmd == "PROBE": 
    305                        pass 
    306                    elif cmd == "REGISTER": 
    307                        if name not in registry[rtype]: 
    308                            registry[rtype][name] = 1 
    309                        else: 
    310                            registry[rtype][name] += 1 
    311 
    312                        if verbose: 
    313                            util.debug( 
    314                                "[ResourceTracker] incremented refcount of " 
    315                                f"{rtype} {name} " 
    316                                f"(current {registry[rtype][name]})" 
    317                            ) 
    318                    elif cmd == "UNREGISTER": 
    319                        del registry[rtype][name] 
    320                        if verbose: 
    321                            util.debug( 
    322                                f"[ResourceTracker] unregister {name} {rtype}: " 
    323                                f"registry({len(registry)})" 
    324                            ) 
    325                    elif cmd == "MAYBE_UNLINK": 
    326                        registry[rtype][name] -= 1 
    327                        if verbose: 
    328                            util.debug( 
    329                                "[ResourceTracker] decremented refcount of " 
    330                                f"{rtype} {name} " 
    331                                f"(current {registry[rtype][name]})" 
    332                            ) 
    333 
    334                        if registry[rtype][name] == 0: 
    335                            del registry[rtype][name] 
    336                            try: 
    337                                if verbose: 
    338                                    util.debug( 
    339                                        f"[ResourceTracker] unlink {name}" 
    340                                    ) 
    341                                _CLEANUP_FUNCS[rtype](name) 
    342                            except Exception as e: 
    343                                warnings.warn( 
    344                                    f"resource_tracker: {name}: {e!r}" 
    345                                ) 
    346 
    347                    else: 
    348                        raise RuntimeError(f"unrecognized command {cmd!r}") 
    349                except BaseException: 
    350                    try: 
    351                        sys.excepthook(*sys.exc_info()) 
    352                    except BaseException: 
    353                        pass 
    354    finally: 
    355        # all processes have terminated; cleanup any remaining resources 
    356        def _unlink_resources(rtype_registry, rtype): 
    357            if rtype_registry: 
    358                try: 
    359                    warnings.warn( 
    360                        "resource_tracker: There appear to be " 
    361                        f"{len(rtype_registry)} leaked {rtype} objects to " 
    362                        "clean up at shutdown" 
    363                    ) 
    364                except Exception: 
    365                    pass 
    366            for name in rtype_registry: 
    367                # For some reason the process which created and registered this 
    368                # resource has failed to unregister it. Presumably it has 
    369                # died.  We therefore clean it up. 
    370                try: 
    371                    _CLEANUP_FUNCS[rtype](name) 
    372                    if verbose: 
    373                        util.debug(f"[ResourceTracker] unlink {name}") 
    374                except Exception as e: 
    375                    warnings.warn(f"resource_tracker: {name}: {e!r}") 
    376 
    377        for rtype, rtype_registry in registry.items(): 
    378            if rtype == "folder": 
    379                continue 
    380            else: 
    381                _unlink_resources(rtype_registry, rtype) 
    382 
    383        # The default cleanup routine for folders deletes everything inside 
    384        # those folders recursively, which can include other resources tracked 
    385        # by the resource tracker). To limit the risk of the resource tracker 
    386        # attempting to delete twice a resource (once as part of a tracked 
    387        # folder, and once as a resource), we delete the folders after all 
    388        # other resource types. 
    389        if "folder" in registry: 
    390            _unlink_resources(registry["folder"], "folder") 
    391 
    392    if verbose: 
    393        util.debug("resource tracker shut down") 
    394 
    395 
    396def spawnv_passfds(path, args, passfds): 
    397    if sys.platform != "win32": 
    398        args = [arg.encode("utf-8") for arg in args] 
    399        path = path.encode("utf-8") 
    400        return util.spawnv_passfds(path, args, passfds) 
    401    else: 
    402        passfds = sorted(passfds) 
    403        cmd = " ".join(f'"{x}"' for x in args) 
    404        try: 
    405            _, ht, pid, _ = _winapi.CreateProcess( 
    406                path, cmd, None, None, True, 0, None, None, None 
    407            ) 
    408            _winapi.CloseHandle(ht) 
    409        except BaseException: 
    410            pass 
    411        return pid