Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/_store_backends.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

222 statements  

1"""Storage providers backends for Memory caching.""" 

2 

3from pickle import PicklingError 

4import re 

5import os 

6import os.path 

7import datetime 

8import json 

9import shutil 

10import time 

11import warnings 

12import collections 

13import operator 

14import threading 

15from abc import ABCMeta, abstractmethod 

16 

17from .backports import concurrency_safe_rename 

18from .disk import mkdirp, memstr_to_bytes, rm_subdirs 

19from .logger import format_time 

20from . import numpy_pickle 

21 

22CacheItemInfo = collections.namedtuple('CacheItemInfo', 

23 'path size last_access') 

24 

25 

26class CacheWarning(Warning): 

27 """Warning to capture dump failures except for PicklingError.""" 

28 pass 

29 

30 

31def concurrency_safe_write(object_to_write, filename, write_func): 

32 """Writes an object into a unique file in a concurrency-safe way.""" 

33 thread_id = id(threading.current_thread()) 

34 temporary_filename = '{}.thread-{}-pid-{}'.format( 

35 filename, thread_id, os.getpid()) 

36 write_func(object_to_write, temporary_filename) 

37 

38 return temporary_filename 

39 

40 

41class StoreBackendBase(metaclass=ABCMeta): 

42 """Helper Abstract Base Class which defines all methods that 

43 a StorageBackend must implement.""" 

44 

45 location = None 

46 

47 @abstractmethod 

48 def _open_item(self, f, mode): 

49 """Opens an item on the store and return a file-like object. 

50 

51 This method is private and only used by the StoreBackendMixin object. 

52 

53 Parameters 

54 ---------- 

55 f: a file-like object 

56 The file-like object where an item is stored and retrieved 

57 mode: string, optional 

58 the mode in which the file-like object is opened allowed valued are 

59 'rb', 'wb' 

60 

61 Returns 

62 ------- 

63 a file-like object 

64 """ 

65 

66 @abstractmethod 

67 def _item_exists(self, location): 

68 """Checks if an item location exists in the store. 

69 

70 This method is private and only used by the StoreBackendMixin object. 

71 

72 Parameters 

73 ---------- 

74 location: string 

75 The location of an item. On a filesystem, this corresponds to the 

76 absolute path, including the filename, of a file. 

77 

78 Returns 

79 ------- 

80 True if the item exists, False otherwise 

81 """ 

82 

83 @abstractmethod 

84 def _move_item(self, src, dst): 

85 """Moves an item from src to dst in the store. 

86 

87 This method is private and only used by the StoreBackendMixin object. 

88 

89 Parameters 

90 ---------- 

91 src: string 

92 The source location of an item 

93 dst: string 

94 The destination location of an item 

95 """ 

96 

97 @abstractmethod 

98 def create_location(self, location): 

99 """Creates a location on the store. 

100 

101 Parameters 

102 ---------- 

103 location: string 

104 The location in the store. On a filesystem, this corresponds to a 

105 directory. 

106 """ 

107 

108 @abstractmethod 

109 def clear_location(self, location): 

110 """Clears a location on the store. 

111 

112 Parameters 

113 ---------- 

114 location: string 

115 The location in the store. On a filesystem, this corresponds to a 

116 directory or a filename absolute path 

117 """ 

118 

119 @abstractmethod 

120 def get_items(self): 

121 """Returns the whole list of items available in the store. 

122 

123 Returns 

124 ------- 

125 The list of items identified by their ids (e.g filename in a 

126 filesystem). 

127 """ 

128 

129 @abstractmethod 

130 def configure(self, location, verbose=0, backend_options=dict()): 

131 """Configures the store. 

132 

133 Parameters 

134 ---------- 

135 location: string 

136 The base location used by the store. On a filesystem, this 

137 corresponds to a directory. 

138 verbose: int 

139 The level of verbosity of the store 

140 backend_options: dict 

141 Contains a dictionary of named parameters used to configure the 

142 store backend. 

143 """ 

144 

145 

146class StoreBackendMixin(object): 

147 """Class providing all logic for managing the store in a generic way. 

148 

149 The StoreBackend subclass has to implement 3 methods: create_location, 

150 clear_location and configure. The StoreBackend also has to provide 

151 a private _open_item, _item_exists and _move_item methods. The _open_item 

152 method has to have the same signature as the builtin open and return a 

153 file-like object. 

154 """ 

155 

156 def load_item(self, call_id, verbose=1, timestamp=None, metadata=None): 

157 """Load an item from the store given its id as a list of str.""" 

158 full_path = os.path.join(self.location, *call_id) 

159 

160 if verbose > 1: 

161 ts_string = ('{: <16}'.format(format_time(time.time() - timestamp)) 

162 if timestamp is not None else '') 

163 signature = os.path.basename(call_id[0]) 

164 if metadata is not None and 'input_args' in metadata: 

165 kwargs = ', '.join('{}={}'.format(*item) 

166 for item in metadata['input_args'].items()) 

167 signature += '({})'.format(kwargs) 

168 msg = '[Memory]{}: Loading {}'.format(ts_string, signature) 

169 if verbose < 10: 

170 print('{0}...'.format(msg)) 

171 else: 

172 print('{0} from {1}'.format(msg, full_path)) 

173 

174 mmap_mode = (None if not hasattr(self, 'mmap_mode') 

175 else self.mmap_mode) 

176 

177 filename = os.path.join(full_path, 'output.pkl') 

178 if not self._item_exists(filename): 

179 raise KeyError("Non-existing item (may have been " 

180 "cleared).\nFile %s does not exist" % filename) 

181 

182 # file-like object cannot be used when mmap_mode is set 

183 if mmap_mode is None: 

184 with self._open_item(filename, "rb") as f: 

185 item = numpy_pickle.load(f) 

186 else: 

187 item = numpy_pickle.load(filename, mmap_mode=mmap_mode) 

188 return item 

189 

190 def dump_item(self, call_id, item, verbose=1): 

191 """Dump an item in the store at the id given as a list of str.""" 

192 try: 

193 item_path = os.path.join(self.location, *call_id) 

194 if not self._item_exists(item_path): 

195 self.create_location(item_path) 

196 filename = os.path.join(item_path, 'output.pkl') 

197 if verbose > 10: 

198 print('Persisting in %s' % item_path) 

199 

200 def write_func(to_write, dest_filename): 

201 with self._open_item(dest_filename, "wb") as f: 

202 try: 

203 numpy_pickle.dump(to_write, f, compress=self.compress) 

204 except PicklingError as e: 

205 # TODO(1.5) turn into error 

206 warnings.warn( 

207 "Unable to cache to disk: failed to pickle " 

208 "output. In version 1.5 this will raise an " 

209 f"exception. Exception: {e}.", 

210 FutureWarning 

211 ) 

212 

213 self._concurrency_safe_write(item, filename, write_func) 

214 except Exception as e: # noqa: E722 

215 warnings.warn( 

216 "Unable to cache to disk. Possibly a race condition in the " 

217 f"creation of the directory. Exception: {e}.", 

218 CacheWarning 

219 ) 

220 

221 def clear_item(self, call_id): 

222 """Clear the item at the id, given as a list of str.""" 

223 item_path = os.path.join(self.location, *call_id) 

224 if self._item_exists(item_path): 

225 self.clear_location(item_path) 

226 

227 def contains_item(self, call_id): 

228 """Check if there is an item at the id, given as a list of str.""" 

229 item_path = os.path.join(self.location, *call_id) 

230 filename = os.path.join(item_path, 'output.pkl') 

231 

232 return self._item_exists(filename) 

233 

234 def get_item_info(self, call_id): 

235 """Return information about item.""" 

236 return {'location': os.path.join(self.location, *call_id)} 

237 

238 def get_metadata(self, call_id): 

239 """Return actual metadata of an item.""" 

240 try: 

241 item_path = os.path.join(self.location, *call_id) 

242 filename = os.path.join(item_path, 'metadata.json') 

243 with self._open_item(filename, 'rb') as f: 

244 return json.loads(f.read().decode('utf-8')) 

245 except: # noqa: E722 

246 return {} 

247 

248 def store_metadata(self, call_id, metadata): 

249 """Store metadata of a computation.""" 

250 try: 

251 item_path = os.path.join(self.location, *call_id) 

252 self.create_location(item_path) 

253 filename = os.path.join(item_path, 'metadata.json') 

254 

255 def write_func(to_write, dest_filename): 

256 with self._open_item(dest_filename, "wb") as f: 

257 f.write(json.dumps(to_write).encode('utf-8')) 

258 

259 self._concurrency_safe_write(metadata, filename, write_func) 

260 except: # noqa: E722 

261 pass 

262 

263 def contains_path(self, call_id): 

264 """Check cached function is available in store.""" 

265 func_path = os.path.join(self.location, *call_id) 

266 return self.object_exists(func_path) 

267 

268 def clear_path(self, call_id): 

269 """Clear all items with a common path in the store.""" 

270 func_path = os.path.join(self.location, *call_id) 

271 if self._item_exists(func_path): 

272 self.clear_location(func_path) 

273 

274 def store_cached_func_code(self, call_id, func_code=None): 

275 """Store the code of the cached function.""" 

276 func_path = os.path.join(self.location, *call_id) 

277 if not self._item_exists(func_path): 

278 self.create_location(func_path) 

279 

280 if func_code is not None: 

281 filename = os.path.join(func_path, "func_code.py") 

282 with self._open_item(filename, 'wb') as f: 

283 f.write(func_code.encode('utf-8')) 

284 

285 def get_cached_func_code(self, call_id): 

286 """Store the code of the cached function.""" 

287 filename = os.path.join(self.location, *call_id, 'func_code.py') 

288 try: 

289 with self._open_item(filename, 'rb') as f: 

290 return f.read().decode('utf-8') 

291 except: # noqa: E722 

292 raise 

293 

294 def get_cached_func_info(self, call_id): 

295 """Return information related to the cached function if it exists.""" 

296 return {'location': os.path.join(self.location, *call_id)} 

297 

298 def clear(self): 

299 """Clear the whole store content.""" 

300 self.clear_location(self.location) 

301 

302 def enforce_store_limits( 

303 self, bytes_limit, items_limit=None, age_limit=None 

304 ): 

305 """ 

306 Remove the store's oldest files to enforce item, byte, and age limits. 

307 """ 

308 items_to_delete = self._get_items_to_delete( 

309 bytes_limit, items_limit, age_limit 

310 ) 

311 

312 for item in items_to_delete: 

313 if self.verbose > 10: 

314 print('Deleting item {0}'.format(item)) 

315 try: 

316 self.clear_location(item.path) 

317 except OSError: 

318 # Even with ignore_errors=True shutil.rmtree can raise OSError 

319 # with: 

320 # [Errno 116] Stale file handle if another process has deleted 

321 # the folder already. 

322 pass 

323 

324 def _get_items_to_delete( 

325 self, bytes_limit, items_limit=None, age_limit=None 

326 ): 

327 """ 

328 Get items to delete to keep the store under size, file, & age limits. 

329 """ 

330 if isinstance(bytes_limit, str): 

331 bytes_limit = memstr_to_bytes(bytes_limit) 

332 

333 items = self.get_items() 

334 if not items: 

335 return [] 

336 

337 size = sum(item.size for item in items) 

338 

339 if bytes_limit is not None: 

340 to_delete_size = size - bytes_limit 

341 else: 

342 to_delete_size = 0 

343 

344 if items_limit is not None: 

345 to_delete_items = len(items) - items_limit 

346 else: 

347 to_delete_items = 0 

348 

349 if age_limit is not None: 

350 older_item = min(item.last_access for item in items) 

351 deadline = datetime.datetime.now() - age_limit 

352 else: 

353 deadline = None 

354 

355 if ( 

356 to_delete_size <= 0 and to_delete_items <= 0 

357 and (deadline is None or older_item > deadline) 

358 ): 

359 return [] 

360 

361 # We want to delete first the cache items that were accessed a 

362 # long time ago 

363 items.sort(key=operator.attrgetter('last_access')) 

364 

365 items_to_delete = [] 

366 size_so_far = 0 

367 items_so_far = 0 

368 

369 for item in items: 

370 if ( 

371 (size_so_far >= to_delete_size) 

372 and items_so_far >= to_delete_items 

373 and (deadline is None or deadline < item.last_access) 

374 ): 

375 break 

376 

377 items_to_delete.append(item) 

378 size_so_far += item.size 

379 items_so_far += 1 

380 

381 return items_to_delete 

382 

383 def _concurrency_safe_write(self, to_write, filename, write_func): 

384 """Writes an object into a file in a concurrency-safe way.""" 

385 temporary_filename = concurrency_safe_write(to_write, 

386 filename, write_func) 

387 self._move_item(temporary_filename, filename) 

388 

389 def __repr__(self): 

390 """Printable representation of the store location.""" 

391 return '{class_name}(location="{location}")'.format( 

392 class_name=self.__class__.__name__, location=self.location) 

393 

394 

395class FileSystemStoreBackend(StoreBackendBase, StoreBackendMixin): 

396 """A StoreBackend used with local or network file systems.""" 

397 

398 _open_item = staticmethod(open) 

399 _item_exists = staticmethod(os.path.exists) 

400 _move_item = staticmethod(concurrency_safe_rename) 

401 

402 def clear_location(self, location): 

403 """Delete location on store.""" 

404 if (location == self.location): 

405 rm_subdirs(location) 

406 else: 

407 shutil.rmtree(location, ignore_errors=True) 

408 

409 def create_location(self, location): 

410 """Create object location on store""" 

411 mkdirp(location) 

412 

413 def get_items(self): 

414 """Returns the whole list of items available in the store.""" 

415 items = [] 

416 

417 for dirpath, _, filenames in os.walk(self.location): 

418 is_cache_hash_dir = re.match('[a-f0-9]{32}', 

419 os.path.basename(dirpath)) 

420 

421 if is_cache_hash_dir: 

422 output_filename = os.path.join(dirpath, 'output.pkl') 

423 try: 

424 last_access = os.path.getatime(output_filename) 

425 except OSError: 

426 try: 

427 last_access = os.path.getatime(dirpath) 

428 except OSError: 

429 # The directory has already been deleted 

430 continue 

431 

432 last_access = datetime.datetime.fromtimestamp(last_access) 

433 try: 

434 full_filenames = [os.path.join(dirpath, fn) 

435 for fn in filenames] 

436 dirsize = sum(os.path.getsize(fn) 

437 for fn in full_filenames) 

438 except OSError: 

439 # Either output_filename or one of the files in 

440 # dirpath does not exist any more. We assume this 

441 # directory is being cleaned by another process already 

442 continue 

443 

444 items.append(CacheItemInfo(dirpath, dirsize, 

445 last_access)) 

446 

447 return items 

448 

449 def configure(self, location, verbose=1, backend_options=None): 

450 """Configure the store backend. 

451 

452 For this backend, valid store options are 'compress' and 'mmap_mode' 

453 """ 

454 if backend_options is None: 

455 backend_options = {} 

456 

457 # setup location directory 

458 self.location = location 

459 if not os.path.exists(self.location): 

460 mkdirp(self.location) 

461 

462 # item can be stored compressed for faster I/O 

463 self.compress = backend_options.get('compress', False) 

464 

465 # FileSystemStoreBackend can be used with mmap_mode options under 

466 # certain conditions. 

467 mmap_mode = backend_options.get('mmap_mode') 

468 if self.compress and mmap_mode is not None: 

469 warnings.warn('Compressed items cannot be memmapped in a ' 

470 'filesystem store. Option will be ignored.', 

471 stacklevel=2) 

472 

473 self.mmap_mode = mmap_mode 

474 self.verbose = verbose