1"""Storage providers backends for Memory caching."""
2
3import collections
4import datetime
5import json
6import operator
7import os
8import os.path
9import re
10import shutil
11import threading
12import time
13import warnings
14from abc import ABCMeta, abstractmethod
15from pickle import PicklingError
16
17from . import numpy_pickle
18from .backports import concurrency_safe_rename
19from .disk import memstr_to_bytes, mkdirp, rm_subdirs
20from .logger import format_time
21
22CacheItemInfo = collections.namedtuple("CacheItemInfo", "path size last_access")
23
24
25class CacheWarning(Warning):
26 """Warning to capture dump failures except for PicklingError."""
27
28 pass
29
30
31def concurrency_safe_write(object_to_write, filename, write_func):
32 """Writes an object into a unique file in a concurrency-safe way."""
33 thread_id = id(threading.current_thread())
34 temporary_filename = "{}.thread-{}-pid-{}".format(filename, thread_id, os.getpid())
35 write_func(object_to_write, temporary_filename)
36
37 return temporary_filename
38
39
40class StoreBackendBase(metaclass=ABCMeta):
41 """Helper Abstract Base Class which defines all methods that
42 a StorageBackend must implement."""
43
44 location = None
45
46 @abstractmethod
47 def _open_item(self, f, mode):
48 """Opens an item on the store and return a file-like object.
49
50 This method is private and only used by the StoreBackendMixin object.
51
52 Parameters
53 ----------
54 f: a file-like object
55 The file-like object where an item is stored and retrieved
56 mode: string, optional
57 the mode in which the file-like object is opened allowed valued are
58 'rb', 'wb'
59
60 Returns
61 -------
62 a file-like object
63 """
64
65 @abstractmethod
66 def _item_exists(self, location):
67 """Checks if an item location exists in the store.
68
69 This method is private and only used by the StoreBackendMixin object.
70
71 Parameters
72 ----------
73 location: string
74 The location of an item. On a filesystem, this corresponds to the
75 absolute path, including the filename, of a file.
76
77 Returns
78 -------
79 True if the item exists, False otherwise
80 """
81
82 @abstractmethod
83 def _move_item(self, src, dst):
84 """Moves an item from src to dst in the store.
85
86 This method is private and only used by the StoreBackendMixin object.
87
88 Parameters
89 ----------
90 src: string
91 The source location of an item
92 dst: string
93 The destination location of an item
94 """
95
96 @abstractmethod
97 def create_location(self, location):
98 """Creates a location on the store.
99
100 Parameters
101 ----------
102 location: string
103 The location in the store. On a filesystem, this corresponds to a
104 directory.
105 """
106
107 @abstractmethod
108 def clear_location(self, location):
109 """Clears a location on the store.
110
111 Parameters
112 ----------
113 location: string
114 The location in the store. On a filesystem, this corresponds to a
115 directory or a filename absolute path
116 """
117
118 @abstractmethod
119 def get_items(self):
120 """Returns the whole list of items available in the store.
121
122 Returns
123 -------
124 The list of items identified by their ids (e.g filename in a
125 filesystem).
126 """
127
128 @abstractmethod
129 def configure(self, location, verbose=0, backend_options=dict()):
130 """Configures the store.
131
132 Parameters
133 ----------
134 location: string
135 The base location used by the store. On a filesystem, this
136 corresponds to a directory.
137 verbose: int
138 The level of verbosity of the store
139 backend_options: dict
140 Contains a dictionary of named parameters used to configure the
141 store backend.
142 """
143
144
145class StoreBackendMixin(object):
146 """Class providing all logic for managing the store in a generic way.
147
148 The StoreBackend subclass has to implement 3 methods: create_location,
149 clear_location and configure. The StoreBackend also has to provide
150 a private _open_item, _item_exists and _move_item methods. The _open_item
151 method has to have the same signature as the builtin open and return a
152 file-like object.
153 """
154
155 def load_item(self, call_id, verbose=1, timestamp=None, metadata=None):
156 """Load an item from the store given its id as a list of str."""
157 full_path = os.path.join(self.location, *call_id)
158
159 if verbose > 1:
160 ts_string = (
161 "{: <16}".format(format_time(time.time() - timestamp))
162 if timestamp is not None
163 else ""
164 )
165 signature = os.path.basename(call_id[0])
166 if metadata is not None and "input_args" in metadata:
167 kwargs = ", ".join(
168 "{}={}".format(*item) for item in metadata["input_args"].items()
169 )
170 signature += "({})".format(kwargs)
171 msg = "[Memory]{}: Loading {}".format(ts_string, signature)
172 if verbose < 10:
173 print("{0}...".format(msg))
174 else:
175 print("{0} from {1}".format(msg, full_path))
176
177 mmap_mode = None if not hasattr(self, "mmap_mode") else self.mmap_mode
178
179 filename = os.path.join(full_path, "output.pkl")
180 if not self._item_exists(filename):
181 raise KeyError(
182 "Non-existing item (may have been "
183 "cleared).\nFile %s does not exist" % filename
184 )
185
186 # file-like object cannot be used when mmap_mode is set
187 if mmap_mode is None:
188 with self._open_item(filename, "rb") as f:
189 item = numpy_pickle.load(f)
190 else:
191 item = numpy_pickle.load(filename, mmap_mode=mmap_mode)
192 return item
193
194 def dump_item(self, call_id, item, verbose=1):
195 """Dump an item in the store at the id given as a list of str."""
196 try:
197 item_path = os.path.join(self.location, *call_id)
198 if not self._item_exists(item_path):
199 self.create_location(item_path)
200 filename = os.path.join(item_path, "output.pkl")
201 if verbose > 10:
202 print("Persisting in %s" % item_path)
203
204 def write_func(to_write, dest_filename):
205 with self._open_item(dest_filename, "wb") as f:
206 try:
207 numpy_pickle.dump(to_write, f, compress=self.compress)
208 except PicklingError as e:
209 # TODO(1.5) turn into error
210 warnings.warn(
211 "Unable to cache to disk: failed to pickle "
212 "output. In version 1.5 this will raise an "
213 f"exception. Exception: {e}.",
214 FutureWarning,
215 )
216
217 self._concurrency_safe_write(item, filename, write_func)
218 except Exception as e: # noqa: E722
219 warnings.warn(
220 "Unable to cache to disk. Possibly a race condition in the "
221 f"creation of the directory. Exception: {e}.",
222 CacheWarning,
223 )
224
225 def clear_item(self, call_id):
226 """Clear the item at the id, given as a list of str."""
227 item_path = os.path.join(self.location, *call_id)
228 if self._item_exists(item_path):
229 self.clear_location(item_path)
230
231 def contains_item(self, call_id):
232 """Check if there is an item at the id, given as a list of str."""
233 item_path = os.path.join(self.location, *call_id)
234 filename = os.path.join(item_path, "output.pkl")
235
236 return self._item_exists(filename)
237
238 def get_item_info(self, call_id):
239 """Return information about item."""
240 return {"location": os.path.join(self.location, *call_id)}
241
242 def get_metadata(self, call_id):
243 """Return actual metadata of an item."""
244 try:
245 item_path = os.path.join(self.location, *call_id)
246 filename = os.path.join(item_path, "metadata.json")
247 with self._open_item(filename, "rb") as f:
248 return json.loads(f.read().decode("utf-8"))
249 except: # noqa: E722
250 return {}
251
252 def store_metadata(self, call_id, metadata):
253 """Store metadata of a computation."""
254 try:
255 item_path = os.path.join(self.location, *call_id)
256 self.create_location(item_path)
257 filename = os.path.join(item_path, "metadata.json")
258
259 def write_func(to_write, dest_filename):
260 with self._open_item(dest_filename, "wb") as f:
261 f.write(json.dumps(to_write).encode("utf-8"))
262
263 self._concurrency_safe_write(metadata, filename, write_func)
264 except: # noqa: E722
265 pass
266
267 def contains_path(self, call_id):
268 """Check cached function is available in store."""
269 func_path = os.path.join(self.location, *call_id)
270 return self.object_exists(func_path)
271
272 def clear_path(self, call_id):
273 """Clear all items with a common path in the store."""
274 func_path = os.path.join(self.location, *call_id)
275 if self._item_exists(func_path):
276 self.clear_location(func_path)
277
278 def store_cached_func_code(self, call_id, func_code=None):
279 """Store the code of the cached function."""
280 func_path = os.path.join(self.location, *call_id)
281 if not self._item_exists(func_path):
282 self.create_location(func_path)
283
284 if func_code is not None:
285 filename = os.path.join(func_path, "func_code.py")
286 with self._open_item(filename, "wb") as f:
287 f.write(func_code.encode("utf-8"))
288
289 def get_cached_func_code(self, call_id):
290 """Store the code of the cached function."""
291 filename = os.path.join(self.location, *call_id, "func_code.py")
292 try:
293 with self._open_item(filename, "rb") as f:
294 return f.read().decode("utf-8")
295 except: # noqa: E722
296 raise
297
298 def get_cached_func_info(self, call_id):
299 """Return information related to the cached function if it exists."""
300 return {"location": os.path.join(self.location, *call_id)}
301
302 def clear(self):
303 """Clear the whole store content."""
304 self.clear_location(self.location)
305
306 def enforce_store_limits(self, bytes_limit, items_limit=None, age_limit=None):
307 """
308 Remove the store's oldest files to enforce item, byte, and age limits.
309 """
310 items_to_delete = self._get_items_to_delete(bytes_limit, items_limit, age_limit)
311
312 for item in items_to_delete:
313 if self.verbose > 10:
314 print("Deleting item {0}".format(item))
315 try:
316 self.clear_location(item.path)
317 except OSError:
318 # Even with ignore_errors=True shutil.rmtree can raise OSError
319 # with:
320 # [Errno 116] Stale file handle if another process has deleted
321 # the folder already.
322 pass
323
324 def _get_items_to_delete(self, bytes_limit, items_limit=None, age_limit=None):
325 """
326 Get items to delete to keep the store under size, file, & age limits.
327 """
328 if isinstance(bytes_limit, str):
329 bytes_limit = memstr_to_bytes(bytes_limit)
330
331 items = self.get_items()
332 if not items:
333 return []
334
335 size = sum(item.size for item in items)
336
337 if bytes_limit is not None:
338 to_delete_size = size - bytes_limit
339 else:
340 to_delete_size = 0
341
342 if items_limit is not None:
343 to_delete_items = len(items) - items_limit
344 else:
345 to_delete_items = 0
346
347 if age_limit is not None:
348 older_item = min(item.last_access for item in items)
349 if age_limit.total_seconds() < 0:
350 raise ValueError("age_limit has to be a positive timedelta")
351 deadline = datetime.datetime.now() - age_limit
352 else:
353 deadline = None
354
355 if (
356 to_delete_size <= 0
357 and to_delete_items <= 0
358 and (deadline is None or older_item > deadline)
359 ):
360 return []
361
362 # We want to delete first the cache items that were accessed a
363 # long time ago
364 items.sort(key=operator.attrgetter("last_access"))
365
366 items_to_delete = []
367 size_so_far = 0
368 items_so_far = 0
369
370 for item in items:
371 if (
372 (size_so_far >= to_delete_size)
373 and items_so_far >= to_delete_items
374 and (deadline is None or deadline < item.last_access)
375 ):
376 break
377
378 items_to_delete.append(item)
379 size_so_far += item.size
380 items_so_far += 1
381
382 return items_to_delete
383
384 def _concurrency_safe_write(self, to_write, filename, write_func):
385 """Writes an object into a file in a concurrency-safe way."""
386 temporary_filename = concurrency_safe_write(to_write, filename, write_func)
387 self._move_item(temporary_filename, filename)
388
389 def __repr__(self):
390 """Printable representation of the store location."""
391 return '{class_name}(location="{location}")'.format(
392 class_name=self.__class__.__name__, location=self.location
393 )
394
395
396class FileSystemStoreBackend(StoreBackendBase, StoreBackendMixin):
397 """A StoreBackend used with local or network file systems."""
398
399 _open_item = staticmethod(open)
400 _item_exists = staticmethod(os.path.exists)
401 _move_item = staticmethod(concurrency_safe_rename)
402
403 def clear_location(self, location):
404 """Delete location on store."""
405 if location == self.location:
406 rm_subdirs(location)
407 else:
408 shutil.rmtree(location, ignore_errors=True)
409
410 def create_location(self, location):
411 """Create object location on store"""
412 mkdirp(location)
413
414 def get_items(self):
415 """Returns the whole list of items available in the store."""
416 items = []
417
418 for dirpath, _, filenames in os.walk(self.location):
419 is_cache_hash_dir = re.match("[a-f0-9]{32}", os.path.basename(dirpath))
420
421 if is_cache_hash_dir:
422 output_filename = os.path.join(dirpath, "output.pkl")
423 try:
424 last_access = os.path.getatime(output_filename)
425 except OSError:
426 try:
427 last_access = os.path.getatime(dirpath)
428 except OSError:
429 # The directory has already been deleted
430 continue
431
432 last_access = datetime.datetime.fromtimestamp(last_access)
433 try:
434 full_filenames = [os.path.join(dirpath, fn) for fn in filenames]
435 dirsize = sum(os.path.getsize(fn) for fn in full_filenames)
436 except OSError:
437 # Either output_filename or one of the files in
438 # dirpath does not exist any more. We assume this
439 # directory is being cleaned by another process already
440 continue
441
442 items.append(CacheItemInfo(dirpath, dirsize, last_access))
443
444 return items
445
446 def configure(self, location, verbose=1, backend_options=None):
447 """Configure the store backend.
448
449 For this backend, valid store options are 'compress' and 'mmap_mode'
450 """
451 if backend_options is None:
452 backend_options = {}
453
454 # setup location directory
455 self.location = location
456 if not os.path.exists(self.location):
457 mkdirp(self.location)
458
459 # Automatically add `.gitignore` file to the cache folder.
460 # XXX: the condition is necessary because in `Memory.__init__`, the user
461 # passed `location` param is modified to be either `{location}` or
462 # `{location}/joblib` depending on input type (`pathlib.Path` vs `str`).
463 # The proper resolution of this inconsistency is tracked in:
464 # https://github.com/joblib/joblib/issues/1684
465 cache_directory = (
466 os.path.dirname(location)
467 if os.path.dirname(location) and os.path.basename(location) == "joblib"
468 else location
469 )
470 with open(os.path.join(cache_directory, ".gitignore"), "w") as file:
471 file.write("# Created by joblib automatically.\n")
472 file.write("*\n")
473
474 # item can be stored compressed for faster I/O
475 self.compress = backend_options.get("compress", False)
476
477 # FileSystemStoreBackend can be used with mmap_mode options under
478 # certain conditions.
479 mmap_mode = backend_options.get("mmap_mode")
480 if self.compress and mmap_mode is not None:
481 warnings.warn(
482 "Compressed items cannot be memmapped in a "
483 "filesystem store. Option will be ignored.",
484 stacklevel=2,
485 )
486
487 self.mmap_mode = mmap_mode
488 self.verbose = verbose