Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/cache.py: 72%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

182 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import threading 

12from collections import OrderedDict 

13from typing import Any, Generic, TypeVar 

14 

15import attr 

16 

17from hypothesis.errors import InvalidArgument 

18 

19K = TypeVar("K") 

20V = TypeVar("V") 

21 

22 

23@attr.s(slots=True) 

24class Entry(Generic[K, V]): 

25 key: K = attr.ib() 

26 value: V = attr.ib() 

27 score: int = attr.ib() 

28 pins: int = attr.ib(default=0) 

29 

30 @property 

31 def sort_key(self) -> tuple[int, ...]: 

32 if self.pins == 0: 

33 # Unpinned entries are sorted by score. 

34 return (0, self.score) 

35 else: 

36 # Pinned entries sort after unpinned ones. Beyond that, we don't 

37 # worry about their relative order. 

38 return (1,) 

39 

40 

41class GenericCache(Generic[K, V]): 

42 """Generic supertype for cache implementations. 

43 

44 Defines a dict-like mapping with a maximum size, where as well as mapping 

45 to a value, each key also maps to a score. When a write would cause the 

46 dict to exceed its maximum size, it first evicts the existing key with 

47 the smallest score, then adds the new key to the map. If due to pinning 

48 no key can be evicted, ValueError is raised. 

49 

50 A key has the following lifecycle: 

51 

52 1. key is written for the first time, the key is given the score 

53 self.new_entry(key, value) 

54 2. whenever an existing key is read or written, self.on_access(key, value, 

55 score) is called. This returns a new score for the key. 

56 3. After a key is evicted, self.on_evict(key, value, score) is called. 

57 

58 The cache will be in a valid state in all of these cases. 

59 

60 Implementations are expected to implement new_entry and optionally 

61 on_access and on_evict to implement a specific scoring strategy. 

62 """ 

63 

64 __slots__ = ("_threadlocal", "max_size") 

65 

66 def __init__(self, max_size: int): 

67 if max_size <= 0: 

68 raise InvalidArgument("Cache size must be at least one.") 

69 

70 self.max_size = max_size 

71 

72 # Implementation: We store a binary heap of Entry objects in self.data, 

73 # with the heap property requiring that a parent's score is <= that of 

74 # its children. keys_to_index then maps keys to their index in the 

75 # heap. We keep these two in sync automatically - the heap is never 

76 # reordered without updating the index. 

77 self._threadlocal = threading.local() 

78 

79 @property 

80 def keys_to_indices(self) -> dict[K, int]: 

81 try: 

82 return self._threadlocal.keys_to_indices 

83 except AttributeError: 

84 self._threadlocal.keys_to_indices = {} 

85 return self._threadlocal.keys_to_indices 

86 

87 @property 

88 def data(self) -> list[Entry[K, V]]: 

89 try: 

90 return self._threadlocal.data 

91 except AttributeError: 

92 self._threadlocal.data = [] 

93 return self._threadlocal.data 

94 

95 def __len__(self) -> int: 

96 assert len(self.keys_to_indices) == len(self.data) 

97 return len(self.data) 

98 

99 def __contains__(self, key: K) -> bool: 

100 return key in self.keys_to_indices 

101 

102 def __getitem__(self, key: K) -> V: 

103 i = self.keys_to_indices[key] 

104 result = self.data[i] 

105 self.__entry_was_accessed(i) 

106 return result.value 

107 

108 def __setitem__(self, key: K, value: V) -> None: 

109 evicted = None 

110 try: 

111 i = self.keys_to_indices[key] 

112 except KeyError: 

113 entry = Entry(key, value, self.new_entry(key, value)) 

114 if len(self.data) >= self.max_size: 

115 evicted = self.data[0] 

116 if evicted.pins > 0: 

117 raise ValueError( 

118 "Cannot increase size of cache where all keys have been pinned." 

119 ) from None 

120 

121 # it's not clear to me how this can occur with a thread-local 

122 # cache, but we've seen failures here before (specifically under 

123 # the windows ci tests). 

124 try: 

125 del self.keys_to_indices[evicted.key] 

126 except KeyError: # pragma: no cover 

127 pass 

128 

129 i = 0 

130 self.data[0] = entry 

131 else: 

132 i = len(self.data) 

133 self.data.append(entry) 

134 self.keys_to_indices[key] = i 

135 self.__balance(i) 

136 else: 

137 entry = self.data[i] 

138 assert entry.key == key 

139 entry.value = value 

140 self.__entry_was_accessed(i) 

141 

142 if evicted is not None: 

143 if self.data[0] is not entry: 

144 assert evicted.sort_key <= self.data[0].sort_key 

145 self.on_evict(evicted.key, evicted.value, evicted.score) 

146 

147 def __iter__(self): 

148 return iter(self.keys_to_indices) 

149 

150 def pin(self, key: K, value: V) -> None: 

151 """Mark ``key`` as pinned (with the given value). That is, it may not 

152 be evicted until ``unpin(key)`` has been called. The same key may be 

153 pinned multiple times, possibly changing its value, and will not be 

154 unpinned until the same number of calls to unpin have been made. 

155 """ 

156 self[key] = value 

157 

158 i = self.keys_to_indices[key] 

159 entry = self.data[i] 

160 entry.pins += 1 

161 if entry.pins == 1: 

162 self.__balance(i) 

163 

164 def unpin(self, key: K) -> None: 

165 """Undo one previous call to ``pin(key)``. The value stays the same. 

166 Once all calls are undone this key may be evicted as normal.""" 

167 i = self.keys_to_indices[key] 

168 entry = self.data[i] 

169 if entry.pins == 0: 

170 raise ValueError(f"Key {key!r} has not been pinned") 

171 entry.pins -= 1 

172 if entry.pins == 0: 

173 self.__balance(i) 

174 

175 def is_pinned(self, key: K) -> bool: 

176 """Returns True if the key is currently pinned.""" 

177 i = self.keys_to_indices[key] 

178 return self.data[i].pins > 0 

179 

180 def clear(self) -> None: 

181 """Remove all keys, regardless of their pinned status.""" 

182 del self.data[:] 

183 self.keys_to_indices.clear() 

184 

185 def __repr__(self) -> str: 

186 return "{" + ", ".join(f"{e.key!r}: {e.value!r}" for e in self.data) + "}" 

187 

188 def new_entry(self, key: K, value: V) -> int: 

189 """Called when a key is written that does not currently appear in the 

190 map. 

191 

192 Returns the score to associate with the key. 

193 """ 

194 raise NotImplementedError 

195 

196 def on_access(self, key: K, value: V, score: Any) -> Any: 

197 """Called every time a key that is already in the map is read or 

198 written. 

199 

200 Returns the new score for the key. 

201 """ 

202 return score 

203 

204 def on_evict(self, key: K, value: V, score: Any) -> Any: 

205 """Called after a key has been evicted, with the score it had had at 

206 the point of eviction.""" 

207 

208 def check_valid(self) -> None: 

209 """Debugging method for use in tests. 

210 

211 Asserts that all of the cache's invariants hold. When everything 

212 is working correctly this should be an expensive no-op. 

213 """ 

214 assert len(self.keys_to_indices) == len(self.data) 

215 for i, e in enumerate(self.data): 

216 assert self.keys_to_indices[e.key] == i 

217 for j in [i * 2 + 1, i * 2 + 2]: 

218 if j < len(self.data): 

219 assert e.sort_key <= self.data[j].sort_key, self.data 

220 

221 def __entry_was_accessed(self, i: int) -> None: 

222 entry = self.data[i] 

223 new_score = self.on_access(entry.key, entry.value, entry.score) 

224 if new_score != entry.score: 

225 entry.score = new_score 

226 # changing the score of a pinned entry cannot unbalance the heap, as 

227 # we place all pinned entries after unpinned ones, regardless of score. 

228 if entry.pins == 0: 

229 self.__balance(i) 

230 

231 def __swap(self, i: int, j: int) -> None: 

232 assert i < j 

233 assert self.data[j].sort_key < self.data[i].sort_key 

234 self.data[i], self.data[j] = self.data[j], self.data[i] 

235 self.keys_to_indices[self.data[i].key] = i 

236 self.keys_to_indices[self.data[j].key] = j 

237 

238 def __balance(self, i: int) -> None: 

239 """When we have made a modification to the heap such that 

240 the heap property has been violated locally around i but previously 

241 held for all other indexes (and no other values have been modified), 

242 this fixes the heap so that the heap property holds everywhere.""" 

243 # bubble up (if score is too low for current position) 

244 while (parent := (i - 1) // 2) >= 0: 

245 if self.__out_of_order(parent, i): 

246 self.__swap(parent, i) 

247 i = parent 

248 else: 

249 break 

250 # or bubble down (if score is too high for current position) 

251 while children := [j for j in (2 * i + 1, 2 * i + 2) if j < len(self.data)]: 

252 smallest_child = min(children, key=lambda j: self.data[j].sort_key) 

253 if self.__out_of_order(i, smallest_child): 

254 self.__swap(i, smallest_child) 

255 i = smallest_child 

256 else: 

257 break 

258 

259 def __out_of_order(self, i: int, j: int) -> bool: 

260 """Returns True if the indices i, j are in the wrong order. 

261 

262 i must be the parent of j. 

263 """ 

264 assert i == (j - 1) // 2 

265 return self.data[j].sort_key < self.data[i].sort_key 

266 

267 

268class LRUReusedCache(GenericCache[K, V]): 

269 """The only concrete implementation of GenericCache we use outside of tests 

270 currently. 

271 

272 Adopts a modified least-recently used eviction policy: It evicts the key 

273 that has been used least recently, but it will always preferentially evict 

274 keys that have never been accessed after insertion. Among keys that have been 

275 accessed, it ignores the number of accesses. 

276 

277 This retains most of the benefits of an LRU cache, but adds an element of 

278 scan-resistance to the process: If we end up scanning through a large 

279 number of keys without reusing them, this does not evict the existing 

280 entries in preference for the new ones. 

281 """ 

282 

283 __slots__ = ("__tick",) 

284 

285 def __init__(self, max_size: int): 

286 super().__init__(max_size) 

287 self.__tick: int = 0 

288 

289 def tick(self) -> int: 

290 self.__tick += 1 

291 return self.__tick 

292 

293 def new_entry(self, key: K, value: V) -> Any: 

294 return (1, self.tick()) 

295 

296 def on_access(self, key: K, value: V, score: Any) -> Any: 

297 return (2, self.tick()) 

298 

299 

300class LRUCache(Generic[K, V]): 

301 """ 

302 This is a drop-in replacement for a GenericCache (despite the lack of inheritance) 

303 in performance critical environments. It turns out that GenericCache's heap 

304 balancing for arbitrary scores can be quite expensive compared to the doubly 

305 linked list approach of lru_cache or OrderedDict. 

306 

307 This class is a pure LRU and does not provide any sort of affininty towards 

308 the number of accesses beyond recency. If soft-pinning entries which have been 

309 accessed at least once is important, use LRUReusedCache. 

310 """ 

311 

312 # Here are some nice performance references for lru_cache vs OrderedDict: 

313 # https://github.com/python/cpython/issues/72426#issuecomment-1093727671 

314 # https://discuss.python.org/t/simplify-lru-cache/18192/6 

315 # 

316 # We use OrderedDict here because it is unclear to me we can provide the same 

317 # api as GenericCache using @lru_cache without messing with lru_cache internals. 

318 # 

319 # Anecdotally, OrderedDict seems quite competitive with lru_cache, but perhaps 

320 # that is localized to our access patterns. 

321 

322 def __init__(self, max_size: int) -> None: 

323 assert max_size > 0 

324 self.max_size = max_size 

325 self._threadlocal = threading.local() 

326 

327 @property 

328 def cache(self) -> OrderedDict[K, V]: 

329 try: 

330 return self._threadlocal.cache 

331 except AttributeError: 

332 self._threadlocal.cache = OrderedDict() 

333 return self._threadlocal.cache 

334 

335 def __setitem__(self, key: K, value: V) -> None: 

336 self.cache[key] = value 

337 self.cache.move_to_end(key) 

338 

339 while len(self.cache) > self.max_size: 

340 self.cache.popitem(last=False) 

341 

342 def __getitem__(self, key: K) -> V: 

343 val = self.cache[key] 

344 self.cache.move_to_end(key) 

345 return val 

346 

347 def __iter__(self): 

348 return iter(self.cache) 

349 

350 def __len__(self) -> int: 

351 return len(self.cache) 

352 

353 def __contains__(self, key: K) -> bool: 

354 return key in self.cache 

355 

356 # implement GenericCache interface, for tests 

357 def check_valid(self) -> None: 

358 pass