1###############################################################################
2# Server process to keep track of unlinked resources, like folders and
3# semaphores and clean them.
4#
5# author: Thomas Moreau
6#
7# Adapted from multiprocessing/resource_tracker.py
8# * add some VERBOSE logging,
9# * add support to track folders,
10# * add Windows support,
11# * refcounting scheme to avoid unlinking resources still in use.
12#
13# On Unix we run a server process which keeps track of unlinked
14# resources. The server ignores SIGINT and SIGTERM and reads from a
15# pipe. The resource_tracker implements a reference counting scheme: each time
16# a Python process anticipates the shared usage of a resource by another
17# process, it signals the resource_tracker of this shared usage, and in return,
18# the resource_tracker increments the resource's reference count by 1.
19# Similarly, when access to a resource is closed by a Python process, the
20# process notifies the resource_tracker by asking it to decrement the
21# resource's reference count by 1. When the reference count drops to 0, the
22# resource_tracker attempts to clean up the underlying resource.
23
24# Finally, every other process connected to the resource tracker has a copy of
25# the writable end of the pipe used to communicate with it, so the resource
26# tracker gets EOF when all other processes have exited. Then the
27# resource_tracker process unlinks any remaining leaked resources (with
28# reference count above 0)
29
30# For semaphores, this is important because the system only supports a limited
31# number of named semaphores, and they will not be automatically removed till
32# the next reboot. Without this resource tracker process, "killall python"
33# would probably leave unlinked semaphores.
34
35# Note that this behavior differs from CPython's resource_tracker, which only
36# implements list of shared resources, and not a proper refcounting scheme.
37# Also, CPython's resource tracker will only attempt to cleanup those shared
38# resources once all processes connected to the resource tracker have exited.
39
40
41import os
42import shutil
43import sys
44import signal
45import warnings
46from _multiprocessing import sem_unlink
47from multiprocessing import util
48from multiprocessing.resource_tracker import (
49 ResourceTracker as _ResourceTracker,
50)
51
52from . import spawn
53
54if sys.platform == "win32":
55 import _winapi
56 import msvcrt
57 from multiprocessing.reduction import duplicate
58
59
60__all__ = ["ensure_running", "register", "unregister"]
61
62_HAVE_SIGMASK = hasattr(signal, "pthread_sigmask")
63_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
64
65_CLEANUP_FUNCS = {"folder": shutil.rmtree, "file": os.unlink}
66
67if os.name == "posix":
68 _CLEANUP_FUNCS["semlock"] = sem_unlink
69
70
71VERBOSE = False
72
73
74class ResourceTracker(_ResourceTracker):
75 """Resource tracker with refcounting scheme.
76
77 This class is an extension of the multiprocessing ResourceTracker class
78 which implements a reference counting scheme to avoid unlinking shared
79 resources still in use in other processes.
80
81 This feature is notably used by `joblib.Parallel` to share temporary
82 folders and memory mapped files between the main process and the worker
83 processes.
84
85 The actual implementation of the refcounting scheme is in the main
86 function, which is run in a dedicated process.
87 """
88
89 def maybe_unlink(self, name, rtype):
90 """Decrement the refcount of a resource, and delete it if it hits 0"""
91 self.ensure_running()
92 self._send("MAYBE_UNLINK", name, rtype)
93
94 def ensure_running(self):
95 """Make sure that resource tracker process is running.
96
97 This can be run from any process. Usually a child process will use
98 the resource created by its parent."""
99 with self._lock:
100 if self._fd is not None:
101 # resource tracker was launched before, is it still running?
102 if self._check_alive():
103 # => still alive
104 return
105 # => dead, launch it again
106 os.close(self._fd)
107 if os.name == "posix":
108 try:
109 # At this point, the resource_tracker process has been
110 # killed or crashed. Let's remove the process entry
111 # from the process table to avoid zombie processes.
112 os.waitpid(self._pid, 0)
113 except OSError:
114 # The process was terminated or is a child from an
115 # ancestor of the current process.
116 pass
117 self._fd = None
118 self._pid = None
119
120 warnings.warn(
121 "resource_tracker: process died unexpectedly, "
122 "relaunching. Some folders/sempahores might "
123 "leak."
124 )
125
126 fds_to_pass = []
127 try:
128 fds_to_pass.append(sys.stderr.fileno())
129 except Exception:
130 pass
131
132 r, w = os.pipe()
133 if sys.platform == "win32":
134 _r = duplicate(msvcrt.get_osfhandle(r), inheritable=True)
135 os.close(r)
136 r = _r
137
138 cmd = f"from {main.__module__} import main; main({r}, {VERBOSE})"
139 try:
140 fds_to_pass.append(r)
141 # process will out live us, so no need to wait on pid
142 exe = spawn.get_executable()
143 args = [exe, *util._args_from_interpreter_flags(), "-c", cmd]
144 util.debug(f"launching resource tracker: {args}")
145 # bpo-33613: Register a signal mask that will block the
146 # signals. This signal mask will be inherited by the child
147 # that is going to be spawned and will protect the child from a
148 # race condition that can make the child die before it
149 # registers signal handlers for SIGINT and SIGTERM. The mask is
150 # unregistered after spawning the child.
151 try:
152 if _HAVE_SIGMASK:
153 signal.pthread_sigmask(
154 signal.SIG_BLOCK, _IGNORED_SIGNALS
155 )
156 pid = spawnv_passfds(exe, args, fds_to_pass)
157 finally:
158 if _HAVE_SIGMASK:
159 signal.pthread_sigmask(
160 signal.SIG_UNBLOCK, _IGNORED_SIGNALS
161 )
162 except BaseException:
163 os.close(w)
164 raise
165 else:
166 self._fd = w
167 self._pid = pid
168 finally:
169 if sys.platform == "win32":
170 _winapi.CloseHandle(r)
171 else:
172 os.close(r)
173
174 def __del__(self):
175 # ignore error due to trying to clean up child process which has already been
176 # shutdown on windows See https://github.com/joblib/loky/pull/450
177 # This is only required if __del__ is defined
178 if not hasattr(_ResourceTracker, "__del__"):
179 return
180 try:
181 super().__del__()
182 except ChildProcessError:
183 pass
184
185
186_resource_tracker = ResourceTracker()
187ensure_running = _resource_tracker.ensure_running
188register = _resource_tracker.register
189maybe_unlink = _resource_tracker.maybe_unlink
190unregister = _resource_tracker.unregister
191getfd = _resource_tracker.getfd
192
193
194def main(fd, verbose=0):
195 """Run resource tracker."""
196 # protect the process from ^C and "killall python" etc
197 if verbose:
198 util.log_to_stderr(level=util.DEBUG)
199
200 signal.signal(signal.SIGINT, signal.SIG_IGN)
201 signal.signal(signal.SIGTERM, signal.SIG_IGN)
202
203 if _HAVE_SIGMASK:
204 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
205
206 for f in (sys.stdin, sys.stdout):
207 try:
208 f.close()
209 except Exception:
210 pass
211
212 if verbose:
213 util.debug("Main resource tracker is running")
214
215 registry = {rtype: {} for rtype in _CLEANUP_FUNCS.keys()}
216 try:
217 # keep track of registered/unregistered resources
218 if sys.platform == "win32":
219 fd = msvcrt.open_osfhandle(fd, os.O_RDONLY)
220 with open(fd, "rb") as f:
221 while True:
222 line = f.readline()
223 if line == b"": # EOF
224 break
225 try:
226 splitted = line.strip().decode("ascii").split(":")
227 # name can potentially contain separator symbols (for
228 # instance folders on Windows)
229 cmd, name, rtype = (
230 splitted[0],
231 ":".join(splitted[1:-1]),
232 splitted[-1],
233 )
234
235 if cmd == "PROBE":
236 continue
237
238 if rtype not in _CLEANUP_FUNCS:
239 raise ValueError(
240 f"Cannot register {name} for automatic cleanup: "
241 f"unknown resource type ({rtype}). Resource type "
242 "should be one of the following: "
243 f"{list(_CLEANUP_FUNCS.keys())}"
244 )
245
246 if cmd == "REGISTER":
247 if name not in registry[rtype]:
248 registry[rtype][name] = 1
249 else:
250 registry[rtype][name] += 1
251
252 if verbose:
253 util.debug(
254 "[ResourceTracker] incremented refcount of "
255 f"{rtype} {name} "
256 f"(current {registry[rtype][name]})"
257 )
258 elif cmd == "UNREGISTER":
259 del registry[rtype][name]
260 if verbose:
261 util.debug(
262 f"[ResourceTracker] unregister {name} {rtype}: "
263 f"registry({len(registry)})"
264 )
265 elif cmd == "MAYBE_UNLINK":
266 registry[rtype][name] -= 1
267 if verbose:
268 util.debug(
269 "[ResourceTracker] decremented refcount of "
270 f"{rtype} {name} "
271 f"(current {registry[rtype][name]})"
272 )
273
274 if registry[rtype][name] == 0:
275 del registry[rtype][name]
276 try:
277 if verbose:
278 util.debug(
279 f"[ResourceTracker] unlink {name}"
280 )
281 _CLEANUP_FUNCS[rtype](name)
282 except Exception as e:
283 warnings.warn(
284 f"resource_tracker: {name}: {e!r}"
285 )
286
287 else:
288 raise RuntimeError(f"unrecognized command {cmd!r}")
289 except BaseException:
290 try:
291 sys.excepthook(*sys.exc_info())
292 except BaseException:
293 pass
294 finally:
295 # all processes have terminated; cleanup any remaining resources
296 def _unlink_resources(rtype_registry, rtype):
297 if rtype_registry:
298 try:
299 warnings.warn(
300 "resource_tracker: There appear to be "
301 f"{len(rtype_registry)} leaked {rtype} objects to "
302 "clean up at shutdown"
303 )
304 except Exception:
305 pass
306 for name in rtype_registry:
307 # For some reason the process which created and registered this
308 # resource has failed to unregister it. Presumably it has
309 # died. We therefore clean it up.
310 try:
311 _CLEANUP_FUNCS[rtype](name)
312 if verbose:
313 util.debug(f"[ResourceTracker] unlink {name}")
314 except Exception as e:
315 warnings.warn(f"resource_tracker: {name}: {e!r}")
316
317 for rtype, rtype_registry in registry.items():
318 if rtype == "folder":
319 continue
320 else:
321 _unlink_resources(rtype_registry, rtype)
322
323 # The default cleanup routine for folders deletes everything inside
324 # those folders recursively, which can include other resources tracked
325 # by the resource tracker). To limit the risk of the resource tracker
326 # attempting to delete twice a resource (once as part of a tracked
327 # folder, and once as a resource), we delete the folders after all
328 # other resource types.
329 if "folder" in registry:
330 _unlink_resources(registry["folder"], "folder")
331
332 if verbose:
333 util.debug("resource tracker shut down")
334
335
336def spawnv_passfds(path, args, passfds):
337 if sys.platform != "win32":
338 args = [arg.encode("utf-8") for arg in args]
339 path = path.encode("utf-8")
340 return util.spawnv_passfds(path, args, passfds)
341 else:
342 passfds = sorted(passfds)
343 cmd = " ".join(f'"{x}"' for x in args)
344 try:
345 _, ht, pid, _ = _winapi.CreateProcess(
346 path, cmd, None, None, True, 0, None, None, None
347 )
348 _winapi.CloseHandle(ht)
349 except BaseException:
350 pass
351 return pid