Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/joblib/_store_backends.py: 29%
213 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-12 06:31 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-12 06:31 +0000
1"""Storage providers backends for Memory caching."""
3from pickle import PicklingError
4import re
5import os
6import os.path
7import datetime
8import json
9import shutil
10import warnings
11import collections
12import operator
13import threading
14from abc import ABCMeta, abstractmethod
16from .backports import concurrency_safe_rename
17from .disk import mkdirp, memstr_to_bytes, rm_subdirs
18from . import numpy_pickle
20CacheItemInfo = collections.namedtuple('CacheItemInfo',
21 'path size last_access')
24class CacheWarning(Warning):
25 """Warning to capture dump failures except for PicklingError."""
26 pass
29def concurrency_safe_write(object_to_write, filename, write_func):
30 """Writes an object into a unique file in a concurrency-safe way."""
31 thread_id = id(threading.current_thread())
32 temporary_filename = '{}.thread-{}-pid-{}'.format(
33 filename, thread_id, os.getpid())
34 write_func(object_to_write, temporary_filename)
36 return temporary_filename
39class StoreBackendBase(metaclass=ABCMeta):
40 """Helper Abstract Base Class which defines all methods that
41 a StorageBackend must implement."""
43 location = None
45 @abstractmethod
46 def _open_item(self, f, mode):
47 """Opens an item on the store and return a file-like object.
49 This method is private and only used by the StoreBackendMixin object.
51 Parameters
52 ----------
53 f: a file-like object
54 The file-like object where an item is stored and retrieved
55 mode: string, optional
56 the mode in which the file-like object is opened allowed valued are
57 'rb', 'wb'
59 Returns
60 -------
61 a file-like object
62 """
64 @abstractmethod
65 def _item_exists(self, location):
66 """Checks if an item location exists in the store.
68 This method is private and only used by the StoreBackendMixin object.
70 Parameters
71 ----------
72 location: string
73 The location of an item. On a filesystem, this corresponds to the
74 absolute path, including the filename, of a file.
76 Returns
77 -------
78 True if the item exists, False otherwise
79 """
81 @abstractmethod
82 def _move_item(self, src, dst):
83 """Moves an item from src to dst in the store.
85 This method is private and only used by the StoreBackendMixin object.
87 Parameters
88 ----------
89 src: string
90 The source location of an item
91 dst: string
92 The destination location of an item
93 """
95 @abstractmethod
96 def create_location(self, location):
97 """Creates a location on the store.
99 Parameters
100 ----------
101 location: string
102 The location in the store. On a filesystem, this corresponds to a
103 directory.
104 """
106 @abstractmethod
107 def clear_location(self, location):
108 """Clears a location on the store.
110 Parameters
111 ----------
112 location: string
113 The location in the store. On a filesystem, this corresponds to a
114 directory or a filename absolute path
115 """
117 @abstractmethod
118 def get_items(self):
119 """Returns the whole list of items available in the store.
121 Returns
122 -------
123 The list of items identified by their ids (e.g filename in a
124 filesystem).
125 """
127 @abstractmethod
128 def configure(self, location, verbose=0, backend_options=dict()):
129 """Configures the store.
131 Parameters
132 ----------
133 location: string
134 The base location used by the store. On a filesystem, this
135 corresponds to a directory.
136 verbose: int
137 The level of verbosity of the store
138 backend_options: dict
139 Contains a dictionary of named parameters used to configure the
140 store backend.
141 """
144class StoreBackendMixin(object):
145 """Class providing all logic for managing the store in a generic way.
147 The StoreBackend subclass has to implement 3 methods: create_location,
148 clear_location and configure. The StoreBackend also has to provide
149 a private _open_item, _item_exists and _move_item methods. The _open_item
150 method has to have the same signature as the builtin open and return a
151 file-like object.
152 """
154 def load_item(self, path, verbose=1, msg=None):
155 """Load an item from the store given its path as a list of
156 strings."""
157 full_path = os.path.join(self.location, *path)
159 if verbose > 1:
160 if verbose < 10:
161 print('{0}...'.format(msg))
162 else:
163 print('{0} from {1}'.format(msg, full_path))
165 mmap_mode = (None if not hasattr(self, 'mmap_mode')
166 else self.mmap_mode)
168 filename = os.path.join(full_path, 'output.pkl')
169 if not self._item_exists(filename):
170 raise KeyError("Non-existing item (may have been "
171 "cleared).\nFile %s does not exist" % filename)
173 # file-like object cannot be used when mmap_mode is set
174 if mmap_mode is None:
175 with self._open_item(filename, "rb") as f:
176 item = numpy_pickle.load(f)
177 else:
178 item = numpy_pickle.load(filename, mmap_mode=mmap_mode)
179 return item
181 def dump_item(self, path, item, verbose=1):
182 """Dump an item in the store at the path given as a list of
183 strings."""
184 try:
185 item_path = os.path.join(self.location, *path)
186 if not self._item_exists(item_path):
187 self.create_location(item_path)
188 filename = os.path.join(item_path, 'output.pkl')
189 if verbose > 10:
190 print('Persisting in %s' % item_path)
192 def write_func(to_write, dest_filename):
193 with self._open_item(dest_filename, "wb") as f:
194 try:
195 numpy_pickle.dump(to_write, f, compress=self.compress)
196 except PicklingError as e:
197 # TODO(1.5) turn into error
198 warnings.warn(
199 "Unable to cache to disk: failed to pickle "
200 "output. In version 1.5 this will raise an "
201 f"exception. Exception: {e}.",
202 FutureWarning
203 )
205 self._concurrency_safe_write(item, filename, write_func)
206 except Exception as e: # noqa: E722
207 warnings.warn(
208 "Unable to cache to disk. Possibly a race condition in the "
209 f"creation of the directory. Exception: {e}.",
210 CacheWarning
211 )
213 def clear_item(self, path):
214 """Clear the item at the path, given as a list of strings."""
215 item_path = os.path.join(self.location, *path)
216 if self._item_exists(item_path):
217 self.clear_location(item_path)
219 def contains_item(self, path):
220 """Check if there is an item at the path, given as a list of
221 strings"""
222 item_path = os.path.join(self.location, *path)
223 filename = os.path.join(item_path, 'output.pkl')
225 return self._item_exists(filename)
227 def get_item_info(self, path):
228 """Return information about item."""
229 return {'location': os.path.join(self.location,
230 *path)}
232 def get_metadata(self, path):
233 """Return actual metadata of an item."""
234 try:
235 item_path = os.path.join(self.location, *path)
236 filename = os.path.join(item_path, 'metadata.json')
237 with self._open_item(filename, 'rb') as f:
238 return json.loads(f.read().decode('utf-8'))
239 except: # noqa: E722
240 return {}
242 def store_metadata(self, path, metadata):
243 """Store metadata of a computation."""
244 try:
245 item_path = os.path.join(self.location, *path)
246 self.create_location(item_path)
247 filename = os.path.join(item_path, 'metadata.json')
249 def write_func(to_write, dest_filename):
250 with self._open_item(dest_filename, "wb") as f:
251 f.write(json.dumps(to_write).encode('utf-8'))
253 self._concurrency_safe_write(metadata, filename, write_func)
254 except: # noqa: E722
255 pass
257 def contains_path(self, path):
258 """Check cached function is available in store."""
259 func_path = os.path.join(self.location, *path)
260 return self.object_exists(func_path)
262 def clear_path(self, path):
263 """Clear all items with a common path in the store."""
264 func_path = os.path.join(self.location, *path)
265 if self._item_exists(func_path):
266 self.clear_location(func_path)
268 def store_cached_func_code(self, path, func_code=None):
269 """Store the code of the cached function."""
270 func_path = os.path.join(self.location, *path)
271 if not self._item_exists(func_path):
272 self.create_location(func_path)
274 if func_code is not None:
275 filename = os.path.join(func_path, "func_code.py")
276 with self._open_item(filename, 'wb') as f:
277 f.write(func_code.encode('utf-8'))
279 def get_cached_func_code(self, path):
280 """Store the code of the cached function."""
281 path += ['func_code.py', ]
282 filename = os.path.join(self.location, *path)
283 try:
284 with self._open_item(filename, 'rb') as f:
285 return f.read().decode('utf-8')
286 except: # noqa: E722
287 raise
289 def get_cached_func_info(self, path):
290 """Return information related to the cached function if it exists."""
291 return {'location': os.path.join(self.location, *path)}
293 def clear(self):
294 """Clear the whole store content."""
295 self.clear_location(self.location)
297 def enforce_store_limits(
298 self, bytes_limit, items_limit=None, age_limit=None
299 ):
300 """
301 Remove the store's oldest files to enforce item, byte, and age limits.
302 """
303 items_to_delete = self._get_items_to_delete(
304 bytes_limit, items_limit, age_limit
305 )
307 for item in items_to_delete:
308 if self.verbose > 10:
309 print('Deleting item {0}'.format(item))
310 try:
311 self.clear_location(item.path)
312 except OSError:
313 # Even with ignore_errors=True shutil.rmtree can raise OSError
314 # with:
315 # [Errno 116] Stale file handle if another process has deleted
316 # the folder already.
317 pass
319 def _get_items_to_delete(
320 self, bytes_limit, items_limit=None, age_limit=None
321 ):
322 """
323 Get items to delete to keep the store under size, file, & age limits.
324 """
325 if isinstance(bytes_limit, str):
326 bytes_limit = memstr_to_bytes(bytes_limit)
328 items = self.get_items()
329 size = sum(item.size for item in items)
331 if bytes_limit is not None:
332 to_delete_size = size - bytes_limit
333 else:
334 to_delete_size = 0
336 if items_limit is not None:
337 to_delete_items = len(items) - items_limit
338 else:
339 to_delete_items = 0
341 if age_limit is not None:
342 older_item = min(item.last_access for item in items)
343 deadline = datetime.datetime.now() - age_limit
344 else:
345 deadline = None
347 if (
348 to_delete_size <= 0 and to_delete_items <= 0
349 and (deadline is None or older_item > deadline)
350 ):
351 return []
353 # We want to delete first the cache items that were accessed a
354 # long time ago
355 items.sort(key=operator.attrgetter('last_access'))
357 items_to_delete = []
358 size_so_far = 0
359 items_so_far = 0
361 for item in items:
362 if (
363 (size_so_far >= to_delete_size)
364 and items_so_far >= to_delete_items
365 and (deadline is None or deadline < item.last_access)
366 ):
367 break
369 items_to_delete.append(item)
370 size_so_far += item.size
371 items_so_far += 1
373 return items_to_delete
375 def _concurrency_safe_write(self, to_write, filename, write_func):
376 """Writes an object into a file in a concurrency-safe way."""
377 temporary_filename = concurrency_safe_write(to_write,
378 filename, write_func)
379 self._move_item(temporary_filename, filename)
381 def __repr__(self):
382 """Printable representation of the store location."""
383 return '{class_name}(location="{location}")'.format(
384 class_name=self.__class__.__name__, location=self.location)
387class FileSystemStoreBackend(StoreBackendBase, StoreBackendMixin):
388 """A StoreBackend used with local or network file systems."""
390 _open_item = staticmethod(open)
391 _item_exists = staticmethod(os.path.exists)
392 _move_item = staticmethod(concurrency_safe_rename)
394 def clear_location(self, location):
395 """Delete location on store."""
396 if (location == self.location):
397 rm_subdirs(location)
398 else:
399 shutil.rmtree(location, ignore_errors=True)
401 def create_location(self, location):
402 """Create object location on store"""
403 mkdirp(location)
405 def get_items(self):
406 """Returns the whole list of items available in the store."""
407 items = []
409 for dirpath, _, filenames in os.walk(self.location):
410 is_cache_hash_dir = re.match('[a-f0-9]{32}',
411 os.path.basename(dirpath))
413 if is_cache_hash_dir:
414 output_filename = os.path.join(dirpath, 'output.pkl')
415 try:
416 last_access = os.path.getatime(output_filename)
417 except OSError:
418 try:
419 last_access = os.path.getatime(dirpath)
420 except OSError:
421 # The directory has already been deleted
422 continue
424 last_access = datetime.datetime.fromtimestamp(last_access)
425 try:
426 full_filenames = [os.path.join(dirpath, fn)
427 for fn in filenames]
428 dirsize = sum(os.path.getsize(fn)
429 for fn in full_filenames)
430 except OSError:
431 # Either output_filename or one of the files in
432 # dirpath does not exist any more. We assume this
433 # directory is being cleaned by another process already
434 continue
436 items.append(CacheItemInfo(dirpath, dirsize,
437 last_access))
439 return items
441 def configure(self, location, verbose=1, backend_options=None):
442 """Configure the store backend.
444 For this backend, valid store options are 'compress' and 'mmap_mode'
445 """
446 if backend_options is None:
447 backend_options = {}
449 # setup location directory
450 self.location = location
451 if not os.path.exists(self.location):
452 mkdirp(self.location)
454 # item can be stored compressed for faster I/O
455 self.compress = backend_options.get('compress', False)
457 # FileSystemStoreBackend can be used with mmap_mode options under
458 # certain conditions.
459 mmap_mode = backend_options.get('mmap_mode')
460 if self.compress and mmap_mode is not None:
461 warnings.warn('Compressed items cannot be memmapped in a '
462 'filesystem store. Option will be ignored.',
463 stacklevel=2)
465 self.mmap_mode = mmap_mode
466 self.verbose = verbose