Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/externals/loky/backend/resource_tracker.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

195 statements  

1############################################################################### 

2# Server process to keep track of unlinked resources, like folders and 

3# semaphores and clean them. 

4# 

5# author: Thomas Moreau 

6# 

7# adapted from multiprocessing/semaphore_tracker.py (17/02/2017) 

8# * include custom spawnv_passfds to start the process 

9# * add some VERBOSE logging 

10# 

11# TODO: multiprocessing.resource_tracker was contributed to Python 3.8 so 

12# once loky drops support for Python 3.7 it might be possible to stop 

13# maintaining this loky-specific fork. As a consequence, it might also be 

14# possible to stop maintaining the loky.backend.synchronize fork of 

15# multiprocessing.synchronize. 

16 

17# 

18# On Unix we run a server process which keeps track of unlinked 

19# resources. The server ignores SIGINT and SIGTERM and reads from a 

20# pipe. The resource_tracker implements a reference counting scheme: each time 

21# a Python process anticipates the shared usage of a resource by another 

22# process, it signals the resource_tracker of this shared usage, and in return, 

23# the resource_tracker increments the resource's reference count by 1. 

24# Similarly, when access to a resource is closed by a Python process, the 

25# process notifies the resource_tracker by asking it to decrement the 

26# resource's reference count by 1. When the reference count drops to 0, the 

27# resource_tracker attempts to clean up the underlying resource. 

28 

29# Finally, every other process connected to the resource tracker has a copy of 

30# the writable end of the pipe used to communicate with it, so the resource 

31# tracker gets EOF when all other processes have exited. Then the 

32# resource_tracker process unlinks any remaining leaked resources (with 

33# reference count above 0) 

34 

35# For semaphores, this is important because the system only supports a limited 

36# number of named semaphores, and they will not be automatically removed till 

37# the next reboot. Without this resource tracker process, "killall python" 

38# would probably leave unlinked semaphores. 

39 

40# Note that this behavior differs from CPython's resource_tracker, which only 

41# implements list of shared resources, and not a proper refcounting scheme. 

42# Also, CPython's resource tracker will only attempt to cleanup those shared 

43# resources once all procsses connected to the resouce tracker have exited. 

44 

45 

46import os 

47import shutil 

48import sys 

49import signal 

50import warnings 

51import threading 

52from _multiprocessing import sem_unlink 

53from multiprocessing import util 

54 

55from . import spawn 

56 

57if sys.platform == "win32": 

58 import _winapi 

59 import msvcrt 

60 from multiprocessing.reduction import duplicate 

61 

62 

63__all__ = ["ensure_running", "register", "unregister"] 

64 

65_HAVE_SIGMASK = hasattr(signal, "pthread_sigmask") 

66_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) 

67 

68_CLEANUP_FUNCS = {"folder": shutil.rmtree, "file": os.unlink} 

69 

70if os.name == "posix": 

71 _CLEANUP_FUNCS["semlock"] = sem_unlink 

72 

73 

74VERBOSE = False 

75 

76 

77class ResourceTracker: 

78 def __init__(self): 

79 self._lock = threading.Lock() 

80 self._fd = None 

81 self._pid = None 

82 

83 def getfd(self): 

84 self.ensure_running() 

85 return self._fd 

86 

87 def ensure_running(self): 

88 """Make sure that resource tracker process is running. 

89 

90 This can be run from any process. Usually a child process will use 

91 the resource created by its parent.""" 

92 with self._lock: 

93 if self._fd is not None: 

94 # resource tracker was launched before, is it still running? 

95 if self._check_alive(): 

96 # => still alive 

97 return 

98 # => dead, launch it again 

99 os.close(self._fd) 

100 if os.name == "posix": 

101 try: 

102 # At this point, the resource_tracker process has been 

103 # killed or crashed. Let's remove the process entry 

104 # from the process table to avoid zombie processes. 

105 os.waitpid(self._pid, 0) 

106 except OSError: 

107 # The process was terminated or is a child from an 

108 # ancestor of the current process. 

109 pass 

110 self._fd = None 

111 self._pid = None 

112 

113 warnings.warn( 

114 "resource_tracker: process died unexpectedly, " 

115 "relaunching. Some folders/sempahores might " 

116 "leak." 

117 ) 

118 

119 fds_to_pass = [] 

120 try: 

121 fds_to_pass.append(sys.stderr.fileno()) 

122 except Exception: 

123 pass 

124 

125 r, w = os.pipe() 

126 if sys.platform == "win32": 

127 _r = duplicate(msvcrt.get_osfhandle(r), inheritable=True) 

128 os.close(r) 

129 r = _r 

130 

131 cmd = f"from {main.__module__} import main; main({r}, {VERBOSE})" 

132 try: 

133 fds_to_pass.append(r) 

134 # process will out live us, so no need to wait on pid 

135 exe = spawn.get_executable() 

136 args = [exe, *util._args_from_interpreter_flags(), "-c", cmd] 

137 util.debug(f"launching resource tracker: {args}") 

138 # bpo-33613: Register a signal mask that will block the 

139 # signals. This signal mask will be inherited by the child 

140 # that is going to be spawned and will protect the child from a 

141 # race condition that can make the child die before it 

142 # registers signal handlers for SIGINT and SIGTERM. The mask is 

143 # unregistered after spawning the child. 

144 try: 

145 if _HAVE_SIGMASK: 

146 signal.pthread_sigmask( 

147 signal.SIG_BLOCK, _IGNORED_SIGNALS 

148 ) 

149 pid = spawnv_passfds(exe, args, fds_to_pass) 

150 finally: 

151 if _HAVE_SIGMASK: 

152 signal.pthread_sigmask( 

153 signal.SIG_UNBLOCK, _IGNORED_SIGNALS 

154 ) 

155 except BaseException: 

156 os.close(w) 

157 raise 

158 else: 

159 self._fd = w 

160 self._pid = pid 

161 finally: 

162 if sys.platform == "win32": 

163 _winapi.CloseHandle(r) 

164 else: 

165 os.close(r) 

166 

167 def _check_alive(self): 

168 """Check for the existence of the resource tracker process.""" 

169 try: 

170 self._send("PROBE", "", "") 

171 except BrokenPipeError: 

172 return False 

173 else: 

174 return True 

175 

176 def register(self, name, rtype): 

177 """Register a named resource, and increment its refcount.""" 

178 self.ensure_running() 

179 self._send("REGISTER", name, rtype) 

180 

181 def unregister(self, name, rtype): 

182 """Unregister a named resource with resource tracker.""" 

183 self.ensure_running() 

184 self._send("UNREGISTER", name, rtype) 

185 

186 def maybe_unlink(self, name, rtype): 

187 """Decrement the refcount of a resource, and delete it if it hits 0""" 

188 self.ensure_running() 

189 self._send("MAYBE_UNLINK", name, rtype) 

190 

191 def _send(self, cmd, name, rtype): 

192 if len(name) > 512: 

193 # posix guarantees that writes to a pipe of less than PIPE_BUF 

194 # bytes are atomic, and that PIPE_BUF >= 512 

195 raise ValueError("name too long") 

196 msg = f"{cmd}:{name}:{rtype}\n".encode("ascii") 

197 nbytes = os.write(self._fd, msg) 

198 assert nbytes == len(msg) 

199 

200 

201_resource_tracker = ResourceTracker() 

202ensure_running = _resource_tracker.ensure_running 

203register = _resource_tracker.register 

204maybe_unlink = _resource_tracker.maybe_unlink 

205unregister = _resource_tracker.unregister 

206getfd = _resource_tracker.getfd 

207 

208 

209def main(fd, verbose=0): 

210 """Run resource tracker.""" 

211 # protect the process from ^C and "killall python" etc 

212 if verbose: 

213 util.log_to_stderr(level=util.DEBUG) 

214 

215 signal.signal(signal.SIGINT, signal.SIG_IGN) 

216 signal.signal(signal.SIGTERM, signal.SIG_IGN) 

217 

218 if _HAVE_SIGMASK: 

219 signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS) 

220 

221 for f in (sys.stdin, sys.stdout): 

222 try: 

223 f.close() 

224 except Exception: 

225 pass 

226 

227 if verbose: 

228 util.debug("Main resource tracker is running") 

229 

230 registry = {rtype: {} for rtype in _CLEANUP_FUNCS.keys()} 

231 try: 

232 # keep track of registered/unregistered resources 

233 if sys.platform == "win32": 

234 fd = msvcrt.open_osfhandle(fd, os.O_RDONLY) 

235 with open(fd, "rb") as f: 

236 while True: 

237 line = f.readline() 

238 if line == b"": # EOF 

239 break 

240 try: 

241 splitted = line.strip().decode("ascii").split(":") 

242 # name can potentially contain separator symbols (for 

243 # instance folders on Windows) 

244 cmd, name, rtype = ( 

245 splitted[0], 

246 ":".join(splitted[1:-1]), 

247 splitted[-1], 

248 ) 

249 

250 if cmd == "PROBE": 

251 continue 

252 

253 if rtype not in _CLEANUP_FUNCS: 

254 raise ValueError( 

255 f"Cannot register {name} for automatic cleanup: " 

256 f"unknown resource type ({rtype}). Resource type " 

257 "should be one of the following: " 

258 f"{list(_CLEANUP_FUNCS.keys())}" 

259 ) 

260 

261 if cmd == "REGISTER": 

262 if name not in registry[rtype]: 

263 registry[rtype][name] = 1 

264 else: 

265 registry[rtype][name] += 1 

266 

267 if verbose: 

268 util.debug( 

269 "[ResourceTracker] incremented refcount of " 

270 f"{rtype} {name} " 

271 f"(current {registry[rtype][name]})" 

272 ) 

273 elif cmd == "UNREGISTER": 

274 del registry[rtype][name] 

275 if verbose: 

276 util.debug( 

277 f"[ResourceTracker] unregister {name} {rtype}: " 

278 f"registry({len(registry)})" 

279 ) 

280 elif cmd == "MAYBE_UNLINK": 

281 registry[rtype][name] -= 1 

282 if verbose: 

283 util.debug( 

284 "[ResourceTracker] decremented refcount of " 

285 f"{rtype} {name} " 

286 f"(current {registry[rtype][name]})" 

287 ) 

288 

289 if registry[rtype][name] == 0: 

290 del registry[rtype][name] 

291 try: 

292 if verbose: 

293 util.debug( 

294 f"[ResourceTracker] unlink {name}" 

295 ) 

296 _CLEANUP_FUNCS[rtype](name) 

297 except Exception as e: 

298 warnings.warn( 

299 f"resource_tracker: {name}: {e!r}" 

300 ) 

301 

302 else: 

303 raise RuntimeError(f"unrecognized command {cmd!r}") 

304 except BaseException: 

305 try: 

306 sys.excepthook(*sys.exc_info()) 

307 except BaseException: 

308 pass 

309 finally: 

310 # all processes have terminated; cleanup any remaining resources 

311 def _unlink_resources(rtype_registry, rtype): 

312 if rtype_registry: 

313 try: 

314 warnings.warn( 

315 "resource_tracker: There appear to be " 

316 f"{len(rtype_registry)} leaked {rtype} objects to " 

317 "clean up at shutdown" 

318 ) 

319 except Exception: 

320 pass 

321 for name in rtype_registry: 

322 # For some reason the process which created and registered this 

323 # resource has failed to unregister it. Presumably it has 

324 # died. We therefore clean it up. 

325 try: 

326 _CLEANUP_FUNCS[rtype](name) 

327 if verbose: 

328 util.debug(f"[ResourceTracker] unlink {name}") 

329 except Exception as e: 

330 warnings.warn(f"resource_tracker: {name}: {e!r}") 

331 

332 for rtype, rtype_registry in registry.items(): 

333 if rtype == "folder": 

334 continue 

335 else: 

336 _unlink_resources(rtype_registry, rtype) 

337 

338 # The default cleanup routine for folders deletes everything inside 

339 # those folders recursively, which can include other resources tracked 

340 # by the resource tracker). To limit the risk of the resource tracker 

341 # attempting to delete twice a resource (once as part of a tracked 

342 # folder, and once as a resource), we delete the folders after all 

343 # other resource types. 

344 if "folder" in registry: 

345 _unlink_resources(registry["folder"], "folder") 

346 

347 if verbose: 

348 util.debug("resource tracker shut down") 

349 

350 

351# 

352# Start a program with only specified fds kept open 

353# 

354 

355 

356def spawnv_passfds(path, args, passfds): 

357 passfds = sorted(passfds) 

358 if sys.platform != "win32": 

359 errpipe_read, errpipe_write = os.pipe() 

360 try: 

361 from .reduction import _mk_inheritable 

362 from .fork_exec import fork_exec 

363 

364 _pass = [_mk_inheritable(fd) for fd in passfds] 

365 return fork_exec(args, _pass) 

366 finally: 

367 os.close(errpipe_read) 

368 os.close(errpipe_write) 

369 else: 

370 cmd = " ".join(f'"{x}"' for x in args) 

371 try: 

372 _, ht, pid, _ = _winapi.CreateProcess( 

373 path, cmd, None, None, True, 0, None, None, None 

374 ) 

375 _winapi.CloseHandle(ht) 

376 except BaseException: 

377 pass 

378 return pid