Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/black/cache.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Caching of formatted files with feature-based invalidation."""
3import hashlib
4import os
5import pickle
6import sys
7import tempfile
8from collections.abc import Iterable
9from dataclasses import dataclass, field
10from pathlib import Path
11from typing import NamedTuple
13from platformdirs import user_cache_dir
15from _black_version import version as __version__
16from black.mode import Mode
17from black.output import err
19if sys.version_info >= (3, 11):
20 from typing import Self
21else:
22 from typing_extensions import Self
25class FileData(NamedTuple):
26 st_mtime: float
27 st_size: int
28 hash: str
31def get_cache_dir() -> Path:
32 """Get the cache directory used by black.
34 Users can customize this directory on all systems using `BLACK_CACHE_DIR`
35 environment variable. By default, the cache directory is the user cache directory
36 under the black application.
38 This result is immediately set to a constant `black.cache.CACHE_DIR` as to avoid
39 repeated calls.
40 """
41 # NOTE: Function mostly exists as a clean way to test getting the cache directory.
42 default_cache_dir = user_cache_dir("black")
43 cache_dir = Path(os.environ.get("BLACK_CACHE_DIR", default_cache_dir))
44 cache_dir = cache_dir / __version__
45 return cache_dir
48CACHE_DIR = get_cache_dir()
51def get_cache_file(mode: Mode) -> Path:
52 return CACHE_DIR / f"cache.{mode.get_cache_key()}.pickle"
55@dataclass
56class Cache:
57 mode: Mode
58 cache_file: Path
59 file_data: dict[str, FileData] = field(default_factory=dict)
61 @classmethod
62 def read(cls, mode: Mode) -> Self:
63 """Read the cache if it exists and is well-formed.
65 If it is not well-formed, the call to write later should
66 resolve the issue.
67 """
68 cache_file = get_cache_file(mode)
69 try:
70 exists = cache_file.exists()
71 except OSError as e:
72 # Likely file too long; see #4172 and #4174
73 err(f"Unable to read cache file {cache_file} due to {e}")
74 return cls(mode, cache_file)
75 if not exists:
76 return cls(mode, cache_file)
78 with cache_file.open("rb") as fobj:
79 try:
80 data: dict[str, tuple[float, int, str]] = pickle.load(fobj)
81 file_data = {k: FileData(*v) for k, v in data.items()}
82 except (pickle.UnpicklingError, ValueError, IndexError):
83 return cls(mode, cache_file)
85 return cls(mode, cache_file, file_data)
87 @staticmethod
88 def hash_digest(path: Path) -> str:
89 """Return hash digest for path."""
91 data = path.read_bytes()
92 return hashlib.sha256(data).hexdigest()
94 @staticmethod
95 def get_file_data(path: Path) -> FileData:
96 """Return file data for path."""
98 stat = path.stat()
99 hash = Cache.hash_digest(path)
100 return FileData(stat.st_mtime, stat.st_size, hash)
102 def is_changed(self, source: Path) -> bool:
103 """Check if source has changed compared to cached version."""
104 res_src = source.resolve()
105 old = self.file_data.get(str(res_src))
106 if old is None:
107 return True
109 st = res_src.stat()
110 if st.st_size != old.st_size:
111 return True
112 if st.st_mtime != old.st_mtime:
113 new_hash = Cache.hash_digest(res_src)
114 if new_hash != old.hash:
115 return True
116 return False
118 def filtered_cached(self, sources: Iterable[Path]) -> tuple[set[Path], set[Path]]:
119 """Split an iterable of paths in `sources` into two sets.
121 The first contains paths of files that modified on disk or are not in the
122 cache. The other contains paths to non-modified files.
123 """
124 changed: set[Path] = set()
125 done: set[Path] = set()
126 for src in sources:
127 if self.is_changed(src):
128 changed.add(src)
129 else:
130 done.add(src)
131 return changed, done
133 def write(self, sources: Iterable[Path]) -> None:
134 """Update the cache file data and write a new cache file."""
135 self.file_data.update(
136 **{str(src.resolve()): Cache.get_file_data(src) for src in sources}
137 )
138 try:
139 CACHE_DIR.mkdir(parents=True, exist_ok=True)
140 with tempfile.NamedTemporaryFile(
141 dir=str(self.cache_file.parent), delete=False
142 ) as f:
143 # We store raw tuples in the cache because it's faster.
144 data: dict[str, tuple[float, int, str]] = {
145 k: (*v,) for k, v in self.file_data.items()
146 }
147 pickle.dump(data, f, protocol=4)
148 os.replace(f.name, self.cache_file)
149 except OSError:
150 pass