Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/black/cache.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

91 statements  

1"""Caching of formatted files with feature-based invalidation.""" 

2 

3import hashlib 

4import os 

5import pickle 

6import sys 

7import tempfile 

8from collections.abc import Iterable 

9from dataclasses import dataclass, field 

10from pathlib import Path 

11from typing import NamedTuple 

12 

13from platformdirs import user_cache_dir 

14 

15from _black_version import version as __version__ 

16from black.mode import Mode 

17from black.output import err 

18 

19if sys.version_info >= (3, 11): 

20 from typing import Self 

21else: 

22 from typing_extensions import Self 

23 

24 

25class FileData(NamedTuple): 

26 st_mtime: float 

27 st_size: int 

28 hash: str 

29 

30 

31def get_cache_dir() -> Path: 

32 """Get the cache directory used by black. 

33 

34 Users can customize this directory on all systems using `BLACK_CACHE_DIR` 

35 environment variable. By default, the cache directory is the user cache directory 

36 under the black application. 

37 

38 This result is immediately set to a constant `black.cache.CACHE_DIR` as to avoid 

39 repeated calls. 

40 """ 

41 # NOTE: Function mostly exists as a clean way to test getting the cache directory. 

42 default_cache_dir = user_cache_dir("black") 

43 cache_dir = Path(os.environ.get("BLACK_CACHE_DIR", default_cache_dir)) 

44 cache_dir = cache_dir / __version__ 

45 return cache_dir 

46 

47 

48CACHE_DIR = get_cache_dir() 

49 

50 

51def get_cache_file(mode: Mode) -> Path: 

52 return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle" 

53 

54 

55@dataclass 

56class Cache: 

57 mode: Mode 

58 cache_file: Path 

59 file_data: dict[str, FileData] = field(default_factory=dict) 

60 

61 @classmethod 

62 def read(cls, mode: Mode) -> Self: 

63 """Read the cache if it exists and is well-formed. 

64 

65 If it is not well-formed, the call to write later should 

66 resolve the issue. 

67 """ 

68 cache_file = get_cache_file(mode) 

69 try: 

70 exists = cache_file.exists() 

71 except OSError as e: 

72 # Likely file too long; see #4172 and #4174 

73 err(f"Unable to read cache file {cache_file} due to {e}") 

74 return cls(mode, cache_file) 

75 if not exists: 

76 return cls(mode, cache_file) 

77 

78 with cache_file.open("rb") as fobj: 

79 try: 

80 data: dict[str, tuple[float, int, str]] = pickle.load(fobj) 

81 file_data = {k: FileData(*v) for k, v in data.items()} 

82 except (pickle.UnpicklingError, ValueError, IndexError): 

83 return cls(mode, cache_file) 

84 

85 return cls(mode, cache_file, file_data) 

86 

87 @staticmethod 

88 def hash_digest(path: Path) -> str: 

89 """Return hash digest for path.""" 

90 

91 data = path.read_bytes() 

92 return hashlib.sha256(data).hexdigest() 

93 

94 @staticmethod 

95 def get_file_data(path: Path) -> FileData: 

96 """Return file data for path.""" 

97 

98 stat = path.stat() 

99 hash = Cache.hash_digest(path) 

100 return FileData(stat.st_mtime, stat.st_size, hash) 

101 

102 def is_changed(self, source: Path) -> bool: 

103 """Check if source has changed compared to cached version.""" 

104 res_src = source.resolve() 

105 old = self.file_data.get(str(res_src)) 

106 if old is None: 

107 return True 

108 

109 st = res_src.stat() 

110 if st.st_size != old.st_size: 

111 return True 

112 if st.st_mtime != old.st_mtime: 

113 new_hash = Cache.hash_digest(res_src) 

114 if new_hash != old.hash: 

115 return True 

116 return False 

117 

118 def filtered_cached(self, sources: Iterable[Path]) -> tuple[set[Path], set[Path]]: 

119 """Split an iterable of paths in `sources` into two sets. 

120 

121 The first contains paths of files that modified on disk or are not in the 

122 cache. The other contains paths to non-modified files. 

123 """ 

124 changed: set[Path] = set() 

125 done: set[Path] = set() 

126 for src in sources: 

127 if self.is_changed(src): 

128 changed.add(src) 

129 else: 

130 done.add(src) 

131 return changed, done 

132 

133 def write(self, sources: Iterable[Path]) -> None: 

134 """Update the cache file data and write a new cache file.""" 

135 self.file_data.update( 

136 **{str(src.resolve()): Cache.get_file_data(src) for src in sources} 

137 ) 

138 try: 

139 CACHE_DIR.mkdir(parents=True, exist_ok=True) 

140 with tempfile.NamedTemporaryFile( 

141 dir=str(self.cache_file.parent), delete=False 

142 ) as f: 

143 # We store raw tuples in the cache because it's faster. 

144 data: dict[str, tuple[float, int, str]] = { 

145 k: (*v,) for k, v in self.file_data.items() 

146 } 

147 pickle.dump(data, f, protocol=4) 

148 os.replace(f.name, self.cache_file) 

149 except OSError: 

150 pass