Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/externals/loky/backend/resource_tracker.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

191 statements  

1############################################################################### 

2# Server process to keep track of unlinked resources, like folders and 

3# semaphores and clean them. 

4# 

5# author: Thomas Moreau 

6# 

7# Adapted from multiprocessing/resource_tracker.py 

8# * add some VERBOSE logging, 

9# * add support to track folders, 

10# * add Windows support, 

11# * refcounting scheme to avoid unlinking resources still in use. 

12# 

13# On Unix we run a server process which keeps track of unlinked 

14# resources. The server ignores SIGINT and SIGTERM and reads from a 

15# pipe. The resource_tracker implements a reference counting scheme: each time 

16# a Python process anticipates the shared usage of a resource by another 

17# process, it signals the resource_tracker of this shared usage, and in return, 

18# the resource_tracker increments the resource's reference count by 1. 

19# Similarly, when access to a resource is closed by a Python process, the 

20# process notifies the resource_tracker by asking it to decrement the 

21# resource's reference count by 1. When the reference count drops to 0, the 

22# resource_tracker attempts to clean up the underlying resource. 

23 

24# Finally, every other process connected to the resource tracker has a copy of 

25# the writable end of the pipe used to communicate with it, so the resource 

26# tracker gets EOF when all other processes have exited. Then the 

27# resource_tracker process unlinks any remaining leaked resources (with 

28# reference count above 0) 

29 

30# For semaphores, this is important because the system only supports a limited 

31# number of named semaphores, and they will not be automatically removed till 

32# the next reboot. Without this resource tracker process, "killall python" 

33# would probably leave unlinked semaphores. 

34 

35# Note that this behavior differs from CPython's resource_tracker, which only 

36# implements list of shared resources, and not a proper refcounting scheme. 

37# Also, CPython's resource tracker will only attempt to cleanup those shared 

38# resources once all processes connected to the resource tracker have exited. 

39 

40 

41import os 

42import shutil 

43import sys 

44import signal 

45import warnings 

46from multiprocessing import util 

47from multiprocessing.resource_tracker import ( 

48 ResourceTracker as _ResourceTracker, 

49) 

50 

51from . import spawn 

52 

53if sys.platform == "win32": 

54 import _winapi 

55 import msvcrt 

56 from multiprocessing.reduction import duplicate 

57 

58 

59__all__ = ["ensure_running", "register", "unregister"] 

60 

61_HAVE_SIGMASK = hasattr(signal, "pthread_sigmask") 

62_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) 

63 

64 

65def cleanup_noop(name): 

66 raise RuntimeError("noop should never be registered or cleaned up") 

67 

68 

69_CLEANUP_FUNCS = { 

70 "noop": cleanup_noop, 

71 "folder": shutil.rmtree, 

72 "file": os.unlink, 

73} 

74 

75if os.name == "posix": 

76 import _multiprocessing 

77 

78 # Use sem_unlink() to clean up named semaphores. 

79 # 

80 # sem_unlink() may be missing if the Python build process detected the 

81 # absence of POSIX named semaphores. In that case, no named semaphores were 

82 # ever opened, so no cleanup would be necessary. 

83 if hasattr(_multiprocessing, "sem_unlink"): 

84 _CLEANUP_FUNCS.update( 

85 { 

86 "semlock": _multiprocessing.sem_unlink, 

87 } 

88 ) 

89 

90 

91VERBOSE = False 

92 

93 

94class ResourceTracker(_ResourceTracker): 

95 """Resource tracker with refcounting scheme. 

96 

97 This class is an extension of the multiprocessing ResourceTracker class 

98 which implements a reference counting scheme to avoid unlinking shared 

99 resources still in use in other processes. 

100 

101 This feature is notably used by `joblib.Parallel` to share temporary 

102 folders and memory mapped files between the main process and the worker 

103 processes. 

104 

105 The actual implementation of the refcounting scheme is in the main 

106 function, which is run in a dedicated process. 

107 """ 

108 

109 def maybe_unlink(self, name, rtype): 

110 """Decrement the refcount of a resource, and delete it if it hits 0""" 

111 self._send("MAYBE_UNLINK", name, rtype) 

112 

113 def ensure_running(self): 

114 """Make sure that resource tracker process is running. 

115 

116 This can be run from any process. Usually a child process will use 

117 the resource created by its parent. 

118 

119 This function is necessary for backward compatibility with python 

120 versions before 3.13.7. 

121 """ 

122 return self._ensure_running_and_write() 

123 

124 def _teardown_dead_process(self): 

125 # Override this function for compatibility with windows and 

126 # for python version before 3.13.7 

127 

128 # At this point, the resource_tracker process has been killed 

129 # or crashed. 

130 os.close(self._fd) 

131 

132 # Let's remove the process entry from the process table on POSIX system 

133 # to avoid zombie processes. 

134 if os.name == "posix": 

135 try: 

136 # _pid can be None if this process is a child from another 

137 # python process, which has started the resource_tracker. 

138 if self._pid is not None: 

139 os.waitpid(self._pid, 0) 

140 except OSError: 

141 # The resource_tracker has already been terminated. 

142 pass 

143 self._fd = None 

144 self._pid = None 

145 

146 warnings.warn( 

147 "resource_tracker: process died unexpectedly, relaunching. " 

148 "Some folders/semaphores might leak." 

149 ) 

150 

151 def _launch(self): 

152 # This is the overridden part of the resource tracker, which launches 

153 # loky's version, which is compatible with windows and allow to track 

154 # folders with external ref counting. 

155 

156 fds_to_pass = [] 

157 try: 

158 fds_to_pass.append(sys.stderr.fileno()) 

159 except Exception: 

160 pass 

161 

162 # Create a pipe for posix and windows 

163 r, w = os.pipe() 

164 if sys.platform == "win32": 

165 _r = duplicate(msvcrt.get_osfhandle(r), inheritable=True) 

166 os.close(r) 

167 r = _r 

168 

169 cmd = f"from {main.__module__} import main; main({r}, {VERBOSE})" 

170 try: 

171 fds_to_pass.append(r) 

172 # process will out live us, so no need to wait on pid 

173 exe = spawn.get_executable() 

174 args = [exe, *util._args_from_interpreter_flags(), "-c", cmd] 

175 util.debug(f"launching resource tracker: {args}") 

176 # bpo-33613: Register a signal mask that will block the 

177 # signals. This signal mask will be inherited by the child 

178 # that is going to be spawned and will protect the child from a 

179 # race condition that can make the child die before it 

180 # registers signal handlers for SIGINT and SIGTERM. The mask is 

181 # unregistered after spawning the child. 

182 try: 

183 if _HAVE_SIGMASK: 

184 signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) 

185 pid = spawnv_passfds(exe, args, fds_to_pass) 

186 finally: 

187 if _HAVE_SIGMASK: 

188 signal.pthread_sigmask( 

189 signal.SIG_UNBLOCK, _IGNORED_SIGNALS 

190 ) 

191 except BaseException: 

192 os.close(w) 

193 raise 

194 else: 

195 self._fd = w 

196 self._pid = pid 

197 finally: 

198 if sys.platform == "win32": 

199 _winapi.CloseHandle(r) 

200 else: 

201 os.close(r) 

202 

203 def _ensure_running_and_write(self, msg=None): 

204 """Make sure that resource tracker process is running. 

205 

206 This can be run from any process. Usually a child process will use 

207 the resource created by its parent. 

208 

209 

210 This function is added for compatibility with python version before 3.13.7. 

211 """ 

212 with self._lock: 

213 if ( 

214 self._fd is not None 

215 ): # resource tracker was launched before, is it still running? 

216 if msg is None: 

217 to_send = b"PROBE:0:noop\n" 

218 else: 

219 to_send = msg 

220 try: 

221 self._write(to_send) 

222 except OSError: 

223 self._teardown_dead_process() 

224 self._launch() 

225 

226 msg = None # message was sent in probe 

227 else: 

228 self._launch() 

229 

230 if msg is not None: 

231 self._write(msg) 

232 

233 def _write(self, msg): 

234 nbytes = os.write(self._fd, msg) 

235 assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}" 

236 

237 def __del__(self): 

238 # ignore error due to trying to clean up child process which has already been 

239 # shutdown on windows. See https://github.com/joblib/loky/pull/450 

240 # This is only required if __del__ is defined 

241 if not hasattr(_ResourceTracker, "__del__"): 

242 return 

243 try: 

244 super().__del__() 

245 except ChildProcessError: 

246 pass 

247 

248 

249_resource_tracker = ResourceTracker() 

250ensure_running = _resource_tracker.ensure_running 

251register = _resource_tracker.register 

252maybe_unlink = _resource_tracker.maybe_unlink 

253unregister = _resource_tracker.unregister 

254getfd = _resource_tracker.getfd 

255 

256 

257def main(fd, verbose=0): 

258 """Run resource tracker.""" 

259 if verbose: 

260 util.log_to_stderr(level=util.DEBUG) 

261 

262 # protect the process from ^C and "killall python" etc 

263 signal.signal(signal.SIGINT, signal.SIG_IGN) 

264 signal.signal(signal.SIGTERM, signal.SIG_IGN) 

265 

266 if _HAVE_SIGMASK: 

267 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) 

268 

269 for f in (sys.stdin, sys.stdout): 

270 try: 

271 f.close() 

272 except Exception: 

273 pass 

274 

275 if verbose: 

276 util.debug("Main resource tracker is running") 

277 

278 registry = {rtype: {} for rtype in _CLEANUP_FUNCS.keys()} 

279 

280 try: 

281 if sys.platform == "win32": 

282 fd = msvcrt.open_osfhandle(fd, os.O_RDONLY) 

283 # keep track of registered/unregistered resources 

284 with open(fd, "rb") as f: 

285 for line in f: 

286 try: 

287 splitted = line.strip().decode("ascii").split(":") 

288 # name can potentially contain separator symbols (for 

289 # instance folders on Windows) 

290 cmd, name, rtype = ( 

291 splitted[0], 

292 ":".join(splitted[1:-1]), 

293 splitted[-1], 

294 ) 

295 

296 if rtype not in _CLEANUP_FUNCS: 

297 raise ValueError( 

298 f"Cannot register {name} for automatic cleanup: " 

299 f"unknown resource type ({rtype}). Resource type " 

300 "should be one of the following: " 

301 f"{list(_CLEANUP_FUNCS.keys())}" 

302 ) 

303 

304 if cmd == "PROBE": 

305 pass 

306 elif cmd == "REGISTER": 

307 if name not in registry[rtype]: 

308 registry[rtype][name] = 1 

309 else: 

310 registry[rtype][name] += 1 

311 

312 if verbose: 

313 util.debug( 

314 "[ResourceTracker] incremented refcount of " 

315 f"{rtype} {name} " 

316 f"(current {registry[rtype][name]})" 

317 ) 

318 elif cmd == "UNREGISTER": 

319 del registry[rtype][name] 

320 if verbose: 

321 util.debug( 

322 f"[ResourceTracker] unregister {name} {rtype}: " 

323 f"registry({len(registry)})" 

324 ) 

325 elif cmd == "MAYBE_UNLINK": 

326 registry[rtype][name] -= 1 

327 if verbose: 

328 util.debug( 

329 "[ResourceTracker] decremented refcount of " 

330 f"{rtype} {name} " 

331 f"(current {registry[rtype][name]})" 

332 ) 

333 

334 if registry[rtype][name] == 0: 

335 del registry[rtype][name] 

336 try: 

337 if verbose: 

338 util.debug( 

339 f"[ResourceTracker] unlink {name}" 

340 ) 

341 _CLEANUP_FUNCS[rtype](name) 

342 except Exception as e: 

343 warnings.warn( 

344 f"resource_tracker: {name}: {e!r}" 

345 ) 

346 

347 else: 

348 raise RuntimeError(f"unrecognized command {cmd!r}") 

349 except BaseException: 

350 try: 

351 sys.excepthook(*sys.exc_info()) 

352 except BaseException: 

353 pass 

354 finally: 

355 # all processes have terminated; cleanup any remaining resources 

356 def _unlink_resources(rtype_registry, rtype): 

357 if rtype_registry: 

358 try: 

359 warnings.warn( 

360 "resource_tracker: There appear to be " 

361 f"{len(rtype_registry)} leaked {rtype} objects to " 

362 "clean up at shutdown" 

363 ) 

364 except Exception: 

365 pass 

366 for name in rtype_registry: 

367 # For some reason the process which created and registered this 

368 # resource has failed to unregister it. Presumably it has 

369 # died. We therefore clean it up. 

370 try: 

371 _CLEANUP_FUNCS[rtype](name) 

372 if verbose: 

373 util.debug(f"[ResourceTracker] unlink {name}") 

374 except Exception as e: 

375 warnings.warn(f"resource_tracker: {name}: {e!r}") 

376 

377 for rtype, rtype_registry in registry.items(): 

378 if rtype == "folder": 

379 continue 

380 else: 

381 _unlink_resources(rtype_registry, rtype) 

382 

383 # The default cleanup routine for folders deletes everything inside 

384 # those folders recursively, which can include other resources tracked 

385 # by the resource tracker). To limit the risk of the resource tracker 

386 # attempting to delete twice a resource (once as part of a tracked 

387 # folder, and once as a resource), we delete the folders after all 

388 # other resource types. 

389 if "folder" in registry: 

390 _unlink_resources(registry["folder"], "folder") 

391 

392 if verbose: 

393 util.debug("resource tracker shut down") 

394 

395 

396def spawnv_passfds(path, args, passfds): 

397 if sys.platform != "win32": 

398 args = [arg.encode("utf-8") for arg in args] 

399 path = path.encode("utf-8") 

400 return util.spawnv_passfds(path, args, passfds) 

401 else: 

402 passfds = sorted(passfds) 

403 cmd = " ".join(f'"{x}"' for x in args) 

404 try: 

405 _, ht, pid, _ = _winapi.CreateProcess( 

406 path, cmd, None, None, True, 0, None, None, None 

407 ) 

408 _winapi.CloseHandle(ht) 

409 except BaseException: 

410 pass 

411 return pid