Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/_store_backends.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

230 statements  

1"""Storage providers backends for Memory caching.""" 

2 

3import collections 

4import datetime 

5import json 

6import operator 

7import os 

8import os.path 

9import re 

10import shutil 

11import threading 

12import time 

13import uuid 

14import warnings 

15from abc import ABCMeta, abstractmethod 

16from pickle import PicklingError 

17 

18from . import numpy_pickle 

19from .backports import concurrency_safe_rename 

20from .disk import memstr_to_bytes, mkdirp, rm_subdirs 

21from .logger import format_time 

22 

23CacheItemInfo = collections.namedtuple("CacheItemInfo", "path size last_access") 

24 

25 

26class CacheWarning(Warning): 

27 """Warning to capture dump failures except for PicklingError.""" 

28 

29 pass 

30 

31 

32def concurrency_safe_write(object_to_write, filename, write_func): 

33 """Writes an object into a unique file in a concurrency-safe way.""" 

34 # Temporary name is composed of UUID, process_id and thread_id to avoid 

35 # collisions due to concurrent write. 

36 # UUID is unique across nodes and time and help avoid collisions, even if 

37 # the cache folder is shared by several Python processes with the same pid and 

38 # thread id on different nodes of a cluster for instance. 

39 thread_id = id(threading.current_thread()) 

40 temporary_filename = f"{filename}.{uuid.uuid4().hex}-{os.getpid()}-{thread_id}" 

41 

42 write_func(object_to_write, temporary_filename) 

43 

44 return temporary_filename 

45 

46 

47class StoreBackendBase(metaclass=ABCMeta): 

48 """Helper Abstract Base Class which defines all methods that 

49 a StorageBackend must implement.""" 

50 

51 location = None 

52 

53 @abstractmethod 

54 def _open_item(self, f, mode): 

55 """Opens an item on the store and return a file-like object. 

56 

57 This method is private and only used by the StoreBackendMixin object. 

58 

59 Parameters 

60 ---------- 

61 f: a file-like object 

62 The file-like object where an item is stored and retrieved 

63 mode: string, optional 

64 the mode in which the file-like object is opened allowed valued are 

65 'rb', 'wb' 

66 

67 Returns 

68 ------- 

69 a file-like object 

70 """ 

71 

72 @abstractmethod 

73 def _item_exists(self, location): 

74 """Checks if an item location exists in the store. 

75 

76 This method is private and only used by the StoreBackendMixin object. 

77 

78 Parameters 

79 ---------- 

80 location: string 

81 The location of an item. On a filesystem, this corresponds to the 

82 absolute path, including the filename, of a file. 

83 

84 Returns 

85 ------- 

86 True if the item exists, False otherwise 

87 """ 

88 

89 @abstractmethod 

90 def _move_item(self, src, dst): 

91 """Moves an item from src to dst in the store. 

92 

93 This method is private and only used by the StoreBackendMixin object. 

94 

95 Parameters 

96 ---------- 

97 src: string 

98 The source location of an item 

99 dst: string 

100 The destination location of an item 

101 """ 

102 

103 @abstractmethod 

104 def create_location(self, location): 

105 """Creates a location on the store. 

106 

107 Parameters 

108 ---------- 

109 location: string 

110 The location in the store. On a filesystem, this corresponds to a 

111 directory. 

112 """ 

113 

114 @abstractmethod 

115 def clear_location(self, location): 

116 """Clears a location on the store. 

117 

118 Parameters 

119 ---------- 

120 location: string 

121 The location in the store. On a filesystem, this corresponds to a 

122 directory or a filename absolute path 

123 """ 

124 

125 @abstractmethod 

126 def get_items(self): 

127 """Returns the whole list of items available in the store. 

128 

129 Returns 

130 ------- 

131 The list of items identified by their ids (e.g filename in a 

132 filesystem). 

133 """ 

134 

135 @abstractmethod 

136 def configure(self, location, verbose=0, backend_options=dict()): 

137 """Configures the store. 

138 

139 Parameters 

140 ---------- 

141 location: string 

142 The base location used by the store. On a filesystem, this 

143 corresponds to a directory. 

144 verbose: int 

145 The level of verbosity of the store 

146 backend_options: dict 

147 Contains a dictionary of named parameters used to configure the 

148 store backend. 

149 """ 

150 

151 

152class StoreBackendMixin(object): 

153 """Class providing all logic for managing the store in a generic way. 

154 

155 The StoreBackend subclass has to implement 3 methods: create_location, 

156 clear_location and configure. The StoreBackend also has to provide 

157 a private _open_item, _item_exists and _move_item methods. The _open_item 

158 method has to have the same signature as the builtin open and return a 

159 file-like object. 

160 """ 

161 

162 def load_item(self, call_id, verbose=1, timestamp=None, metadata=None): 

163 """Load an item from the store given its id as a list of str.""" 

164 full_path = os.path.join(self.location, *call_id) 

165 

166 if verbose > 1: 

167 ts_string = ( 

168 "{: <16}".format(format_time(time.time() - timestamp)) 

169 if timestamp is not None 

170 else "" 

171 ) 

172 signature = os.path.basename(call_id[0]) 

173 if metadata is not None and "input_args" in metadata: 

174 kwargs = ", ".join( 

175 "{}={}".format(*item) for item in metadata["input_args"].items() 

176 ) 

177 signature += "({})".format(kwargs) 

178 msg = "[Memory]{}: Loading {}".format(ts_string, signature) 

179 if verbose < 10: 

180 print("{0}...".format(msg)) 

181 else: 

182 print("{0} from {1}".format(msg, full_path)) 

183 

184 mmap_mode = None if not hasattr(self, "mmap_mode") else self.mmap_mode 

185 

186 filename = os.path.join(full_path, "output.pkl") 

187 if not self._item_exists(filename): 

188 raise KeyError( 

189 "Non-existing item (may have been " 

190 "cleared).\nFile %s does not exist" % filename 

191 ) 

192 

193 # file-like object cannot be used when mmap_mode is set 

194 if mmap_mode is None: 

195 with self._open_item(filename, "rb") as f: 

196 item = numpy_pickle.load(f) 

197 else: 

198 item = numpy_pickle.load(filename, mmap_mode=mmap_mode) 

199 return item 

200 

201 def dump_item(self, call_id, item, verbose=1): 

202 """Dump an item in the store at the id given as a list of str.""" 

203 try: 

204 item_path = os.path.join(self.location, *call_id) 

205 if not self._item_exists(item_path): 

206 self.create_location(item_path) 

207 filename = os.path.join(item_path, "output.pkl") 

208 if verbose > 10: 

209 print("Persisting in %s" % item_path) 

210 

211 def write_func(to_write, dest_filename): 

212 with self._open_item(dest_filename, "wb") as f: 

213 try: 

214 numpy_pickle.dump(to_write, f, compress=self.compress) 

215 except PicklingError as e: 

216 # TODO(1.5) turn into error 

217 warnings.warn( 

218 "Unable to cache to disk: failed to pickle " 

219 "output. In version 1.5 this will raise an " 

220 f"exception. Exception: {e}.", 

221 FutureWarning, 

222 ) 

223 

224 self._concurrency_safe_write(item, filename, write_func) 

225 except Exception as e: # noqa: E722 

226 warnings.warn( 

227 "Unable to cache to disk. Possibly a race condition in the " 

228 f"creation of the directory. Exception: {e}.", 

229 CacheWarning, 

230 ) 

231 

232 def clear_item(self, call_id): 

233 """Clear the item at the id, given as a list of str.""" 

234 item_path = os.path.join(self.location, *call_id) 

235 if self._item_exists(item_path): 

236 self.clear_location(item_path) 

237 

238 def contains_item(self, call_id): 

239 """Check if there is an item at the id, given as a list of str.""" 

240 item_path = os.path.join(self.location, *call_id) 

241 filename = os.path.join(item_path, "output.pkl") 

242 

243 return self._item_exists(filename) 

244 

245 def get_item_info(self, call_id): 

246 """Return information about item.""" 

247 return {"location": os.path.join(self.location, *call_id)} 

248 

249 def get_metadata(self, call_id): 

250 """Return actual metadata of an item.""" 

251 try: 

252 item_path = os.path.join(self.location, *call_id) 

253 filename = os.path.join(item_path, "metadata.json") 

254 with self._open_item(filename, "rb") as f: 

255 return json.loads(f.read().decode("utf-8")) 

256 except: # noqa: E722 

257 return {} 

258 

259 def store_metadata(self, call_id, metadata): 

260 """Store metadata of a computation.""" 

261 try: 

262 item_path = os.path.join(self.location, *call_id) 

263 self.create_location(item_path) 

264 filename = os.path.join(item_path, "metadata.json") 

265 

266 def write_func(to_write, dest_filename): 

267 with self._open_item(dest_filename, "wb") as f: 

268 f.write(json.dumps(to_write).encode("utf-8")) 

269 

270 self._concurrency_safe_write(metadata, filename, write_func) 

271 except: # noqa: E722 

272 pass 

273 

274 def contains_path(self, call_id): 

275 """Check cached function is available in store.""" 

276 func_path = os.path.join(self.location, *call_id) 

277 return self.object_exists(func_path) 

278 

279 def clear_path(self, call_id): 

280 """Clear all items with a common path in the store.""" 

281 func_path = os.path.join(self.location, *call_id) 

282 if self._item_exists(func_path): 

283 self.clear_location(func_path) 

284 

285 def store_cached_func_code(self, call_id, func_code=None): 

286 """Store the code of the cached function.""" 

287 func_path = os.path.join(self.location, *call_id) 

288 if not self._item_exists(func_path): 

289 self.create_location(func_path) 

290 

291 if func_code is not None: 

292 filename = os.path.join(func_path, "func_code.py") 

293 with self._open_item(filename, "wb") as f: 

294 f.write(func_code.encode("utf-8")) 

295 

296 def get_cached_func_code(self, call_id): 

297 """Store the code of the cached function.""" 

298 filename = os.path.join(self.location, *call_id, "func_code.py") 

299 try: 

300 with self._open_item(filename, "rb") as f: 

301 return f.read().decode("utf-8") 

302 except: # noqa: E722 

303 raise 

304 

305 def get_cached_func_info(self, call_id): 

306 """Return information related to the cached function if it exists.""" 

307 return {"location": os.path.join(self.location, *call_id)} 

308 

309 def clear(self): 

310 """Clear the whole store content.""" 

311 self.clear_location(self.location) 

312 

313 def enforce_store_limits(self, bytes_limit, items_limit=None, age_limit=None): 

314 """ 

315 Remove the store's oldest files to enforce item, byte, and age limits. 

316 """ 

317 items_to_delete = self._get_items_to_delete(bytes_limit, items_limit, age_limit) 

318 

319 for item in items_to_delete: 

320 if self.verbose > 10: 

321 print("Deleting item {0}".format(item)) 

322 try: 

323 self.clear_location(item.path) 

324 except OSError: 

325 # Even with ignore_errors=True shutil.rmtree can raise OSError 

326 # with: 

327 # [Errno 116] Stale file handle if another process has deleted 

328 # the folder already. 

329 pass 

330 

331 def _get_items_to_delete(self, bytes_limit, items_limit=None, age_limit=None): 

332 """ 

333 Get items to delete to keep the store under size, file, & age limits. 

334 """ 

335 if isinstance(bytes_limit, str): 

336 bytes_limit = memstr_to_bytes(bytes_limit) 

337 

338 items = self.get_items() 

339 if not items: 

340 return [] 

341 

342 size = sum(item.size for item in items) 

343 

344 if bytes_limit is not None: 

345 to_delete_size = size - bytes_limit 

346 else: 

347 to_delete_size = 0 

348 

349 if items_limit is not None: 

350 to_delete_items = len(items) - items_limit 

351 else: 

352 to_delete_items = 0 

353 

354 if age_limit is not None: 

355 older_item = min(item.last_access for item in items) 

356 if age_limit.total_seconds() < 0: 

357 raise ValueError("age_limit has to be a positive timedelta") 

358 deadline = datetime.datetime.now() - age_limit 

359 else: 

360 deadline = None 

361 

362 if ( 

363 to_delete_size <= 0 

364 and to_delete_items <= 0 

365 and (deadline is None or older_item > deadline) 

366 ): 

367 return [] 

368 

369 # We want to delete first the cache items that were accessed a 

370 # long time ago 

371 items.sort(key=operator.attrgetter("last_access")) 

372 

373 items_to_delete = [] 

374 size_so_far = 0 

375 items_so_far = 0 

376 

377 for item in items: 

378 if ( 

379 (size_so_far >= to_delete_size) 

380 and items_so_far >= to_delete_items 

381 and (deadline is None or deadline < item.last_access) 

382 ): 

383 break 

384 

385 items_to_delete.append(item) 

386 size_so_far += item.size 

387 items_so_far += 1 

388 

389 return items_to_delete 

390 

391 def _concurrency_safe_write(self, to_write, filename, write_func): 

392 """Writes an object into a file in a concurrency-safe way.""" 

393 temporary_filename = concurrency_safe_write(to_write, filename, write_func) 

394 self._move_item(temporary_filename, filename) 

395 

396 def __repr__(self): 

397 """Printable representation of the store location.""" 

398 return '{class_name}(location="{location}")'.format( 

399 class_name=self.__class__.__name__, location=self.location 

400 ) 

401 

402 

403class FileSystemStoreBackend(StoreBackendBase, StoreBackendMixin): 

404 """A StoreBackend used with local or network file systems.""" 

405 

406 _open_item = staticmethod(open) 

407 _item_exists = staticmethod(os.path.exists) 

408 _move_item = staticmethod(concurrency_safe_rename) 

409 

410 def clear_location(self, location): 

411 """Delete location on store.""" 

412 if location == self.location: 

413 rm_subdirs(location) 

414 else: 

415 shutil.rmtree(location, ignore_errors=True) 

416 

417 def create_location(self, location): 

418 """Create object location on store""" 

419 mkdirp(location) 

420 

421 def get_items(self): 

422 """Returns the whole list of items available in the store.""" 

423 items = [] 

424 

425 for dirpath, _, filenames in os.walk(self.location): 

426 is_cache_hash_dir = re.match("[a-f0-9]{32}", os.path.basename(dirpath)) 

427 

428 if is_cache_hash_dir: 

429 output_filename = os.path.join(dirpath, "output.pkl") 

430 try: 

431 last_access = os.path.getatime(output_filename) 

432 except OSError: 

433 try: 

434 last_access = os.path.getatime(dirpath) 

435 except OSError: 

436 # The directory has already been deleted 

437 continue 

438 

439 last_access = datetime.datetime.fromtimestamp(last_access) 

440 try: 

441 full_filenames = [os.path.join(dirpath, fn) for fn in filenames] 

442 dirsize = sum(os.path.getsize(fn) for fn in full_filenames) 

443 except OSError: 

444 # Either output_filename or one of the files in 

445 # dirpath does not exist any more. We assume this 

446 # directory is being cleaned by another process already 

447 continue 

448 

449 items.append(CacheItemInfo(dirpath, dirsize, last_access)) 

450 

451 return items 

452 

453 def configure(self, location, verbose=1, backend_options=None): 

454 """Configure the store backend. 

455 

456 For this backend, valid store options are 'compress' and 'mmap_mode' 

457 """ 

458 if backend_options is None: 

459 backend_options = {} 

460 

461 # setup location directory 

462 self.location = location 

463 if not os.path.exists(self.location): 

464 mkdirp(self.location) 

465 

466 # Automatically add `.gitignore` file to the cache folder. 

467 # XXX: the condition is necessary because in `Memory.__init__`, the user 

468 # passed `location` param is modified to be either `{location}` or 

469 # `{location}/joblib` depending on input type (`pathlib.Path` vs `str`). 

470 # The proper resolution of this inconsistency is tracked in: 

471 # https://github.com/joblib/joblib/issues/1684 

472 cache_directory = ( 

473 os.path.dirname(location) 

474 if os.path.dirname(location) and os.path.basename(location) == "joblib" 

475 else location 

476 ) 

477 with open(os.path.join(cache_directory, ".gitignore"), "w") as file: 

478 file.write("# Created by joblib automatically.\n") 

479 file.write("*\n") 

480 

481 # item can be stored compressed for faster I/O 

482 self.compress = backend_options.get("compress", False) 

483 

484 # FileSystemStoreBackend can be used with mmap_mode options under 

485 # certain conditions. 

486 mmap_mode = backend_options.get("mmap_mode") 

487 if self.compress and mmap_mode is not None: 

488 warnings.warn( 

489 "Compressed items cannot be memmapped in a " 

490 "filesystem store. Option will be ignored.", 

491 stacklevel=2, 

492 ) 

493 

494 self.mmap_mode = mmap_mode 

495 self.verbose = verbose