1"""Storage providers backends for Memory caching."""
2
3import collections
4import datetime
5import json
6import operator
7import os
8import os.path
9import re
10import shutil
11import threading
12import time
13import uuid
14import warnings
15from abc import ABCMeta, abstractmethod
16from pickle import PicklingError
17
18from . import numpy_pickle
19from .backports import concurrency_safe_rename
20from .disk import memstr_to_bytes, mkdirp, rm_subdirs
21from .logger import format_time
22
23CacheItemInfo = collections.namedtuple("CacheItemInfo", "path size last_access")
24
25
26class CacheWarning(Warning):
27 """Warning to capture dump failures except for PicklingError."""
28
29 pass
30
31
32def concurrency_safe_write(object_to_write, filename, write_func):
33 """Writes an object into a unique file in a concurrency-safe way."""
34 # Temporary name is composed of UUID, process_id and thread_id to avoid
35 # collisions due to concurrent write.
36 # UUID is unique across nodes and time and help avoid collisions, even if
37 # the cache folder is shared by several Python processes with the same pid and
38 # thread id on different nodes of a cluster for instance.
39 thread_id = id(threading.current_thread())
40 temporary_filename = f"{filename}.{uuid.uuid4().hex}-{os.getpid()}-{thread_id}"
41
42 write_func(object_to_write, temporary_filename)
43
44 return temporary_filename
45
46
47class StoreBackendBase(metaclass=ABCMeta):
48 """Helper Abstract Base Class which defines all methods that
49 a StorageBackend must implement."""
50
51 location = None
52
53 @abstractmethod
54 def _open_item(self, f, mode):
55 """Opens an item on the store and return a file-like object.
56
57 This method is private and only used by the StoreBackendMixin object.
58
59 Parameters
60 ----------
61 f: a file-like object
62 The file-like object where an item is stored and retrieved
63 mode: string, optional
64 the mode in which the file-like object is opened allowed valued are
65 'rb', 'wb'
66
67 Returns
68 -------
69 a file-like object
70 """
71
72 @abstractmethod
73 def _item_exists(self, location):
74 """Checks if an item location exists in the store.
75
76 This method is private and only used by the StoreBackendMixin object.
77
78 Parameters
79 ----------
80 location: string
81 The location of an item. On a filesystem, this corresponds to the
82 absolute path, including the filename, of a file.
83
84 Returns
85 -------
86 True if the item exists, False otherwise
87 """
88
89 @abstractmethod
90 def _move_item(self, src, dst):
91 """Moves an item from src to dst in the store.
92
93 This method is private and only used by the StoreBackendMixin object.
94
95 Parameters
96 ----------
97 src: string
98 The source location of an item
99 dst: string
100 The destination location of an item
101 """
102
103 @abstractmethod
104 def create_location(self, location):
105 """Creates a location on the store.
106
107 Parameters
108 ----------
109 location: string
110 The location in the store. On a filesystem, this corresponds to a
111 directory.
112 """
113
114 @abstractmethod
115 def clear_location(self, location):
116 """Clears a location on the store.
117
118 Parameters
119 ----------
120 location: string
121 The location in the store. On a filesystem, this corresponds to a
122 directory or a filename absolute path
123 """
124
125 @abstractmethod
126 def get_items(self):
127 """Returns the whole list of items available in the store.
128
129 Returns
130 -------
131 The list of items identified by their ids (e.g filename in a
132 filesystem).
133 """
134
135 @abstractmethod
136 def configure(self, location, verbose=0, backend_options=dict()):
137 """Configures the store.
138
139 Parameters
140 ----------
141 location: string
142 The base location used by the store. On a filesystem, this
143 corresponds to a directory.
144 verbose: int
145 The level of verbosity of the store
146 backend_options: dict
147 Contains a dictionary of named parameters used to configure the
148 store backend.
149 """
150
151
152class StoreBackendMixin(object):
153 """Class providing all logic for managing the store in a generic way.
154
155 The StoreBackend subclass has to implement 3 methods: create_location,
156 clear_location and configure. The StoreBackend also has to provide
157 a private _open_item, _item_exists and _move_item methods. The _open_item
158 method has to have the same signature as the builtin open and return a
159 file-like object.
160 """
161
162 def load_item(self, call_id, verbose=1, timestamp=None, metadata=None):
163 """Load an item from the store given its id as a list of str."""
164 full_path = os.path.join(self.location, *call_id)
165
166 if verbose > 1:
167 ts_string = (
168 "{: <16}".format(format_time(time.time() - timestamp))
169 if timestamp is not None
170 else ""
171 )
172 signature = os.path.basename(call_id[0])
173 if metadata is not None and "input_args" in metadata:
174 kwargs = ", ".join(
175 "{}={}".format(*item) for item in metadata["input_args"].items()
176 )
177 signature += "({})".format(kwargs)
178 msg = "[Memory]{}: Loading {}".format(ts_string, signature)
179 if verbose < 10:
180 print("{0}...".format(msg))
181 else:
182 print("{0} from {1}".format(msg, full_path))
183
184 mmap_mode = None if not hasattr(self, "mmap_mode") else self.mmap_mode
185
186 filename = os.path.join(full_path, "output.pkl")
187 if not self._item_exists(filename):
188 raise KeyError(
189 "Non-existing item (may have been "
190 "cleared).\nFile %s does not exist" % filename
191 )
192
193 # file-like object cannot be used when mmap_mode is set
194 if mmap_mode is None:
195 with self._open_item(filename, "rb") as f:
196 item = numpy_pickle.load(f)
197 else:
198 item = numpy_pickle.load(filename, mmap_mode=mmap_mode)
199 return item
200
201 def dump_item(self, call_id, item, verbose=1):
202 """Dump an item in the store at the id given as a list of str."""
203 try:
204 item_path = os.path.join(self.location, *call_id)
205 if not self._item_exists(item_path):
206 self.create_location(item_path)
207 filename = os.path.join(item_path, "output.pkl")
208 if verbose > 10:
209 print("Persisting in %s" % item_path)
210
211 def write_func(to_write, dest_filename):
212 with self._open_item(dest_filename, "wb") as f:
213 try:
214 numpy_pickle.dump(to_write, f, compress=self.compress)
215 except PicklingError as e:
216 # TODO(1.5) turn into error
217 warnings.warn(
218 "Unable to cache to disk: failed to pickle "
219 "output. In version 1.5 this will raise an "
220 f"exception. Exception: {e}.",
221 FutureWarning,
222 )
223
224 self._concurrency_safe_write(item, filename, write_func)
225 except Exception as e: # noqa: E722
226 warnings.warn(
227 "Unable to cache to disk. Possibly a race condition in the "
228 f"creation of the directory. Exception: {e}.",
229 CacheWarning,
230 )
231
232 def clear_item(self, call_id):
233 """Clear the item at the id, given as a list of str."""
234 item_path = os.path.join(self.location, *call_id)
235 if self._item_exists(item_path):
236 self.clear_location(item_path)
237
238 def contains_item(self, call_id):
239 """Check if there is an item at the id, given as a list of str."""
240 item_path = os.path.join(self.location, *call_id)
241 filename = os.path.join(item_path, "output.pkl")
242
243 return self._item_exists(filename)
244
245 def get_item_info(self, call_id):
246 """Return information about item."""
247 return {"location": os.path.join(self.location, *call_id)}
248
249 def get_metadata(self, call_id):
250 """Return actual metadata of an item."""
251 try:
252 item_path = os.path.join(self.location, *call_id)
253 filename = os.path.join(item_path, "metadata.json")
254 with self._open_item(filename, "rb") as f:
255 return json.loads(f.read().decode("utf-8"))
256 except: # noqa: E722
257 return {}
258
259 def store_metadata(self, call_id, metadata):
260 """Store metadata of a computation."""
261 try:
262 item_path = os.path.join(self.location, *call_id)
263 self.create_location(item_path)
264 filename = os.path.join(item_path, "metadata.json")
265
266 def write_func(to_write, dest_filename):
267 with self._open_item(dest_filename, "wb") as f:
268 f.write(json.dumps(to_write).encode("utf-8"))
269
270 self._concurrency_safe_write(metadata, filename, write_func)
271 except: # noqa: E722
272 pass
273
274 def contains_path(self, call_id):
275 """Check cached function is available in store."""
276 func_path = os.path.join(self.location, *call_id)
277 return self.object_exists(func_path)
278
279 def clear_path(self, call_id):
280 """Clear all items with a common path in the store."""
281 func_path = os.path.join(self.location, *call_id)
282 if self._item_exists(func_path):
283 self.clear_location(func_path)
284
285 def store_cached_func_code(self, call_id, func_code=None):
286 """Store the code of the cached function."""
287 func_path = os.path.join(self.location, *call_id)
288 if not self._item_exists(func_path):
289 self.create_location(func_path)
290
291 if func_code is not None:
292 filename = os.path.join(func_path, "func_code.py")
293 with self._open_item(filename, "wb") as f:
294 f.write(func_code.encode("utf-8"))
295
296 def get_cached_func_code(self, call_id):
297 """Store the code of the cached function."""
298 filename = os.path.join(self.location, *call_id, "func_code.py")
299 try:
300 with self._open_item(filename, "rb") as f:
301 return f.read().decode("utf-8")
302 except: # noqa: E722
303 raise
304
305 def get_cached_func_info(self, call_id):
306 """Return information related to the cached function if it exists."""
307 return {"location": os.path.join(self.location, *call_id)}
308
309 def clear(self):
310 """Clear the whole store content."""
311 self.clear_location(self.location)
312
313 def enforce_store_limits(self, bytes_limit, items_limit=None, age_limit=None):
314 """
315 Remove the store's oldest files to enforce item, byte, and age limits.
316 """
317 items_to_delete = self._get_items_to_delete(bytes_limit, items_limit, age_limit)
318
319 for item in items_to_delete:
320 if self.verbose > 10:
321 print("Deleting item {0}".format(item))
322 try:
323 self.clear_location(item.path)
324 except OSError:
325 # Even with ignore_errors=True shutil.rmtree can raise OSError
326 # with:
327 # [Errno 116] Stale file handle if another process has deleted
328 # the folder already.
329 pass
330
331 def _get_items_to_delete(self, bytes_limit, items_limit=None, age_limit=None):
332 """
333 Get items to delete to keep the store under size, file, & age limits.
334 """
335 if isinstance(bytes_limit, str):
336 bytes_limit = memstr_to_bytes(bytes_limit)
337
338 items = self.get_items()
339 if not items:
340 return []
341
342 size = sum(item.size for item in items)
343
344 if bytes_limit is not None:
345 to_delete_size = size - bytes_limit
346 else:
347 to_delete_size = 0
348
349 if items_limit is not None:
350 to_delete_items = len(items) - items_limit
351 else:
352 to_delete_items = 0
353
354 if age_limit is not None:
355 older_item = min(item.last_access for item in items)
356 if age_limit.total_seconds() < 0:
357 raise ValueError("age_limit has to be a positive timedelta")
358 deadline = datetime.datetime.now() - age_limit
359 else:
360 deadline = None
361
362 if (
363 to_delete_size <= 0
364 and to_delete_items <= 0
365 and (deadline is None or older_item > deadline)
366 ):
367 return []
368
369 # We want to delete first the cache items that were accessed a
370 # long time ago
371 items.sort(key=operator.attrgetter("last_access"))
372
373 items_to_delete = []
374 size_so_far = 0
375 items_so_far = 0
376
377 for item in items:
378 if (
379 (size_so_far >= to_delete_size)
380 and items_so_far >= to_delete_items
381 and (deadline is None or deadline < item.last_access)
382 ):
383 break
384
385 items_to_delete.append(item)
386 size_so_far += item.size
387 items_so_far += 1
388
389 return items_to_delete
390
391 def _concurrency_safe_write(self, to_write, filename, write_func):
392 """Writes an object into a file in a concurrency-safe way."""
393 temporary_filename = concurrency_safe_write(to_write, filename, write_func)
394 self._move_item(temporary_filename, filename)
395
396 def __repr__(self):
397 """Printable representation of the store location."""
398 return '{class_name}(location="{location}")'.format(
399 class_name=self.__class__.__name__, location=self.location
400 )
401
402
403class FileSystemStoreBackend(StoreBackendBase, StoreBackendMixin):
404 """A StoreBackend used with local or network file systems."""
405
406 _open_item = staticmethod(open)
407 _item_exists = staticmethod(os.path.exists)
408 _move_item = staticmethod(concurrency_safe_rename)
409
410 def clear_location(self, location):
411 """Delete location on store."""
412 if location == self.location:
413 rm_subdirs(location)
414 else:
415 shutil.rmtree(location, ignore_errors=True)
416
417 def create_location(self, location):
418 """Create object location on store"""
419 mkdirp(location)
420
421 def get_items(self):
422 """Returns the whole list of items available in the store."""
423 items = []
424
425 for dirpath, _, filenames in os.walk(self.location):
426 is_cache_hash_dir = re.match("[a-f0-9]{32}", os.path.basename(dirpath))
427
428 if is_cache_hash_dir:
429 output_filename = os.path.join(dirpath, "output.pkl")
430 try:
431 last_access = os.path.getatime(output_filename)
432 except OSError:
433 try:
434 last_access = os.path.getatime(dirpath)
435 except OSError:
436 # The directory has already been deleted
437 continue
438
439 last_access = datetime.datetime.fromtimestamp(last_access)
440 try:
441 full_filenames = [os.path.join(dirpath, fn) for fn in filenames]
442 dirsize = sum(os.path.getsize(fn) for fn in full_filenames)
443 except OSError:
444 # Either output_filename or one of the files in
445 # dirpath does not exist any more. We assume this
446 # directory is being cleaned by another process already
447 continue
448
449 items.append(CacheItemInfo(dirpath, dirsize, last_access))
450
451 return items
452
453 def configure(self, location, verbose=1, backend_options=None):
454 """Configure the store backend.
455
456 For this backend, valid store options are 'compress' and 'mmap_mode'
457 """
458 if backend_options is None:
459 backend_options = {}
460
461 # setup location directory
462 self.location = location
463 if not os.path.exists(self.location):
464 mkdirp(self.location)
465
466 # Automatically add `.gitignore` file to the cache folder.
467 # XXX: the condition is necessary because in `Memory.__init__`, the user
468 # passed `location` param is modified to be either `{location}` or
469 # `{location}/joblib` depending on input type (`pathlib.Path` vs `str`).
470 # The proper resolution of this inconsistency is tracked in:
471 # https://github.com/joblib/joblib/issues/1684
472 cache_directory = (
473 os.path.dirname(location)
474 if os.path.dirname(location) and os.path.basename(location) == "joblib"
475 else location
476 )
477 with open(os.path.join(cache_directory, ".gitignore"), "w") as file:
478 file.write("# Created by joblib automatically.\n")
479 file.write("*\n")
480
481 # item can be stored compressed for faster I/O
482 self.compress = backend_options.get("compress", False)
483
484 # FileSystemStoreBackend can be used with mmap_mode options under
485 # certain conditions.
486 mmap_mode = backend_options.get("mmap_mode")
487 if self.compress and mmap_mode is not None:
488 warnings.warn(
489 "Compressed items cannot be memmapped in a "
490 "filesystem store. Option will be ignored.",
491 stacklevel=2,
492 )
493
494 self.mmap_mode = mmap_mode
495 self.verbose = verbose