Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/externals/loky/backend/resource_tracker.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

174 statements  

1############################################################################### 

2# Server process to keep track of unlinked resources, like folders and 

3# semaphores and clean them. 

4# 

5# author: Thomas Moreau 

6# 

7# Adapted from multiprocessing/resource_tracker.py 

8# * add some VERBOSE logging, 

9# * add support to track folders, 

10# * add Windows support, 

11# * refcounting scheme to avoid unlinking resources still in use. 

12# 

13# On Unix we run a server process which keeps track of unlinked 

14# resources. The server ignores SIGINT and SIGTERM and reads from a 

15# pipe. The resource_tracker implements a reference counting scheme: each time 

16# a Python process anticipates the shared usage of a resource by another 

17# process, it signals the resource_tracker of this shared usage, and in return, 

18# the resource_tracker increments the resource's reference count by 1. 

19# Similarly, when access to a resource is closed by a Python process, the 

20# process notifies the resource_tracker by asking it to decrement the 

21# resource's reference count by 1. When the reference count drops to 0, the 

22# resource_tracker attempts to clean up the underlying resource. 

23 

24# Finally, every other process connected to the resource tracker has a copy of 

25# the writable end of the pipe used to communicate with it, so the resource 

26# tracker gets EOF when all other processes have exited. Then the 

27# resource_tracker process unlinks any remaining leaked resources (with 

28# reference count above 0) 

29 

30# For semaphores, this is important because the system only supports a limited 

31# number of named semaphores, and they will not be automatically removed till 

32# the next reboot. Without this resource tracker process, "killall python" 

33# would probably leave unlinked semaphores. 

34 

35# Note that this behavior differs from CPython's resource_tracker, which only 

36# implements list of shared resources, and not a proper refcounting scheme. 

37# Also, CPython's resource tracker will only attempt to cleanup those shared 

38# resources once all processes connected to the resource tracker have exited. 

39 

40 

41import os 

42import shutil 

43import sys 

44import signal 

45import warnings 

46from _multiprocessing import sem_unlink 

47from multiprocessing import util 

48from multiprocessing.resource_tracker import ( 

49 ResourceTracker as _ResourceTracker, 

50) 

51 

52from . import spawn 

53 

54if sys.platform == "win32": 

55 import _winapi 

56 import msvcrt 

57 from multiprocessing.reduction import duplicate 

58 

59 

60__all__ = ["ensure_running", "register", "unregister"] 

61 

62_HAVE_SIGMASK = hasattr(signal, "pthread_sigmask") 

63_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) 

64 

65_CLEANUP_FUNCS = {"folder": shutil.rmtree, "file": os.unlink} 

66 

67if os.name == "posix": 

68 _CLEANUP_FUNCS["semlock"] = sem_unlink 

69 

70 

71VERBOSE = False 

72 

73 

74class ResourceTracker(_ResourceTracker): 

75 """Resource tracker with refcounting scheme. 

76 

77 This class is an extension of the multiprocessing ResourceTracker class 

78 which implements a reference counting scheme to avoid unlinking shared 

79 resources still in use in other processes. 

80 

81 This feature is notably used by `joblib.Parallel` to share temporary 

82 folders and memory mapped files between the main process and the worker 

83 processes. 

84 

85 The actual implementation of the refcounting scheme is in the main 

86 function, which is run in a dedicated process. 

87 """ 

88 

89 def maybe_unlink(self, name, rtype): 

90 """Decrement the refcount of a resource, and delete it if it hits 0""" 

91 self.ensure_running() 

92 self._send("MAYBE_UNLINK", name, rtype) 

93 

94 def ensure_running(self): 

95 """Make sure that resource tracker process is running. 

96 

97 This can be run from any process. Usually a child process will use 

98 the resource created by its parent.""" 

99 with self._lock: 

100 if self._fd is not None: 

101 # resource tracker was launched before, is it still running? 

102 if self._check_alive(): 

103 # => still alive 

104 return 

105 # => dead, launch it again 

106 os.close(self._fd) 

107 if os.name == "posix": 

108 try: 

109 # At this point, the resource_tracker process has been 

110 # killed or crashed. Let's remove the process entry 

111 # from the process table to avoid zombie processes. 

112 os.waitpid(self._pid, 0) 

113 except OSError: 

114 # The process was terminated or is a child from an 

115 # ancestor of the current process. 

116 pass 

117 self._fd = None 

118 self._pid = None 

119 

120 warnings.warn( 

121 "resource_tracker: process died unexpectedly, " 

122 "relaunching. Some folders/sempahores might " 

123 "leak." 

124 ) 

125 

126 fds_to_pass = [] 

127 try: 

128 fds_to_pass.append(sys.stderr.fileno()) 

129 except Exception: 

130 pass 

131 

132 r, w = os.pipe() 

133 if sys.platform == "win32": 

134 _r = duplicate(msvcrt.get_osfhandle(r), inheritable=True) 

135 os.close(r) 

136 r = _r 

137 

138 cmd = f"from {main.__module__} import main; main({r}, {VERBOSE})" 

139 try: 

140 fds_to_pass.append(r) 

141 # process will out live us, so no need to wait on pid 

142 exe = spawn.get_executable() 

143 args = [exe, *util._args_from_interpreter_flags(), "-c", cmd] 

144 util.debug(f"launching resource tracker: {args}") 

145 # bpo-33613: Register a signal mask that will block the 

146 # signals. This signal mask will be inherited by the child 

147 # that is going to be spawned and will protect the child from a 

148 # race condition that can make the child die before it 

149 # registers signal handlers for SIGINT and SIGTERM. The mask is 

150 # unregistered after spawning the child. 

151 try: 

152 if _HAVE_SIGMASK: 

153 signal.pthread_sigmask( 

154 signal.SIG_BLOCK, _IGNORED_SIGNALS 

155 ) 

156 pid = spawnv_passfds(exe, args, fds_to_pass) 

157 finally: 

158 if _HAVE_SIGMASK: 

159 signal.pthread_sigmask( 

160 signal.SIG_UNBLOCK, _IGNORED_SIGNALS 

161 ) 

162 except BaseException: 

163 os.close(w) 

164 raise 

165 else: 

166 self._fd = w 

167 self._pid = pid 

168 finally: 

169 if sys.platform == "win32": 

170 _winapi.CloseHandle(r) 

171 else: 

172 os.close(r) 

173 

174 def __del__(self): 

175 # ignore error due to trying to clean up child process which has already been 

176 # shutdown on windows See https://github.com/joblib/loky/pull/450 

177 # This is only required if __del__ is defined 

178 if not hasattr(_ResourceTracker, "__del__"): 

179 return 

180 try: 

181 super().__del__() 

182 except ChildProcessError: 

183 pass 

184 

185 

186_resource_tracker = ResourceTracker() 

187ensure_running = _resource_tracker.ensure_running 

188register = _resource_tracker.register 

189maybe_unlink = _resource_tracker.maybe_unlink 

190unregister = _resource_tracker.unregister 

191getfd = _resource_tracker.getfd 

192 

193 

194def main(fd, verbose=0): 

195 """Run resource tracker.""" 

196 # protect the process from ^C and "killall python" etc 

197 if verbose: 

198 util.log_to_stderr(level=util.DEBUG) 

199 

200 signal.signal(signal.SIGINT, signal.SIG_IGN) 

201 signal.signal(signal.SIGTERM, signal.SIG_IGN) 

202 

203 if _HAVE_SIGMASK: 

204 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) 

205 

206 for f in (sys.stdin, sys.stdout): 

207 try: 

208 f.close() 

209 except Exception: 

210 pass 

211 

212 if verbose: 

213 util.debug("Main resource tracker is running") 

214 

215 registry = {rtype: {} for rtype in _CLEANUP_FUNCS.keys()} 

216 try: 

217 # keep track of registered/unregistered resources 

218 if sys.platform == "win32": 

219 fd = msvcrt.open_osfhandle(fd, os.O_RDONLY) 

220 with open(fd, "rb") as f: 

221 while True: 

222 line = f.readline() 

223 if line == b"": # EOF 

224 break 

225 try: 

226 splitted = line.strip().decode("ascii").split(":") 

227 # name can potentially contain separator symbols (for 

228 # instance folders on Windows) 

229 cmd, name, rtype = ( 

230 splitted[0], 

231 ":".join(splitted[1:-1]), 

232 splitted[-1], 

233 ) 

234 

235 if cmd == "PROBE": 

236 continue 

237 

238 if rtype not in _CLEANUP_FUNCS: 

239 raise ValueError( 

240 f"Cannot register {name} for automatic cleanup: " 

241 f"unknown resource type ({rtype}). Resource type " 

242 "should be one of the following: " 

243 f"{list(_CLEANUP_FUNCS.keys())}" 

244 ) 

245 

246 if cmd == "REGISTER": 

247 if name not in registry[rtype]: 

248 registry[rtype][name] = 1 

249 else: 

250 registry[rtype][name] += 1 

251 

252 if verbose: 

253 util.debug( 

254 "[ResourceTracker] incremented refcount of " 

255 f"{rtype} {name} " 

256 f"(current {registry[rtype][name]})" 

257 ) 

258 elif cmd == "UNREGISTER": 

259 del registry[rtype][name] 

260 if verbose: 

261 util.debug( 

262 f"[ResourceTracker] unregister {name} {rtype}: " 

263 f"registry({len(registry)})" 

264 ) 

265 elif cmd == "MAYBE_UNLINK": 

266 registry[rtype][name] -= 1 

267 if verbose: 

268 util.debug( 

269 "[ResourceTracker] decremented refcount of " 

270 f"{rtype} {name} " 

271 f"(current {registry[rtype][name]})" 

272 ) 

273 

274 if registry[rtype][name] == 0: 

275 del registry[rtype][name] 

276 try: 

277 if verbose: 

278 util.debug( 

279 f"[ResourceTracker] unlink {name}" 

280 ) 

281 _CLEANUP_FUNCS[rtype](name) 

282 except Exception as e: 

283 warnings.warn( 

284 f"resource_tracker: {name}: {e!r}" 

285 ) 

286 

287 else: 

288 raise RuntimeError(f"unrecognized command {cmd!r}") 

289 except BaseException: 

290 try: 

291 sys.excepthook(*sys.exc_info()) 

292 except BaseException: 

293 pass 

294 finally: 

295 # all processes have terminated; cleanup any remaining resources 

296 def _unlink_resources(rtype_registry, rtype): 

297 if rtype_registry: 

298 try: 

299 warnings.warn( 

300 "resource_tracker: There appear to be " 

301 f"{len(rtype_registry)} leaked {rtype} objects to " 

302 "clean up at shutdown" 

303 ) 

304 except Exception: 

305 pass 

306 for name in rtype_registry: 

307 # For some reason the process which created and registered this 

308 # resource has failed to unregister it. Presumably it has 

309 # died. We therefore clean it up. 

310 try: 

311 _CLEANUP_FUNCS[rtype](name) 

312 if verbose: 

313 util.debug(f"[ResourceTracker] unlink {name}") 

314 except Exception as e: 

315 warnings.warn(f"resource_tracker: {name}: {e!r}") 

316 

317 for rtype, rtype_registry in registry.items(): 

318 if rtype == "folder": 

319 continue 

320 else: 

321 _unlink_resources(rtype_registry, rtype) 

322 

323 # The default cleanup routine for folders deletes everything inside 

324 # those folders recursively, which can include other resources tracked 

325 # by the resource tracker). To limit the risk of the resource tracker 

326 # attempting to delete twice a resource (once as part of a tracked 

327 # folder, and once as a resource), we delete the folders after all 

328 # other resource types. 

329 if "folder" in registry: 

330 _unlink_resources(registry["folder"], "folder") 

331 

332 if verbose: 

333 util.debug("resource tracker shut down") 

334 

335 

336def spawnv_passfds(path, args, passfds): 

337 if sys.platform != "win32": 

338 args = [arg.encode("utf-8") for arg in args] 

339 path = path.encode("utf-8") 

340 return util.spawnv_passfds(path, args, passfds) 

341 else: 

342 passfds = sorted(passfds) 

343 cmd = " ".join(f'"{x}"' for x in args) 

344 try: 

345 _, ht, pid, _ = _winapi.CreateProcess( 

346 path, cmd, None, None, True, 0, None, None, None 

347 ) 

348 _winapi.CloseHandle(ht) 

349 except BaseException: 

350 pass 

351 return pid