Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/_store_backends.py: 29%

213 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-12 06:31 +0000

1"""Storage providers backends for Memory caching.""" 

2 

3from pickle import PicklingError 

4import re 

5import os 

6import os.path 

7import datetime 

8import json 

9import shutil 

10import warnings 

11import collections 

12import operator 

13import threading 

14from abc import ABCMeta, abstractmethod 

15 

16from .backports import concurrency_safe_rename 

17from .disk import mkdirp, memstr_to_bytes, rm_subdirs 

18from . import numpy_pickle 

19 

20CacheItemInfo = collections.namedtuple('CacheItemInfo', 

21 'path size last_access') 

22 

23 

24class CacheWarning(Warning): 

25 """Warning to capture dump failures except for PicklingError.""" 

26 pass 

27 

28 

29def concurrency_safe_write(object_to_write, filename, write_func): 

30 """Writes an object into a unique file in a concurrency-safe way.""" 

31 thread_id = id(threading.current_thread()) 

32 temporary_filename = '{}.thread-{}-pid-{}'.format( 

33 filename, thread_id, os.getpid()) 

34 write_func(object_to_write, temporary_filename) 

35 

36 return temporary_filename 

37 

38 

39class StoreBackendBase(metaclass=ABCMeta): 

40 """Helper Abstract Base Class which defines all methods that 

41 a StorageBackend must implement.""" 

42 

43 location = None 

44 

45 @abstractmethod 

46 def _open_item(self, f, mode): 

47 """Opens an item on the store and return a file-like object. 

48 

49 This method is private and only used by the StoreBackendMixin object. 

50 

51 Parameters 

52 ---------- 

53 f: a file-like object 

54 The file-like object where an item is stored and retrieved 

55 mode: string, optional 

56 the mode in which the file-like object is opened allowed valued are 

57 'rb', 'wb' 

58 

59 Returns 

60 ------- 

61 a file-like object 

62 """ 

63 

64 @abstractmethod 

65 def _item_exists(self, location): 

66 """Checks if an item location exists in the store. 

67 

68 This method is private and only used by the StoreBackendMixin object. 

69 

70 Parameters 

71 ---------- 

72 location: string 

73 The location of an item. On a filesystem, this corresponds to the 

74 absolute path, including the filename, of a file. 

75 

76 Returns 

77 ------- 

78 True if the item exists, False otherwise 

79 """ 

80 

81 @abstractmethod 

82 def _move_item(self, src, dst): 

83 """Moves an item from src to dst in the store. 

84 

85 This method is private and only used by the StoreBackendMixin object. 

86 

87 Parameters 

88 ---------- 

89 src: string 

90 The source location of an item 

91 dst: string 

92 The destination location of an item 

93 """ 

94 

95 @abstractmethod 

96 def create_location(self, location): 

97 """Creates a location on the store. 

98 

99 Parameters 

100 ---------- 

101 location: string 

102 The location in the store. On a filesystem, this corresponds to a 

103 directory. 

104 """ 

105 

106 @abstractmethod 

107 def clear_location(self, location): 

108 """Clears a location on the store. 

109 

110 Parameters 

111 ---------- 

112 location: string 

113 The location in the store. On a filesystem, this corresponds to a 

114 directory or a filename absolute path 

115 """ 

116 

117 @abstractmethod 

118 def get_items(self): 

119 """Returns the whole list of items available in the store. 

120 

121 Returns 

122 ------- 

123 The list of items identified by their ids (e.g filename in a 

124 filesystem). 

125 """ 

126 

127 @abstractmethod 

128 def configure(self, location, verbose=0, backend_options=dict()): 

129 """Configures the store. 

130 

131 Parameters 

132 ---------- 

133 location: string 

134 The base location used by the store. On a filesystem, this 

135 corresponds to a directory. 

136 verbose: int 

137 The level of verbosity of the store 

138 backend_options: dict 

139 Contains a dictionary of named parameters used to configure the 

140 store backend. 

141 """ 

142 

143 

144class StoreBackendMixin(object): 

145 """Class providing all logic for managing the store in a generic way. 

146 

147 The StoreBackend subclass has to implement 3 methods: create_location, 

148 clear_location and configure. The StoreBackend also has to provide 

149 a private _open_item, _item_exists and _move_item methods. The _open_item 

150 method has to have the same signature as the builtin open and return a 

151 file-like object. 

152 """ 

153 

154 def load_item(self, path, verbose=1, msg=None): 

155 """Load an item from the store given its path as a list of 

156 strings.""" 

157 full_path = os.path.join(self.location, *path) 

158 

159 if verbose > 1: 

160 if verbose < 10: 

161 print('{0}...'.format(msg)) 

162 else: 

163 print('{0} from {1}'.format(msg, full_path)) 

164 

165 mmap_mode = (None if not hasattr(self, 'mmap_mode') 

166 else self.mmap_mode) 

167 

168 filename = os.path.join(full_path, 'output.pkl') 

169 if not self._item_exists(filename): 

170 raise KeyError("Non-existing item (may have been " 

171 "cleared).\nFile %s does not exist" % filename) 

172 

173 # file-like object cannot be used when mmap_mode is set 

174 if mmap_mode is None: 

175 with self._open_item(filename, "rb") as f: 

176 item = numpy_pickle.load(f) 

177 else: 

178 item = numpy_pickle.load(filename, mmap_mode=mmap_mode) 

179 return item 

180 

181 def dump_item(self, path, item, verbose=1): 

182 """Dump an item in the store at the path given as a list of 

183 strings.""" 

184 try: 

185 item_path = os.path.join(self.location, *path) 

186 if not self._item_exists(item_path): 

187 self.create_location(item_path) 

188 filename = os.path.join(item_path, 'output.pkl') 

189 if verbose > 10: 

190 print('Persisting in %s' % item_path) 

191 

192 def write_func(to_write, dest_filename): 

193 with self._open_item(dest_filename, "wb") as f: 

194 try: 

195 numpy_pickle.dump(to_write, f, compress=self.compress) 

196 except PicklingError as e: 

197 # TODO(1.5) turn into error 

198 warnings.warn( 

199 "Unable to cache to disk: failed to pickle " 

200 "output. In version 1.5 this will raise an " 

201 f"exception. Exception: {e}.", 

202 FutureWarning 

203 ) 

204 

205 self._concurrency_safe_write(item, filename, write_func) 

206 except Exception as e: # noqa: E722 

207 warnings.warn( 

208 "Unable to cache to disk. Possibly a race condition in the " 

209 f"creation of the directory. Exception: {e}.", 

210 CacheWarning 

211 ) 

212 

213 def clear_item(self, path): 

214 """Clear the item at the path, given as a list of strings.""" 

215 item_path = os.path.join(self.location, *path) 

216 if self._item_exists(item_path): 

217 self.clear_location(item_path) 

218 

219 def contains_item(self, path): 

220 """Check if there is an item at the path, given as a list of 

221 strings""" 

222 item_path = os.path.join(self.location, *path) 

223 filename = os.path.join(item_path, 'output.pkl') 

224 

225 return self._item_exists(filename) 

226 

227 def get_item_info(self, path): 

228 """Return information about item.""" 

229 return {'location': os.path.join(self.location, 

230 *path)} 

231 

232 def get_metadata(self, path): 

233 """Return actual metadata of an item.""" 

234 try: 

235 item_path = os.path.join(self.location, *path) 

236 filename = os.path.join(item_path, 'metadata.json') 

237 with self._open_item(filename, 'rb') as f: 

238 return json.loads(f.read().decode('utf-8')) 

239 except: # noqa: E722 

240 return {} 

241 

242 def store_metadata(self, path, metadata): 

243 """Store metadata of a computation.""" 

244 try: 

245 item_path = os.path.join(self.location, *path) 

246 self.create_location(item_path) 

247 filename = os.path.join(item_path, 'metadata.json') 

248 

249 def write_func(to_write, dest_filename): 

250 with self._open_item(dest_filename, "wb") as f: 

251 f.write(json.dumps(to_write).encode('utf-8')) 

252 

253 self._concurrency_safe_write(metadata, filename, write_func) 

254 except: # noqa: E722 

255 pass 

256 

257 def contains_path(self, path): 

258 """Check cached function is available in store.""" 

259 func_path = os.path.join(self.location, *path) 

260 return self.object_exists(func_path) 

261 

262 def clear_path(self, path): 

263 """Clear all items with a common path in the store.""" 

264 func_path = os.path.join(self.location, *path) 

265 if self._item_exists(func_path): 

266 self.clear_location(func_path) 

267 

268 def store_cached_func_code(self, path, func_code=None): 

269 """Store the code of the cached function.""" 

270 func_path = os.path.join(self.location, *path) 

271 if not self._item_exists(func_path): 

272 self.create_location(func_path) 

273 

274 if func_code is not None: 

275 filename = os.path.join(func_path, "func_code.py") 

276 with self._open_item(filename, 'wb') as f: 

277 f.write(func_code.encode('utf-8')) 

278 

279 def get_cached_func_code(self, path): 

280 """Store the code of the cached function.""" 

281 path += ['func_code.py', ] 

282 filename = os.path.join(self.location, *path) 

283 try: 

284 with self._open_item(filename, 'rb') as f: 

285 return f.read().decode('utf-8') 

286 except: # noqa: E722 

287 raise 

288 

289 def get_cached_func_info(self, path): 

290 """Return information related to the cached function if it exists.""" 

291 return {'location': os.path.join(self.location, *path)} 

292 

293 def clear(self): 

294 """Clear the whole store content.""" 

295 self.clear_location(self.location) 

296 

297 def enforce_store_limits( 

298 self, bytes_limit, items_limit=None, age_limit=None 

299 ): 

300 """ 

301 Remove the store's oldest files to enforce item, byte, and age limits. 

302 """ 

303 items_to_delete = self._get_items_to_delete( 

304 bytes_limit, items_limit, age_limit 

305 ) 

306 

307 for item in items_to_delete: 

308 if self.verbose > 10: 

309 print('Deleting item {0}'.format(item)) 

310 try: 

311 self.clear_location(item.path) 

312 except OSError: 

313 # Even with ignore_errors=True shutil.rmtree can raise OSError 

314 # with: 

315 # [Errno 116] Stale file handle if another process has deleted 

316 # the folder already. 

317 pass 

318 

319 def _get_items_to_delete( 

320 self, bytes_limit, items_limit=None, age_limit=None 

321 ): 

322 """ 

323 Get items to delete to keep the store under size, file, & age limits. 

324 """ 

325 if isinstance(bytes_limit, str): 

326 bytes_limit = memstr_to_bytes(bytes_limit) 

327 

328 items = self.get_items() 

329 size = sum(item.size for item in items) 

330 

331 if bytes_limit is not None: 

332 to_delete_size = size - bytes_limit 

333 else: 

334 to_delete_size = 0 

335 

336 if items_limit is not None: 

337 to_delete_items = len(items) - items_limit 

338 else: 

339 to_delete_items = 0 

340 

341 if age_limit is not None: 

342 older_item = min(item.last_access for item in items) 

343 deadline = datetime.datetime.now() - age_limit 

344 else: 

345 deadline = None 

346 

347 if ( 

348 to_delete_size <= 0 and to_delete_items <= 0 

349 and (deadline is None or older_item > deadline) 

350 ): 

351 return [] 

352 

353 # We want to delete first the cache items that were accessed a 

354 # long time ago 

355 items.sort(key=operator.attrgetter('last_access')) 

356 

357 items_to_delete = [] 

358 size_so_far = 0 

359 items_so_far = 0 

360 

361 for item in items: 

362 if ( 

363 (size_so_far >= to_delete_size) 

364 and items_so_far >= to_delete_items 

365 and (deadline is None or deadline < item.last_access) 

366 ): 

367 break 

368 

369 items_to_delete.append(item) 

370 size_so_far += item.size 

371 items_so_far += 1 

372 

373 return items_to_delete 

374 

375 def _concurrency_safe_write(self, to_write, filename, write_func): 

376 """Writes an object into a file in a concurrency-safe way.""" 

377 temporary_filename = concurrency_safe_write(to_write, 

378 filename, write_func) 

379 self._move_item(temporary_filename, filename) 

380 

381 def __repr__(self): 

382 """Printable representation of the store location.""" 

383 return '{class_name}(location="{location}")'.format( 

384 class_name=self.__class__.__name__, location=self.location) 

385 

386 

387class FileSystemStoreBackend(StoreBackendBase, StoreBackendMixin): 

388 """A StoreBackend used with local or network file systems.""" 

389 

390 _open_item = staticmethod(open) 

391 _item_exists = staticmethod(os.path.exists) 

392 _move_item = staticmethod(concurrency_safe_rename) 

393 

394 def clear_location(self, location): 

395 """Delete location on store.""" 

396 if (location == self.location): 

397 rm_subdirs(location) 

398 else: 

399 shutil.rmtree(location, ignore_errors=True) 

400 

401 def create_location(self, location): 

402 """Create object location on store""" 

403 mkdirp(location) 

404 

405 def get_items(self): 

406 """Returns the whole list of items available in the store.""" 

407 items = [] 

408 

409 for dirpath, _, filenames in os.walk(self.location): 

410 is_cache_hash_dir = re.match('[a-f0-9]{32}', 

411 os.path.basename(dirpath)) 

412 

413 if is_cache_hash_dir: 

414 output_filename = os.path.join(dirpath, 'output.pkl') 

415 try: 

416 last_access = os.path.getatime(output_filename) 

417 except OSError: 

418 try: 

419 last_access = os.path.getatime(dirpath) 

420 except OSError: 

421 # The directory has already been deleted 

422 continue 

423 

424 last_access = datetime.datetime.fromtimestamp(last_access) 

425 try: 

426 full_filenames = [os.path.join(dirpath, fn) 

427 for fn in filenames] 

428 dirsize = sum(os.path.getsize(fn) 

429 for fn in full_filenames) 

430 except OSError: 

431 # Either output_filename or one of the files in 

432 # dirpath does not exist any more. We assume this 

433 # directory is being cleaned by another process already 

434 continue 

435 

436 items.append(CacheItemInfo(dirpath, dirsize, 

437 last_access)) 

438 

439 return items 

440 

441 def configure(self, location, verbose=1, backend_options=None): 

442 """Configure the store backend. 

443 

444 For this backend, valid store options are 'compress' and 'mmap_mode' 

445 """ 

446 if backend_options is None: 

447 backend_options = {} 

448 

449 # setup location directory 

450 self.location = location 

451 if not os.path.exists(self.location): 

452 mkdirp(self.location) 

453 

454 # item can be stored compressed for faster I/O 

455 self.compress = backend_options.get('compress', False) 

456 

457 # FileSystemStoreBackend can be used with mmap_mode options under 

458 # certain conditions. 

459 mmap_mode = backend_options.get('mmap_mode') 

460 if self.compress and mmap_mode is not None: 

461 warnings.warn('Compressed items cannot be memmapped in a ' 

462 'filesystem store. Option will be ignored.', 

463 stacklevel=2) 

464 

465 self.mmap_mode = mmap_mode 

466 self.verbose = verbose