1###############################################################################
2# Server process to keep track of unlinked resources, like folders and
3# semaphores and clean them.
4#
5# author: Thomas Moreau
6#
7# adapted from multiprocessing/semaphore_tracker.py (17/02/2017)
8# * include custom spawnv_passfds to start the process
9# * add some VERBOSE logging
10#
11# TODO: multiprocessing.resource_tracker was contributed to Python 3.8 so
12# once loky drops support for Python 3.7 it might be possible to stop
13# maintaining this loky-specific fork. As a consequence, it might also be
14# possible to stop maintaining the loky.backend.synchronize fork of
15# multiprocessing.synchronize.
16
17#
18# On Unix we run a server process which keeps track of unlinked
19# resources. The server ignores SIGINT and SIGTERM and reads from a
20# pipe. The resource_tracker implements a reference counting scheme: each time
21# a Python process anticipates the shared usage of a resource by another
22# process, it signals the resource_tracker of this shared usage, and in return,
23# the resource_tracker increments the resource's reference count by 1.
24# Similarly, when access to a resource is closed by a Python process, the
25# process notifies the resource_tracker by asking it to decrement the
26# resource's reference count by 1. When the reference count drops to 0, the
27# resource_tracker attempts to clean up the underlying resource.
28
29# Finally, every other process connected to the resource tracker has a copy of
30# the writable end of the pipe used to communicate with it, so the resource
31# tracker gets EOF when all other processes have exited. Then the
32# resource_tracker process unlinks any remaining leaked resources (with
33# reference count above 0)
34
35# For semaphores, this is important because the system only supports a limited
36# number of named semaphores, and they will not be automatically removed till
37# the next reboot. Without this resource tracker process, "killall python"
38# would probably leave unlinked semaphores.
39
40# Note that this behavior differs from CPython's resource_tracker, which only
41# implements list of shared resources, and not a proper refcounting scheme.
42# Also, CPython's resource tracker will only attempt to cleanup those shared
43# resources once all procsses connected to the resouce tracker have exited.
44
45
46import os
47import shutil
48import sys
49import signal
50import warnings
51import threading
52from _multiprocessing import sem_unlink
53from multiprocessing import util
54
55from . import spawn
56
57if sys.platform == "win32":
58 import _winapi
59 import msvcrt
60 from multiprocessing.reduction import duplicate
61
62
63__all__ = ["ensure_running", "register", "unregister"]
64
65_HAVE_SIGMASK = hasattr(signal, "pthread_sigmask")
66_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
67
68_CLEANUP_FUNCS = {"folder": shutil.rmtree, "file": os.unlink}
69
70if os.name == "posix":
71 _CLEANUP_FUNCS["semlock"] = sem_unlink
72
73
74VERBOSE = False
75
76
77class ResourceTracker:
78 def __init__(self):
79 self._lock = threading.Lock()
80 self._fd = None
81 self._pid = None
82
83 def getfd(self):
84 self.ensure_running()
85 return self._fd
86
87 def ensure_running(self):
88 """Make sure that resource tracker process is running.
89
90 This can be run from any process. Usually a child process will use
91 the resource created by its parent."""
92 with self._lock:
93 if self._fd is not None:
94 # resource tracker was launched before, is it still running?
95 if self._check_alive():
96 # => still alive
97 return
98 # => dead, launch it again
99 os.close(self._fd)
100 if os.name == "posix":
101 try:
102 # At this point, the resource_tracker process has been
103 # killed or crashed. Let's remove the process entry
104 # from the process table to avoid zombie processes.
105 os.waitpid(self._pid, 0)
106 except OSError:
107 # The process was terminated or is a child from an
108 # ancestor of the current process.
109 pass
110 self._fd = None
111 self._pid = None
112
113 warnings.warn(
114 "resource_tracker: process died unexpectedly, "
115 "relaunching. Some folders/sempahores might "
116 "leak."
117 )
118
119 fds_to_pass = []
120 try:
121 fds_to_pass.append(sys.stderr.fileno())
122 except Exception:
123 pass
124
125 r, w = os.pipe()
126 if sys.platform == "win32":
127 _r = duplicate(msvcrt.get_osfhandle(r), inheritable=True)
128 os.close(r)
129 r = _r
130
131 cmd = f"from {main.__module__} import main; main({r}, {VERBOSE})"
132 try:
133 fds_to_pass.append(r)
134 # process will out live us, so no need to wait on pid
135 exe = spawn.get_executable()
136 args = [exe, *util._args_from_interpreter_flags(), "-c", cmd]
137 util.debug(f"launching resource tracker: {args}")
138 # bpo-33613: Register a signal mask that will block the
139 # signals. This signal mask will be inherited by the child
140 # that is going to be spawned and will protect the child from a
141 # race condition that can make the child die before it
142 # registers signal handlers for SIGINT and SIGTERM. The mask is
143 # unregistered after spawning the child.
144 try:
145 if _HAVE_SIGMASK:
146 signal.pthread_sigmask(
147 signal.SIG_BLOCK, _IGNORED_SIGNALS
148 )
149 pid = spawnv_passfds(exe, args, fds_to_pass)
150 finally:
151 if _HAVE_SIGMASK:
152 signal.pthread_sigmask(
153 signal.SIG_UNBLOCK, _IGNORED_SIGNALS
154 )
155 except BaseException:
156 os.close(w)
157 raise
158 else:
159 self._fd = w
160 self._pid = pid
161 finally:
162 if sys.platform == "win32":
163 _winapi.CloseHandle(r)
164 else:
165 os.close(r)
166
167 def _check_alive(self):
168 """Check for the existence of the resource tracker process."""
169 try:
170 self._send("PROBE", "", "")
171 except BrokenPipeError:
172 return False
173 else:
174 return True
175
176 def register(self, name, rtype):
177 """Register a named resource, and increment its refcount."""
178 self.ensure_running()
179 self._send("REGISTER", name, rtype)
180
181 def unregister(self, name, rtype):
182 """Unregister a named resource with resource tracker."""
183 self.ensure_running()
184 self._send("UNREGISTER", name, rtype)
185
186 def maybe_unlink(self, name, rtype):
187 """Decrement the refcount of a resource, and delete it if it hits 0"""
188 self.ensure_running()
189 self._send("MAYBE_UNLINK", name, rtype)
190
191 def _send(self, cmd, name, rtype):
192 if len(name) > 512:
193 # posix guarantees that writes to a pipe of less than PIPE_BUF
194 # bytes are atomic, and that PIPE_BUF >= 512
195 raise ValueError("name too long")
196 msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
197 nbytes = os.write(self._fd, msg)
198 assert nbytes == len(msg)
199
200
201_resource_tracker = ResourceTracker()
202ensure_running = _resource_tracker.ensure_running
203register = _resource_tracker.register
204maybe_unlink = _resource_tracker.maybe_unlink
205unregister = _resource_tracker.unregister
206getfd = _resource_tracker.getfd
207
208
209def main(fd, verbose=0):
210 """Run resource tracker."""
211 # protect the process from ^C and "killall python" etc
212 if verbose:
213 util.log_to_stderr(level=util.DEBUG)
214
215 signal.signal(signal.SIGINT, signal.SIG_IGN)
216 signal.signal(signal.SIGTERM, signal.SIG_IGN)
217
218 if _HAVE_SIGMASK:
219 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
220
221 for f in (sys.stdin, sys.stdout):
222 try:
223 f.close()
224 except Exception:
225 pass
226
227 if verbose:
228 util.debug("Main resource tracker is running")
229
230 registry = {rtype: {} for rtype in _CLEANUP_FUNCS.keys()}
231 try:
232 # keep track of registered/unregistered resources
233 if sys.platform == "win32":
234 fd = msvcrt.open_osfhandle(fd, os.O_RDONLY)
235 with open(fd, "rb") as f:
236 while True:
237 line = f.readline()
238 if line == b"": # EOF
239 break
240 try:
241 splitted = line.strip().decode("ascii").split(":")
242 # name can potentially contain separator symbols (for
243 # instance folders on Windows)
244 cmd, name, rtype = (
245 splitted[0],
246 ":".join(splitted[1:-1]),
247 splitted[-1],
248 )
249
250 if cmd == "PROBE":
251 continue
252
253 if rtype not in _CLEANUP_FUNCS:
254 raise ValueError(
255 f"Cannot register {name} for automatic cleanup: "
256 f"unknown resource type ({rtype}). Resource type "
257 "should be one of the following: "
258 f"{list(_CLEANUP_FUNCS.keys())}"
259 )
260
261 if cmd == "REGISTER":
262 if name not in registry[rtype]:
263 registry[rtype][name] = 1
264 else:
265 registry[rtype][name] += 1
266
267 if verbose:
268 util.debug(
269 "[ResourceTracker] incremented refcount of "
270 f"{rtype} {name} "
271 f"(current {registry[rtype][name]})"
272 )
273 elif cmd == "UNREGISTER":
274 del registry[rtype][name]
275 if verbose:
276 util.debug(
277 f"[ResourceTracker] unregister {name} {rtype}: "
278 f"registry({len(registry)})"
279 )
280 elif cmd == "MAYBE_UNLINK":
281 registry[rtype][name] -= 1
282 if verbose:
283 util.debug(
284 "[ResourceTracker] decremented refcount of "
285 f"{rtype} {name} "
286 f"(current {registry[rtype][name]})"
287 )
288
289 if registry[rtype][name] == 0:
290 del registry[rtype][name]
291 try:
292 if verbose:
293 util.debug(
294 f"[ResourceTracker] unlink {name}"
295 )
296 _CLEANUP_FUNCS[rtype](name)
297 except Exception as e:
298 warnings.warn(
299 f"resource_tracker: {name}: {e!r}"
300 )
301
302 else:
303 raise RuntimeError(f"unrecognized command {cmd!r}")
304 except BaseException:
305 try:
306 sys.excepthook(*sys.exc_info())
307 except BaseException:
308 pass
309 finally:
310 # all processes have terminated; cleanup any remaining resources
311 def _unlink_resources(rtype_registry, rtype):
312 if rtype_registry:
313 try:
314 warnings.warn(
315 "resource_tracker: There appear to be "
316 f"{len(rtype_registry)} leaked {rtype} objects to "
317 "clean up at shutdown"
318 )
319 except Exception:
320 pass
321 for name in rtype_registry:
322 # For some reason the process which created and registered this
323 # resource has failed to unregister it. Presumably it has
324 # died. We therefore clean it up.
325 try:
326 _CLEANUP_FUNCS[rtype](name)
327 if verbose:
328 util.debug(f"[ResourceTracker] unlink {name}")
329 except Exception as e:
330 warnings.warn(f"resource_tracker: {name}: {e!r}")
331
332 for rtype, rtype_registry in registry.items():
333 if rtype == "folder":
334 continue
335 else:
336 _unlink_resources(rtype_registry, rtype)
337
338 # The default cleanup routine for folders deletes everything inside
339 # those folders recursively, which can include other resources tracked
340 # by the resource tracker). To limit the risk of the resource tracker
341 # attempting to delete twice a resource (once as part of a tracked
342 # folder, and once as a resource), we delete the folders after all
343 # other resource types.
344 if "folder" in registry:
345 _unlink_resources(registry["folder"], "folder")
346
347 if verbose:
348 util.debug("resource tracker shut down")
349
350
351#
352# Start a program with only specified fds kept open
353#
354
355
356def spawnv_passfds(path, args, passfds):
357 passfds = sorted(passfds)
358 if sys.platform != "win32":
359 errpipe_read, errpipe_write = os.pipe()
360 try:
361 from .reduction import _mk_inheritable
362 from .fork_exec import fork_exec
363
364 _pass = [_mk_inheritable(fd) for fd in passfds]
365 return fork_exec(args, _pass)
366 finally:
367 os.close(errpipe_read)
368 os.close(errpipe_write)
369 else:
370 cmd = " ".join(f'"{x}"' for x in args)
371 try:
372 _, ht, pid, _ = _winapi.CreateProcess(
373 path, cmd, None, None, True, 0, None, None, None
374 )
375 _winapi.CloseHandle(ht)
376 except BaseException:
377 pass
378 return pid