Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/_memmapping_reducer.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

255 statements  

1""" 

2Reducer using memory mapping for numpy arrays 

3""" 

4# Author: Thomas Moreau <thomas.moreau.2010@gmail.com> 

5# Copyright: 2017, Thomas Moreau 

6# License: BSD 3 clause 

7 

8import atexit 

9import errno 

10import os 

11import stat 

12import tempfile 

13import threading 

14import time 

15import warnings 

16import weakref 

17from mmap import mmap 

18from multiprocessing import util 

19from pickle import HIGHEST_PROTOCOL, PicklingError, dumps, loads, whichmodule 

20from uuid import uuid4 

21 

22try: 

23 WindowsError 

24except NameError: 

25 WindowsError = type(None) 

26 

27try: 

28 import numpy as np 

29 from numpy.lib.stride_tricks import as_strided 

30except ImportError: 

31 np = None 

32 

33from .backports import make_memmap 

34from .disk import delete_folder 

35from .externals.loky.backend import resource_tracker 

36from .numpy_pickle import dump, load, load_temporary_memmap 

37 

38# Some system have a ramdisk mounted by default, we can use it instead of /tmp 

39# as the default folder to dump big arrays to share with subprocesses. 

40SYSTEM_SHARED_MEM_FS = "/dev/shm" 

41 

42# Minimal number of bytes available on SYSTEM_SHARED_MEM_FS to consider using 

43# it as the default folder to dump big arrays to share with subprocesses. 

44SYSTEM_SHARED_MEM_FS_MIN_SIZE = int(2e9) 

45 

46# Folder and file permissions to chmod temporary files generated by the 

47# memmapping pool. Only the owner of the Python process can access the 

48# temporary files and folder. 

49FOLDER_PERMISSIONS = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR 

50FILE_PERMISSIONS = stat.S_IRUSR | stat.S_IWUSR 

51 

52# Set used in joblib workers, referencing the filenames of temporary memmaps 

53# created by joblib to speed up data communication. In child processes, we add 

54# a finalizer to these memmaps that sends a maybe_unlink call to the 

55# resource_tracker, in order to free main memory as fast as possible. 

56JOBLIB_MMAPS = set() 

57 

58 

59def _log_and_unlink(filename): 

60 from .externals.loky.backend.resource_tracker import _resource_tracker 

61 

62 util.debug( 

63 "[FINALIZER CALL] object mapping to {} about to be deleted," 

64 " decrementing the refcount of the file (pid: {})".format( 

65 os.path.basename(filename), os.getpid() 

66 ) 

67 ) 

68 _resource_tracker.maybe_unlink(filename, "file") 

69 

70 

71def add_maybe_unlink_finalizer(memmap): 

72 util.debug( 

73 "[FINALIZER ADD] adding finalizer to {} (id {}, filename {}, pid {})".format( 

74 type(memmap), id(memmap), os.path.basename(memmap.filename), os.getpid() 

75 ) 

76 ) 

77 weakref.finalize(memmap, _log_and_unlink, memmap.filename) 

78 

79 

80def unlink_file(filename): 

81 """Wrapper around os.unlink with a retry mechanism. 

82 

83 The retry mechanism has been implemented primarily to overcome a race 

84 condition happening during the finalizer of a np.memmap: when a process 

85 holding the last reference to a mmap-backed np.memmap/np.array is about to 

86 delete this array (and close the reference), it sends a maybe_unlink 

87 request to the resource_tracker. This request can be processed faster than 

88 it takes for the last reference of the memmap to be closed, yielding (on 

89 Windows) a PermissionError in the resource_tracker loop. 

90 """ 

91 NUM_RETRIES = 10 

92 for retry_no in range(1, NUM_RETRIES + 1): 

93 try: 

94 os.unlink(filename) 

95 break 

96 except PermissionError: 

97 util.debug( 

98 "[ResourceTracker] tried to unlink {}, got PermissionError".format( 

99 filename 

100 ) 

101 ) 

102 if retry_no == NUM_RETRIES: 

103 raise 

104 else: 

105 time.sleep(0.2) 

106 except FileNotFoundError: 

107 # In case of a race condition when deleting the temporary folder, 

108 # avoid noisy FileNotFoundError exception in the resource tracker. 

109 pass 

110 

111 

112resource_tracker._CLEANUP_FUNCS["file"] = unlink_file 

113 

114 

115class _WeakArrayKeyMap: 

116 """A variant of weakref.WeakKeyDictionary for unhashable numpy arrays. 

117 

118 This datastructure will be used with numpy arrays as obj keys, therefore we 

119 do not use the __get__ / __set__ methods to avoid any conflict with the 

120 numpy fancy indexing syntax. 

121 """ 

122 

123 def __init__(self): 

124 self._data = {} 

125 

126 def get(self, obj): 

127 ref, val = self._data[id(obj)] 

128 if ref() is not obj: 

129 # In case of race condition with on_destroy: could never be 

130 # triggered by the joblib tests with CPython. 

131 raise KeyError(obj) 

132 return val 

133 

134 def set(self, obj, value): 

135 key = id(obj) 

136 try: 

137 ref, _ = self._data[key] 

138 if ref() is not obj: 

139 # In case of race condition with on_destroy: could never be 

140 # triggered by the joblib tests with CPython. 

141 raise KeyError(obj) 

142 except KeyError: 

143 # Insert the new entry in the mapping along with a weakref 

144 # callback to automatically delete the entry from the mapping 

145 # as soon as the object used as key is garbage collected. 

146 def on_destroy(_): 

147 del self._data[key] 

148 

149 ref = weakref.ref(obj, on_destroy) 

150 self._data[key] = ref, value 

151 

152 def __getstate__(self): 

153 raise PicklingError("_WeakArrayKeyMap is not pickleable") 

154 

155 

156############################################################################### 

157# Support for efficient transient pickling of numpy data structures 

158 

159 

160def _get_backing_memmap(a): 

161 """Recursively look up the original np.memmap instance base if any.""" 

162 b = getattr(a, "base", None) 

163 if b is None: 

164 # TODO: check scipy sparse datastructure if scipy is installed 

165 # a nor its descendants do not have a memmap base 

166 return None 

167 

168 elif isinstance(b, mmap): 

169 # a is already a real memmap instance. 

170 return a 

171 

172 else: 

173 # Recursive exploration of the base ancestry 

174 return _get_backing_memmap(b) 

175 

176 

177def _get_temp_dir(pool_folder_name, temp_folder=None): 

178 """Get the full path to a subfolder inside the temporary folder. 

179 

180 Parameters 

181 ---------- 

182 pool_folder_name : str 

183 Sub-folder name used for the serialization of a pool instance. 

184 

185 temp_folder: str, optional 

186 Folder to be used by the pool for memmapping large arrays 

187 for sharing memory with worker processes. If None, this will try in 

188 order: 

189 

190 - a folder pointed by the JOBLIB_TEMP_FOLDER environment 

191 variable, 

192 - /dev/shm if the folder exists and is writable: this is a 

193 RAMdisk filesystem available by default on modern Linux 

194 distributions, 

195 - the default system temporary folder that can be 

196 overridden with TMP, TMPDIR or TEMP environment 

197 variables, typically /tmp under Unix operating systems. 

198 

199 Returns 

200 ------- 

201 pool_folder : str 

202 full path to the temporary folder 

203 use_shared_mem : bool 

204 whether the temporary folder is written to the system shared memory 

205 folder or some other temporary folder. 

206 """ 

207 use_shared_mem = False 

208 if temp_folder is None: 

209 temp_folder = os.environ.get("JOBLIB_TEMP_FOLDER", None) 

210 if temp_folder is None: 

211 if os.path.exists(SYSTEM_SHARED_MEM_FS) and hasattr(os, "statvfs"): 

212 try: 

213 shm_stats = os.statvfs(SYSTEM_SHARED_MEM_FS) 

214 available_nbytes = shm_stats.f_bsize * shm_stats.f_bavail 

215 if available_nbytes > SYSTEM_SHARED_MEM_FS_MIN_SIZE: 

216 # Try to see if we have write access to the shared mem 

217 # folder only if it is reasonably large (that is 2GB or 

218 # more). 

219 temp_folder = SYSTEM_SHARED_MEM_FS 

220 pool_folder = os.path.join(temp_folder, pool_folder_name) 

221 if not os.path.exists(pool_folder): 

222 os.makedirs(pool_folder) 

223 use_shared_mem = True 

224 except (IOError, OSError): 

225 # Missing rights in the /dev/shm partition, fallback to regular 

226 # temp folder. 

227 temp_folder = None 

228 if temp_folder is None: 

229 # Fallback to the default tmp folder, typically /tmp 

230 temp_folder = tempfile.gettempdir() 

231 temp_folder = os.path.abspath(os.path.expanduser(temp_folder)) 

232 pool_folder = os.path.join(temp_folder, pool_folder_name) 

233 return pool_folder, use_shared_mem 

234 

235 

236def has_shareable_memory(a): 

237 """Return True if a is backed by some mmap buffer directly or not.""" 

238 return _get_backing_memmap(a) is not None 

239 

240 

241def _strided_from_memmap( 

242 filename, 

243 dtype, 

244 mode, 

245 offset, 

246 order, 

247 shape, 

248 strides, 

249 total_buffer_len, 

250 unlink_on_gc_collect, 

251): 

252 """Reconstruct an array view on a memory mapped file.""" 

253 if mode == "w+": 

254 # Do not zero the original data when unpickling 

255 mode = "r+" 

256 

257 if strides is None: 

258 # Simple, contiguous memmap 

259 return make_memmap( 

260 filename, 

261 dtype=dtype, 

262 shape=shape, 

263 mode=mode, 

264 offset=offset, 

265 order=order, 

266 unlink_on_gc_collect=unlink_on_gc_collect, 

267 ) 

268 else: 

269 # For non-contiguous data, memmap the total enclosing buffer and then 

270 # extract the non-contiguous view with the stride-tricks API 

271 base = make_memmap( 

272 filename, 

273 dtype=dtype, 

274 shape=total_buffer_len, 

275 offset=offset, 

276 mode=mode, 

277 order=order, 

278 unlink_on_gc_collect=unlink_on_gc_collect, 

279 ) 

280 return as_strided(base, shape=shape, strides=strides) 

281 

282 

283def _reduce_memmap_backed(a, m): 

284 """Pickling reduction for memmap backed arrays. 

285 

286 a is expected to be an instance of np.ndarray (or np.memmap) 

287 m is expected to be an instance of np.memmap on the top of the ``base`` 

288 attribute ancestry of a. ``m.base`` should be the real python mmap object. 

289 """ 

290 # offset that comes from the striding differences between a and m 

291 util.debug( 

292 "[MEMMAP REDUCE] reducing a memmap-backed array (shape, {}, pid: {})".format( 

293 a.shape, os.getpid() 

294 ) 

295 ) 

296 try: 

297 from numpy.lib.array_utils import byte_bounds 

298 except (ModuleNotFoundError, ImportError): 

299 # Backward-compat for numpy < 2.0 

300 from numpy import byte_bounds 

301 a_start, a_end = byte_bounds(a) 

302 m_start = byte_bounds(m)[0] 

303 offset = a_start - m_start 

304 

305 # offset from the backing memmap 

306 offset += m.offset 

307 

308 # 1D arrays are both F and C contiguous, so only set the flag in 

309 # higher dimensions. See https://github.com/joblib/joblib/pull/1704. 

310 if m.ndim > 1 and m.flags["F_CONTIGUOUS"]: 

311 order = "F" 

312 else: 

313 # The backing memmap buffer is necessarily contiguous hence C if not 

314 # Fortran 

315 order = "C" 

316 

317 if a.flags["F_CONTIGUOUS"] or a.flags["C_CONTIGUOUS"]: 

318 # If the array is a contiguous view, no need to pass the strides 

319 strides = None 

320 total_buffer_len = None 

321 else: 

322 # Compute the total number of items to map from which the strided 

323 # view will be extracted. 

324 strides = a.strides 

325 total_buffer_len = (a_end - a_start) // a.itemsize 

326 

327 return ( 

328 _strided_from_memmap, 

329 ( 

330 m.filename, 

331 a.dtype, 

332 m.mode, 

333 offset, 

334 order, 

335 a.shape, 

336 strides, 

337 total_buffer_len, 

338 False, 

339 ), 

340 ) 

341 

342 

343def reduce_array_memmap_backward(a): 

344 """reduce a np.array or a np.memmap from a child process""" 

345 m = _get_backing_memmap(a) 

346 if isinstance(m, np.memmap) and m.filename not in JOBLIB_MMAPS: 

347 # if a is backed by a memmaped file, reconstruct a using the 

348 # memmaped file. 

349 return _reduce_memmap_backed(a, m) 

350 else: 

351 # a is either a regular (not memmap-backed) numpy array, or an array 

352 # backed by a shared temporary file created by joblib. In the latter 

353 # case, in order to limit the lifespan of these temporary files, we 

354 # serialize the memmap as a regular numpy array, and decref the 

355 # file backing the memmap (done implicitly in a previously registered 

356 # finalizer, see ``unlink_on_gc_collect`` for more details) 

357 return (loads, (dumps(np.asarray(a), protocol=HIGHEST_PROTOCOL),)) 

358 

359 

360class ArrayMemmapForwardReducer(object): 

361 """Reducer callable to dump large arrays to memmap files. 

362 

363 Parameters 

364 ---------- 

365 max_nbytes: int 

366 Threshold to trigger memmapping of large arrays to files created 

367 a folder. 

368 temp_folder_resolver: callable 

369 An callable in charge of resolving a temporary folder name where files 

370 for backing memmapped arrays are created. 

371 mmap_mode: 'r', 'r+' or 'c' 

372 Mode for the created memmap datastructure. See the documentation of 

373 numpy.memmap for more details. Note: 'w+' is coerced to 'r+' 

374 automatically to avoid zeroing the data on unpickling. 

375 verbose: int, optional, 0 by default 

376 If verbose > 0, memmap creations are logged. 

377 If verbose > 1, both memmap creations, reuse and array pickling are 

378 logged. 

379 prewarm: bool, optional, False by default. 

380 Force a read on newly memmapped array to make sure that OS pre-cache it 

381 memory. This can be useful to avoid concurrent disk access when the 

382 same data array is passed to different worker processes. 

383 """ 

384 

385 def __init__( 

386 self, 

387 max_nbytes, 

388 temp_folder_resolver, 

389 mmap_mode, 

390 unlink_on_gc_collect, 

391 verbose=0, 

392 prewarm=True, 

393 ): 

394 self._max_nbytes = max_nbytes 

395 self._temp_folder_resolver = temp_folder_resolver 

396 self._mmap_mode = mmap_mode 

397 self.verbose = int(verbose) 

398 if prewarm == "auto": 

399 self._prewarm = not self._temp_folder.startswith(SYSTEM_SHARED_MEM_FS) 

400 else: 

401 self._prewarm = prewarm 

402 self._prewarm = prewarm 

403 self._memmaped_arrays = _WeakArrayKeyMap() 

404 self._temporary_memmaped_filenames = set() 

405 self._unlink_on_gc_collect = unlink_on_gc_collect 

406 

407 @property 

408 def _temp_folder(self): 

409 return self._temp_folder_resolver() 

410 

411 def __reduce__(self): 

412 # The ArrayMemmapForwardReducer is passed to the children processes: it 

413 # needs to be pickled but the _WeakArrayKeyMap need to be skipped as 

414 # it's only guaranteed to be consistent with the parent process memory 

415 # garbage collection. 

416 # Although this reducer is pickled, it is not needed in its destination 

417 # process (child processes), as we only use this reducer to send 

418 # memmaps from the parent process to the children processes. For this 

419 # reason, we can afford skipping the resolver, (which would otherwise 

420 # be unpicklable), and pass it as None instead. 

421 args = (self._max_nbytes, None, self._mmap_mode, self._unlink_on_gc_collect) 

422 kwargs = { 

423 "verbose": self.verbose, 

424 "prewarm": self._prewarm, 

425 } 

426 return ArrayMemmapForwardReducer, args, kwargs 

427 

428 def __call__(self, a): 

429 m = _get_backing_memmap(a) 

430 if m is not None and isinstance(m, np.memmap): 

431 # a is already backed by a memmap file, let's reuse it directly 

432 return _reduce_memmap_backed(a, m) 

433 

434 if ( 

435 not a.dtype.hasobject 

436 and self._max_nbytes is not None 

437 and a.nbytes > self._max_nbytes 

438 ): 

439 # check that the folder exists (lazily create the pool temp folder 

440 # if required) 

441 try: 

442 os.makedirs(self._temp_folder) 

443 os.chmod(self._temp_folder, FOLDER_PERMISSIONS) 

444 except OSError as e: 

445 if e.errno != errno.EEXIST: 

446 raise e 

447 

448 try: 

449 basename = self._memmaped_arrays.get(a) 

450 except KeyError: 

451 # Generate a new unique random filename. The process and thread 

452 # ids are only useful for debugging purpose and to make it 

453 # easier to cleanup orphaned files in case of hard process 

454 # kill (e.g. by "kill -9" or segfault). 

455 basename = "{}-{}-{}.pkl".format( 

456 os.getpid(), id(threading.current_thread()), uuid4().hex 

457 ) 

458 self._memmaped_arrays.set(a, basename) 

459 filename = os.path.join(self._temp_folder, basename) 

460 

461 # In case the same array with the same content is passed several 

462 # times to the pool subprocess children, serialize it only once 

463 

464 is_new_memmap = filename not in self._temporary_memmaped_filenames 

465 

466 # add the memmap to the list of temporary memmaps created by joblib 

467 self._temporary_memmaped_filenames.add(filename) 

468 

469 if self._unlink_on_gc_collect: 

470 # Bump reference count of the memmap by 1 to account for 

471 # shared usage of the memmap by a child process. The 

472 # corresponding decref call will be executed upon calling 

473 # resource_tracker.maybe_unlink, registered as a finalizer in 

474 # the child. 

475 # the incref/decref calls here are only possible when the child 

476 # and the parent share the same resource_tracker. It is not the 

477 # case for the multiprocessing backend, but it does not matter 

478 # because unlinking a memmap from a child process is only 

479 # useful to control the memory usage of long-lasting child 

480 # processes, while the multiprocessing-based pools terminate 

481 # their workers at the end of a map() call. 

482 resource_tracker.register(filename, "file") 

483 

484 if is_new_memmap: 

485 # Incref each temporary memmap created by joblib one extra 

486 # time. This means that these memmaps will only be deleted 

487 # once an extra maybe_unlink() is called, which is done once 

488 # all the jobs have completed (or been canceled) in the 

489 # Parallel._terminate_backend() method. 

490 resource_tracker.register(filename, "file") 

491 

492 if not os.path.exists(filename): 

493 util.debug( 

494 "[ARRAY DUMP] Pickling new array (shape={}, dtype={}) " 

495 "creating a new memmap at {}".format(a.shape, a.dtype, filename) 

496 ) 

497 for dumped_filename in dump(a, filename): 

498 os.chmod(dumped_filename, FILE_PERMISSIONS) 

499 

500 if self._prewarm: 

501 # Warm up the data by accessing it. This operation ensures 

502 # that the disk access required to create the memmapping 

503 # file are performed in the reducing process and avoids 

504 # concurrent memmap creation in multiple children 

505 # processes. 

506 load(filename, mmap_mode=self._mmap_mode).max() 

507 

508 else: 

509 util.debug( 

510 "[ARRAY DUMP] Pickling known array (shape={}, dtype={}) " 

511 "reusing memmap file: {}".format( 

512 a.shape, a.dtype, os.path.basename(filename) 

513 ) 

514 ) 

515 

516 # The worker process will use joblib.load to memmap the data 

517 return ( 

518 load_temporary_memmap, 

519 (filename, self._mmap_mode, self._unlink_on_gc_collect), 

520 ) 

521 else: 

522 # do not convert a into memmap, let pickler do its usual copy with 

523 # the default system pickler 

524 util.debug( 

525 "[ARRAY DUMP] Pickling array (NO MEMMAPPING) (shape={}, " 

526 " dtype={}).".format(a.shape, a.dtype) 

527 ) 

528 return (loads, (dumps(a, protocol=HIGHEST_PROTOCOL),)) 

529 

530 

531def get_memmapping_reducers( 

532 forward_reducers=None, 

533 backward_reducers=None, 

534 temp_folder_resolver=None, 

535 max_nbytes=1e6, 

536 mmap_mode="r", 

537 verbose=0, 

538 prewarm=False, 

539 unlink_on_gc_collect=True, 

540 **kwargs, 

541): 

542 """Construct a pair of memmapping reducer linked to a tmpdir. 

543 

544 This function manage the creation and the clean up of the temporary folders 

545 underlying the memory maps and should be use to get the reducers necessary 

546 to construct joblib pool or executor. 

547 """ 

548 if forward_reducers is None: 

549 forward_reducers = dict() 

550 if backward_reducers is None: 

551 backward_reducers = dict() 

552 

553 if np is not None: 

554 # Register smart numpy.ndarray reducers that detects memmap backed 

555 # arrays and that is also able to dump to memmap large in-memory 

556 # arrays over the max_nbytes threshold 

557 forward_reduce_ndarray = ArrayMemmapForwardReducer( 

558 max_nbytes, 

559 temp_folder_resolver, 

560 mmap_mode, 

561 unlink_on_gc_collect, 

562 verbose, 

563 prewarm=prewarm, 

564 ) 

565 forward_reducers[np.ndarray] = forward_reduce_ndarray 

566 forward_reducers[np.memmap] = forward_reduce_ndarray 

567 

568 # Communication from child process to the parent process always 

569 # pickles in-memory numpy.ndarray without dumping them as memmap 

570 # to avoid confusing the caller and make it tricky to collect the 

571 # temporary folder 

572 backward_reducers[np.ndarray] = reduce_array_memmap_backward 

573 backward_reducers[np.memmap] = reduce_array_memmap_backward 

574 

575 return forward_reducers, backward_reducers 

576 

577 

578class TemporaryResourcesManager(object): 

579 """Stateful object able to manage temporary folder and pickles 

580 

581 It exposes: 

582 - a per-context folder name resolving API that memmap-based reducers will 

583 rely on to know where to pickle the temporary memmaps 

584 - a temporary file/folder management API that internally uses the 

585 resource_tracker. 

586 """ 

587 

588 def __init__(self, temp_folder_root=None, context_id=None): 

589 self._current_temp_folder = None 

590 self._temp_folder_root = temp_folder_root 

591 self._use_shared_mem = None 

592 self._cached_temp_folders = dict() 

593 self._id = uuid4().hex 

594 self._finalizers = {} 

595 if context_id is None: 

596 # It would be safer to not assign a default context id (less silent 

597 # bugs), but doing this while maintaining backward compatibility 

598 # with the previous, context-unaware version get_memmaping_executor 

599 # exposes too many low-level details. 

600 context_id = uuid4().hex 

601 self.set_current_context(context_id) 

602 

603 def set_current_context(self, context_id): 

604 self._current_context_id = context_id 

605 self.register_new_context(context_id) 

606 

607 def register_new_context(self, context_id): 

608 # Prepare a sub-folder name specific to a context (usually a unique id 

609 # generated by each instance of the Parallel class). Do not create in 

610 # advance to spare FS write access if no array is to be dumped). 

611 if context_id in self._cached_temp_folders: 

612 return 

613 else: 

614 # During its lifecycle, one Parallel object can have several 

615 # executors associated to it (for instance, if a loky worker raises 

616 # an exception, joblib shutdowns the executor and instantly 

617 # recreates a new one before raising the error - see 

618 # ``ensure_ready``. Because we don't want two executors tied to 

619 # the same Parallel object (and thus the same context id) to 

620 # register/use/delete the same folder, we also add an id specific 

621 # to the current Manager (and thus specific to its associated 

622 # executor) to the folder name. 

623 new_folder_name = "joblib_memmapping_folder_{}_{}_{}".format( 

624 os.getpid(), self._id, context_id 

625 ) 

626 new_folder_path, _ = _get_temp_dir(new_folder_name, self._temp_folder_root) 

627 self.register_folder_finalizer(new_folder_path, context_id) 

628 self._cached_temp_folders[context_id] = new_folder_path 

629 

630 def resolve_temp_folder_name(self): 

631 """Return a folder name specific to the currently activated context""" 

632 return self._cached_temp_folders[self._current_context_id] 

633 

634 # resource management API 

635 

636 def register_folder_finalizer(self, pool_subfolder, context_id): 

637 # Register the garbage collector at program exit in case caller forgets 

638 # to call terminate explicitly: note we do not pass any reference to 

639 # ensure that this callback won't prevent garbage collection of 

640 # parallel instance and related file handler resources such as POSIX 

641 # semaphores and pipes 

642 pool_module_name = whichmodule(delete_folder, "delete_folder") 

643 resource_tracker.register(pool_subfolder, "folder") 

644 

645 def _cleanup(): 

646 # In some cases the Python runtime seems to set delete_folder to 

647 # None just before exiting when accessing the delete_folder 

648 # function from the closure namespace. So instead we reimport 

649 # the delete_folder function explicitly. 

650 # https://github.com/joblib/joblib/issues/328 

651 # We cannot just use from 'joblib.pool import delete_folder' 

652 # because joblib should only use relative imports to allow 

653 # easy vendoring. 

654 delete_folder = __import__( 

655 pool_module_name, fromlist=["delete_folder"] 

656 ).delete_folder 

657 try: 

658 delete_folder(pool_subfolder, allow_non_empty=True) 

659 resource_tracker.unregister(pool_subfolder, "folder") 

660 except OSError: 

661 warnings.warn( 

662 "Failed to delete temporary folder: {}".format(pool_subfolder) 

663 ) 

664 

665 self._finalizers[context_id] = atexit.register(_cleanup) 

666 

667 def _clean_temporary_resources( 

668 self, context_id=None, force=False, allow_non_empty=False 

669 ): 

670 """Clean temporary resources created by a process-based pool""" 

671 if context_id is None: 

672 # Iterates over a copy of the cache keys to avoid Error due to 

673 # iterating over a changing size dictionary. 

674 for context_id in list(self._cached_temp_folders): 

675 self._clean_temporary_resources( 

676 context_id, force=force, allow_non_empty=allow_non_empty 

677 ) 

678 else: 

679 temp_folder = self._cached_temp_folders.get(context_id) 

680 if temp_folder and os.path.exists(temp_folder): 

681 for filename in os.listdir(temp_folder): 

682 if force: 

683 # Some workers have failed and the ref counted might 

684 # be off. The workers should have shut down by this 

685 # time so forcefully clean up the files. 

686 resource_tracker.unregister( 

687 os.path.join(temp_folder, filename), "file" 

688 ) 

689 else: 

690 resource_tracker.maybe_unlink( 

691 os.path.join(temp_folder, filename), "file" 

692 ) 

693 

694 # When forcing clean-up, try to delete the folder even if some 

695 # files are still in it. Otherwise, try to delete the folder 

696 allow_non_empty |= force 

697 

698 # Clean up the folder if possible, either if it is empty or 

699 # if none of the files in it are in used and allow_non_empty. 

700 try: 

701 delete_folder(temp_folder, allow_non_empty=allow_non_empty) 

702 # Forget the folder once it has been deleted 

703 self._cached_temp_folders.pop(context_id, None) 

704 resource_tracker.unregister(temp_folder, "folder") 

705 

706 # Also cancel the finalizers that gets triggered at gc. 

707 finalizer = self._finalizers.pop(context_id, None) 

708 if finalizer is not None: 

709 atexit.unregister(finalizer) 

710 

711 except OSError: 

712 # Temporary folder cannot be deleted right now. 

713 # This folder will be cleaned up by an atexit 

714 # finalizer registered by the memmapping_reducer. 

715 pass