Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/black/cache.py: 44%

50 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:15 +0000

1"""Caching of formatted files with feature-based invalidation.""" 

2 

3import os 

4import pickle 

5import tempfile 

6from pathlib import Path 

7from typing import Dict, Iterable, Set, Tuple 

8 

9from platformdirs import user_cache_dir 

10 

11from _black_version import version as __version__ 

12from black.mode import Mode 

13 

14# types 

15Timestamp = float 

16FileSize = int 

17CacheInfo = Tuple[Timestamp, FileSize] 

18Cache = Dict[str, CacheInfo] 

19 

20 

21def get_cache_dir() -> Path: 

22 """Get the cache directory used by black. 

23 

24 Users can customize this directory on all systems using `BLACK_CACHE_DIR` 

25 environment variable. By default, the cache directory is the user cache directory 

26 under the black application. 

27 

28 This result is immediately set to a constant `black.cache.CACHE_DIR` as to avoid 

29 repeated calls. 

30 """ 

31 # NOTE: Function mostly exists as a clean way to test getting the cache directory. 

32 default_cache_dir = user_cache_dir("black", version=__version__) 

33 cache_dir = Path(os.environ.get("BLACK_CACHE_DIR", default_cache_dir)) 

34 return cache_dir 

35 

36 

37CACHE_DIR = get_cache_dir() 

38 

39 

40def read_cache(mode: Mode) -> Cache: 

41 """Read the cache if it exists and is well formed. 

42 

43 If it is not well formed, the call to write_cache later should resolve the issue. 

44 """ 

45 cache_file = get_cache_file(mode) 

46 if not cache_file.exists(): 

47 return {} 

48 

49 with cache_file.open("rb") as fobj: 

50 try: 

51 cache: Cache = pickle.load(fobj) 

52 except (pickle.UnpicklingError, ValueError, IndexError): 

53 return {} 

54 

55 return cache 

56 

57 

58def get_cache_file(mode: Mode) -> Path: 

59 return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle" 

60 

61 

62def get_cache_info(path: Path) -> CacheInfo: 

63 """Return the information used to check if a file is already formatted or not.""" 

64 stat = path.stat() 

65 return stat.st_mtime, stat.st_size 

66 

67 

68def filter_cached(cache: Cache, sources: Iterable[Path]) -> Tuple[Set[Path], Set[Path]]: 

69 """Split an iterable of paths in `sources` into two sets. 

70 

71 The first contains paths of files that modified on disk or are not in the 

72 cache. The other contains paths to non-modified files. 

73 """ 

74 todo, done = set(), set() 

75 for src in sources: 

76 res_src = src.resolve() 

77 if cache.get(str(res_src)) != get_cache_info(res_src): 

78 todo.add(src) 

79 else: 

80 done.add(src) 

81 return todo, done 

82 

83 

84def write_cache(cache: Cache, sources: Iterable[Path], mode: Mode) -> None: 

85 """Update the cache file.""" 

86 cache_file = get_cache_file(mode) 

87 try: 

88 CACHE_DIR.mkdir(parents=True, exist_ok=True) 

89 new_cache = { 

90 **cache, 

91 **{str(src.resolve()): get_cache_info(src) for src in sources}, 

92 } 

93 with tempfile.NamedTemporaryFile(dir=str(cache_file.parent), delete=False) as f: 

94 pickle.dump(new_cache, f, protocol=4) 

95 os.replace(f.name, cache_file) 

96 except OSError: 

97 pass