1"""Storage providers backends for Memory caching."""
2
3from pickle import PicklingError
4import re
5import os
6import os.path
7import datetime
8import json
9import shutil
10import time
11import warnings
12import collections
13import operator
14import threading
15from abc import ABCMeta, abstractmethod
16
17from .backports import concurrency_safe_rename
18from .disk import mkdirp, memstr_to_bytes, rm_subdirs
19from .logger import format_time
20from . import numpy_pickle
21
22CacheItemInfo = collections.namedtuple('CacheItemInfo',
23 'path size last_access')
24
25
26class CacheWarning(Warning):
27 """Warning to capture dump failures except for PicklingError."""
28 pass
29
30
31def concurrency_safe_write(object_to_write, filename, write_func):
32 """Writes an object into a unique file in a concurrency-safe way."""
33 thread_id = id(threading.current_thread())
34 temporary_filename = '{}.thread-{}-pid-{}'.format(
35 filename, thread_id, os.getpid())
36 write_func(object_to_write, temporary_filename)
37
38 return temporary_filename
39
40
41class StoreBackendBase(metaclass=ABCMeta):
42 """Helper Abstract Base Class which defines all methods that
43 a StorageBackend must implement."""
44
45 location = None
46
47 @abstractmethod
48 def _open_item(self, f, mode):
49 """Opens an item on the store and return a file-like object.
50
51 This method is private and only used by the StoreBackendMixin object.
52
53 Parameters
54 ----------
55 f: a file-like object
56 The file-like object where an item is stored and retrieved
57 mode: string, optional
58 the mode in which the file-like object is opened allowed valued are
59 'rb', 'wb'
60
61 Returns
62 -------
63 a file-like object
64 """
65
66 @abstractmethod
67 def _item_exists(self, location):
68 """Checks if an item location exists in the store.
69
70 This method is private and only used by the StoreBackendMixin object.
71
72 Parameters
73 ----------
74 location: string
75 The location of an item. On a filesystem, this corresponds to the
76 absolute path, including the filename, of a file.
77
78 Returns
79 -------
80 True if the item exists, False otherwise
81 """
82
83 @abstractmethod
84 def _move_item(self, src, dst):
85 """Moves an item from src to dst in the store.
86
87 This method is private and only used by the StoreBackendMixin object.
88
89 Parameters
90 ----------
91 src: string
92 The source location of an item
93 dst: string
94 The destination location of an item
95 """
96
97 @abstractmethod
98 def create_location(self, location):
99 """Creates a location on the store.
100
101 Parameters
102 ----------
103 location: string
104 The location in the store. On a filesystem, this corresponds to a
105 directory.
106 """
107
108 @abstractmethod
109 def clear_location(self, location):
110 """Clears a location on the store.
111
112 Parameters
113 ----------
114 location: string
115 The location in the store. On a filesystem, this corresponds to a
116 directory or a filename absolute path
117 """
118
119 @abstractmethod
120 def get_items(self):
121 """Returns the whole list of items available in the store.
122
123 Returns
124 -------
125 The list of items identified by their ids (e.g filename in a
126 filesystem).
127 """
128
129 @abstractmethod
130 def configure(self, location, verbose=0, backend_options=dict()):
131 """Configures the store.
132
133 Parameters
134 ----------
135 location: string
136 The base location used by the store. On a filesystem, this
137 corresponds to a directory.
138 verbose: int
139 The level of verbosity of the store
140 backend_options: dict
141 Contains a dictionary of named parameters used to configure the
142 store backend.
143 """
144
145
146class StoreBackendMixin(object):
147 """Class providing all logic for managing the store in a generic way.
148
149 The StoreBackend subclass has to implement 3 methods: create_location,
150 clear_location and configure. The StoreBackend also has to provide
151 a private _open_item, _item_exists and _move_item methods. The _open_item
152 method has to have the same signature as the builtin open and return a
153 file-like object.
154 """
155
156 def load_item(self, call_id, verbose=1, timestamp=None, metadata=None):
157 """Load an item from the store given its id as a list of str."""
158 full_path = os.path.join(self.location, *call_id)
159
160 if verbose > 1:
161 ts_string = ('{: <16}'.format(format_time(time.time() - timestamp))
162 if timestamp is not None else '')
163 signature = os.path.basename(call_id[0])
164 if metadata is not None and 'input_args' in metadata:
165 kwargs = ', '.join('{}={}'.format(*item)
166 for item in metadata['input_args'].items())
167 signature += '({})'.format(kwargs)
168 msg = '[Memory]{}: Loading {}'.format(ts_string, signature)
169 if verbose < 10:
170 print('{0}...'.format(msg))
171 else:
172 print('{0} from {1}'.format(msg, full_path))
173
174 mmap_mode = (None if not hasattr(self, 'mmap_mode')
175 else self.mmap_mode)
176
177 filename = os.path.join(full_path, 'output.pkl')
178 if not self._item_exists(filename):
179 raise KeyError("Non-existing item (may have been "
180 "cleared).\nFile %s does not exist" % filename)
181
182 # file-like object cannot be used when mmap_mode is set
183 if mmap_mode is None:
184 with self._open_item(filename, "rb") as f:
185 item = numpy_pickle.load(f)
186 else:
187 item = numpy_pickle.load(filename, mmap_mode=mmap_mode)
188 return item
189
190 def dump_item(self, call_id, item, verbose=1):
191 """Dump an item in the store at the id given as a list of str."""
192 try:
193 item_path = os.path.join(self.location, *call_id)
194 if not self._item_exists(item_path):
195 self.create_location(item_path)
196 filename = os.path.join(item_path, 'output.pkl')
197 if verbose > 10:
198 print('Persisting in %s' % item_path)
199
200 def write_func(to_write, dest_filename):
201 with self._open_item(dest_filename, "wb") as f:
202 try:
203 numpy_pickle.dump(to_write, f, compress=self.compress)
204 except PicklingError as e:
205 # TODO(1.5) turn into error
206 warnings.warn(
207 "Unable to cache to disk: failed to pickle "
208 "output. In version 1.5 this will raise an "
209 f"exception. Exception: {e}.",
210 FutureWarning
211 )
212
213 self._concurrency_safe_write(item, filename, write_func)
214 except Exception as e: # noqa: E722
215 warnings.warn(
216 "Unable to cache to disk. Possibly a race condition in the "
217 f"creation of the directory. Exception: {e}.",
218 CacheWarning
219 )
220
221 def clear_item(self, call_id):
222 """Clear the item at the id, given as a list of str."""
223 item_path = os.path.join(self.location, *call_id)
224 if self._item_exists(item_path):
225 self.clear_location(item_path)
226
227 def contains_item(self, call_id):
228 """Check if there is an item at the id, given as a list of str."""
229 item_path = os.path.join(self.location, *call_id)
230 filename = os.path.join(item_path, 'output.pkl')
231
232 return self._item_exists(filename)
233
234 def get_item_info(self, call_id):
235 """Return information about item."""
236 return {'location': os.path.join(self.location, *call_id)}
237
238 def get_metadata(self, call_id):
239 """Return actual metadata of an item."""
240 try:
241 item_path = os.path.join(self.location, *call_id)
242 filename = os.path.join(item_path, 'metadata.json')
243 with self._open_item(filename, 'rb') as f:
244 return json.loads(f.read().decode('utf-8'))
245 except: # noqa: E722
246 return {}
247
248 def store_metadata(self, call_id, metadata):
249 """Store metadata of a computation."""
250 try:
251 item_path = os.path.join(self.location, *call_id)
252 self.create_location(item_path)
253 filename = os.path.join(item_path, 'metadata.json')
254
255 def write_func(to_write, dest_filename):
256 with self._open_item(dest_filename, "wb") as f:
257 f.write(json.dumps(to_write).encode('utf-8'))
258
259 self._concurrency_safe_write(metadata, filename, write_func)
260 except: # noqa: E722
261 pass
262
263 def contains_path(self, call_id):
264 """Check cached function is available in store."""
265 func_path = os.path.join(self.location, *call_id)
266 return self.object_exists(func_path)
267
268 def clear_path(self, call_id):
269 """Clear all items with a common path in the store."""
270 func_path = os.path.join(self.location, *call_id)
271 if self._item_exists(func_path):
272 self.clear_location(func_path)
273
274 def store_cached_func_code(self, call_id, func_code=None):
275 """Store the code of the cached function."""
276 func_path = os.path.join(self.location, *call_id)
277 if not self._item_exists(func_path):
278 self.create_location(func_path)
279
280 if func_code is not None:
281 filename = os.path.join(func_path, "func_code.py")
282 with self._open_item(filename, 'wb') as f:
283 f.write(func_code.encode('utf-8'))
284
285 def get_cached_func_code(self, call_id):
286 """Store the code of the cached function."""
287 filename = os.path.join(self.location, *call_id, 'func_code.py')
288 try:
289 with self._open_item(filename, 'rb') as f:
290 return f.read().decode('utf-8')
291 except: # noqa: E722
292 raise
293
294 def get_cached_func_info(self, call_id):
295 """Return information related to the cached function if it exists."""
296 return {'location': os.path.join(self.location, *call_id)}
297
298 def clear(self):
299 """Clear the whole store content."""
300 self.clear_location(self.location)
301
302 def enforce_store_limits(
303 self, bytes_limit, items_limit=None, age_limit=None
304 ):
305 """
306 Remove the store's oldest files to enforce item, byte, and age limits.
307 """
308 items_to_delete = self._get_items_to_delete(
309 bytes_limit, items_limit, age_limit
310 )
311
312 for item in items_to_delete:
313 if self.verbose > 10:
314 print('Deleting item {0}'.format(item))
315 try:
316 self.clear_location(item.path)
317 except OSError:
318 # Even with ignore_errors=True shutil.rmtree can raise OSError
319 # with:
320 # [Errno 116] Stale file handle if another process has deleted
321 # the folder already.
322 pass
323
324 def _get_items_to_delete(
325 self, bytes_limit, items_limit=None, age_limit=None
326 ):
327 """
328 Get items to delete to keep the store under size, file, & age limits.
329 """
330 if isinstance(bytes_limit, str):
331 bytes_limit = memstr_to_bytes(bytes_limit)
332
333 items = self.get_items()
334 if not items:
335 return []
336
337 size = sum(item.size for item in items)
338
339 if bytes_limit is not None:
340 to_delete_size = size - bytes_limit
341 else:
342 to_delete_size = 0
343
344 if items_limit is not None:
345 to_delete_items = len(items) - items_limit
346 else:
347 to_delete_items = 0
348
349 if age_limit is not None:
350 older_item = min(item.last_access for item in items)
351 deadline = datetime.datetime.now() - age_limit
352 else:
353 deadline = None
354
355 if (
356 to_delete_size <= 0 and to_delete_items <= 0
357 and (deadline is None or older_item > deadline)
358 ):
359 return []
360
361 # We want to delete first the cache items that were accessed a
362 # long time ago
363 items.sort(key=operator.attrgetter('last_access'))
364
365 items_to_delete = []
366 size_so_far = 0
367 items_so_far = 0
368
369 for item in items:
370 if (
371 (size_so_far >= to_delete_size)
372 and items_so_far >= to_delete_items
373 and (deadline is None or deadline < item.last_access)
374 ):
375 break
376
377 items_to_delete.append(item)
378 size_so_far += item.size
379 items_so_far += 1
380
381 return items_to_delete
382
383 def _concurrency_safe_write(self, to_write, filename, write_func):
384 """Writes an object into a file in a concurrency-safe way."""
385 temporary_filename = concurrency_safe_write(to_write,
386 filename, write_func)
387 self._move_item(temporary_filename, filename)
388
389 def __repr__(self):
390 """Printable representation of the store location."""
391 return '{class_name}(location="{location}")'.format(
392 class_name=self.__class__.__name__, location=self.location)
393
394
395class FileSystemStoreBackend(StoreBackendBase, StoreBackendMixin):
396 """A StoreBackend used with local or network file systems."""
397
398 _open_item = staticmethod(open)
399 _item_exists = staticmethod(os.path.exists)
400 _move_item = staticmethod(concurrency_safe_rename)
401
402 def clear_location(self, location):
403 """Delete location on store."""
404 if (location == self.location):
405 rm_subdirs(location)
406 else:
407 shutil.rmtree(location, ignore_errors=True)
408
409 def create_location(self, location):
410 """Create object location on store"""
411 mkdirp(location)
412
413 def get_items(self):
414 """Returns the whole list of items available in the store."""
415 items = []
416
417 for dirpath, _, filenames in os.walk(self.location):
418 is_cache_hash_dir = re.match('[a-f0-9]{32}',
419 os.path.basename(dirpath))
420
421 if is_cache_hash_dir:
422 output_filename = os.path.join(dirpath, 'output.pkl')
423 try:
424 last_access = os.path.getatime(output_filename)
425 except OSError:
426 try:
427 last_access = os.path.getatime(dirpath)
428 except OSError:
429 # The directory has already been deleted
430 continue
431
432 last_access = datetime.datetime.fromtimestamp(last_access)
433 try:
434 full_filenames = [os.path.join(dirpath, fn)
435 for fn in filenames]
436 dirsize = sum(os.path.getsize(fn)
437 for fn in full_filenames)
438 except OSError:
439 # Either output_filename or one of the files in
440 # dirpath does not exist any more. We assume this
441 # directory is being cleaned by another process already
442 continue
443
444 items.append(CacheItemInfo(dirpath, dirsize,
445 last_access))
446
447 return items
448
449 def configure(self, location, verbose=1, backend_options=None):
450 """Configure the store backend.
451
452 For this backend, valid store options are 'compress' and 'mmap_mode'
453 """
454 if backend_options is None:
455 backend_options = {}
456
457 # setup location directory
458 self.location = location
459 if not os.path.exists(self.location):
460 mkdirp(self.location)
461
462 # item can be stored compressed for faster I/O
463 self.compress = backend_options.get('compress', False)
464
465 # FileSystemStoreBackend can be used with mmap_mode options under
466 # certain conditions.
467 mmap_mode = backend_options.get('mmap_mode')
468 if self.compress and mmap_mode is not None:
469 warnings.warn('Compressed items cannot be memmapped in a '
470 'filesystem store. Option will be ignored.',
471 stacklevel=2)
472
473 self.mmap_mode = mmap_mode
474 self.verbose = verbose