Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/joblib/_store_backends.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

229 statements  

1"""Storage providers backends for Memory caching.""" 

2 

3import collections 

4import datetime 

5import json 

6import operator 

7import os 

8import os.path 

9import re 

10import shutil 

11import threading 

12import time 

13import warnings 

14from abc import ABCMeta, abstractmethod 

15from pickle import PicklingError 

16 

17from . import numpy_pickle 

18from .backports import concurrency_safe_rename 

19from .disk import memstr_to_bytes, mkdirp, rm_subdirs 

20from .logger import format_time 

21 

22CacheItemInfo = collections.namedtuple("CacheItemInfo", "path size last_access") 

23 

24 

25class CacheWarning(Warning): 

26 """Warning to capture dump failures except for PicklingError.""" 

27 

28 pass 

29 

30 

31def concurrency_safe_write(object_to_write, filename, write_func): 

32 """Writes an object into a unique file in a concurrency-safe way.""" 

33 thread_id = id(threading.current_thread()) 

34 temporary_filename = "{}.thread-{}-pid-{}".format(filename, thread_id, os.getpid()) 

35 write_func(object_to_write, temporary_filename) 

36 

37 return temporary_filename 

38 

39 

40class StoreBackendBase(metaclass=ABCMeta): 

41 """Helper Abstract Base Class which defines all methods that 

42 a StorageBackend must implement.""" 

43 

44 location = None 

45 

46 @abstractmethod 

47 def _open_item(self, f, mode): 

48 """Opens an item on the store and return a file-like object. 

49 

50 This method is private and only used by the StoreBackendMixin object. 

51 

52 Parameters 

53 ---------- 

54 f: a file-like object 

55 The file-like object where an item is stored and retrieved 

56 mode: string, optional 

57 the mode in which the file-like object is opened allowed valued are 

58 'rb', 'wb' 

59 

60 Returns 

61 ------- 

62 a file-like object 

63 """ 

64 

65 @abstractmethod 

66 def _item_exists(self, location): 

67 """Checks if an item location exists in the store. 

68 

69 This method is private and only used by the StoreBackendMixin object. 

70 

71 Parameters 

72 ---------- 

73 location: string 

74 The location of an item. On a filesystem, this corresponds to the 

75 absolute path, including the filename, of a file. 

76 

77 Returns 

78 ------- 

79 True if the item exists, False otherwise 

80 """ 

81 

82 @abstractmethod 

83 def _move_item(self, src, dst): 

84 """Moves an item from src to dst in the store. 

85 

86 This method is private and only used by the StoreBackendMixin object. 

87 

88 Parameters 

89 ---------- 

90 src: string 

91 The source location of an item 

92 dst: string 

93 The destination location of an item 

94 """ 

95 

96 @abstractmethod 

97 def create_location(self, location): 

98 """Creates a location on the store. 

99 

100 Parameters 

101 ---------- 

102 location: string 

103 The location in the store. On a filesystem, this corresponds to a 

104 directory. 

105 """ 

106 

107 @abstractmethod 

108 def clear_location(self, location): 

109 """Clears a location on the store. 

110 

111 Parameters 

112 ---------- 

113 location: string 

114 The location in the store. On a filesystem, this corresponds to a 

115 directory or a filename absolute path 

116 """ 

117 

118 @abstractmethod 

119 def get_items(self): 

120 """Returns the whole list of items available in the store. 

121 

122 Returns 

123 ------- 

124 The list of items identified by their ids (e.g filename in a 

125 filesystem). 

126 """ 

127 

128 @abstractmethod 

129 def configure(self, location, verbose=0, backend_options=dict()): 

130 """Configures the store. 

131 

132 Parameters 

133 ---------- 

134 location: string 

135 The base location used by the store. On a filesystem, this 

136 corresponds to a directory. 

137 verbose: int 

138 The level of verbosity of the store 

139 backend_options: dict 

140 Contains a dictionary of named parameters used to configure the 

141 store backend. 

142 """ 

143 

144 

145class StoreBackendMixin(object): 

146 """Class providing all logic for managing the store in a generic way. 

147 

148 The StoreBackend subclass has to implement 3 methods: create_location, 

149 clear_location and configure. The StoreBackend also has to provide 

150 a private _open_item, _item_exists and _move_item methods. The _open_item 

151 method has to have the same signature as the builtin open and return a 

152 file-like object. 

153 """ 

154 

155 def load_item(self, call_id, verbose=1, timestamp=None, metadata=None): 

156 """Load an item from the store given its id as a list of str.""" 

157 full_path = os.path.join(self.location, *call_id) 

158 

159 if verbose > 1: 

160 ts_string = ( 

161 "{: <16}".format(format_time(time.time() - timestamp)) 

162 if timestamp is not None 

163 else "" 

164 ) 

165 signature = os.path.basename(call_id[0]) 

166 if metadata is not None and "input_args" in metadata: 

167 kwargs = ", ".join( 

168 "{}={}".format(*item) for item in metadata["input_args"].items() 

169 ) 

170 signature += "({})".format(kwargs) 

171 msg = "[Memory]{}: Loading {}".format(ts_string, signature) 

172 if verbose < 10: 

173 print("{0}...".format(msg)) 

174 else: 

175 print("{0} from {1}".format(msg, full_path)) 

176 

177 mmap_mode = None if not hasattr(self, "mmap_mode") else self.mmap_mode 

178 

179 filename = os.path.join(full_path, "output.pkl") 

180 if not self._item_exists(filename): 

181 raise KeyError( 

182 "Non-existing item (may have been " 

183 "cleared).\nFile %s does not exist" % filename 

184 ) 

185 

186 # file-like object cannot be used when mmap_mode is set 

187 if mmap_mode is None: 

188 with self._open_item(filename, "rb") as f: 

189 item = numpy_pickle.load(f) 

190 else: 

191 item = numpy_pickle.load(filename, mmap_mode=mmap_mode) 

192 return item 

193 

194 def dump_item(self, call_id, item, verbose=1): 

195 """Dump an item in the store at the id given as a list of str.""" 

196 try: 

197 item_path = os.path.join(self.location, *call_id) 

198 if not self._item_exists(item_path): 

199 self.create_location(item_path) 

200 filename = os.path.join(item_path, "output.pkl") 

201 if verbose > 10: 

202 print("Persisting in %s" % item_path) 

203 

204 def write_func(to_write, dest_filename): 

205 with self._open_item(dest_filename, "wb") as f: 

206 try: 

207 numpy_pickle.dump(to_write, f, compress=self.compress) 

208 except PicklingError as e: 

209 # TODO(1.5) turn into error 

210 warnings.warn( 

211 "Unable to cache to disk: failed to pickle " 

212 "output. In version 1.5 this will raise an " 

213 f"exception. Exception: {e}.", 

214 FutureWarning, 

215 ) 

216 

217 self._concurrency_safe_write(item, filename, write_func) 

218 except Exception as e: # noqa: E722 

219 warnings.warn( 

220 "Unable to cache to disk. Possibly a race condition in the " 

221 f"creation of the directory. Exception: {e}.", 

222 CacheWarning, 

223 ) 

224 

225 def clear_item(self, call_id): 

226 """Clear the item at the id, given as a list of str.""" 

227 item_path = os.path.join(self.location, *call_id) 

228 if self._item_exists(item_path): 

229 self.clear_location(item_path) 

230 

231 def contains_item(self, call_id): 

232 """Check if there is an item at the id, given as a list of str.""" 

233 item_path = os.path.join(self.location, *call_id) 

234 filename = os.path.join(item_path, "output.pkl") 

235 

236 return self._item_exists(filename) 

237 

238 def get_item_info(self, call_id): 

239 """Return information about item.""" 

240 return {"location": os.path.join(self.location, *call_id)} 

241 

242 def get_metadata(self, call_id): 

243 """Return actual metadata of an item.""" 

244 try: 

245 item_path = os.path.join(self.location, *call_id) 

246 filename = os.path.join(item_path, "metadata.json") 

247 with self._open_item(filename, "rb") as f: 

248 return json.loads(f.read().decode("utf-8")) 

249 except: # noqa: E722 

250 return {} 

251 

252 def store_metadata(self, call_id, metadata): 

253 """Store metadata of a computation.""" 

254 try: 

255 item_path = os.path.join(self.location, *call_id) 

256 self.create_location(item_path) 

257 filename = os.path.join(item_path, "metadata.json") 

258 

259 def write_func(to_write, dest_filename): 

260 with self._open_item(dest_filename, "wb") as f: 

261 f.write(json.dumps(to_write).encode("utf-8")) 

262 

263 self._concurrency_safe_write(metadata, filename, write_func) 

264 except: # noqa: E722 

265 pass 

266 

267 def contains_path(self, call_id): 

268 """Check cached function is available in store.""" 

269 func_path = os.path.join(self.location, *call_id) 

270 return self.object_exists(func_path) 

271 

272 def clear_path(self, call_id): 

273 """Clear all items with a common path in the store.""" 

274 func_path = os.path.join(self.location, *call_id) 

275 if self._item_exists(func_path): 

276 self.clear_location(func_path) 

277 

278 def store_cached_func_code(self, call_id, func_code=None): 

279 """Store the code of the cached function.""" 

280 func_path = os.path.join(self.location, *call_id) 

281 if not self._item_exists(func_path): 

282 self.create_location(func_path) 

283 

284 if func_code is not None: 

285 filename = os.path.join(func_path, "func_code.py") 

286 with self._open_item(filename, "wb") as f: 

287 f.write(func_code.encode("utf-8")) 

288 

289 def get_cached_func_code(self, call_id): 

290 """Store the code of the cached function.""" 

291 filename = os.path.join(self.location, *call_id, "func_code.py") 

292 try: 

293 with self._open_item(filename, "rb") as f: 

294 return f.read().decode("utf-8") 

295 except: # noqa: E722 

296 raise 

297 

298 def get_cached_func_info(self, call_id): 

299 """Return information related to the cached function if it exists.""" 

300 return {"location": os.path.join(self.location, *call_id)} 

301 

302 def clear(self): 

303 """Clear the whole store content.""" 

304 self.clear_location(self.location) 

305 

306 def enforce_store_limits(self, bytes_limit, items_limit=None, age_limit=None): 

307 """ 

308 Remove the store's oldest files to enforce item, byte, and age limits. 

309 """ 

310 items_to_delete = self._get_items_to_delete(bytes_limit, items_limit, age_limit) 

311 

312 for item in items_to_delete: 

313 if self.verbose > 10: 

314 print("Deleting item {0}".format(item)) 

315 try: 

316 self.clear_location(item.path) 

317 except OSError: 

318 # Even with ignore_errors=True shutil.rmtree can raise OSError 

319 # with: 

320 # [Errno 116] Stale file handle if another process has deleted 

321 # the folder already. 

322 pass 

323 

324 def _get_items_to_delete(self, bytes_limit, items_limit=None, age_limit=None): 

325 """ 

326 Get items to delete to keep the store under size, file, & age limits. 

327 """ 

328 if isinstance(bytes_limit, str): 

329 bytes_limit = memstr_to_bytes(bytes_limit) 

330 

331 items = self.get_items() 

332 if not items: 

333 return [] 

334 

335 size = sum(item.size for item in items) 

336 

337 if bytes_limit is not None: 

338 to_delete_size = size - bytes_limit 

339 else: 

340 to_delete_size = 0 

341 

342 if items_limit is not None: 

343 to_delete_items = len(items) - items_limit 

344 else: 

345 to_delete_items = 0 

346 

347 if age_limit is not None: 

348 older_item = min(item.last_access for item in items) 

349 if age_limit.total_seconds() < 0: 

350 raise ValueError("age_limit has to be a positive timedelta") 

351 deadline = datetime.datetime.now() - age_limit 

352 else: 

353 deadline = None 

354 

355 if ( 

356 to_delete_size <= 0 

357 and to_delete_items <= 0 

358 and (deadline is None or older_item > deadline) 

359 ): 

360 return [] 

361 

362 # We want to delete first the cache items that were accessed a 

363 # long time ago 

364 items.sort(key=operator.attrgetter("last_access")) 

365 

366 items_to_delete = [] 

367 size_so_far = 0 

368 items_so_far = 0 

369 

370 for item in items: 

371 if ( 

372 (size_so_far >= to_delete_size) 

373 and items_so_far >= to_delete_items 

374 and (deadline is None or deadline < item.last_access) 

375 ): 

376 break 

377 

378 items_to_delete.append(item) 

379 size_so_far += item.size 

380 items_so_far += 1 

381 

382 return items_to_delete 

383 

384 def _concurrency_safe_write(self, to_write, filename, write_func): 

385 """Writes an object into a file in a concurrency-safe way.""" 

386 temporary_filename = concurrency_safe_write(to_write, filename, write_func) 

387 self._move_item(temporary_filename, filename) 

388 

389 def __repr__(self): 

390 """Printable representation of the store location.""" 

391 return '{class_name}(location="{location}")'.format( 

392 class_name=self.__class__.__name__, location=self.location 

393 ) 

394 

395 

396class FileSystemStoreBackend(StoreBackendBase, StoreBackendMixin): 

397 """A StoreBackend used with local or network file systems.""" 

398 

399 _open_item = staticmethod(open) 

400 _item_exists = staticmethod(os.path.exists) 

401 _move_item = staticmethod(concurrency_safe_rename) 

402 

403 def clear_location(self, location): 

404 """Delete location on store.""" 

405 if location == self.location: 

406 rm_subdirs(location) 

407 else: 

408 shutil.rmtree(location, ignore_errors=True) 

409 

410 def create_location(self, location): 

411 """Create object location on store""" 

412 mkdirp(location) 

413 

414 def get_items(self): 

415 """Returns the whole list of items available in the store.""" 

416 items = [] 

417 

418 for dirpath, _, filenames in os.walk(self.location): 

419 is_cache_hash_dir = re.match("[a-f0-9]{32}", os.path.basename(dirpath)) 

420 

421 if is_cache_hash_dir: 

422 output_filename = os.path.join(dirpath, "output.pkl") 

423 try: 

424 last_access = os.path.getatime(output_filename) 

425 except OSError: 

426 try: 

427 last_access = os.path.getatime(dirpath) 

428 except OSError: 

429 # The directory has already been deleted 

430 continue 

431 

432 last_access = datetime.datetime.fromtimestamp(last_access) 

433 try: 

434 full_filenames = [os.path.join(dirpath, fn) for fn in filenames] 

435 dirsize = sum(os.path.getsize(fn) for fn in full_filenames) 

436 except OSError: 

437 # Either output_filename or one of the files in 

438 # dirpath does not exist any more. We assume this 

439 # directory is being cleaned by another process already 

440 continue 

441 

442 items.append(CacheItemInfo(dirpath, dirsize, last_access)) 

443 

444 return items 

445 

446 def configure(self, location, verbose=1, backend_options=None): 

447 """Configure the store backend. 

448 

449 For this backend, valid store options are 'compress' and 'mmap_mode' 

450 """ 

451 if backend_options is None: 

452 backend_options = {} 

453 

454 # setup location directory 

455 self.location = location 

456 if not os.path.exists(self.location): 

457 mkdirp(self.location) 

458 

459 # Automatically add `.gitignore` file to the cache folder. 

460 # XXX: the condition is necessary because in `Memory.__init__`, the user 

461 # passed `location` param is modified to be either `{location}` or 

462 # `{location}/joblib` depending on input type (`pathlib.Path` vs `str`). 

463 # The proper resolution of this inconsistency is tracked in: 

464 # https://github.com/joblib/joblib/issues/1684 

465 cache_directory = ( 

466 os.path.dirname(location) 

467 if os.path.dirname(location) and os.path.basename(location) == "joblib" 

468 else location 

469 ) 

470 with open(os.path.join(cache_directory, ".gitignore"), "w") as file: 

471 file.write("# Created by joblib automatically.\n") 

472 file.write("*\n") 

473 

474 # item can be stored compressed for faster I/O 

475 self.compress = backend_options.get("compress", False) 

476 

477 # FileSystemStoreBackend can be used with mmap_mode options under 

478 # certain conditions. 

479 mmap_mode = backend_options.get("mmap_mode") 

480 if self.compress and mmap_mode is not None: 

481 warnings.warn( 

482 "Compressed items cannot be memmapped in a " 

483 "filesystem store. Option will be ignored.", 

484 stacklevel=2, 

485 ) 

486 

487 self.mmap_mode = mmap_mode 

488 self.verbose = verbose