/rust/registry/src/index.crates.io-1949cf8c6b5b557f/moka-0.12.10/src/sync/segment.rs
Line | Count | Source |
1 | | use super::{cache::Cache, CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector}; |
2 | | use crate::common::concurrent::Weigher; |
3 | | use crate::common::time::Clock; |
4 | | use crate::{ |
5 | | common::HousekeeperConfig, |
6 | | notification::EvictionListener, |
7 | | policy::{EvictionPolicy, ExpirationPolicy}, |
8 | | sync_base::iter::{Iter, ScanningGet}, |
9 | | Entry, Policy, PredicateError, |
10 | | }; |
11 | | |
12 | | use std::{ |
13 | | borrow::Borrow, |
14 | | collections::hash_map::RandomState, |
15 | | fmt, |
16 | | hash::{BuildHasher, Hash, Hasher}, |
17 | | sync::Arc, |
18 | | }; |
19 | | |
20 | | /// A thread-safe concurrent in-memory cache, with multiple internal segments. |
21 | | /// |
22 | | /// `SegmentedCache` has multiple internal [`Cache`][cache-struct] instances for |
23 | | /// increased concurrent update performance. However, it has little overheads on |
24 | | /// retrievals and updates for managing these segments. |
25 | | /// |
26 | | /// For usage examples, see the document of the [`Cache`][cache-struct]. |
27 | | /// |
28 | | /// [cache-struct]: ./struct.Cache.html |
29 | | /// |
30 | | pub struct SegmentedCache<K, V, S = RandomState> { |
31 | | inner: Arc<Inner<K, V, S>>, |
32 | | } |
33 | | |
34 | | // TODO: https://github.com/moka-rs/moka/issues/54 |
35 | | #[allow(clippy::non_send_fields_in_send_ty)] |
36 | | unsafe impl<K, V, S> Send for SegmentedCache<K, V, S> |
37 | | where |
38 | | K: Send + Sync, |
39 | | V: Send + Sync, |
40 | | S: Send, |
41 | | { |
42 | | } |
43 | | |
44 | | unsafe impl<K, V, S> Sync for SegmentedCache<K, V, S> |
45 | | where |
46 | | K: Send + Sync, |
47 | | V: Send + Sync, |
48 | | S: Sync, |
49 | | { |
50 | | } |
51 | | |
52 | | impl<K, V, S> Clone for SegmentedCache<K, V, S> { |
53 | | /// Makes a clone of this shared cache. |
54 | | /// |
55 | | /// This operation is cheap as it only creates thread-safe reference counted |
56 | | /// pointers to the shared internal data structures. |
57 | 0 | fn clone(&self) -> Self { |
58 | 0 | Self { |
59 | 0 | inner: Arc::clone(&self.inner), |
60 | 0 | } |
61 | 0 | } |
62 | | } |
63 | | |
64 | | impl<K, V, S> fmt::Debug for SegmentedCache<K, V, S> |
65 | | where |
66 | | K: fmt::Debug + Eq + Hash + Send + Sync + 'static, |
67 | | V: fmt::Debug + Clone + Send + Sync + 'static, |
68 | | // TODO: Remove these bounds from S. |
69 | | S: BuildHasher + Clone + Send + Sync + 'static, |
70 | | { |
71 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
72 | 0 | let mut d_map = f.debug_map(); |
73 | | |
74 | 0 | for (k, v) in self { |
75 | 0 | d_map.entry(&k, &v); |
76 | 0 | } |
77 | | |
78 | 0 | d_map.finish() |
79 | 0 | } |
80 | | } |
81 | | |
82 | | impl<K, V> SegmentedCache<K, V, RandomState> |
83 | | where |
84 | | K: Hash + Eq + Send + Sync + 'static, |
85 | | V: Clone + Send + Sync + 'static, |
86 | | { |
87 | | /// Constructs a new `SegmentedCache<K, V>` that has multiple internal |
88 | | /// segments and will store up to the `max_capacity`. |
89 | | /// |
90 | | /// To adjust various configuration knobs such as `initial_capacity` or |
91 | | /// `time_to_live`, use the [`CacheBuilder`][builder-struct]. |
92 | | /// |
93 | | /// [builder-struct]: ./struct.CacheBuilder.html |
94 | | /// |
95 | | /// # Panics |
96 | | /// |
97 | | /// Panics if `num_segments` is 0. |
98 | 0 | pub fn new(max_capacity: u64, num_segments: usize) -> Self { |
99 | 0 | let build_hasher = RandomState::default(); |
100 | 0 | Self::with_everything( |
101 | 0 | None, |
102 | 0 | Some(max_capacity), |
103 | 0 | None, |
104 | 0 | num_segments, |
105 | 0 | build_hasher, |
106 | 0 | None, |
107 | 0 | EvictionPolicy::default(), |
108 | 0 | None, |
109 | 0 | ExpirationPolicy::default(), |
110 | 0 | HousekeeperConfig::default(), |
111 | | false, |
112 | 0 | Clock::default(), |
113 | | ) |
114 | 0 | } |
115 | | |
116 | | /// Returns a [`CacheBuilder`][builder-struct], which can builds a |
117 | | /// `SegmentedCache` with various configuration knobs. |
118 | | /// |
119 | | /// [builder-struct]: ./struct.CacheBuilder.html |
120 | 0 | pub fn builder(num_segments: usize) -> CacheBuilder<K, V, SegmentedCache<K, V, RandomState>> { |
121 | 0 | CacheBuilder::default().segments(num_segments) |
122 | 0 | } |
123 | | } |
124 | | |
125 | | impl<K, V, S> SegmentedCache<K, V, S> { |
126 | | /// Returns cache’s name. |
127 | 0 | pub fn name(&self) -> Option<&str> { |
128 | 0 | self.inner.segments[0].name() |
129 | 0 | } |
130 | | |
131 | | /// Returns a read-only cache policy of this cache. |
132 | | /// |
133 | | /// At this time, cache policy cannot be modified after cache creation. |
134 | | /// A future version may support to modify it. |
135 | 0 | pub fn policy(&self) -> Policy { |
136 | 0 | let mut policy = self.inner.segments[0].policy(); |
137 | 0 | policy.set_max_capacity(self.inner.desired_capacity); |
138 | 0 | policy.set_num_segments(self.inner.segments.len()); |
139 | 0 | policy |
140 | 0 | } |
141 | | |
142 | | /// Returns an approximate number of entries in this cache. |
143 | | /// |
144 | | /// The value returned is _an estimate_; the actual count may differ if there are |
145 | | /// concurrent insertions or removals, or if some entries are pending removal due |
146 | | /// to expiration. This inaccuracy can be mitigated by performing a `sync()` |
147 | | /// first. |
148 | | /// |
149 | | /// # Example |
150 | | /// |
151 | | /// ```rust |
152 | | /// use moka::sync::SegmentedCache; |
153 | | /// |
154 | | /// let cache = SegmentedCache::new(10, 4); |
155 | | /// cache.insert('n', "Netherland Dwarf"); |
156 | | /// cache.insert('l', "Lop Eared"); |
157 | | /// cache.insert('d', "Dutch"); |
158 | | /// |
159 | | /// // Ensure an entry exists. |
160 | | /// assert!(cache.contains_key(&'n')); |
161 | | /// |
162 | | /// // However, followings may print stale number zeros instead of threes. |
163 | | /// println!("{}", cache.entry_count()); // -> 0 |
164 | | /// println!("{}", cache.weighted_size()); // -> 0 |
165 | | /// |
166 | | /// // To mitigate the inaccuracy, call `run_pending_tasks` method to run |
167 | | /// // pending internal tasks. |
168 | | /// cache.run_pending_tasks(); |
169 | | /// |
170 | | /// // Followings will print the actual numbers. |
171 | | /// println!("{}", cache.entry_count()); // -> 3 |
172 | | /// println!("{}", cache.weighted_size()); // -> 3 |
173 | | /// ``` |
174 | | /// |
175 | 0 | pub fn entry_count(&self) -> u64 { |
176 | 0 | self.inner |
177 | 0 | .segments |
178 | 0 | .iter() |
179 | 0 | .map(|seg| seg.entry_count()) |
180 | 0 | .sum() |
181 | 0 | } |
182 | | |
183 | | /// Returns an approximate total weighted size of entries in this cache. |
184 | | /// |
185 | | /// The value returned is _an estimate_; the actual size may differ if there are |
186 | | /// concurrent insertions or removals, or if some entries are pending removal due |
187 | | /// to expiration. This inaccuracy can be mitigated by performing a `sync()` |
188 | | /// first. See [`entry_count`](#method.entry_count) for a sample code. |
189 | 0 | pub fn weighted_size(&self) -> u64 { |
190 | 0 | self.inner |
191 | 0 | .segments |
192 | 0 | .iter() |
193 | 0 | .map(|seg| seg.weighted_size()) |
194 | 0 | .sum() |
195 | 0 | } |
196 | | } |
197 | | |
198 | | impl<K, V, S> SegmentedCache<K, V, S> |
199 | | where |
200 | | K: Hash + Eq + Send + Sync + 'static, |
201 | | V: Clone + Send + Sync + 'static, |
202 | | S: BuildHasher + Clone + Send + Sync + 'static, |
203 | | { |
204 | | /// # Panics |
205 | | /// |
206 | | /// Panics if `num_segments` is 0. |
207 | | #[allow(clippy::too_many_arguments)] |
208 | 0 | pub(crate) fn with_everything( |
209 | 0 | name: Option<String>, |
210 | 0 | max_capacity: Option<u64>, |
211 | 0 | initial_capacity: Option<usize>, |
212 | 0 | num_segments: usize, |
213 | 0 | build_hasher: S, |
214 | 0 | weigher: Option<Weigher<K, V>>, |
215 | 0 | eviction_policy: EvictionPolicy, |
216 | 0 | eviction_listener: Option<EvictionListener<K, V>>, |
217 | 0 | expiration_policy: ExpirationPolicy<K, V>, |
218 | 0 | housekeeper_config: HousekeeperConfig, |
219 | 0 | invalidator_enabled: bool, |
220 | 0 | clock: Clock, |
221 | 0 | ) -> Self { |
222 | 0 | Self { |
223 | 0 | inner: Arc::new(Inner::new( |
224 | 0 | name, |
225 | 0 | max_capacity, |
226 | 0 | initial_capacity, |
227 | 0 | num_segments, |
228 | 0 | build_hasher, |
229 | 0 | weigher, |
230 | 0 | eviction_policy, |
231 | 0 | eviction_listener, |
232 | 0 | expiration_policy, |
233 | 0 | housekeeper_config, |
234 | 0 | invalidator_enabled, |
235 | 0 | clock, |
236 | 0 | )), |
237 | 0 | } |
238 | 0 | } |
239 | | |
240 | | /// Returns `true` if the cache contains a value for the key. |
241 | | /// |
242 | | /// Unlike the `get` method, this method is not considered a cache read operation, |
243 | | /// so it does not update the historic popularity estimator or reset the idle |
244 | | /// timer for the key. |
245 | | /// |
246 | | /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq` |
247 | | /// on the borrowed form _must_ match those for the key type. |
248 | 0 | pub fn contains_key<Q>(&self, key: &Q) -> bool |
249 | 0 | where |
250 | 0 | K: Borrow<Q>, |
251 | 0 | Q: Hash + Eq + ?Sized, |
252 | | { |
253 | 0 | let hash = self.inner.hash(key); |
254 | 0 | self.inner.select(hash).contains_key_with_hash(key, hash) |
255 | 0 | } |
256 | | |
257 | | /// Returns a _clone_ of the value corresponding to the key. |
258 | | /// |
259 | | /// If you want to store values that will be expensive to clone, wrap them by |
260 | | /// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a |
261 | | /// thread-safe reference-counted pointer and its `clone()` method is cheap. |
262 | | /// |
263 | | /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq` |
264 | | /// on the borrowed form _must_ match those for the key type. |
265 | | /// |
266 | | /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html |
267 | 0 | pub fn get<Q>(&self, key: &Q) -> Option<V> |
268 | 0 | where |
269 | 0 | K: Borrow<Q>, |
270 | 0 | Q: Hash + Eq + ?Sized, |
271 | | { |
272 | 0 | let hash = self.inner.hash(key); |
273 | 0 | self.inner |
274 | 0 | .select(hash) |
275 | 0 | .get_with_hash(key, hash, false) |
276 | 0 | .map(Entry::into_value) |
277 | 0 | } |
278 | | |
279 | 0 | pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S> |
280 | 0 | where |
281 | 0 | K: Hash + Eq, |
282 | | { |
283 | 0 | let hash = self.inner.hash(&key); |
284 | 0 | let cache = self.inner.select(hash); |
285 | 0 | OwnedKeyEntrySelector::new(key, hash, cache) |
286 | 0 | } |
287 | | |
288 | 0 | pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S> |
289 | 0 | where |
290 | 0 | K: Borrow<Q>, |
291 | 0 | Q: ToOwned<Owned = K> + Hash + Eq + ?Sized, |
292 | | { |
293 | 0 | let hash = self.inner.hash(key); |
294 | 0 | let cache = self.inner.select(hash); |
295 | 0 | RefKeyEntrySelector::new(key, hash, cache) |
296 | 0 | } |
297 | | |
298 | | /// TODO: Remove this in v0.13.0. |
299 | | /// Deprecated, replaced with [`get_with`](#method.get_with) |
300 | | #[deprecated(since = "0.8.0", note = "Replaced with `get_with`")] |
301 | 0 | pub fn get_or_insert_with(&self, key: K, init: impl FnOnce() -> V) -> V { |
302 | 0 | self.get_with(key, init) |
303 | 0 | } |
304 | | |
305 | | /// TODO: Remove this in v0.13.0. |
306 | | /// Deprecated, replaced with [`try_get_with`](#method.try_get_with) |
307 | | #[deprecated(since = "0.8.0", note = "Replaced with `try_get_with`")] |
308 | 0 | pub fn get_or_try_insert_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>> |
309 | 0 | where |
310 | 0 | F: FnOnce() -> Result<V, E>, |
311 | 0 | E: Send + Sync + 'static, |
312 | | { |
313 | 0 | self.try_get_with(key, init) |
314 | 0 | } |
315 | | |
316 | | /// Returns a _clone_ of the value corresponding to the key. If the value does |
317 | | /// not exist, evaluates the `init` closure and inserts the output. |
318 | | /// |
319 | | /// # Concurrent calls on the same key |
320 | | /// |
321 | | /// This method guarantees that concurrent calls on the same not-existing key are |
322 | | /// coalesced into one evaluation of the `init` closure. Only one of the calls |
323 | | /// evaluates its closure, and other calls wait for that closure to complete. See |
324 | | /// [`Cache::get_with`][get-with-method] for more details. |
325 | | /// |
326 | | /// [get-with-method]: ./struct.Cache.html#method.get_with |
327 | 0 | pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V { |
328 | 0 | let hash = self.inner.hash(&key); |
329 | 0 | let key = Arc::new(key); |
330 | 0 | let replace_if = None as Option<fn(&V) -> bool>; |
331 | 0 | self.inner |
332 | 0 | .select(hash) |
333 | 0 | .get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false) |
334 | 0 | .into_value() |
335 | 0 | } |
336 | | |
337 | | /// Similar to [`get_with`](#method.get_with), but instead of passing an owned |
338 | | /// key, you can pass a reference to the key. If the key does not exist in the |
339 | | /// cache, the key will be cloned to create new entry in the cache. |
340 | 0 | pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V |
341 | 0 | where |
342 | 0 | K: Borrow<Q>, |
343 | 0 | Q: ToOwned<Owned = K> + Hash + Eq + ?Sized, |
344 | | { |
345 | 0 | let hash = self.inner.hash(key); |
346 | 0 | let replace_if = None as Option<fn(&V) -> bool>; |
347 | 0 | self.inner |
348 | 0 | .select(hash) |
349 | 0 | .get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false) |
350 | 0 | .into_value() |
351 | 0 | } |
352 | | |
353 | | /// Works like [`get_with`](#method.get_with), but takes an additional |
354 | | /// `replace_if` closure. |
355 | | /// |
356 | | /// This method will evaluate the `init` closure and insert the output to the |
357 | | /// cache when: |
358 | | /// |
359 | | /// - The key does not exist. |
360 | | /// - Or, `replace_if` closure returns `true`. |
361 | 0 | pub fn get_with_if( |
362 | 0 | &self, |
363 | 0 | key: K, |
364 | 0 | init: impl FnOnce() -> V, |
365 | 0 | replace_if: impl FnMut(&V) -> bool, |
366 | 0 | ) -> V { |
367 | 0 | let hash = self.inner.hash(&key); |
368 | 0 | let key = Arc::new(key); |
369 | 0 | self.inner |
370 | 0 | .select(hash) |
371 | 0 | .get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false) |
372 | 0 | .into_value() |
373 | 0 | } |
374 | | |
375 | | /// Returns a _clone_ of the value corresponding to the key. If the value does |
376 | | /// not exist, evaluates the `init` closure, and inserts the value if |
377 | | /// `Some(value)` was returned. If `None` was returned from the closure, this |
378 | | /// method does not insert a value and returns `None`. |
379 | | /// |
380 | | /// # Concurrent calls on the same key |
381 | | /// |
382 | | /// This method guarantees that concurrent calls on the same not-existing key are |
383 | | /// coalesced into one evaluation of the `init` closure. Only one of the calls |
384 | | /// evaluates its closure, and other calls wait for that closure to complete. |
385 | | /// See [`Cache::optionally_get_with`][opt-get-with-method] for more details. |
386 | | /// |
387 | | /// [opt-get-with-method]: ./struct.Cache.html#method.optionally_get_with |
388 | 0 | pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V> |
389 | 0 | where |
390 | 0 | F: FnOnce() -> Option<V>, |
391 | | { |
392 | 0 | let hash = self.inner.hash(&key); |
393 | 0 | let key = Arc::new(key); |
394 | 0 | self.inner |
395 | 0 | .select(hash) |
396 | 0 | .get_or_optionally_insert_with_hash_and_fun(key, hash, init, false) |
397 | 0 | .map(Entry::into_value) |
398 | 0 | } |
399 | | |
400 | | /// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead |
401 | | /// of passing an owned key, you can pass a reference to the key. If the key does |
402 | | /// not exist in the cache, the key will be cloned to create new entry in the |
403 | | /// cache. |
404 | 0 | pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V> |
405 | 0 | where |
406 | 0 | F: FnOnce() -> Option<V>, |
407 | 0 | K: Borrow<Q>, |
408 | 0 | Q: ToOwned<Owned = K> + Hash + Eq + ?Sized, |
409 | | { |
410 | 0 | let hash = self.inner.hash(key); |
411 | 0 | self.inner |
412 | 0 | .select(hash) |
413 | 0 | .get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false) |
414 | 0 | .map(Entry::into_value) |
415 | 0 | } |
416 | | |
417 | | /// Returns a _clone_ of the value corresponding to the key. If the value does |
418 | | /// not exist, evaluates the `init` closure, and inserts the value if `Ok(value)` |
419 | | /// was returned. If `Err(_)` was returned from the closure, this method does not |
420 | | /// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc]. |
421 | | /// |
422 | | /// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html |
423 | | /// |
424 | | /// # Concurrent calls on the same key |
425 | | /// |
426 | | /// This method guarantees that concurrent calls on the same not-existing key are |
427 | | /// coalesced into one evaluation of the `init` closure (as long as these |
428 | | /// closures return the same error type). Only one of the calls evaluates its |
429 | | /// closure, and other calls wait for that closure to complete. See |
430 | | /// [`Cache::try_get_with`][try-get-with-method] for more details. |
431 | | /// |
432 | | /// [try-get-with-method]: ./struct.Cache.html#method.try_get_with |
433 | 0 | pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>> |
434 | 0 | where |
435 | 0 | F: FnOnce() -> Result<V, E>, |
436 | 0 | E: Send + Sync + 'static, |
437 | | { |
438 | 0 | let hash = self.inner.hash(&key); |
439 | 0 | let key = Arc::new(key); |
440 | 0 | self.inner |
441 | 0 | .select(hash) |
442 | 0 | .get_or_try_insert_with_hash_and_fun(key, hash, init, false) |
443 | 0 | .map(Entry::into_value) |
444 | 0 | } |
445 | | |
446 | | /// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an |
447 | | /// owned key, you can pass a reference to the key. If the key does not exist in |
448 | | /// the cache, the key will be cloned to create new entry in the cache. |
449 | 0 | pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>> |
450 | 0 | where |
451 | 0 | F: FnOnce() -> Result<V, E>, |
452 | 0 | E: Send + Sync + 'static, |
453 | 0 | K: Borrow<Q>, |
454 | 0 | Q: ToOwned<Owned = K> + Hash + Eq + ?Sized, |
455 | | { |
456 | 0 | let hash = self.inner.hash(key); |
457 | 0 | self.inner |
458 | 0 | .select(hash) |
459 | 0 | .get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false) |
460 | 0 | .map(Entry::into_value) |
461 | 0 | } |
462 | | |
463 | | /// Inserts a key-value pair into the cache. |
464 | | /// |
465 | | /// If the cache has this key present, the value is updated. |
466 | 0 | pub fn insert(&self, key: K, value: V) { |
467 | 0 | let hash = self.inner.hash(&key); |
468 | 0 | let key = Arc::new(key); |
469 | 0 | self.inner.select(hash).insert_with_hash(key, hash, value); |
470 | 0 | } |
471 | | |
472 | | /// Discards any cached value for the key. |
473 | | /// |
474 | | /// If you need to get a the value that has been discarded, use the |
475 | | /// [`remove`](#method.remove) method instead. |
476 | | /// |
477 | | /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq` |
478 | | /// on the borrowed form _must_ match those for the key type. |
479 | 0 | pub fn invalidate<Q>(&self, key: &Q) |
480 | 0 | where |
481 | 0 | K: Borrow<Q>, |
482 | 0 | Q: Hash + Eq + ?Sized, |
483 | | { |
484 | 0 | let hash = self.inner.hash(key); |
485 | 0 | self.inner |
486 | 0 | .select(hash) |
487 | 0 | .invalidate_with_hash(key, hash, false); |
488 | 0 | } |
489 | | |
490 | | /// Discards any cached value for the key and returns a clone of the value. |
491 | | /// |
492 | | /// If you do not need to get the value that has been discarded, use the |
493 | | /// [`invalidate`](#method.invalidate) method instead. |
494 | | /// |
495 | | /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq` |
496 | | /// on the borrowed form _must_ match those for the key type. |
497 | 0 | pub fn remove<Q>(&self, key: &Q) -> Option<V> |
498 | 0 | where |
499 | 0 | K: Borrow<Q>, |
500 | 0 | Q: Hash + Eq + ?Sized, |
501 | | { |
502 | 0 | let hash = self.inner.hash(key); |
503 | 0 | self.inner |
504 | 0 | .select(hash) |
505 | 0 | .invalidate_with_hash(key, hash, true) |
506 | 0 | } |
507 | | |
508 | | /// Discards all cached values. |
509 | | /// |
510 | | /// This method returns immediately by just setting the current time as the |
511 | | /// invalidation time. `get` and other retrieval methods are guaranteed not to |
512 | | /// return the entries inserted before or at the invalidation time. |
513 | | /// |
514 | | /// The actual removal of the invalidated entries is done as a maintenance task |
515 | | /// driven by a user thread. For more details, see |
516 | | /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate |
517 | | /// level documentation. |
518 | | /// |
519 | | /// Like the `invalidate` method, this method does not clear the historic |
520 | | /// popularity estimator of keys so that it retains the client activities of |
521 | | /// trying to retrieve an item. |
522 | 0 | pub fn invalidate_all(&self) { |
523 | 0 | for segment in self.inner.segments.iter() { |
524 | 0 | segment.invalidate_all(); |
525 | 0 | } |
526 | 0 | } |
527 | | |
528 | | /// Discards cached values that satisfy a predicate. |
529 | | /// |
530 | | /// `invalidate_entries_if` takes a closure that returns `true` or `false`. The |
531 | | /// closure is called against each cached entry inserted before or at the time |
532 | | /// when this method was called. If the closure returns `true` that entry will be |
533 | | /// evicted from the cache. |
534 | | /// |
535 | | /// This method returns immediately by not actually removing the invalidated |
536 | | /// entries. Instead, it just sets the predicate to the cache with the time when |
537 | | /// this method was called. The actual removal of the invalidated entries is done |
538 | | /// as a maintenance task driven by a user thread. For more details, see |
539 | | /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate |
540 | | /// level documentation. |
541 | | /// |
542 | | /// Also the `get` and other retrieval methods will apply the closure to a cached |
543 | | /// entry to determine if it should have been invalidated. Therefore, it is |
544 | | /// guaranteed that these methods must not return invalidated values. |
545 | | /// |
546 | | /// Note that you must call |
547 | | /// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures] |
548 | | /// at the cache creation time as the cache needs to maintain additional internal |
549 | | /// data structures to support this method. Otherwise, calling this method will |
550 | | /// fail with a |
551 | | /// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error]. |
552 | | /// |
553 | | /// Like the `invalidate` method, this method does not clear the historic |
554 | | /// popularity estimator of keys so that it retains the client activities of |
555 | | /// trying to retrieve an item. |
556 | | /// |
557 | | /// [support-invalidation-closures]: |
558 | | /// ./struct.CacheBuilder.html#method.support_invalidation_closures |
559 | | /// [invalidation-disabled-error]: |
560 | | /// ../enum.PredicateError.html#variant.InvalidationClosuresDisabled |
561 | 0 | pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<(), PredicateError> |
562 | 0 | where |
563 | 0 | F: Fn(&K, &V) -> bool + Send + Sync + 'static, |
564 | | { |
565 | 0 | let pred = Arc::new(predicate); |
566 | 0 | for segment in self.inner.segments.iter() { |
567 | 0 | segment.invalidate_entries_with_arc_fun(Arc::clone(&pred))?; |
568 | | } |
569 | 0 | Ok(()) |
570 | 0 | } |
571 | | |
572 | | /// Creates an iterator visiting all key-value pairs in arbitrary order. The |
573 | | /// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored |
574 | | /// value. |
575 | | /// |
576 | | /// Iterators do not block concurrent reads and writes on the cache. An entry can |
577 | | /// be inserted to, invalidated or evicted from a cache while iterators are alive |
578 | | /// on the same cache. |
579 | | /// |
580 | | /// Unlike the `get` method, visiting entries via an iterator do not update the |
581 | | /// historic popularity estimator or reset idle timers for keys. |
582 | | /// |
583 | | /// # Guarantees |
584 | | /// |
585 | | /// In order to allow concurrent access to the cache, iterator's `next` method |
586 | | /// does _not_ guarantee the following: |
587 | | /// |
588 | | /// - It does not guarantee to return a key-value pair (an entry) if its key has |
589 | | /// been inserted to the cache _after_ the iterator was created. |
590 | | /// - Such an entry may or may not be returned depending on key's hash and |
591 | | /// timing. |
592 | | /// |
593 | | /// and the `next` method guarantees the followings: |
594 | | /// |
595 | | /// - It guarantees not to return the same entry more than once. |
596 | | /// - It guarantees not to return an entry if it has been removed from the cache |
597 | | /// after the iterator was created. |
598 | | /// - Note: An entry can be removed by following reasons: |
599 | | /// - Manually invalidated. |
600 | | /// - Expired (e.g. time-to-live). |
601 | | /// - Evicted as the cache capacity exceeded. |
602 | | /// |
603 | | /// # Examples |
604 | | /// |
605 | | /// ```rust |
606 | | /// use moka::sync::SegmentedCache; |
607 | | /// |
608 | | /// let cache = SegmentedCache::new(100, 4); |
609 | | /// cache.insert("Julia", 14); |
610 | | /// |
611 | | /// let mut iter = cache.iter(); |
612 | | /// let (k, v) = iter.next().unwrap(); // (Arc<K>, V) |
613 | | /// assert_eq!(*k, "Julia"); |
614 | | /// assert_eq!(v, 14); |
615 | | /// |
616 | | /// assert!(iter.next().is_none()); |
617 | | /// ``` |
618 | | /// |
619 | 0 | pub fn iter(&self) -> Iter<'_, K, V> { |
620 | 0 | let num_cht_segments = self.inner.segments[0].num_cht_segments(); |
621 | 0 | let segments = self |
622 | 0 | .inner |
623 | 0 | .segments |
624 | 0 | .iter() |
625 | 0 | .map(|c| c as &dyn ScanningGet<_, _>) |
626 | 0 | .collect::<Vec<_>>() |
627 | 0 | .into_boxed_slice(); |
628 | 0 | Iter::with_multiple_cache_segments(segments, num_cht_segments) |
629 | 0 | } |
630 | | |
631 | | /// Performs any pending maintenance operations needed by the cache. |
632 | 0 | pub fn run_pending_tasks(&self) { |
633 | 0 | for segment in self.inner.segments.iter() { |
634 | 0 | segment.run_pending_tasks(); |
635 | 0 | } |
636 | 0 | } |
637 | | |
638 | | // /// This is used by unit tests to get consistent result. |
639 | | // #[cfg(test)] |
640 | | // pub(crate) fn reconfigure_for_testing(&mut self) { |
641 | | // // Stop the housekeeping job that may cause sync() method to return earlier. |
642 | | // for segment in self.inner.segments.iter_mut() { |
643 | | // segment.reconfigure_for_testing() |
644 | | // } |
645 | | // } |
646 | | } |
647 | | |
648 | | impl<'a, K, V, S> IntoIterator for &'a SegmentedCache<K, V, S> |
649 | | where |
650 | | K: Hash + Eq + Send + Sync + 'static, |
651 | | V: Clone + Send + Sync + 'static, |
652 | | S: BuildHasher + Clone + Send + Sync + 'static, |
653 | | { |
654 | | type Item = (Arc<K>, V); |
655 | | |
656 | | type IntoIter = Iter<'a, K, V>; |
657 | | |
658 | 0 | fn into_iter(self) -> Self::IntoIter { |
659 | 0 | self.iter() |
660 | 0 | } |
661 | | } |
662 | | |
663 | | // For unit tests. |
664 | | #[cfg(test)] |
665 | | impl<K, V, S> SegmentedCache<K, V, S> { |
666 | | fn is_waiter_map_empty(&self) -> bool { |
667 | | self.inner.segments.iter().all(Cache::is_waiter_map_empty) |
668 | | } |
669 | | } |
670 | | |
671 | | #[cfg(test)] |
672 | | impl<K, V, S> SegmentedCache<K, V, S> |
673 | | where |
674 | | K: Hash + Eq + Send + Sync + 'static, |
675 | | V: Clone + Send + Sync + 'static, |
676 | | S: BuildHasher + Clone + Send + Sync + 'static, |
677 | | { |
678 | | fn invalidation_predicate_count(&self) -> usize { |
679 | | self.inner |
680 | | .segments |
681 | | .iter() |
682 | | .map(|seg| seg.invalidation_predicate_count()) |
683 | | .sum() |
684 | | } |
685 | | |
686 | | fn reconfigure_for_testing(&mut self) { |
687 | | let inner = Arc::get_mut(&mut self.inner) |
688 | | .expect("There are other strong reference to self.inner Arc"); |
689 | | |
690 | | for segment in inner.segments.iter_mut() { |
691 | | segment.reconfigure_for_testing(); |
692 | | } |
693 | | } |
694 | | |
695 | | fn key_locks_map_is_empty(&self) -> bool { |
696 | | self.inner |
697 | | .segments |
698 | | .iter() |
699 | | .all(|seg| seg.key_locks_map_is_empty()) |
700 | | } |
701 | | } |
702 | | |
703 | | struct Inner<K, V, S> { |
704 | | desired_capacity: Option<u64>, |
705 | | segments: Box<[Cache<K, V, S>]>, |
706 | | build_hasher: S, |
707 | | segment_shift: u32, |
708 | | } |
709 | | |
710 | | impl<K, V, S> Inner<K, V, S> |
711 | | where |
712 | | K: Hash + Eq + Send + Sync + 'static, |
713 | | V: Clone + Send + Sync + 'static, |
714 | | S: BuildHasher + Clone + Send + Sync + 'static, |
715 | | { |
716 | | /// # Panics |
717 | | /// |
718 | | /// Panics if `num_segments` is 0. |
719 | | #[allow(clippy::too_many_arguments)] |
720 | 0 | fn new( |
721 | 0 | name: Option<String>, |
722 | 0 | max_capacity: Option<u64>, |
723 | 0 | initial_capacity: Option<usize>, |
724 | 0 | num_segments: usize, |
725 | 0 | build_hasher: S, |
726 | 0 | weigher: Option<Weigher<K, V>>, |
727 | 0 | eviction_policy: EvictionPolicy, |
728 | 0 | eviction_listener: Option<EvictionListener<K, V>>, |
729 | 0 | expiration_policy: ExpirationPolicy<K, V>, |
730 | 0 | housekeeper_config: HousekeeperConfig, |
731 | 0 | invalidator_enabled: bool, |
732 | 0 | clock: Clock, |
733 | 0 | ) -> Self { |
734 | 0 | assert!(num_segments > 0); |
735 | | |
736 | 0 | let actual_num_segments = num_segments.next_power_of_two(); |
737 | 0 | let segment_shift = 64 - actual_num_segments.trailing_zeros(); |
738 | 0 | let seg_max_capacity = |
739 | 0 | max_capacity.map(|n| (n as f64 / actual_num_segments as f64).ceil() as u64); |
740 | 0 | let seg_init_capacity = |
741 | 0 | initial_capacity.map(|cap| (cap as f64 / actual_num_segments as f64).ceil() as usize); |
742 | | // NOTE: We cannot initialize the segments as `vec![cache; actual_num_segments]` |
743 | | // because Cache::clone() does not clone its inner but shares the same inner. |
744 | 0 | let segments = (0..actual_num_segments) |
745 | 0 | .map(|_| { |
746 | 0 | Cache::with_everything( |
747 | 0 | name.clone(), |
748 | 0 | seg_max_capacity, |
749 | 0 | seg_init_capacity, |
750 | 0 | build_hasher.clone(), |
751 | 0 | weigher.clone(), |
752 | 0 | eviction_policy.clone(), |
753 | 0 | eviction_listener.clone(), |
754 | 0 | expiration_policy.clone(), |
755 | 0 | housekeeper_config.clone(), |
756 | 0 | invalidator_enabled, |
757 | 0 | clock.clone(), |
758 | | ) |
759 | 0 | }) |
760 | 0 | .collect::<Vec<_>>(); |
761 | | |
762 | 0 | Self { |
763 | 0 | desired_capacity: max_capacity, |
764 | 0 | segments: segments.into_boxed_slice(), |
765 | 0 | build_hasher, |
766 | 0 | segment_shift, |
767 | 0 | } |
768 | 0 | } |
769 | | |
770 | | #[inline] |
771 | 0 | fn hash<Q>(&self, key: &Q) -> u64 |
772 | 0 | where |
773 | 0 | K: Borrow<Q>, |
774 | 0 | Q: Hash + Eq + ?Sized, |
775 | | { |
776 | 0 | let mut hasher = self.build_hasher.build_hasher(); |
777 | 0 | key.hash(&mut hasher); |
778 | 0 | hasher.finish() |
779 | 0 | } |
780 | | |
781 | | #[inline] |
782 | 0 | fn select(&self, hash: u64) -> &Cache<K, V, S> { |
783 | 0 | let index = self.segment_index_from_hash(hash); |
784 | 0 | &self.segments[index] |
785 | 0 | } |
786 | | |
787 | | #[inline] |
788 | 0 | fn segment_index_from_hash(&self, hash: u64) -> usize { |
789 | 0 | if self.segment_shift == 64 { |
790 | 0 | 0 |
791 | | } else { |
792 | 0 | (hash >> self.segment_shift) as usize |
793 | | } |
794 | 0 | } |
795 | | } |
796 | | |
797 | | #[cfg(test)] |
798 | | mod tests { |
799 | | use super::SegmentedCache; |
800 | | use crate::notification::RemovalCause; |
801 | | use parking_lot::Mutex; |
802 | | use std::{sync::Arc, time::Duration}; |
803 | | |
804 | | #[test] |
805 | | fn max_capacity_zero() { |
806 | | let mut cache = SegmentedCache::new(0, 1); |
807 | | cache.reconfigure_for_testing(); |
808 | | |
809 | | // Make the cache exterior immutable. |
810 | | let cache = cache; |
811 | | |
812 | | cache.insert(0, ()); |
813 | | |
814 | | assert!(!cache.contains_key(&0)); |
815 | | assert!(cache.get(&0).is_none()); |
816 | | cache.run_pending_tasks(); |
817 | | assert!(!cache.contains_key(&0)); |
818 | | assert!(cache.get(&0).is_none()); |
819 | | assert_eq!(cache.entry_count(), 0) |
820 | | } |
821 | | |
822 | | #[test] |
823 | | fn basic_single_thread() { |
824 | | // The following `Vec`s will hold actual and expected notifications. |
825 | | let actual = Arc::new(Mutex::new(Vec::new())); |
826 | | let mut expected = Vec::new(); |
827 | | |
828 | | // Create an eviction listener. |
829 | | let a1 = Arc::clone(&actual); |
830 | | let listener = move |k, v, cause| a1.lock().push((k, v, cause)); |
831 | | |
832 | | // Create a cache with the eviction listener. |
833 | | let mut cache = SegmentedCache::builder(1) |
834 | | .max_capacity(3) |
835 | | .eviction_listener(listener) |
836 | | .build(); |
837 | | cache.reconfigure_for_testing(); |
838 | | |
839 | | // Make the cache exterior immutable. |
840 | | let cache = cache; |
841 | | |
842 | | cache.insert("a", "alice"); |
843 | | cache.insert("b", "bob"); |
844 | | assert_eq!(cache.get(&"a"), Some("alice")); |
845 | | assert!(cache.contains_key(&"a")); |
846 | | assert!(cache.contains_key(&"b")); |
847 | | assert_eq!(cache.get(&"b"), Some("bob")); |
848 | | cache.run_pending_tasks(); |
849 | | // counts: a -> 1, b -> 1 |
850 | | |
851 | | cache.insert("c", "cindy"); |
852 | | assert_eq!(cache.get(&"c"), Some("cindy")); |
853 | | assert!(cache.contains_key(&"c")); |
854 | | // counts: a -> 1, b -> 1, c -> 1 |
855 | | cache.run_pending_tasks(); |
856 | | |
857 | | assert!(cache.contains_key(&"a")); |
858 | | assert_eq!(cache.get(&"a"), Some("alice")); |
859 | | assert_eq!(cache.get(&"b"), Some("bob")); |
860 | | assert!(cache.contains_key(&"b")); |
861 | | cache.run_pending_tasks(); |
862 | | // counts: a -> 2, b -> 2, c -> 1 |
863 | | |
864 | | // "d" should not be admitted because its frequency is too low. |
865 | | cache.insert("d", "david"); // count: d -> 0 |
866 | | expected.push((Arc::new("d"), "david", RemovalCause::Size)); |
867 | | cache.run_pending_tasks(); |
868 | | assert_eq!(cache.get(&"d"), None); // d -> 1 |
869 | | assert!(!cache.contains_key(&"d")); |
870 | | |
871 | | cache.insert("d", "david"); |
872 | | expected.push((Arc::new("d"), "david", RemovalCause::Size)); |
873 | | cache.run_pending_tasks(); |
874 | | assert!(!cache.contains_key(&"d")); |
875 | | assert_eq!(cache.get(&"d"), None); // d -> 2 |
876 | | |
877 | | // "d" should be admitted and "c" should be evicted |
878 | | // because d's frequency is higher than c's. |
879 | | cache.insert("d", "dennis"); |
880 | | expected.push((Arc::new("c"), "cindy", RemovalCause::Size)); |
881 | | cache.run_pending_tasks(); |
882 | | assert_eq!(cache.get(&"a"), Some("alice")); |
883 | | assert_eq!(cache.get(&"b"), Some("bob")); |
884 | | assert_eq!(cache.get(&"c"), None); |
885 | | assert_eq!(cache.get(&"d"), Some("dennis")); |
886 | | assert!(cache.contains_key(&"a")); |
887 | | assert!(cache.contains_key(&"b")); |
888 | | assert!(!cache.contains_key(&"c")); |
889 | | assert!(cache.contains_key(&"d")); |
890 | | |
891 | | cache.invalidate(&"b"); |
892 | | expected.push((Arc::new("b"), "bob", RemovalCause::Explicit)); |
893 | | cache.run_pending_tasks(); |
894 | | assert_eq!(cache.get(&"b"), None); |
895 | | assert!(!cache.contains_key(&"b")); |
896 | | |
897 | | assert!(cache.remove(&"b").is_none()); |
898 | | assert_eq!(cache.remove(&"d"), Some("dennis")); |
899 | | expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit)); |
900 | | cache.run_pending_tasks(); |
901 | | assert_eq!(cache.get(&"d"), None); |
902 | | assert!(!cache.contains_key(&"d")); |
903 | | |
904 | | verify_notification_vec(&cache, actual, &expected); |
905 | | assert!(cache.key_locks_map_is_empty()); |
906 | | } |
907 | | |
908 | | #[test] |
909 | | fn non_power_of_two_segments() { |
910 | | let mut cache = SegmentedCache::new(100, 5); |
911 | | cache.reconfigure_for_testing(); |
912 | | |
913 | | // Make the cache exterior immutable. |
914 | | let cache = cache; |
915 | | |
916 | | assert_eq!(cache.iter().count(), 0); |
917 | | |
918 | | cache.insert("a", "alice"); |
919 | | cache.insert("b", "bob"); |
920 | | cache.insert("c", "cindy"); |
921 | | |
922 | | assert_eq!(cache.iter().count(), 3); |
923 | | cache.run_pending_tasks(); |
924 | | assert_eq!(cache.iter().count(), 3); |
925 | | } |
926 | | |
927 | | #[test] |
928 | | fn size_aware_eviction() { |
929 | | let weigher = |_k: &&str, v: &(&str, u32)| v.1; |
930 | | |
931 | | let alice = ("alice", 10); |
932 | | let bob = ("bob", 15); |
933 | | let bill = ("bill", 20); |
934 | | let cindy = ("cindy", 5); |
935 | | let david = ("david", 15); |
936 | | let dennis = ("dennis", 15); |
937 | | |
938 | | // The following `Vec`s will hold actual and expected notifications. |
939 | | let actual = Arc::new(Mutex::new(Vec::new())); |
940 | | let mut expected = Vec::new(); |
941 | | |
942 | | // Create an eviction listener. |
943 | | let a1 = Arc::clone(&actual); |
944 | | let listener = move |k, v, cause| a1.lock().push((k, v, cause)); |
945 | | |
946 | | // Create a cache with the eviction listener. |
947 | | let mut cache = SegmentedCache::builder(1) |
948 | | .max_capacity(31) |
949 | | .weigher(weigher) |
950 | | .eviction_listener(listener) |
951 | | .build(); |
952 | | cache.reconfigure_for_testing(); |
953 | | |
954 | | // Make the cache exterior immutable. |
955 | | let cache = cache; |
956 | | |
957 | | cache.insert("a", alice); |
958 | | cache.insert("b", bob); |
959 | | assert_eq!(cache.get(&"a"), Some(alice)); |
960 | | assert!(cache.contains_key(&"a")); |
961 | | assert!(cache.contains_key(&"b")); |
962 | | assert_eq!(cache.get(&"b"), Some(bob)); |
963 | | cache.run_pending_tasks(); |
964 | | // order (LRU -> MRU) and counts: a -> 1, b -> 1 |
965 | | |
966 | | cache.insert("c", cindy); |
967 | | assert_eq!(cache.get(&"c"), Some(cindy)); |
968 | | assert!(cache.contains_key(&"c")); |
969 | | // order and counts: a -> 1, b -> 1, c -> 1 |
970 | | cache.run_pending_tasks(); |
971 | | |
972 | | assert!(cache.contains_key(&"a")); |
973 | | assert_eq!(cache.get(&"a"), Some(alice)); |
974 | | assert_eq!(cache.get(&"b"), Some(bob)); |
975 | | assert!(cache.contains_key(&"b")); |
976 | | cache.run_pending_tasks(); |
977 | | // order and counts: c -> 1, a -> 2, b -> 2 |
978 | | |
979 | | // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10). |
980 | | // "d" must have higher count than 3, which is the aggregated count |
981 | | // of "a" and "c". |
982 | | cache.insert("d", david); // count: d -> 0 |
983 | | expected.push((Arc::new("d"), david, RemovalCause::Size)); |
984 | | cache.run_pending_tasks(); |
985 | | assert_eq!(cache.get(&"d"), None); // d -> 1 |
986 | | assert!(!cache.contains_key(&"d")); |
987 | | |
988 | | cache.insert("d", david); |
989 | | expected.push((Arc::new("d"), david, RemovalCause::Size)); |
990 | | cache.run_pending_tasks(); |
991 | | assert!(!cache.contains_key(&"d")); |
992 | | assert_eq!(cache.get(&"d"), None); // d -> 2 |
993 | | |
994 | | cache.insert("d", david); |
995 | | expected.push((Arc::new("d"), david, RemovalCause::Size)); |
996 | | cache.run_pending_tasks(); |
997 | | assert_eq!(cache.get(&"d"), None); // d -> 3 |
998 | | assert!(!cache.contains_key(&"d")); |
999 | | |
1000 | | cache.insert("d", david); |
1001 | | expected.push((Arc::new("d"), david, RemovalCause::Size)); |
1002 | | cache.run_pending_tasks(); |
1003 | | assert!(!cache.contains_key(&"d")); |
1004 | | assert_eq!(cache.get(&"d"), None); // d -> 4 |
1005 | | |
1006 | | // Finally "d" should be admitted by evicting "c" and "a". |
1007 | | cache.insert("d", dennis); |
1008 | | expected.push((Arc::new("c"), cindy, RemovalCause::Size)); |
1009 | | expected.push((Arc::new("a"), alice, RemovalCause::Size)); |
1010 | | cache.run_pending_tasks(); |
1011 | | assert_eq!(cache.get(&"a"), None); |
1012 | | assert_eq!(cache.get(&"b"), Some(bob)); |
1013 | | assert_eq!(cache.get(&"c"), None); |
1014 | | assert_eq!(cache.get(&"d"), Some(dennis)); |
1015 | | assert!(!cache.contains_key(&"a")); |
1016 | | assert!(cache.contains_key(&"b")); |
1017 | | assert!(!cache.contains_key(&"c")); |
1018 | | assert!(cache.contains_key(&"d")); |
1019 | | |
1020 | | // Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15). |
1021 | | cache.insert("b", bill); |
1022 | | expected.push((Arc::new("b"), bob, RemovalCause::Replaced)); |
1023 | | expected.push((Arc::new("d"), dennis, RemovalCause::Size)); |
1024 | | cache.run_pending_tasks(); |
1025 | | assert_eq!(cache.get(&"b"), Some(bill)); |
1026 | | assert_eq!(cache.get(&"d"), None); |
1027 | | assert!(cache.contains_key(&"b")); |
1028 | | assert!(!cache.contains_key(&"d")); |
1029 | | |
1030 | | // Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15). |
1031 | | cache.insert("a", alice); |
1032 | | cache.insert("b", bob); |
1033 | | expected.push((Arc::new("b"), bill, RemovalCause::Replaced)); |
1034 | | cache.run_pending_tasks(); |
1035 | | assert_eq!(cache.get(&"a"), Some(alice)); |
1036 | | assert_eq!(cache.get(&"b"), Some(bob)); |
1037 | | assert_eq!(cache.get(&"d"), None); |
1038 | | assert!(cache.contains_key(&"a")); |
1039 | | assert!(cache.contains_key(&"b")); |
1040 | | assert!(!cache.contains_key(&"d")); |
1041 | | |
1042 | | // Verify the sizes. |
1043 | | assert_eq!(cache.entry_count(), 2); |
1044 | | assert_eq!(cache.weighted_size(), 25); |
1045 | | |
1046 | | verify_notification_vec(&cache, actual, &expected); |
1047 | | assert!(cache.key_locks_map_is_empty()); |
1048 | | } |
1049 | | |
1050 | | #[test] |
1051 | | fn basic_multi_threads() { |
1052 | | let num_threads = 4; |
1053 | | |
1054 | | let mut cache = SegmentedCache::new(100, num_threads); |
1055 | | cache.reconfigure_for_testing(); |
1056 | | |
1057 | | // Make the cache exterior immutable. |
1058 | | let cache = cache; |
1059 | | |
1060 | | // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect |
1061 | | #[allow(clippy::needless_collect)] |
1062 | | let handles = (0..num_threads) |
1063 | | .map(|id| { |
1064 | | let cache = cache.clone(); |
1065 | | std::thread::spawn(move || { |
1066 | | cache.insert(10, format!("{id}-100")); |
1067 | | cache.get(&10); |
1068 | | cache.run_pending_tasks(); |
1069 | | cache.insert(20, format!("{id}-200")); |
1070 | | cache.invalidate(&10); |
1071 | | }) |
1072 | | }) |
1073 | | .collect::<Vec<_>>(); |
1074 | | |
1075 | | handles.into_iter().for_each(|h| h.join().expect("Failed")); |
1076 | | |
1077 | | cache.run_pending_tasks(); |
1078 | | |
1079 | | assert!(cache.get(&10).is_none()); |
1080 | | assert!(cache.get(&20).is_some()); |
1081 | | assert!(!cache.contains_key(&10)); |
1082 | | assert!(cache.contains_key(&20)); |
1083 | | } |
1084 | | |
1085 | | #[test] |
1086 | | fn invalidate_all() { |
1087 | | use std::collections::HashMap; |
1088 | | |
1089 | | // The following `HashMap`s will hold actual and expected notifications. |
1090 | | // Note: We use `HashMap` here as the order of invalidations is non-deterministic. |
1091 | | let actual = Arc::new(Mutex::new(HashMap::new())); |
1092 | | let mut expected = HashMap::new(); |
1093 | | |
1094 | | // Create an eviction listener. |
1095 | | let a1 = Arc::clone(&actual); |
1096 | | let listener = move |k, v, cause| { |
1097 | | a1.lock().insert(k, (v, cause)); |
1098 | | }; |
1099 | | |
1100 | | // Create a cache with the eviction listener. |
1101 | | let mut cache = SegmentedCache::builder(4) |
1102 | | .max_capacity(100) |
1103 | | .eviction_listener(listener) |
1104 | | .build(); |
1105 | | cache.reconfigure_for_testing(); |
1106 | | |
1107 | | // Make the cache exterior immutable. |
1108 | | let cache = cache; |
1109 | | |
1110 | | cache.insert("a", "alice"); |
1111 | | cache.insert("b", "bob"); |
1112 | | cache.insert("c", "cindy"); |
1113 | | assert_eq!(cache.get(&"a"), Some("alice")); |
1114 | | assert_eq!(cache.get(&"b"), Some("bob")); |
1115 | | assert_eq!(cache.get(&"c"), Some("cindy")); |
1116 | | assert!(cache.contains_key(&"a")); |
1117 | | assert!(cache.contains_key(&"b")); |
1118 | | assert!(cache.contains_key(&"c")); |
1119 | | |
1120 | | // `cache.run_pending_tasks()` is no longer needed here before invalidating. The last |
1121 | | // modified timestamp of the entries were updated when they were inserted. |
1122 | | // https://github.com/moka-rs/moka/issues/155 |
1123 | | |
1124 | | cache.invalidate_all(); |
1125 | | expected.insert(Arc::new("a"), ("alice", RemovalCause::Explicit)); |
1126 | | expected.insert(Arc::new("b"), ("bob", RemovalCause::Explicit)); |
1127 | | expected.insert(Arc::new("c"), ("cindy", RemovalCause::Explicit)); |
1128 | | cache.run_pending_tasks(); |
1129 | | |
1130 | | cache.insert("d", "david"); |
1131 | | cache.run_pending_tasks(); |
1132 | | |
1133 | | assert!(cache.get(&"a").is_none()); |
1134 | | assert!(cache.get(&"b").is_none()); |
1135 | | assert!(cache.get(&"c").is_none()); |
1136 | | assert_eq!(cache.get(&"d"), Some("david")); |
1137 | | assert!(!cache.contains_key(&"a")); |
1138 | | assert!(!cache.contains_key(&"b")); |
1139 | | assert!(!cache.contains_key(&"c")); |
1140 | | assert!(cache.contains_key(&"d")); |
1141 | | |
1142 | | verify_notification_map(&cache, actual, &expected); |
1143 | | } |
1144 | | |
1145 | | #[test] |
1146 | | fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> { |
1147 | | use std::collections::{HashMap, HashSet}; |
1148 | | |
1149 | | const SEGMENTS: usize = 4; |
1150 | | |
1151 | | // The following `HashMap`s will hold actual and expected notifications. |
1152 | | // Note: We use `HashMap` here as the order of invalidations is non-deterministic. |
1153 | | let actual = Arc::new(Mutex::new(HashMap::new())); |
1154 | | let mut expected = HashMap::new(); |
1155 | | |
1156 | | // Create an eviction listener. |
1157 | | let a1 = Arc::clone(&actual); |
1158 | | let listener = move |k, v, cause| { |
1159 | | a1.lock().insert(k, (v, cause)); |
1160 | | }; |
1161 | | |
1162 | | let (clock, mock) = crate::common::time::Clock::mock(); |
1163 | | |
1164 | | // Create a cache with the eviction listener. |
1165 | | let mut cache = SegmentedCache::builder(SEGMENTS) |
1166 | | .max_capacity(100) |
1167 | | .support_invalidation_closures() |
1168 | | .eviction_listener(listener) |
1169 | | .clock(clock) |
1170 | | .build(); |
1171 | | cache.reconfigure_for_testing(); |
1172 | | |
1173 | | // Make the cache exterior immutable. |
1174 | | let cache = cache; |
1175 | | |
1176 | | cache.insert(0, "alice"); |
1177 | | cache.insert(1, "bob"); |
1178 | | cache.insert(2, "alex"); |
1179 | | cache.run_pending_tasks(); |
1180 | | mock.increment(Duration::from_secs(5)); // 5 secs from the start. |
1181 | | cache.run_pending_tasks(); |
1182 | | |
1183 | | assert_eq!(cache.get(&0), Some("alice")); |
1184 | | assert_eq!(cache.get(&1), Some("bob")); |
1185 | | assert_eq!(cache.get(&2), Some("alex")); |
1186 | | assert!(cache.contains_key(&0)); |
1187 | | assert!(cache.contains_key(&1)); |
1188 | | assert!(cache.contains_key(&2)); |
1189 | | |
1190 | | let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>(); |
1191 | | cache.invalidate_entries_if(move |_k, &v| names.contains(v))?; |
1192 | | assert_eq!(cache.invalidation_predicate_count(), SEGMENTS); |
1193 | | expected.insert(Arc::new(0), ("alice", RemovalCause::Explicit)); |
1194 | | expected.insert(Arc::new(2), ("alex", RemovalCause::Explicit)); |
1195 | | |
1196 | | mock.increment(Duration::from_secs(5)); // 10 secs from the start. |
1197 | | |
1198 | | cache.insert(3, "alice"); |
1199 | | |
1200 | | // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping) |
1201 | | cache.run_pending_tasks(); // To submit the invalidation task. |
1202 | | std::thread::sleep(Duration::from_millis(200)); |
1203 | | cache.run_pending_tasks(); // To process the task result. |
1204 | | std::thread::sleep(Duration::from_millis(200)); |
1205 | | |
1206 | | assert!(cache.get(&0).is_none()); |
1207 | | assert!(cache.get(&2).is_none()); |
1208 | | assert_eq!(cache.get(&1), Some("bob")); |
1209 | | // This should survive as it was inserted after calling invalidate_entries_if. |
1210 | | assert_eq!(cache.get(&3), Some("alice")); |
1211 | | |
1212 | | assert!(!cache.contains_key(&0)); |
1213 | | assert!(cache.contains_key(&1)); |
1214 | | assert!(!cache.contains_key(&2)); |
1215 | | assert!(cache.contains_key(&3)); |
1216 | | |
1217 | | assert_eq!(cache.entry_count(), 2); |
1218 | | assert_eq!(cache.invalidation_predicate_count(), 0); |
1219 | | |
1220 | | mock.increment(Duration::from_secs(5)); // 15 secs from the start. |
1221 | | |
1222 | | cache.invalidate_entries_if(|_k, &v| v == "alice")?; |
1223 | | cache.invalidate_entries_if(|_k, &v| v == "bob")?; |
1224 | | assert_eq!(cache.invalidation_predicate_count(), SEGMENTS * 2); |
1225 | | expected.insert(Arc::new(1), ("bob", RemovalCause::Explicit)); |
1226 | | expected.insert(Arc::new(3), ("alice", RemovalCause::Explicit)); |
1227 | | |
1228 | | // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping) |
1229 | | cache.run_pending_tasks(); // To submit the invalidation task. |
1230 | | std::thread::sleep(Duration::from_millis(200)); |
1231 | | cache.run_pending_tasks(); // To process the task result. |
1232 | | std::thread::sleep(Duration::from_millis(200)); |
1233 | | |
1234 | | assert!(cache.get(&1).is_none()); |
1235 | | assert!(cache.get(&3).is_none()); |
1236 | | |
1237 | | assert!(!cache.contains_key(&1)); |
1238 | | assert!(!cache.contains_key(&3)); |
1239 | | |
1240 | | assert_eq!(cache.entry_count(), 0); |
1241 | | assert_eq!(cache.invalidation_predicate_count(), 0); |
1242 | | |
1243 | | verify_notification_map(&cache, actual, &expected); |
1244 | | |
1245 | | Ok(()) |
1246 | | } |
1247 | | |
1248 | | #[test] |
1249 | | fn test_iter() { |
1250 | | const NUM_KEYS: usize = 50; |
1251 | | |
1252 | | fn make_value(key: usize) -> String { |
1253 | | format!("val: {key}") |
1254 | | } |
1255 | | |
1256 | | // let cache = SegmentedCache::builder(5) |
1257 | | let cache = SegmentedCache::builder(4) |
1258 | | .max_capacity(100) |
1259 | | .time_to_idle(Duration::from_secs(10)) |
1260 | | .build(); |
1261 | | |
1262 | | for key in 0..NUM_KEYS { |
1263 | | cache.insert(key, make_value(key)); |
1264 | | } |
1265 | | |
1266 | | let mut key_set = std::collections::HashSet::new(); |
1267 | | |
1268 | | for (key, value) in &cache { |
1269 | | assert_eq!(value, make_value(*key)); |
1270 | | |
1271 | | key_set.insert(*key); |
1272 | | } |
1273 | | |
1274 | | // Ensure there are no missing or duplicate keys in the iteration. |
1275 | | assert_eq!(key_set.len(), NUM_KEYS); |
1276 | | } |
1277 | | |
1278 | | /// Runs 16 threads at the same time and ensures no deadlock occurs. |
1279 | | /// |
1280 | | /// - Eight of the threads will update key-values in the cache. |
1281 | | /// - Eight others will iterate the cache. |
1282 | | /// |
1283 | | #[test] |
1284 | | fn test_iter_multi_threads() { |
1285 | | use std::collections::HashSet; |
1286 | | |
1287 | | const NUM_KEYS: usize = 1024; |
1288 | | const NUM_THREADS: usize = 16; |
1289 | | |
1290 | | fn make_value(key: usize) -> String { |
1291 | | format!("val: {key}") |
1292 | | } |
1293 | | |
1294 | | let cache = SegmentedCache::builder(4) |
1295 | | .max_capacity(2048) |
1296 | | .time_to_idle(Duration::from_secs(10)) |
1297 | | .build(); |
1298 | | |
1299 | | // Initialize the cache. |
1300 | | for key in 0..NUM_KEYS { |
1301 | | cache.insert(key, make_value(key)); |
1302 | | } |
1303 | | |
1304 | | let rw_lock = Arc::new(std::sync::RwLock::<()>::default()); |
1305 | | let write_lock = rw_lock.write().unwrap(); |
1306 | | |
1307 | | // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect |
1308 | | #[allow(clippy::needless_collect)] |
1309 | | let handles = (0..NUM_THREADS) |
1310 | | .map(|n| { |
1311 | | let cache = cache.clone(); |
1312 | | let rw_lock = Arc::clone(&rw_lock); |
1313 | | |
1314 | | if n % 2 == 0 { |
1315 | | // This thread will update the cache. |
1316 | | std::thread::spawn(move || { |
1317 | | let read_lock = rw_lock.read().unwrap(); |
1318 | | for key in 0..NUM_KEYS { |
1319 | | // TODO: Update keys in a random order? |
1320 | | cache.insert(key, make_value(key)); |
1321 | | } |
1322 | | std::mem::drop(read_lock); |
1323 | | }) |
1324 | | } else { |
1325 | | // This thread will iterate the cache. |
1326 | | std::thread::spawn(move || { |
1327 | | let read_lock = rw_lock.read().unwrap(); |
1328 | | let mut key_set = HashSet::new(); |
1329 | | for (key, value) in &cache { |
1330 | | assert_eq!(value, make_value(*key)); |
1331 | | key_set.insert(*key); |
1332 | | } |
1333 | | // Ensure there are no missing or duplicate keys in the iteration. |
1334 | | assert_eq!(key_set.len(), NUM_KEYS); |
1335 | | std::mem::drop(read_lock); |
1336 | | }) |
1337 | | } |
1338 | | }) |
1339 | | .collect::<Vec<_>>(); |
1340 | | |
1341 | | // Let these threads to run by releasing the write lock. |
1342 | | std::mem::drop(write_lock); |
1343 | | |
1344 | | handles.into_iter().for_each(|h| h.join().expect("Failed")); |
1345 | | |
1346 | | // Ensure there are no missing or duplicate keys in the iteration. |
1347 | | let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>(); |
1348 | | assert_eq!(key_set.len(), NUM_KEYS); |
1349 | | } |
1350 | | |
1351 | | #[test] |
1352 | | fn get_with() { |
1353 | | use std::thread::{sleep, spawn}; |
1354 | | |
1355 | | let cache = SegmentedCache::new(100, 4); |
1356 | | const KEY: u32 = 0; |
1357 | | |
1358 | | // This test will run five threads: |
1359 | | // |
1360 | | // Thread1 will be the first thread to call `get_with` for a key, so its init |
1361 | | // closure will be evaluated and then a &str value "thread1" will be inserted |
1362 | | // to the cache. |
1363 | | let thread1 = { |
1364 | | let cache1 = cache.clone(); |
1365 | | spawn(move || { |
1366 | | // Call `get_with` immediately. |
1367 | | let v = cache1.get_with(KEY, || { |
1368 | | // Wait for 300 ms and return a &str value. |
1369 | | sleep(Duration::from_millis(300)); |
1370 | | "thread1" |
1371 | | }); |
1372 | | assert_eq!(v, "thread1"); |
1373 | | }) |
1374 | | }; |
1375 | | |
1376 | | // Thread2 will be the second thread to call `get_with` for the same key, so |
1377 | | // its init closure will not be evaluated. Once thread1's init closure |
1378 | | // finishes, it will get the value inserted by thread1's init closure. |
1379 | | let thread2 = { |
1380 | | let cache2 = cache.clone(); |
1381 | | spawn(move || { |
1382 | | // Wait for 100 ms before calling `get_with`. |
1383 | | sleep(Duration::from_millis(100)); |
1384 | | let v = cache2.get_with(KEY, || unreachable!()); |
1385 | | assert_eq!(v, "thread1"); |
1386 | | }) |
1387 | | }; |
1388 | | |
1389 | | // Thread3 will be the third thread to call `get_with` for the same key. By |
1390 | | // the time it calls, thread1's init closure should have finished already and |
1391 | | // the value should be already inserted to the cache. So its init closure |
1392 | | // will not be evaluated and will get the value insert by thread1's init |
1393 | | // closure immediately. |
1394 | | let thread3 = { |
1395 | | let cache3 = cache.clone(); |
1396 | | spawn(move || { |
1397 | | // Wait for 400 ms before calling `get_with`. |
1398 | | sleep(Duration::from_millis(400)); |
1399 | | let v = cache3.get_with(KEY, || unreachable!()); |
1400 | | assert_eq!(v, "thread1"); |
1401 | | }) |
1402 | | }; |
1403 | | |
1404 | | // Thread4 will call `get` for the same key. It will call when thread1's init |
1405 | | // closure is still running, so it will get none for the key. |
1406 | | let thread4 = { |
1407 | | let cache4 = cache.clone(); |
1408 | | spawn(move || { |
1409 | | // Wait for 200 ms before calling `get`. |
1410 | | sleep(Duration::from_millis(200)); |
1411 | | let maybe_v = cache4.get(&KEY); |
1412 | | assert!(maybe_v.is_none()); |
1413 | | }) |
1414 | | }; |
1415 | | |
1416 | | // Thread5 will call `get` for the same key. It will call after thread1's init |
1417 | | // closure finished, so it will get the value insert by thread1's init closure. |
1418 | | let thread5 = { |
1419 | | let cache5 = cache.clone(); |
1420 | | spawn(move || { |
1421 | | // Wait for 400 ms before calling `get`. |
1422 | | sleep(Duration::from_millis(400)); |
1423 | | let maybe_v = cache5.get(&KEY); |
1424 | | assert_eq!(maybe_v, Some("thread1")); |
1425 | | }) |
1426 | | }; |
1427 | | |
1428 | | for t in [thread1, thread2, thread3, thread4, thread5] { |
1429 | | t.join().expect("Failed to join"); |
1430 | | } |
1431 | | |
1432 | | assert!(cache.is_waiter_map_empty()); |
1433 | | } |
1434 | | |
1435 | | #[test] |
1436 | | fn get_with_if() { |
1437 | | use std::thread::{sleep, spawn}; |
1438 | | |
1439 | | let cache = SegmentedCache::new(100, 4); |
1440 | | const KEY: u32 = 0; |
1441 | | |
1442 | | // This test will run seven threads: |
1443 | | // |
1444 | | // Thread1 will be the first thread to call `get_with_if` for a key, so its |
1445 | | // init closure will be evaluated and then a &str value "thread1" will be |
1446 | | // inserted to the cache. |
1447 | | let thread1 = { |
1448 | | let cache1 = cache.clone(); |
1449 | | spawn(move || { |
1450 | | // Call `get_with` immediately. |
1451 | | let v = cache1.get_with_if( |
1452 | | KEY, |
1453 | | || { |
1454 | | // Wait for 300 ms and return a &str value. |
1455 | | sleep(Duration::from_millis(300)); |
1456 | | "thread1" |
1457 | | }, |
1458 | | |_v| unreachable!(), |
1459 | | ); |
1460 | | assert_eq!(v, "thread1"); |
1461 | | }) |
1462 | | }; |
1463 | | |
1464 | | // Thread2 will be the second thread to call `get_with_if` for the same key, |
1465 | | // so its init closure will not be evaluated. Once thread1's init closure |
1466 | | // finishes, it will get the value inserted by thread1's init closure. |
1467 | | let thread2 = { |
1468 | | let cache2 = cache.clone(); |
1469 | | spawn(move || { |
1470 | | // Wait for 100 ms before calling `get_with`. |
1471 | | sleep(Duration::from_millis(100)); |
1472 | | let v = cache2.get_with_if(KEY, || unreachable!(), |_v| unreachable!()); |
1473 | | assert_eq!(v, "thread1"); |
1474 | | }) |
1475 | | }; |
1476 | | |
1477 | | // Thread3 will be the third thread to call `get_with_if` for the same |
1478 | | // key. By the time it calls, thread1's init closure should have finished |
1479 | | // already and the value should be already inserted to the cache. Also |
1480 | | // thread3's `replace_if` closure returns `false`. So its init closure will |
1481 | | // not be evaluated and will get the value inserted by thread1's init closure |
1482 | | // immediately. |
1483 | | let thread3 = { |
1484 | | let cache3 = cache.clone(); |
1485 | | spawn(move || { |
1486 | | // Wait for 350 ms before calling `get_with_if`. |
1487 | | sleep(Duration::from_millis(350)); |
1488 | | let v = cache3.get_with_if( |
1489 | | KEY, |
1490 | | || unreachable!(), |
1491 | | |v| { |
1492 | | assert_eq!(v, &"thread1"); |
1493 | | false |
1494 | | }, |
1495 | | ); |
1496 | | assert_eq!(v, "thread1"); |
1497 | | }) |
1498 | | }; |
1499 | | |
1500 | | // Thread4 will be the fourth thread to call `get_with_if` for the same |
1501 | | // key. The value should have been already inserted to the cache by |
1502 | | // thread1. However thread4's `replace_if` closure returns `true`. So its |
1503 | | // init closure will be evaluated to replace the current value. |
1504 | | let thread4 = { |
1505 | | let cache4 = cache.clone(); |
1506 | | spawn(move || { |
1507 | | // Wait for 400 ms before calling `get_with_if`. |
1508 | | sleep(Duration::from_millis(400)); |
1509 | | let v = cache4.get_with_if( |
1510 | | KEY, |
1511 | | || "thread4", |
1512 | | |v| { |
1513 | | assert_eq!(v, &"thread1"); |
1514 | | true |
1515 | | }, |
1516 | | ); |
1517 | | assert_eq!(v, "thread4"); |
1518 | | }) |
1519 | | }; |
1520 | | |
1521 | | // Thread5 will call `get` for the same key. It will call when thread1's init |
1522 | | // closure is still running, so it will get none for the key. |
1523 | | let thread5 = { |
1524 | | let cache5 = cache.clone(); |
1525 | | spawn(move || { |
1526 | | // Wait for 200 ms before calling `get`. |
1527 | | sleep(Duration::from_millis(200)); |
1528 | | let maybe_v = cache5.get(&KEY); |
1529 | | assert!(maybe_v.is_none()); |
1530 | | }) |
1531 | | }; |
1532 | | |
1533 | | // Thread6 will call `get` for the same key. It will call when thread1's init |
1534 | | // closure is still running, so it will get none for the key. |
1535 | | let thread6 = { |
1536 | | let cache6 = cache.clone(); |
1537 | | spawn(move || { |
1538 | | // Wait for 200 ms before calling `get`. |
1539 | | sleep(Duration::from_millis(350)); |
1540 | | let maybe_v = cache6.get(&KEY); |
1541 | | assert_eq!(maybe_v, Some("thread1")); |
1542 | | }) |
1543 | | }; |
1544 | | |
1545 | | // Thread7 will call `get` for the same key. It will call after thread1's init |
1546 | | // closure finished, so it will get the value insert by thread1's init closure. |
1547 | | let thread7 = { |
1548 | | let cache7 = cache.clone(); |
1549 | | spawn(move || { |
1550 | | // Wait for 400 ms before calling `get`. |
1551 | | sleep(Duration::from_millis(450)); |
1552 | | let maybe_v = cache7.get(&KEY); |
1553 | | assert_eq!(maybe_v, Some("thread4")); |
1554 | | }) |
1555 | | }; |
1556 | | |
1557 | | for t in [ |
1558 | | thread1, thread2, thread3, thread4, thread5, thread6, thread7, |
1559 | | ] { |
1560 | | t.join().expect("Failed to join"); |
1561 | | } |
1562 | | |
1563 | | assert!(cache.is_waiter_map_empty()); |
1564 | | } |
1565 | | |
1566 | | #[test] |
1567 | | fn try_get_with() { |
1568 | | use std::{ |
1569 | | sync::Arc, |
1570 | | thread::{sleep, spawn}, |
1571 | | }; |
1572 | | |
1573 | | #[derive(thiserror::Error, Debug)] |
1574 | | #[error("{}", _0)] |
1575 | | pub struct MyError(String); |
1576 | | |
1577 | | type MyResult<T> = Result<T, Arc<MyError>>; |
1578 | | |
1579 | | let cache = SegmentedCache::new(100, 4); |
1580 | | const KEY: u32 = 0; |
1581 | | |
1582 | | // This test will run eight threads: |
1583 | | // |
1584 | | // Thread1 will be the first thread to call `try_get_with` for a key, so its |
1585 | | // init closure will be evaluated and then an error will be returned. Nothing |
1586 | | // will be inserted to the cache. |
1587 | | let thread1 = { |
1588 | | let cache1 = cache.clone(); |
1589 | | spawn(move || { |
1590 | | // Call `try_get_with` immediately. |
1591 | | let v = cache1.try_get_with(KEY, || { |
1592 | | // Wait for 300 ms and return an error. |
1593 | | sleep(Duration::from_millis(300)); |
1594 | | Err(MyError("thread1 error".into())) |
1595 | | }); |
1596 | | assert!(v.is_err()); |
1597 | | }) |
1598 | | }; |
1599 | | |
1600 | | // Thread2 will be the second thread to call `try_get_with` for the same key, |
1601 | | // so its init closure will not be evaluated. Once thread1's init closure |
1602 | | // finishes, it will get the same error value returned by thread1's init |
1603 | | // closure. |
1604 | | let thread2 = { |
1605 | | let cache2 = cache.clone(); |
1606 | | spawn(move || { |
1607 | | // Wait for 100 ms before calling `try_get_with`. |
1608 | | sleep(Duration::from_millis(100)); |
1609 | | let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!()); |
1610 | | assert!(v.is_err()); |
1611 | | }) |
1612 | | }; |
1613 | | |
1614 | | // Thread3 will be the third thread to call `get_with` for the same key. By |
1615 | | // the time it calls, thread1's init closure should have finished already, |
1616 | | // but the key still does not exist in the cache. So its init closure will be |
1617 | | // evaluated and then an okay &str value will be returned. That value will be |
1618 | | // inserted to the cache. |
1619 | | let thread3 = { |
1620 | | let cache3 = cache.clone(); |
1621 | | spawn(move || { |
1622 | | // Wait for 400 ms before calling `try_get_with`. |
1623 | | sleep(Duration::from_millis(400)); |
1624 | | let v: MyResult<_> = cache3.try_get_with(KEY, || { |
1625 | | // Wait for 300 ms and return an Ok(&str) value. |
1626 | | sleep(Duration::from_millis(300)); |
1627 | | Ok("thread3") |
1628 | | }); |
1629 | | assert_eq!(v.unwrap(), "thread3"); |
1630 | | }) |
1631 | | }; |
1632 | | |
1633 | | // thread4 will be the fourth thread to call `try_get_with` for the same |
1634 | | // key. So its init closure will not be evaluated. Once thread3's init |
1635 | | // closure finishes, it will get the same okay &str value. |
1636 | | let thread4 = { |
1637 | | let cache4 = cache.clone(); |
1638 | | spawn(move || { |
1639 | | // Wait for 500 ms before calling `try_get_with`. |
1640 | | sleep(Duration::from_millis(500)); |
1641 | | let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!()); |
1642 | | assert_eq!(v.unwrap(), "thread3"); |
1643 | | }) |
1644 | | }; |
1645 | | |
1646 | | // Thread5 will be the fifth thread to call `try_get_with` for the same |
1647 | | // key. So its init closure will not be evaluated. By the time it calls, |
1648 | | // thread3's init closure should have finished already, so its init closure |
1649 | | // will not be evaluated and will get the value insert by thread3's init |
1650 | | // closure immediately. |
1651 | | let thread5 = { |
1652 | | let cache5 = cache.clone(); |
1653 | | spawn(move || { |
1654 | | // Wait for 800 ms before calling `try_get_with`. |
1655 | | sleep(Duration::from_millis(800)); |
1656 | | let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!()); |
1657 | | assert_eq!(v.unwrap(), "thread3"); |
1658 | | }) |
1659 | | }; |
1660 | | |
1661 | | // Thread6 will call `get` for the same key. It will call when thread1's init |
1662 | | // closure is still running, so it will get none for the key. |
1663 | | let thread6 = { |
1664 | | let cache6 = cache.clone(); |
1665 | | spawn(move || { |
1666 | | // Wait for 200 ms before calling `get`. |
1667 | | sleep(Duration::from_millis(200)); |
1668 | | let maybe_v = cache6.get(&KEY); |
1669 | | assert!(maybe_v.is_none()); |
1670 | | }) |
1671 | | }; |
1672 | | |
1673 | | // Thread7 will call `get` for the same key. It will call after thread1's init |
1674 | | // closure finished with an error. So it will get none for the key. |
1675 | | let thread7 = { |
1676 | | let cache7 = cache.clone(); |
1677 | | spawn(move || { |
1678 | | // Wait for 400 ms before calling `get`. |
1679 | | sleep(Duration::from_millis(400)); |
1680 | | let maybe_v = cache7.get(&KEY); |
1681 | | assert!(maybe_v.is_none()); |
1682 | | }) |
1683 | | }; |
1684 | | |
1685 | | // Thread8 will call `get` for the same key. It will call after thread3's init |
1686 | | // closure finished, so it will get the value insert by thread3's init closure. |
1687 | | let thread8 = { |
1688 | | let cache8 = cache.clone(); |
1689 | | spawn(move || { |
1690 | | // Wait for 800 ms before calling `get`. |
1691 | | sleep(Duration::from_millis(800)); |
1692 | | let maybe_v = cache8.get(&KEY); |
1693 | | assert_eq!(maybe_v, Some("thread3")); |
1694 | | }) |
1695 | | }; |
1696 | | |
1697 | | for t in [ |
1698 | | thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8, |
1699 | | ] { |
1700 | | t.join().expect("Failed to join"); |
1701 | | } |
1702 | | |
1703 | | assert!(cache.is_waiter_map_empty()); |
1704 | | } |
1705 | | |
1706 | | #[test] |
1707 | | fn optionally_get_with() { |
1708 | | use std::thread::{sleep, spawn}; |
1709 | | |
1710 | | let cache = SegmentedCache::new(100, 4); |
1711 | | const KEY: u32 = 0; |
1712 | | |
1713 | | // This test will run eight threads: |
1714 | | // |
1715 | | // Thread1 will be the first thread to call `optionally_get_with` for a key, so its |
1716 | | // init closure will be evaluated and then an error will be returned. Nothing |
1717 | | // will be inserted to the cache. |
1718 | | let thread1 = { |
1719 | | let cache1 = cache.clone(); |
1720 | | spawn(move || { |
1721 | | // Call `optionally_get_with` immediately. |
1722 | | let v = cache1.optionally_get_with(KEY, || { |
1723 | | // Wait for 300 ms and return an error. |
1724 | | sleep(Duration::from_millis(300)); |
1725 | | None |
1726 | | }); |
1727 | | assert!(v.is_none()); |
1728 | | }) |
1729 | | }; |
1730 | | |
1731 | | // Thread2 will be the second thread to call `optionally_get_with` for the same key, |
1732 | | // so its init closure will not be evaluated. Once thread1's init closure |
1733 | | // finishes, it will get the same error value returned by thread1's init |
1734 | | // closure. |
1735 | | let thread2 = { |
1736 | | let cache2 = cache.clone(); |
1737 | | spawn(move || { |
1738 | | // Wait for 100 ms before calling `optionally_get_with`. |
1739 | | sleep(Duration::from_millis(100)); |
1740 | | let v = cache2.optionally_get_with(KEY, || unreachable!()); |
1741 | | assert!(v.is_none()); |
1742 | | }) |
1743 | | }; |
1744 | | |
1745 | | // Thread3 will be the third thread to call `get_with` for the same key. By |
1746 | | // the time it calls, thread1's init closure should have finished already, |
1747 | | // but the key still does not exist in the cache. So its init closure will be |
1748 | | // evaluated and then an okay &str value will be returned. That value will be |
1749 | | // inserted to the cache. |
1750 | | let thread3 = { |
1751 | | let cache3 = cache.clone(); |
1752 | | spawn(move || { |
1753 | | // Wait for 400 ms before calling `optionally_get_with`. |
1754 | | sleep(Duration::from_millis(400)); |
1755 | | let v = cache3.optionally_get_with(KEY, || { |
1756 | | // Wait for 300 ms and return an Ok(&str) value. |
1757 | | sleep(Duration::from_millis(300)); |
1758 | | Some("thread3") |
1759 | | }); |
1760 | | assert_eq!(v.unwrap(), "thread3"); |
1761 | | }) |
1762 | | }; |
1763 | | |
1764 | | // thread4 will be the fourth thread to call `optionally_get_with` for the same |
1765 | | // key. So its init closure will not be evaluated. Once thread3's init |
1766 | | // closure finishes, it will get the same okay &str value. |
1767 | | let thread4 = { |
1768 | | let cache4 = cache.clone(); |
1769 | | spawn(move || { |
1770 | | // Wait for 500 ms before calling `optionally_get_with`. |
1771 | | sleep(Duration::from_millis(500)); |
1772 | | let v = cache4.optionally_get_with(KEY, || unreachable!()); |
1773 | | assert_eq!(v.unwrap(), "thread3"); |
1774 | | }) |
1775 | | }; |
1776 | | |
1777 | | // Thread5 will be the fifth thread to call `optionally_get_with` for the same |
1778 | | // key. So its init closure will not be evaluated. By the time it calls, |
1779 | | // thread3's init closure should have finished already, so its init closure |
1780 | | // will not be evaluated and will get the value insert by thread3's init |
1781 | | // closure immediately. |
1782 | | let thread5 = { |
1783 | | let cache5 = cache.clone(); |
1784 | | spawn(move || { |
1785 | | // Wait for 800 ms before calling `optionally_get_with`. |
1786 | | sleep(Duration::from_millis(800)); |
1787 | | let v = cache5.optionally_get_with(KEY, || unreachable!()); |
1788 | | assert_eq!(v.unwrap(), "thread3"); |
1789 | | }) |
1790 | | }; |
1791 | | |
1792 | | // Thread6 will call `get` for the same key. It will call when thread1's init |
1793 | | // closure is still running, so it will get none for the key. |
1794 | | let thread6 = { |
1795 | | let cache6 = cache.clone(); |
1796 | | spawn(move || { |
1797 | | // Wait for 200 ms before calling `get`. |
1798 | | sleep(Duration::from_millis(200)); |
1799 | | let maybe_v = cache6.get(&KEY); |
1800 | | assert!(maybe_v.is_none()); |
1801 | | }) |
1802 | | }; |
1803 | | |
1804 | | // Thread7 will call `get` for the same key. It will call after thread1's init |
1805 | | // closure finished with an error. So it will get none for the key. |
1806 | | let thread7 = { |
1807 | | let cache7 = cache.clone(); |
1808 | | spawn(move || { |
1809 | | // Wait for 400 ms before calling `get`. |
1810 | | sleep(Duration::from_millis(400)); |
1811 | | let maybe_v = cache7.get(&KEY); |
1812 | | assert!(maybe_v.is_none()); |
1813 | | }) |
1814 | | }; |
1815 | | |
1816 | | // Thread8 will call `get` for the same key. It will call after thread3's init |
1817 | | // closure finished, so it will get the value insert by thread3's init closure. |
1818 | | let thread8 = { |
1819 | | let cache8 = cache.clone(); |
1820 | | spawn(move || { |
1821 | | // Wait for 800 ms before calling `get`. |
1822 | | sleep(Duration::from_millis(800)); |
1823 | | let maybe_v = cache8.get(&KEY); |
1824 | | assert_eq!(maybe_v, Some("thread3")); |
1825 | | }) |
1826 | | }; |
1827 | | |
1828 | | for t in [ |
1829 | | thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8, |
1830 | | ] { |
1831 | | t.join().expect("Failed to join"); |
1832 | | } |
1833 | | |
1834 | | assert!(cache.is_waiter_map_empty()); |
1835 | | } |
1836 | | |
1837 | | // This test ensures that the `contains_key`, `get` and `invalidate` can use |
1838 | | // borrowed form `&[u8]` for key with type `Vec<u8>`. |
1839 | | // https://github.com/moka-rs/moka/issues/166 |
1840 | | #[test] |
1841 | | fn borrowed_forms_of_key() { |
1842 | | let cache: SegmentedCache<Vec<u8>, ()> = SegmentedCache::new(1, 2); |
1843 | | |
1844 | | let key = vec![1_u8]; |
1845 | | cache.insert(key.clone(), ()); |
1846 | | |
1847 | | // key as &Vec<u8> |
1848 | | let key_v: &Vec<u8> = &key; |
1849 | | assert!(cache.contains_key(key_v)); |
1850 | | assert_eq!(cache.get(key_v), Some(())); |
1851 | | cache.invalidate(key_v); |
1852 | | |
1853 | | cache.insert(key, ()); |
1854 | | |
1855 | | // key as &[u8] |
1856 | | let key_s: &[u8] = &[1_u8]; |
1857 | | assert!(cache.contains_key(key_s)); |
1858 | | assert_eq!(cache.get(key_s), Some(())); |
1859 | | cache.invalidate(key_s); |
1860 | | } |
1861 | | |
1862 | | // Ignored by default. This test becomes unstable when run in parallel with |
1863 | | // other tests. |
1864 | | #[test] |
1865 | | #[ignore] |
1866 | | fn drop_value_immediately_after_eviction() { |
1867 | | use crate::common::test_utils::{Counters, Value}; |
1868 | | |
1869 | | const NUM_SEGMENTS: usize = 1; |
1870 | | const MAX_CAPACITY: u32 = 500; |
1871 | | const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32; |
1872 | | |
1873 | | let counters = Arc::new(Counters::default()); |
1874 | | let counters1 = Arc::clone(&counters); |
1875 | | |
1876 | | let listener = move |_k, _v, cause| match cause { |
1877 | | RemovalCause::Size => counters1.incl_evicted(), |
1878 | | RemovalCause::Explicit => counters1.incl_invalidated(), |
1879 | | _ => (), |
1880 | | }; |
1881 | | |
1882 | | let mut cache = SegmentedCache::builder(NUM_SEGMENTS) |
1883 | | .max_capacity(MAX_CAPACITY as u64) |
1884 | | .eviction_listener(listener) |
1885 | | .build(); |
1886 | | cache.reconfigure_for_testing(); |
1887 | | |
1888 | | // Make the cache exterior immutable. |
1889 | | let cache = cache; |
1890 | | |
1891 | | for key in 0..KEYS { |
1892 | | let value = Arc::new(Value::new(vec![0u8; 1024], &counters)); |
1893 | | cache.insert(key, value); |
1894 | | counters.incl_inserted(); |
1895 | | cache.run_pending_tasks(); |
1896 | | } |
1897 | | |
1898 | | let eviction_count = KEYS - MAX_CAPACITY; |
1899 | | |
1900 | | cache.run_pending_tasks(); |
1901 | | assert_eq!(counters.inserted(), KEYS, "inserted"); |
1902 | | assert_eq!(counters.value_created(), KEYS, "value_created"); |
1903 | | assert_eq!(counters.evicted(), eviction_count, "evicted"); |
1904 | | assert_eq!(counters.invalidated(), 0, "invalidated"); |
1905 | | assert_eq!(counters.value_dropped(), eviction_count, "value_dropped"); |
1906 | | |
1907 | | for key in 0..KEYS { |
1908 | | cache.invalidate(&key); |
1909 | | cache.run_pending_tasks(); |
1910 | | } |
1911 | | |
1912 | | cache.run_pending_tasks(); |
1913 | | assert_eq!(counters.inserted(), KEYS, "inserted"); |
1914 | | assert_eq!(counters.value_created(), KEYS, "value_created"); |
1915 | | assert_eq!(counters.evicted(), eviction_count, "evicted"); |
1916 | | assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated"); |
1917 | | assert_eq!(counters.value_dropped(), KEYS, "value_dropped"); |
1918 | | |
1919 | | std::mem::drop(cache); |
1920 | | assert_eq!(counters.value_dropped(), KEYS, "value_dropped"); |
1921 | | } |
1922 | | |
1923 | | #[test] |
1924 | | fn test_debug_format() { |
1925 | | let cache = SegmentedCache::new(10, 4); |
1926 | | cache.insert('a', "alice"); |
1927 | | cache.insert('b', "bob"); |
1928 | | cache.insert('c', "cindy"); |
1929 | | |
1930 | | let debug_str = format!("{cache:?}"); |
1931 | | assert!(debug_str.starts_with('{')); |
1932 | | assert!(debug_str.contains(r#"'a': "alice""#)); |
1933 | | assert!(debug_str.contains(r#"'b': "bob""#)); |
1934 | | assert!(debug_str.contains(r#"'c': "cindy""#)); |
1935 | | assert!(debug_str.ends_with('}')); |
1936 | | } |
1937 | | |
1938 | | type NotificationPair<V> = (V, RemovalCause); |
1939 | | type NotificationTriple<K, V> = (Arc<K>, V, RemovalCause); |
1940 | | |
1941 | | fn verify_notification_vec<K, V, S>( |
1942 | | cache: &SegmentedCache<K, V, S>, |
1943 | | actual: Arc<Mutex<Vec<NotificationTriple<K, V>>>>, |
1944 | | expected: &[NotificationTriple<K, V>], |
1945 | | ) where |
1946 | | K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static, |
1947 | | V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static, |
1948 | | S: std::hash::BuildHasher + Clone + Send + Sync + 'static, |
1949 | | { |
1950 | | // Retries will be needed when testing in a QEMU VM. |
1951 | | const MAX_RETRIES: usize = 5; |
1952 | | let mut retries = 0; |
1953 | | loop { |
1954 | | // Ensure all scheduled notifications have been processed. |
1955 | | std::thread::sleep(Duration::from_millis(500)); |
1956 | | |
1957 | | let actual = &*actual.lock(); |
1958 | | if actual.len() != expected.len() { |
1959 | | if retries <= MAX_RETRIES { |
1960 | | retries += 1; |
1961 | | cache.run_pending_tasks(); |
1962 | | continue; |
1963 | | } else { |
1964 | | assert_eq!(actual.len(), expected.len(), "Retries exhausted"); |
1965 | | } |
1966 | | } |
1967 | | |
1968 | | for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() { |
1969 | | assert_eq!(actual, expected, "expected[{i}]"); |
1970 | | } |
1971 | | |
1972 | | break; |
1973 | | } |
1974 | | } |
1975 | | |
1976 | | fn verify_notification_map<K, V, S>( |
1977 | | cache: &SegmentedCache<K, V, S>, |
1978 | | actual: Arc<Mutex<std::collections::HashMap<Arc<K>, NotificationPair<V>>>>, |
1979 | | expected: &std::collections::HashMap<Arc<K>, NotificationPair<V>>, |
1980 | | ) where |
1981 | | K: std::hash::Hash + Eq + std::fmt::Display + Send + Sync + 'static, |
1982 | | V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static, |
1983 | | S: std::hash::BuildHasher + Clone + Send + Sync + 'static, |
1984 | | { |
1985 | | // Retries will be needed when testing in a QEMU VM. |
1986 | | const MAX_RETRIES: usize = 5; |
1987 | | let mut retries = 0; |
1988 | | loop { |
1989 | | // Ensure all scheduled notifications have been processed. |
1990 | | std::thread::sleep(Duration::from_millis(500)); |
1991 | | |
1992 | | let actual = &*actual.lock(); |
1993 | | if actual.len() != expected.len() { |
1994 | | if retries <= MAX_RETRIES { |
1995 | | retries += 1; |
1996 | | cache.run_pending_tasks(); |
1997 | | continue; |
1998 | | } else { |
1999 | | assert_eq!(actual.len(), expected.len(), "Retries exhausted"); |
2000 | | } |
2001 | | } |
2002 | | |
2003 | | for actual_key in actual.keys() { |
2004 | | assert_eq!( |
2005 | | actual.get(actual_key), |
2006 | | expected.get(actual_key), |
2007 | | "expected[{actual_key}]", |
2008 | | ); |
2009 | | } |
2010 | | |
2011 | | break; |
2012 | | } |
2013 | | } |
2014 | | } |