Coverage Report

Created: 2025-10-29 07:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/moka-0.12.10/src/sync/segment.rs
Line
Count
Source
1
use super::{cache::Cache, CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector};
2
use crate::common::concurrent::Weigher;
3
use crate::common::time::Clock;
4
use crate::{
5
    common::HousekeeperConfig,
6
    notification::EvictionListener,
7
    policy::{EvictionPolicy, ExpirationPolicy},
8
    sync_base::iter::{Iter, ScanningGet},
9
    Entry, Policy, PredicateError,
10
};
11
12
use std::{
13
    borrow::Borrow,
14
    collections::hash_map::RandomState,
15
    fmt,
16
    hash::{BuildHasher, Hash, Hasher},
17
    sync::Arc,
18
};
19
20
/// A thread-safe concurrent in-memory cache, with multiple internal segments.
21
///
22
/// `SegmentedCache` has multiple internal [`Cache`][cache-struct] instances for
23
/// increased concurrent update performance. However, it has little overheads on
24
/// retrievals and updates for managing these segments.
25
///
26
/// For usage examples, see the document of the [`Cache`][cache-struct].
27
///
28
/// [cache-struct]: ./struct.Cache.html
29
///
30
pub struct SegmentedCache<K, V, S = RandomState> {
31
    inner: Arc<Inner<K, V, S>>,
32
}
33
34
// TODO: https://github.com/moka-rs/moka/issues/54
35
#[allow(clippy::non_send_fields_in_send_ty)]
36
unsafe impl<K, V, S> Send for SegmentedCache<K, V, S>
37
where
38
    K: Send + Sync,
39
    V: Send + Sync,
40
    S: Send,
41
{
42
}
43
44
unsafe impl<K, V, S> Sync for SegmentedCache<K, V, S>
45
where
46
    K: Send + Sync,
47
    V: Send + Sync,
48
    S: Sync,
49
{
50
}
51
52
impl<K, V, S> Clone for SegmentedCache<K, V, S> {
53
    /// Makes a clone of this shared cache.
54
    ///
55
    /// This operation is cheap as it only creates thread-safe reference counted
56
    /// pointers to the shared internal data structures.
57
0
    fn clone(&self) -> Self {
58
0
        Self {
59
0
            inner: Arc::clone(&self.inner),
60
0
        }
61
0
    }
62
}
63
64
impl<K, V, S> fmt::Debug for SegmentedCache<K, V, S>
65
where
66
    K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
67
    V: fmt::Debug + Clone + Send + Sync + 'static,
68
    // TODO: Remove these bounds from S.
69
    S: BuildHasher + Clone + Send + Sync + 'static,
70
{
71
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72
0
        let mut d_map = f.debug_map();
73
74
0
        for (k, v) in self {
75
0
            d_map.entry(&k, &v);
76
0
        }
77
78
0
        d_map.finish()
79
0
    }
80
}
81
82
impl<K, V> SegmentedCache<K, V, RandomState>
83
where
84
    K: Hash + Eq + Send + Sync + 'static,
85
    V: Clone + Send + Sync + 'static,
86
{
87
    /// Constructs a new `SegmentedCache<K, V>` that has multiple internal
88
    /// segments and will store up to the `max_capacity`.
89
    ///
90
    /// To adjust various configuration knobs such as `initial_capacity` or
91
    /// `time_to_live`, use the [`CacheBuilder`][builder-struct].
92
    ///
93
    /// [builder-struct]: ./struct.CacheBuilder.html
94
    ///
95
    /// # Panics
96
    ///
97
    /// Panics if `num_segments` is 0.
98
0
    pub fn new(max_capacity: u64, num_segments: usize) -> Self {
99
0
        let build_hasher = RandomState::default();
100
0
        Self::with_everything(
101
0
            None,
102
0
            Some(max_capacity),
103
0
            None,
104
0
            num_segments,
105
0
            build_hasher,
106
0
            None,
107
0
            EvictionPolicy::default(),
108
0
            None,
109
0
            ExpirationPolicy::default(),
110
0
            HousekeeperConfig::default(),
111
            false,
112
0
            Clock::default(),
113
        )
114
0
    }
115
116
    /// Returns a [`CacheBuilder`][builder-struct], which can builds a
117
    /// `SegmentedCache` with various configuration knobs.
118
    ///
119
    /// [builder-struct]: ./struct.CacheBuilder.html
120
0
    pub fn builder(num_segments: usize) -> CacheBuilder<K, V, SegmentedCache<K, V, RandomState>> {
121
0
        CacheBuilder::default().segments(num_segments)
122
0
    }
123
}
124
125
impl<K, V, S> SegmentedCache<K, V, S> {
126
    /// Returns cache’s name.
127
0
    pub fn name(&self) -> Option<&str> {
128
0
        self.inner.segments[0].name()
129
0
    }
130
131
    /// Returns a read-only cache policy of this cache.
132
    ///
133
    /// At this time, cache policy cannot be modified after cache creation.
134
    /// A future version may support to modify it.
135
0
    pub fn policy(&self) -> Policy {
136
0
        let mut policy = self.inner.segments[0].policy();
137
0
        policy.set_max_capacity(self.inner.desired_capacity);
138
0
        policy.set_num_segments(self.inner.segments.len());
139
0
        policy
140
0
    }
141
142
    /// Returns an approximate number of entries in this cache.
143
    ///
144
    /// The value returned is _an estimate_; the actual count may differ if there are
145
    /// concurrent insertions or removals, or if some entries are pending removal due
146
    /// to expiration. This inaccuracy can be mitigated by performing a `sync()`
147
    /// first.
148
    ///
149
    /// # Example
150
    ///
151
    /// ```rust
152
    /// use moka::sync::SegmentedCache;
153
    ///
154
    /// let cache = SegmentedCache::new(10, 4);
155
    /// cache.insert('n', "Netherland Dwarf");
156
    /// cache.insert('l', "Lop Eared");
157
    /// cache.insert('d', "Dutch");
158
    ///
159
    /// // Ensure an entry exists.
160
    /// assert!(cache.contains_key(&'n'));
161
    ///
162
    /// // However, followings may print stale number zeros instead of threes.
163
    /// println!("{}", cache.entry_count());   // -> 0
164
    /// println!("{}", cache.weighted_size()); // -> 0
165
    ///
166
    /// // To mitigate the inaccuracy, call `run_pending_tasks` method to run
167
    /// // pending internal tasks.
168
    /// cache.run_pending_tasks();
169
    ///
170
    /// // Followings will print the actual numbers.
171
    /// println!("{}", cache.entry_count());   // -> 3
172
    /// println!("{}", cache.weighted_size()); // -> 3
173
    /// ```
174
    ///
175
0
    pub fn entry_count(&self) -> u64 {
176
0
        self.inner
177
0
            .segments
178
0
            .iter()
179
0
            .map(|seg| seg.entry_count())
180
0
            .sum()
181
0
    }
182
183
    /// Returns an approximate total weighted size of entries in this cache.
184
    ///
185
    /// The value returned is _an estimate_; the actual size may differ if there are
186
    /// concurrent insertions or removals, or if some entries are pending removal due
187
    /// to expiration. This inaccuracy can be mitigated by performing a `sync()`
188
    /// first. See [`entry_count`](#method.entry_count) for a sample code.
189
0
    pub fn weighted_size(&self) -> u64 {
190
0
        self.inner
191
0
            .segments
192
0
            .iter()
193
0
            .map(|seg| seg.weighted_size())
194
0
            .sum()
195
0
    }
196
}
197
198
impl<K, V, S> SegmentedCache<K, V, S>
199
where
200
    K: Hash + Eq + Send + Sync + 'static,
201
    V: Clone + Send + Sync + 'static,
202
    S: BuildHasher + Clone + Send + Sync + 'static,
203
{
204
    /// # Panics
205
    ///
206
    /// Panics if `num_segments` is 0.
207
    #[allow(clippy::too_many_arguments)]
208
0
    pub(crate) fn with_everything(
209
0
        name: Option<String>,
210
0
        max_capacity: Option<u64>,
211
0
        initial_capacity: Option<usize>,
212
0
        num_segments: usize,
213
0
        build_hasher: S,
214
0
        weigher: Option<Weigher<K, V>>,
215
0
        eviction_policy: EvictionPolicy,
216
0
        eviction_listener: Option<EvictionListener<K, V>>,
217
0
        expiration_policy: ExpirationPolicy<K, V>,
218
0
        housekeeper_config: HousekeeperConfig,
219
0
        invalidator_enabled: bool,
220
0
        clock: Clock,
221
0
    ) -> Self {
222
0
        Self {
223
0
            inner: Arc::new(Inner::new(
224
0
                name,
225
0
                max_capacity,
226
0
                initial_capacity,
227
0
                num_segments,
228
0
                build_hasher,
229
0
                weigher,
230
0
                eviction_policy,
231
0
                eviction_listener,
232
0
                expiration_policy,
233
0
                housekeeper_config,
234
0
                invalidator_enabled,
235
0
                clock,
236
0
            )),
237
0
        }
238
0
    }
239
240
    /// Returns `true` if the cache contains a value for the key.
241
    ///
242
    /// Unlike the `get` method, this method is not considered a cache read operation,
243
    /// so it does not update the historic popularity estimator or reset the idle
244
    /// timer for the key.
245
    ///
246
    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
247
    /// on the borrowed form _must_ match those for the key type.
248
0
    pub fn contains_key<Q>(&self, key: &Q) -> bool
249
0
    where
250
0
        K: Borrow<Q>,
251
0
        Q: Hash + Eq + ?Sized,
252
    {
253
0
        let hash = self.inner.hash(key);
254
0
        self.inner.select(hash).contains_key_with_hash(key, hash)
255
0
    }
256
257
    /// Returns a _clone_ of the value corresponding to the key.
258
    ///
259
    /// If you want to store values that will be expensive to clone, wrap them by
260
    /// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
261
    /// thread-safe reference-counted pointer and its `clone()` method is cheap.
262
    ///
263
    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
264
    /// on the borrowed form _must_ match those for the key type.
265
    ///
266
    /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
267
0
    pub fn get<Q>(&self, key: &Q) -> Option<V>
268
0
    where
269
0
        K: Borrow<Q>,
270
0
        Q: Hash + Eq + ?Sized,
271
    {
272
0
        let hash = self.inner.hash(key);
273
0
        self.inner
274
0
            .select(hash)
275
0
            .get_with_hash(key, hash, false)
276
0
            .map(Entry::into_value)
277
0
    }
278
279
0
    pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
280
0
    where
281
0
        K: Hash + Eq,
282
    {
283
0
        let hash = self.inner.hash(&key);
284
0
        let cache = self.inner.select(hash);
285
0
        OwnedKeyEntrySelector::new(key, hash, cache)
286
0
    }
287
288
0
    pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
289
0
    where
290
0
        K: Borrow<Q>,
291
0
        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
292
    {
293
0
        let hash = self.inner.hash(key);
294
0
        let cache = self.inner.select(hash);
295
0
        RefKeyEntrySelector::new(key, hash, cache)
296
0
    }
297
298
    /// TODO: Remove this in v0.13.0.
299
    /// Deprecated, replaced with [`get_with`](#method.get_with)
300
    #[deprecated(since = "0.8.0", note = "Replaced with `get_with`")]
301
0
    pub fn get_or_insert_with(&self, key: K, init: impl FnOnce() -> V) -> V {
302
0
        self.get_with(key, init)
303
0
    }
304
305
    /// TODO: Remove this in v0.13.0.
306
    /// Deprecated, replaced with [`try_get_with`](#method.try_get_with)
307
    #[deprecated(since = "0.8.0", note = "Replaced with `try_get_with`")]
308
0
    pub fn get_or_try_insert_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
309
0
    where
310
0
        F: FnOnce() -> Result<V, E>,
311
0
        E: Send + Sync + 'static,
312
    {
313
0
        self.try_get_with(key, init)
314
0
    }
315
316
    /// Returns a _clone_ of the value corresponding to the key. If the value does
317
    /// not exist, evaluates the `init` closure and inserts the output.
318
    ///
319
    /// # Concurrent calls on the same key
320
    ///
321
    /// This method guarantees that concurrent calls on the same not-existing key are
322
    /// coalesced into one evaluation of the `init` closure. Only one of the calls
323
    /// evaluates its closure, and other calls wait for that closure to complete. See
324
    /// [`Cache::get_with`][get-with-method] for more details.
325
    ///
326
    /// [get-with-method]: ./struct.Cache.html#method.get_with
327
0
    pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V {
328
0
        let hash = self.inner.hash(&key);
329
0
        let key = Arc::new(key);
330
0
        let replace_if = None as Option<fn(&V) -> bool>;
331
0
        self.inner
332
0
            .select(hash)
333
0
            .get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
334
0
            .into_value()
335
0
    }
336
337
    /// Similar to [`get_with`](#method.get_with), but instead of passing an owned
338
    /// key, you can pass a reference to the key. If the key does not exist in the
339
    /// cache, the key will be cloned to create new entry in the cache.
340
0
    pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
341
0
    where
342
0
        K: Borrow<Q>,
343
0
        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
344
    {
345
0
        let hash = self.inner.hash(key);
346
0
        let replace_if = None as Option<fn(&V) -> bool>;
347
0
        self.inner
348
0
            .select(hash)
349
0
            .get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
350
0
            .into_value()
351
0
    }
352
353
    /// Works like [`get_with`](#method.get_with), but takes an additional
354
    /// `replace_if` closure.
355
    ///
356
    /// This method will evaluate the `init` closure and insert the output to the
357
    /// cache when:
358
    ///
359
    /// - The key does not exist.
360
    /// - Or, `replace_if` closure returns `true`.
361
0
    pub fn get_with_if(
362
0
        &self,
363
0
        key: K,
364
0
        init: impl FnOnce() -> V,
365
0
        replace_if: impl FnMut(&V) -> bool,
366
0
    ) -> V {
367
0
        let hash = self.inner.hash(&key);
368
0
        let key = Arc::new(key);
369
0
        self.inner
370
0
            .select(hash)
371
0
            .get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
372
0
            .into_value()
373
0
    }
374
375
    /// Returns a _clone_ of the value corresponding to the key. If the value does
376
    /// not exist, evaluates the `init` closure, and inserts the value if
377
    /// `Some(value)` was returned. If `None` was returned from the closure, this
378
    /// method does not insert a value and returns `None`.
379
    ///
380
    /// # Concurrent calls on the same key
381
    ///
382
    /// This method guarantees that concurrent calls on the same not-existing key are
383
    /// coalesced into one evaluation of the `init` closure. Only one of the calls
384
    /// evaluates its closure, and other calls wait for that closure to complete.
385
    /// See [`Cache::optionally_get_with`][opt-get-with-method] for more details.
386
    ///
387
    /// [opt-get-with-method]: ./struct.Cache.html#method.optionally_get_with
388
0
    pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
389
0
    where
390
0
        F: FnOnce() -> Option<V>,
391
    {
392
0
        let hash = self.inner.hash(&key);
393
0
        let key = Arc::new(key);
394
0
        self.inner
395
0
            .select(hash)
396
0
            .get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
397
0
            .map(Entry::into_value)
398
0
    }
399
400
    /// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead
401
    /// of passing an owned key, you can pass a reference to the key. If the key does
402
    /// not exist in the cache, the key will be cloned to create new entry in the
403
    /// cache.
404
0
    pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
405
0
    where
406
0
        F: FnOnce() -> Option<V>,
407
0
        K: Borrow<Q>,
408
0
        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
409
    {
410
0
        let hash = self.inner.hash(key);
411
0
        self.inner
412
0
            .select(hash)
413
0
            .get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
414
0
            .map(Entry::into_value)
415
0
    }
416
417
    /// Returns a _clone_ of the value corresponding to the key. If the value does
418
    /// not exist, evaluates the `init` closure, and inserts the value if `Ok(value)`
419
    /// was returned. If `Err(_)` was returned from the closure, this method does not
420
    /// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc].
421
    ///
422
    /// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
423
    ///
424
    /// # Concurrent calls on the same key
425
    ///
426
    /// This method guarantees that concurrent calls on the same not-existing key are
427
    /// coalesced into one evaluation of the `init` closure (as long as these
428
    /// closures return the same error type). Only one of the calls evaluates its
429
    /// closure, and other calls wait for that closure to complete. See
430
    /// [`Cache::try_get_with`][try-get-with-method] for more details.
431
    ///
432
    /// [try-get-with-method]: ./struct.Cache.html#method.try_get_with
433
0
    pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
434
0
    where
435
0
        F: FnOnce() -> Result<V, E>,
436
0
        E: Send + Sync + 'static,
437
    {
438
0
        let hash = self.inner.hash(&key);
439
0
        let key = Arc::new(key);
440
0
        self.inner
441
0
            .select(hash)
442
0
            .get_or_try_insert_with_hash_and_fun(key, hash, init, false)
443
0
            .map(Entry::into_value)
444
0
    }
445
446
    /// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an
447
    /// owned key, you can pass a reference to the key. If the key does not exist in
448
    /// the cache, the key will be cloned to create new entry in the cache.
449
0
    pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
450
0
    where
451
0
        F: FnOnce() -> Result<V, E>,
452
0
        E: Send + Sync + 'static,
453
0
        K: Borrow<Q>,
454
0
        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
455
    {
456
0
        let hash = self.inner.hash(key);
457
0
        self.inner
458
0
            .select(hash)
459
0
            .get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
460
0
            .map(Entry::into_value)
461
0
    }
462
463
    /// Inserts a key-value pair into the cache.
464
    ///
465
    /// If the cache has this key present, the value is updated.
466
0
    pub fn insert(&self, key: K, value: V) {
467
0
        let hash = self.inner.hash(&key);
468
0
        let key = Arc::new(key);
469
0
        self.inner.select(hash).insert_with_hash(key, hash, value);
470
0
    }
471
472
    /// Discards any cached value for the key.
473
    ///
474
    /// If you need to get a the value that has been discarded, use the
475
    /// [`remove`](#method.remove) method instead.
476
    ///
477
    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
478
    /// on the borrowed form _must_ match those for the key type.
479
0
    pub fn invalidate<Q>(&self, key: &Q)
480
0
    where
481
0
        K: Borrow<Q>,
482
0
        Q: Hash + Eq + ?Sized,
483
    {
484
0
        let hash = self.inner.hash(key);
485
0
        self.inner
486
0
            .select(hash)
487
0
            .invalidate_with_hash(key, hash, false);
488
0
    }
489
490
    /// Discards any cached value for the key and returns a clone of the value.
491
    ///
492
    /// If you do not need to get the value that has been discarded, use the
493
    /// [`invalidate`](#method.invalidate) method instead.
494
    ///
495
    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
496
    /// on the borrowed form _must_ match those for the key type.
497
0
    pub fn remove<Q>(&self, key: &Q) -> Option<V>
498
0
    where
499
0
        K: Borrow<Q>,
500
0
        Q: Hash + Eq + ?Sized,
501
    {
502
0
        let hash = self.inner.hash(key);
503
0
        self.inner
504
0
            .select(hash)
505
0
            .invalidate_with_hash(key, hash, true)
506
0
    }
507
508
    /// Discards all cached values.
509
    ///
510
    /// This method returns immediately by just setting the current time as the
511
    /// invalidation time. `get` and other retrieval methods are guaranteed not to
512
    /// return the entries inserted before or at the invalidation time.
513
    ///
514
    /// The actual removal of the invalidated entries is done as a maintenance task
515
    /// driven by a user thread. For more details, see
516
    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
517
    /// level documentation.
518
    ///
519
    /// Like the `invalidate` method, this method does not clear the historic
520
    /// popularity estimator of keys so that it retains the client activities of
521
    /// trying to retrieve an item.
522
0
    pub fn invalidate_all(&self) {
523
0
        for segment in self.inner.segments.iter() {
524
0
            segment.invalidate_all();
525
0
        }
526
0
    }
527
528
    /// Discards cached values that satisfy a predicate.
529
    ///
530
    /// `invalidate_entries_if` takes a closure that returns `true` or `false`. The
531
    /// closure is called against each cached entry inserted before or at the time
532
    /// when this method was called. If the closure returns `true` that entry will be
533
    /// evicted from the cache.
534
    ///
535
    /// This method returns immediately by not actually removing the invalidated
536
    /// entries. Instead, it just sets the predicate to the cache with the time when
537
    /// this method was called. The actual removal of the invalidated entries is done
538
    /// as a maintenance task driven by a user thread. For more details, see
539
    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
540
    /// level documentation.
541
    ///
542
    /// Also the `get` and other retrieval methods will apply the closure to a cached
543
    /// entry to determine if it should have been invalidated. Therefore, it is
544
    /// guaranteed that these methods must not return invalidated values.
545
    ///
546
    /// Note that you must call
547
    /// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures]
548
    /// at the cache creation time as the cache needs to maintain additional internal
549
    /// data structures to support this method. Otherwise, calling this method will
550
    /// fail with a
551
    /// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error].
552
    ///
553
    /// Like the `invalidate` method, this method does not clear the historic
554
    /// popularity estimator of keys so that it retains the client activities of
555
    /// trying to retrieve an item.
556
    ///
557
    /// [support-invalidation-closures]:
558
    ///     ./struct.CacheBuilder.html#method.support_invalidation_closures
559
    /// [invalidation-disabled-error]:
560
    ///     ../enum.PredicateError.html#variant.InvalidationClosuresDisabled
561
0
    pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<(), PredicateError>
562
0
    where
563
0
        F: Fn(&K, &V) -> bool + Send + Sync + 'static,
564
    {
565
0
        let pred = Arc::new(predicate);
566
0
        for segment in self.inner.segments.iter() {
567
0
            segment.invalidate_entries_with_arc_fun(Arc::clone(&pred))?;
568
        }
569
0
        Ok(())
570
0
    }
571
572
    /// Creates an iterator visiting all key-value pairs in arbitrary order. The
573
    /// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored
574
    /// value.
575
    ///
576
    /// Iterators do not block concurrent reads and writes on the cache. An entry can
577
    /// be inserted to, invalidated or evicted from a cache while iterators are alive
578
    /// on the same cache.
579
    ///
580
    /// Unlike the `get` method, visiting entries via an iterator do not update the
581
    /// historic popularity estimator or reset idle timers for keys.
582
    ///
583
    /// # Guarantees
584
    ///
585
    /// In order to allow concurrent access to the cache, iterator's `next` method
586
    /// does _not_ guarantee the following:
587
    ///
588
    /// - It does not guarantee to return a key-value pair (an entry) if its key has
589
    ///   been inserted to the cache _after_ the iterator was created.
590
    ///   - Such an entry may or may not be returned depending on key's hash and
591
    ///     timing.
592
    ///
593
    /// and the `next` method guarantees the followings:
594
    ///
595
    /// - It guarantees not to return the same entry more than once.
596
    /// - It guarantees not to return an entry if it has been removed from the cache
597
    ///   after the iterator was created.
598
    ///     - Note: An entry can be removed by following reasons:
599
    ///         - Manually invalidated.
600
    ///         - Expired (e.g. time-to-live).
601
    ///         - Evicted as the cache capacity exceeded.
602
    ///
603
    /// # Examples
604
    ///
605
    /// ```rust
606
    /// use moka::sync::SegmentedCache;
607
    ///
608
    /// let cache = SegmentedCache::new(100, 4);
609
    /// cache.insert("Julia", 14);
610
    ///
611
    /// let mut iter = cache.iter();
612
    /// let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
613
    /// assert_eq!(*k, "Julia");
614
    /// assert_eq!(v, 14);
615
    ///
616
    /// assert!(iter.next().is_none());
617
    /// ```
618
    ///
619
0
    pub fn iter(&self) -> Iter<'_, K, V> {
620
0
        let num_cht_segments = self.inner.segments[0].num_cht_segments();
621
0
        let segments = self
622
0
            .inner
623
0
            .segments
624
0
            .iter()
625
0
            .map(|c| c as &dyn ScanningGet<_, _>)
626
0
            .collect::<Vec<_>>()
627
0
            .into_boxed_slice();
628
0
        Iter::with_multiple_cache_segments(segments, num_cht_segments)
629
0
    }
630
631
    /// Performs any pending maintenance operations needed by the cache.
632
0
    pub fn run_pending_tasks(&self) {
633
0
        for segment in self.inner.segments.iter() {
634
0
            segment.run_pending_tasks();
635
0
        }
636
0
    }
637
638
    // /// This is used by unit tests to get consistent result.
639
    // #[cfg(test)]
640
    // pub(crate) fn reconfigure_for_testing(&mut self) {
641
    //     // Stop the housekeeping job that may cause sync() method to return earlier.
642
    //     for segment in self.inner.segments.iter_mut() {
643
    //         segment.reconfigure_for_testing()
644
    //     }
645
    // }
646
}
647
648
impl<'a, K, V, S> IntoIterator for &'a SegmentedCache<K, V, S>
649
where
650
    K: Hash + Eq + Send + Sync + 'static,
651
    V: Clone + Send + Sync + 'static,
652
    S: BuildHasher + Clone + Send + Sync + 'static,
653
{
654
    type Item = (Arc<K>, V);
655
656
    type IntoIter = Iter<'a, K, V>;
657
658
0
    fn into_iter(self) -> Self::IntoIter {
659
0
        self.iter()
660
0
    }
661
}
662
663
// For unit tests.
664
#[cfg(test)]
665
impl<K, V, S> SegmentedCache<K, V, S> {
666
    fn is_waiter_map_empty(&self) -> bool {
667
        self.inner.segments.iter().all(Cache::is_waiter_map_empty)
668
    }
669
}
670
671
#[cfg(test)]
672
impl<K, V, S> SegmentedCache<K, V, S>
673
where
674
    K: Hash + Eq + Send + Sync + 'static,
675
    V: Clone + Send + Sync + 'static,
676
    S: BuildHasher + Clone + Send + Sync + 'static,
677
{
678
    fn invalidation_predicate_count(&self) -> usize {
679
        self.inner
680
            .segments
681
            .iter()
682
            .map(|seg| seg.invalidation_predicate_count())
683
            .sum()
684
    }
685
686
    fn reconfigure_for_testing(&mut self) {
687
        let inner = Arc::get_mut(&mut self.inner)
688
            .expect("There are other strong reference to self.inner Arc");
689
690
        for segment in inner.segments.iter_mut() {
691
            segment.reconfigure_for_testing();
692
        }
693
    }
694
695
    fn key_locks_map_is_empty(&self) -> bool {
696
        self.inner
697
            .segments
698
            .iter()
699
            .all(|seg| seg.key_locks_map_is_empty())
700
    }
701
}
702
703
struct Inner<K, V, S> {
704
    desired_capacity: Option<u64>,
705
    segments: Box<[Cache<K, V, S>]>,
706
    build_hasher: S,
707
    segment_shift: u32,
708
}
709
710
impl<K, V, S> Inner<K, V, S>
711
where
712
    K: Hash + Eq + Send + Sync + 'static,
713
    V: Clone + Send + Sync + 'static,
714
    S: BuildHasher + Clone + Send + Sync + 'static,
715
{
716
    /// # Panics
717
    ///
718
    /// Panics if `num_segments` is 0.
719
    #[allow(clippy::too_many_arguments)]
720
0
    fn new(
721
0
        name: Option<String>,
722
0
        max_capacity: Option<u64>,
723
0
        initial_capacity: Option<usize>,
724
0
        num_segments: usize,
725
0
        build_hasher: S,
726
0
        weigher: Option<Weigher<K, V>>,
727
0
        eviction_policy: EvictionPolicy,
728
0
        eviction_listener: Option<EvictionListener<K, V>>,
729
0
        expiration_policy: ExpirationPolicy<K, V>,
730
0
        housekeeper_config: HousekeeperConfig,
731
0
        invalidator_enabled: bool,
732
0
        clock: Clock,
733
0
    ) -> Self {
734
0
        assert!(num_segments > 0);
735
736
0
        let actual_num_segments = num_segments.next_power_of_two();
737
0
        let segment_shift = 64 - actual_num_segments.trailing_zeros();
738
0
        let seg_max_capacity =
739
0
            max_capacity.map(|n| (n as f64 / actual_num_segments as f64).ceil() as u64);
740
0
        let seg_init_capacity =
741
0
            initial_capacity.map(|cap| (cap as f64 / actual_num_segments as f64).ceil() as usize);
742
        // NOTE: We cannot initialize the segments as `vec![cache; actual_num_segments]`
743
        // because Cache::clone() does not clone its inner but shares the same inner.
744
0
        let segments = (0..actual_num_segments)
745
0
            .map(|_| {
746
0
                Cache::with_everything(
747
0
                    name.clone(),
748
0
                    seg_max_capacity,
749
0
                    seg_init_capacity,
750
0
                    build_hasher.clone(),
751
0
                    weigher.clone(),
752
0
                    eviction_policy.clone(),
753
0
                    eviction_listener.clone(),
754
0
                    expiration_policy.clone(),
755
0
                    housekeeper_config.clone(),
756
0
                    invalidator_enabled,
757
0
                    clock.clone(),
758
                )
759
0
            })
760
0
            .collect::<Vec<_>>();
761
762
0
        Self {
763
0
            desired_capacity: max_capacity,
764
0
            segments: segments.into_boxed_slice(),
765
0
            build_hasher,
766
0
            segment_shift,
767
0
        }
768
0
    }
769
770
    #[inline]
771
0
    fn hash<Q>(&self, key: &Q) -> u64
772
0
    where
773
0
        K: Borrow<Q>,
774
0
        Q: Hash + Eq + ?Sized,
775
    {
776
0
        let mut hasher = self.build_hasher.build_hasher();
777
0
        key.hash(&mut hasher);
778
0
        hasher.finish()
779
0
    }
780
781
    #[inline]
782
0
    fn select(&self, hash: u64) -> &Cache<K, V, S> {
783
0
        let index = self.segment_index_from_hash(hash);
784
0
        &self.segments[index]
785
0
    }
786
787
    #[inline]
788
0
    fn segment_index_from_hash(&self, hash: u64) -> usize {
789
0
        if self.segment_shift == 64 {
790
0
            0
791
        } else {
792
0
            (hash >> self.segment_shift) as usize
793
        }
794
0
    }
795
}
796
797
#[cfg(test)]
798
mod tests {
799
    use super::SegmentedCache;
800
    use crate::notification::RemovalCause;
801
    use parking_lot::Mutex;
802
    use std::{sync::Arc, time::Duration};
803
804
    #[test]
805
    fn max_capacity_zero() {
806
        let mut cache = SegmentedCache::new(0, 1);
807
        cache.reconfigure_for_testing();
808
809
        // Make the cache exterior immutable.
810
        let cache = cache;
811
812
        cache.insert(0, ());
813
814
        assert!(!cache.contains_key(&0));
815
        assert!(cache.get(&0).is_none());
816
        cache.run_pending_tasks();
817
        assert!(!cache.contains_key(&0));
818
        assert!(cache.get(&0).is_none());
819
        assert_eq!(cache.entry_count(), 0)
820
    }
821
822
    #[test]
823
    fn basic_single_thread() {
824
        // The following `Vec`s will hold actual and expected notifications.
825
        let actual = Arc::new(Mutex::new(Vec::new()));
826
        let mut expected = Vec::new();
827
828
        // Create an eviction listener.
829
        let a1 = Arc::clone(&actual);
830
        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
831
832
        // Create a cache with the eviction listener.
833
        let mut cache = SegmentedCache::builder(1)
834
            .max_capacity(3)
835
            .eviction_listener(listener)
836
            .build();
837
        cache.reconfigure_for_testing();
838
839
        // Make the cache exterior immutable.
840
        let cache = cache;
841
842
        cache.insert("a", "alice");
843
        cache.insert("b", "bob");
844
        assert_eq!(cache.get(&"a"), Some("alice"));
845
        assert!(cache.contains_key(&"a"));
846
        assert!(cache.contains_key(&"b"));
847
        assert_eq!(cache.get(&"b"), Some("bob"));
848
        cache.run_pending_tasks();
849
        // counts: a -> 1, b -> 1
850
851
        cache.insert("c", "cindy");
852
        assert_eq!(cache.get(&"c"), Some("cindy"));
853
        assert!(cache.contains_key(&"c"));
854
        // counts: a -> 1, b -> 1, c -> 1
855
        cache.run_pending_tasks();
856
857
        assert!(cache.contains_key(&"a"));
858
        assert_eq!(cache.get(&"a"), Some("alice"));
859
        assert_eq!(cache.get(&"b"), Some("bob"));
860
        assert!(cache.contains_key(&"b"));
861
        cache.run_pending_tasks();
862
        // counts: a -> 2, b -> 2, c -> 1
863
864
        // "d" should not be admitted because its frequency is too low.
865
        cache.insert("d", "david"); //   count: d -> 0
866
        expected.push((Arc::new("d"), "david", RemovalCause::Size));
867
        cache.run_pending_tasks();
868
        assert_eq!(cache.get(&"d"), None); //   d -> 1
869
        assert!(!cache.contains_key(&"d"));
870
871
        cache.insert("d", "david");
872
        expected.push((Arc::new("d"), "david", RemovalCause::Size));
873
        cache.run_pending_tasks();
874
        assert!(!cache.contains_key(&"d"));
875
        assert_eq!(cache.get(&"d"), None); //   d -> 2
876
877
        // "d" should be admitted and "c" should be evicted
878
        // because d's frequency is higher than c's.
879
        cache.insert("d", "dennis");
880
        expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
881
        cache.run_pending_tasks();
882
        assert_eq!(cache.get(&"a"), Some("alice"));
883
        assert_eq!(cache.get(&"b"), Some("bob"));
884
        assert_eq!(cache.get(&"c"), None);
885
        assert_eq!(cache.get(&"d"), Some("dennis"));
886
        assert!(cache.contains_key(&"a"));
887
        assert!(cache.contains_key(&"b"));
888
        assert!(!cache.contains_key(&"c"));
889
        assert!(cache.contains_key(&"d"));
890
891
        cache.invalidate(&"b");
892
        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
893
        cache.run_pending_tasks();
894
        assert_eq!(cache.get(&"b"), None);
895
        assert!(!cache.contains_key(&"b"));
896
897
        assert!(cache.remove(&"b").is_none());
898
        assert_eq!(cache.remove(&"d"), Some("dennis"));
899
        expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
900
        cache.run_pending_tasks();
901
        assert_eq!(cache.get(&"d"), None);
902
        assert!(!cache.contains_key(&"d"));
903
904
        verify_notification_vec(&cache, actual, &expected);
905
        assert!(cache.key_locks_map_is_empty());
906
    }
907
908
    #[test]
909
    fn non_power_of_two_segments() {
910
        let mut cache = SegmentedCache::new(100, 5);
911
        cache.reconfigure_for_testing();
912
913
        // Make the cache exterior immutable.
914
        let cache = cache;
915
916
        assert_eq!(cache.iter().count(), 0);
917
918
        cache.insert("a", "alice");
919
        cache.insert("b", "bob");
920
        cache.insert("c", "cindy");
921
922
        assert_eq!(cache.iter().count(), 3);
923
        cache.run_pending_tasks();
924
        assert_eq!(cache.iter().count(), 3);
925
    }
926
927
    #[test]
928
    fn size_aware_eviction() {
929
        let weigher = |_k: &&str, v: &(&str, u32)| v.1;
930
931
        let alice = ("alice", 10);
932
        let bob = ("bob", 15);
933
        let bill = ("bill", 20);
934
        let cindy = ("cindy", 5);
935
        let david = ("david", 15);
936
        let dennis = ("dennis", 15);
937
938
        // The following `Vec`s will hold actual and expected notifications.
939
        let actual = Arc::new(Mutex::new(Vec::new()));
940
        let mut expected = Vec::new();
941
942
        // Create an eviction listener.
943
        let a1 = Arc::clone(&actual);
944
        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
945
946
        // Create a cache with the eviction listener.
947
        let mut cache = SegmentedCache::builder(1)
948
            .max_capacity(31)
949
            .weigher(weigher)
950
            .eviction_listener(listener)
951
            .build();
952
        cache.reconfigure_for_testing();
953
954
        // Make the cache exterior immutable.
955
        let cache = cache;
956
957
        cache.insert("a", alice);
958
        cache.insert("b", bob);
959
        assert_eq!(cache.get(&"a"), Some(alice));
960
        assert!(cache.contains_key(&"a"));
961
        assert!(cache.contains_key(&"b"));
962
        assert_eq!(cache.get(&"b"), Some(bob));
963
        cache.run_pending_tasks();
964
        // order (LRU -> MRU) and counts: a -> 1, b -> 1
965
966
        cache.insert("c", cindy);
967
        assert_eq!(cache.get(&"c"), Some(cindy));
968
        assert!(cache.contains_key(&"c"));
969
        // order and counts: a -> 1, b -> 1, c -> 1
970
        cache.run_pending_tasks();
971
972
        assert!(cache.contains_key(&"a"));
973
        assert_eq!(cache.get(&"a"), Some(alice));
974
        assert_eq!(cache.get(&"b"), Some(bob));
975
        assert!(cache.contains_key(&"b"));
976
        cache.run_pending_tasks();
977
        // order and counts: c -> 1, a -> 2, b -> 2
978
979
        // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10).
980
        // "d" must have higher count than 3, which is the aggregated count
981
        // of "a" and "c".
982
        cache.insert("d", david); //   count: d -> 0
983
        expected.push((Arc::new("d"), david, RemovalCause::Size));
984
        cache.run_pending_tasks();
985
        assert_eq!(cache.get(&"d"), None); //   d -> 1
986
        assert!(!cache.contains_key(&"d"));
987
988
        cache.insert("d", david);
989
        expected.push((Arc::new("d"), david, RemovalCause::Size));
990
        cache.run_pending_tasks();
991
        assert!(!cache.contains_key(&"d"));
992
        assert_eq!(cache.get(&"d"), None); //   d -> 2
993
994
        cache.insert("d", david);
995
        expected.push((Arc::new("d"), david, RemovalCause::Size));
996
        cache.run_pending_tasks();
997
        assert_eq!(cache.get(&"d"), None); //   d -> 3
998
        assert!(!cache.contains_key(&"d"));
999
1000
        cache.insert("d", david);
1001
        expected.push((Arc::new("d"), david, RemovalCause::Size));
1002
        cache.run_pending_tasks();
1003
        assert!(!cache.contains_key(&"d"));
1004
        assert_eq!(cache.get(&"d"), None); //   d -> 4
1005
1006
        // Finally "d" should be admitted by evicting "c" and "a".
1007
        cache.insert("d", dennis);
1008
        expected.push((Arc::new("c"), cindy, RemovalCause::Size));
1009
        expected.push((Arc::new("a"), alice, RemovalCause::Size));
1010
        cache.run_pending_tasks();
1011
        assert_eq!(cache.get(&"a"), None);
1012
        assert_eq!(cache.get(&"b"), Some(bob));
1013
        assert_eq!(cache.get(&"c"), None);
1014
        assert_eq!(cache.get(&"d"), Some(dennis));
1015
        assert!(!cache.contains_key(&"a"));
1016
        assert!(cache.contains_key(&"b"));
1017
        assert!(!cache.contains_key(&"c"));
1018
        assert!(cache.contains_key(&"d"));
1019
1020
        // Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15).
1021
        cache.insert("b", bill);
1022
        expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
1023
        expected.push((Arc::new("d"), dennis, RemovalCause::Size));
1024
        cache.run_pending_tasks();
1025
        assert_eq!(cache.get(&"b"), Some(bill));
1026
        assert_eq!(cache.get(&"d"), None);
1027
        assert!(cache.contains_key(&"b"));
1028
        assert!(!cache.contains_key(&"d"));
1029
1030
        // Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15).
1031
        cache.insert("a", alice);
1032
        cache.insert("b", bob);
1033
        expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
1034
        cache.run_pending_tasks();
1035
        assert_eq!(cache.get(&"a"), Some(alice));
1036
        assert_eq!(cache.get(&"b"), Some(bob));
1037
        assert_eq!(cache.get(&"d"), None);
1038
        assert!(cache.contains_key(&"a"));
1039
        assert!(cache.contains_key(&"b"));
1040
        assert!(!cache.contains_key(&"d"));
1041
1042
        // Verify the sizes.
1043
        assert_eq!(cache.entry_count(), 2);
1044
        assert_eq!(cache.weighted_size(), 25);
1045
1046
        verify_notification_vec(&cache, actual, &expected);
1047
        assert!(cache.key_locks_map_is_empty());
1048
    }
1049
1050
    #[test]
1051
    fn basic_multi_threads() {
1052
        let num_threads = 4;
1053
1054
        let mut cache = SegmentedCache::new(100, num_threads);
1055
        cache.reconfigure_for_testing();
1056
1057
        // Make the cache exterior immutable.
1058
        let cache = cache;
1059
1060
        // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
1061
        #[allow(clippy::needless_collect)]
1062
        let handles = (0..num_threads)
1063
            .map(|id| {
1064
                let cache = cache.clone();
1065
                std::thread::spawn(move || {
1066
                    cache.insert(10, format!("{id}-100"));
1067
                    cache.get(&10);
1068
                    cache.run_pending_tasks();
1069
                    cache.insert(20, format!("{id}-200"));
1070
                    cache.invalidate(&10);
1071
                })
1072
            })
1073
            .collect::<Vec<_>>();
1074
1075
        handles.into_iter().for_each(|h| h.join().expect("Failed"));
1076
1077
        cache.run_pending_tasks();
1078
1079
        assert!(cache.get(&10).is_none());
1080
        assert!(cache.get(&20).is_some());
1081
        assert!(!cache.contains_key(&10));
1082
        assert!(cache.contains_key(&20));
1083
    }
1084
1085
    #[test]
1086
    fn invalidate_all() {
1087
        use std::collections::HashMap;
1088
1089
        // The following `HashMap`s will hold actual and expected notifications.
1090
        // Note: We use `HashMap` here as the order of invalidations is non-deterministic.
1091
        let actual = Arc::new(Mutex::new(HashMap::new()));
1092
        let mut expected = HashMap::new();
1093
1094
        // Create an eviction listener.
1095
        let a1 = Arc::clone(&actual);
1096
        let listener = move |k, v, cause| {
1097
            a1.lock().insert(k, (v, cause));
1098
        };
1099
1100
        // Create a cache with the eviction listener.
1101
        let mut cache = SegmentedCache::builder(4)
1102
            .max_capacity(100)
1103
            .eviction_listener(listener)
1104
            .build();
1105
        cache.reconfigure_for_testing();
1106
1107
        // Make the cache exterior immutable.
1108
        let cache = cache;
1109
1110
        cache.insert("a", "alice");
1111
        cache.insert("b", "bob");
1112
        cache.insert("c", "cindy");
1113
        assert_eq!(cache.get(&"a"), Some("alice"));
1114
        assert_eq!(cache.get(&"b"), Some("bob"));
1115
        assert_eq!(cache.get(&"c"), Some("cindy"));
1116
        assert!(cache.contains_key(&"a"));
1117
        assert!(cache.contains_key(&"b"));
1118
        assert!(cache.contains_key(&"c"));
1119
1120
        // `cache.run_pending_tasks()` is no longer needed here before invalidating. The last
1121
        // modified timestamp of the entries were updated when they were inserted.
1122
        // https://github.com/moka-rs/moka/issues/155
1123
1124
        cache.invalidate_all();
1125
        expected.insert(Arc::new("a"), ("alice", RemovalCause::Explicit));
1126
        expected.insert(Arc::new("b"), ("bob", RemovalCause::Explicit));
1127
        expected.insert(Arc::new("c"), ("cindy", RemovalCause::Explicit));
1128
        cache.run_pending_tasks();
1129
1130
        cache.insert("d", "david");
1131
        cache.run_pending_tasks();
1132
1133
        assert!(cache.get(&"a").is_none());
1134
        assert!(cache.get(&"b").is_none());
1135
        assert!(cache.get(&"c").is_none());
1136
        assert_eq!(cache.get(&"d"), Some("david"));
1137
        assert!(!cache.contains_key(&"a"));
1138
        assert!(!cache.contains_key(&"b"));
1139
        assert!(!cache.contains_key(&"c"));
1140
        assert!(cache.contains_key(&"d"));
1141
1142
        verify_notification_map(&cache, actual, &expected);
1143
    }
1144
1145
    #[test]
1146
    fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
1147
        use std::collections::{HashMap, HashSet};
1148
1149
        const SEGMENTS: usize = 4;
1150
1151
        // The following `HashMap`s will hold actual and expected notifications.
1152
        // Note: We use `HashMap` here as the order of invalidations is non-deterministic.
1153
        let actual = Arc::new(Mutex::new(HashMap::new()));
1154
        let mut expected = HashMap::new();
1155
1156
        // Create an eviction listener.
1157
        let a1 = Arc::clone(&actual);
1158
        let listener = move |k, v, cause| {
1159
            a1.lock().insert(k, (v, cause));
1160
        };
1161
1162
        let (clock, mock) = crate::common::time::Clock::mock();
1163
1164
        // Create a cache with the eviction listener.
1165
        let mut cache = SegmentedCache::builder(SEGMENTS)
1166
            .max_capacity(100)
1167
            .support_invalidation_closures()
1168
            .eviction_listener(listener)
1169
            .clock(clock)
1170
            .build();
1171
        cache.reconfigure_for_testing();
1172
1173
        // Make the cache exterior immutable.
1174
        let cache = cache;
1175
1176
        cache.insert(0, "alice");
1177
        cache.insert(1, "bob");
1178
        cache.insert(2, "alex");
1179
        cache.run_pending_tasks();
1180
        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
1181
        cache.run_pending_tasks();
1182
1183
        assert_eq!(cache.get(&0), Some("alice"));
1184
        assert_eq!(cache.get(&1), Some("bob"));
1185
        assert_eq!(cache.get(&2), Some("alex"));
1186
        assert!(cache.contains_key(&0));
1187
        assert!(cache.contains_key(&1));
1188
        assert!(cache.contains_key(&2));
1189
1190
        let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
1191
        cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
1192
        assert_eq!(cache.invalidation_predicate_count(), SEGMENTS);
1193
        expected.insert(Arc::new(0), ("alice", RemovalCause::Explicit));
1194
        expected.insert(Arc::new(2), ("alex", RemovalCause::Explicit));
1195
1196
        mock.increment(Duration::from_secs(5)); // 10 secs from the start.
1197
1198
        cache.insert(3, "alice");
1199
1200
        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
1201
        cache.run_pending_tasks(); // To submit the invalidation task.
1202
        std::thread::sleep(Duration::from_millis(200));
1203
        cache.run_pending_tasks(); // To process the task result.
1204
        std::thread::sleep(Duration::from_millis(200));
1205
1206
        assert!(cache.get(&0).is_none());
1207
        assert!(cache.get(&2).is_none());
1208
        assert_eq!(cache.get(&1), Some("bob"));
1209
        // This should survive as it was inserted after calling invalidate_entries_if.
1210
        assert_eq!(cache.get(&3), Some("alice"));
1211
1212
        assert!(!cache.contains_key(&0));
1213
        assert!(cache.contains_key(&1));
1214
        assert!(!cache.contains_key(&2));
1215
        assert!(cache.contains_key(&3));
1216
1217
        assert_eq!(cache.entry_count(), 2);
1218
        assert_eq!(cache.invalidation_predicate_count(), 0);
1219
1220
        mock.increment(Duration::from_secs(5)); // 15 secs from the start.
1221
1222
        cache.invalidate_entries_if(|_k, &v| v == "alice")?;
1223
        cache.invalidate_entries_if(|_k, &v| v == "bob")?;
1224
        assert_eq!(cache.invalidation_predicate_count(), SEGMENTS * 2);
1225
        expected.insert(Arc::new(1), ("bob", RemovalCause::Explicit));
1226
        expected.insert(Arc::new(3), ("alice", RemovalCause::Explicit));
1227
1228
        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
1229
        cache.run_pending_tasks(); // To submit the invalidation task.
1230
        std::thread::sleep(Duration::from_millis(200));
1231
        cache.run_pending_tasks(); // To process the task result.
1232
        std::thread::sleep(Duration::from_millis(200));
1233
1234
        assert!(cache.get(&1).is_none());
1235
        assert!(cache.get(&3).is_none());
1236
1237
        assert!(!cache.contains_key(&1));
1238
        assert!(!cache.contains_key(&3));
1239
1240
        assert_eq!(cache.entry_count(), 0);
1241
        assert_eq!(cache.invalidation_predicate_count(), 0);
1242
1243
        verify_notification_map(&cache, actual, &expected);
1244
1245
        Ok(())
1246
    }
1247
1248
    #[test]
1249
    fn test_iter() {
1250
        const NUM_KEYS: usize = 50;
1251
1252
        fn make_value(key: usize) -> String {
1253
            format!("val: {key}")
1254
        }
1255
1256
        // let cache = SegmentedCache::builder(5)
1257
        let cache = SegmentedCache::builder(4)
1258
            .max_capacity(100)
1259
            .time_to_idle(Duration::from_secs(10))
1260
            .build();
1261
1262
        for key in 0..NUM_KEYS {
1263
            cache.insert(key, make_value(key));
1264
        }
1265
1266
        let mut key_set = std::collections::HashSet::new();
1267
1268
        for (key, value) in &cache {
1269
            assert_eq!(value, make_value(*key));
1270
1271
            key_set.insert(*key);
1272
        }
1273
1274
        // Ensure there are no missing or duplicate keys in the iteration.
1275
        assert_eq!(key_set.len(), NUM_KEYS);
1276
    }
1277
1278
    /// Runs 16 threads at the same time and ensures no deadlock occurs.
1279
    ///
1280
    /// - Eight of the threads will update key-values in the cache.
1281
    /// - Eight others will iterate the cache.
1282
    ///
1283
    #[test]
1284
    fn test_iter_multi_threads() {
1285
        use std::collections::HashSet;
1286
1287
        const NUM_KEYS: usize = 1024;
1288
        const NUM_THREADS: usize = 16;
1289
1290
        fn make_value(key: usize) -> String {
1291
            format!("val: {key}")
1292
        }
1293
1294
        let cache = SegmentedCache::builder(4)
1295
            .max_capacity(2048)
1296
            .time_to_idle(Duration::from_secs(10))
1297
            .build();
1298
1299
        // Initialize the cache.
1300
        for key in 0..NUM_KEYS {
1301
            cache.insert(key, make_value(key));
1302
        }
1303
1304
        let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
1305
        let write_lock = rw_lock.write().unwrap();
1306
1307
        // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
1308
        #[allow(clippy::needless_collect)]
1309
        let handles = (0..NUM_THREADS)
1310
            .map(|n| {
1311
                let cache = cache.clone();
1312
                let rw_lock = Arc::clone(&rw_lock);
1313
1314
                if n % 2 == 0 {
1315
                    // This thread will update the cache.
1316
                    std::thread::spawn(move || {
1317
                        let read_lock = rw_lock.read().unwrap();
1318
                        for key in 0..NUM_KEYS {
1319
                            // TODO: Update keys in a random order?
1320
                            cache.insert(key, make_value(key));
1321
                        }
1322
                        std::mem::drop(read_lock);
1323
                    })
1324
                } else {
1325
                    // This thread will iterate the cache.
1326
                    std::thread::spawn(move || {
1327
                        let read_lock = rw_lock.read().unwrap();
1328
                        let mut key_set = HashSet::new();
1329
                        for (key, value) in &cache {
1330
                            assert_eq!(value, make_value(*key));
1331
                            key_set.insert(*key);
1332
                        }
1333
                        // Ensure there are no missing or duplicate keys in the iteration.
1334
                        assert_eq!(key_set.len(), NUM_KEYS);
1335
                        std::mem::drop(read_lock);
1336
                    })
1337
                }
1338
            })
1339
            .collect::<Vec<_>>();
1340
1341
        // Let these threads to run by releasing the write lock.
1342
        std::mem::drop(write_lock);
1343
1344
        handles.into_iter().for_each(|h| h.join().expect("Failed"));
1345
1346
        // Ensure there are no missing or duplicate keys in the iteration.
1347
        let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
1348
        assert_eq!(key_set.len(), NUM_KEYS);
1349
    }
1350
1351
    #[test]
1352
    fn get_with() {
1353
        use std::thread::{sleep, spawn};
1354
1355
        let cache = SegmentedCache::new(100, 4);
1356
        const KEY: u32 = 0;
1357
1358
        // This test will run five threads:
1359
        //
1360
        // Thread1 will be the first thread to call `get_with` for a key, so its init
1361
        // closure will be evaluated and then a &str value "thread1" will be inserted
1362
        // to the cache.
1363
        let thread1 = {
1364
            let cache1 = cache.clone();
1365
            spawn(move || {
1366
                // Call `get_with` immediately.
1367
                let v = cache1.get_with(KEY, || {
1368
                    // Wait for 300 ms and return a &str value.
1369
                    sleep(Duration::from_millis(300));
1370
                    "thread1"
1371
                });
1372
                assert_eq!(v, "thread1");
1373
            })
1374
        };
1375
1376
        // Thread2 will be the second thread to call `get_with` for the same key, so
1377
        // its init closure will not be evaluated. Once thread1's init closure
1378
        // finishes, it will get the value inserted by thread1's init closure.
1379
        let thread2 = {
1380
            let cache2 = cache.clone();
1381
            spawn(move || {
1382
                // Wait for 100 ms before calling `get_with`.
1383
                sleep(Duration::from_millis(100));
1384
                let v = cache2.get_with(KEY, || unreachable!());
1385
                assert_eq!(v, "thread1");
1386
            })
1387
        };
1388
1389
        // Thread3 will be the third thread to call `get_with` for the same key. By
1390
        // the time it calls, thread1's init closure should have finished already and
1391
        // the value should be already inserted to the cache. So its init closure
1392
        // will not be evaluated and will get the value insert by thread1's init
1393
        // closure immediately.
1394
        let thread3 = {
1395
            let cache3 = cache.clone();
1396
            spawn(move || {
1397
                // Wait for 400 ms before calling `get_with`.
1398
                sleep(Duration::from_millis(400));
1399
                let v = cache3.get_with(KEY, || unreachable!());
1400
                assert_eq!(v, "thread1");
1401
            })
1402
        };
1403
1404
        // Thread4 will call `get` for the same key. It will call when thread1's init
1405
        // closure is still running, so it will get none for the key.
1406
        let thread4 = {
1407
            let cache4 = cache.clone();
1408
            spawn(move || {
1409
                // Wait for 200 ms before calling `get`.
1410
                sleep(Duration::from_millis(200));
1411
                let maybe_v = cache4.get(&KEY);
1412
                assert!(maybe_v.is_none());
1413
            })
1414
        };
1415
1416
        // Thread5 will call `get` for the same key. It will call after thread1's init
1417
        // closure finished, so it will get the value insert by thread1's init closure.
1418
        let thread5 = {
1419
            let cache5 = cache.clone();
1420
            spawn(move || {
1421
                // Wait for 400 ms before calling `get`.
1422
                sleep(Duration::from_millis(400));
1423
                let maybe_v = cache5.get(&KEY);
1424
                assert_eq!(maybe_v, Some("thread1"));
1425
            })
1426
        };
1427
1428
        for t in [thread1, thread2, thread3, thread4, thread5] {
1429
            t.join().expect("Failed to join");
1430
        }
1431
1432
        assert!(cache.is_waiter_map_empty());
1433
    }
1434
1435
    #[test]
1436
    fn get_with_if() {
1437
        use std::thread::{sleep, spawn};
1438
1439
        let cache = SegmentedCache::new(100, 4);
1440
        const KEY: u32 = 0;
1441
1442
        // This test will run seven threads:
1443
        //
1444
        // Thread1 will be the first thread to call `get_with_if` for a key, so its
1445
        // init closure will be evaluated and then a &str value "thread1" will be
1446
        // inserted to the cache.
1447
        let thread1 = {
1448
            let cache1 = cache.clone();
1449
            spawn(move || {
1450
                // Call `get_with` immediately.
1451
                let v = cache1.get_with_if(
1452
                    KEY,
1453
                    || {
1454
                        // Wait for 300 ms and return a &str value.
1455
                        sleep(Duration::from_millis(300));
1456
                        "thread1"
1457
                    },
1458
                    |_v| unreachable!(),
1459
                );
1460
                assert_eq!(v, "thread1");
1461
            })
1462
        };
1463
1464
        // Thread2 will be the second thread to call `get_with_if` for the same key,
1465
        // so its init closure will not be evaluated. Once thread1's init closure
1466
        // finishes, it will get the value inserted by thread1's init closure.
1467
        let thread2 = {
1468
            let cache2 = cache.clone();
1469
            spawn(move || {
1470
                // Wait for 100 ms before calling `get_with`.
1471
                sleep(Duration::from_millis(100));
1472
                let v = cache2.get_with_if(KEY, || unreachable!(), |_v| unreachable!());
1473
                assert_eq!(v, "thread1");
1474
            })
1475
        };
1476
1477
        // Thread3 will be the third thread to call `get_with_if` for the same
1478
        // key. By the time it calls, thread1's init closure should have finished
1479
        // already and the value should be already inserted to the cache. Also
1480
        // thread3's `replace_if` closure returns `false`. So its init closure will
1481
        // not be evaluated and will get the value inserted by thread1's init closure
1482
        // immediately.
1483
        let thread3 = {
1484
            let cache3 = cache.clone();
1485
            spawn(move || {
1486
                // Wait for 350 ms before calling `get_with_if`.
1487
                sleep(Duration::from_millis(350));
1488
                let v = cache3.get_with_if(
1489
                    KEY,
1490
                    || unreachable!(),
1491
                    |v| {
1492
                        assert_eq!(v, &"thread1");
1493
                        false
1494
                    },
1495
                );
1496
                assert_eq!(v, "thread1");
1497
            })
1498
        };
1499
1500
        // Thread4 will be the fourth thread to call `get_with_if` for the same
1501
        // key. The value should have been already inserted to the cache by
1502
        // thread1. However thread4's `replace_if` closure returns `true`. So its
1503
        // init closure will be evaluated to replace the current value.
1504
        let thread4 = {
1505
            let cache4 = cache.clone();
1506
            spawn(move || {
1507
                // Wait for 400 ms before calling `get_with_if`.
1508
                sleep(Duration::from_millis(400));
1509
                let v = cache4.get_with_if(
1510
                    KEY,
1511
                    || "thread4",
1512
                    |v| {
1513
                        assert_eq!(v, &"thread1");
1514
                        true
1515
                    },
1516
                );
1517
                assert_eq!(v, "thread4");
1518
            })
1519
        };
1520
1521
        // Thread5 will call `get` for the same key. It will call when thread1's init
1522
        // closure is still running, so it will get none for the key.
1523
        let thread5 = {
1524
            let cache5 = cache.clone();
1525
            spawn(move || {
1526
                // Wait for 200 ms before calling `get`.
1527
                sleep(Duration::from_millis(200));
1528
                let maybe_v = cache5.get(&KEY);
1529
                assert!(maybe_v.is_none());
1530
            })
1531
        };
1532
1533
        // Thread6 will call `get` for the same key. It will call when thread1's init
1534
        // closure is still running, so it will get none for the key.
1535
        let thread6 = {
1536
            let cache6 = cache.clone();
1537
            spawn(move || {
1538
                // Wait for 200 ms before calling `get`.
1539
                sleep(Duration::from_millis(350));
1540
                let maybe_v = cache6.get(&KEY);
1541
                assert_eq!(maybe_v, Some("thread1"));
1542
            })
1543
        };
1544
1545
        // Thread7 will call `get` for the same key. It will call after thread1's init
1546
        // closure finished, so it will get the value insert by thread1's init closure.
1547
        let thread7 = {
1548
            let cache7 = cache.clone();
1549
            spawn(move || {
1550
                // Wait for 400 ms before calling `get`.
1551
                sleep(Duration::from_millis(450));
1552
                let maybe_v = cache7.get(&KEY);
1553
                assert_eq!(maybe_v, Some("thread4"));
1554
            })
1555
        };
1556
1557
        for t in [
1558
            thread1, thread2, thread3, thread4, thread5, thread6, thread7,
1559
        ] {
1560
            t.join().expect("Failed to join");
1561
        }
1562
1563
        assert!(cache.is_waiter_map_empty());
1564
    }
1565
1566
    #[test]
1567
    fn try_get_with() {
1568
        use std::{
1569
            sync::Arc,
1570
            thread::{sleep, spawn},
1571
        };
1572
1573
        #[derive(thiserror::Error, Debug)]
1574
        #[error("{}", _0)]
1575
        pub struct MyError(String);
1576
1577
        type MyResult<T> = Result<T, Arc<MyError>>;
1578
1579
        let cache = SegmentedCache::new(100, 4);
1580
        const KEY: u32 = 0;
1581
1582
        // This test will run eight threads:
1583
        //
1584
        // Thread1 will be the first thread to call `try_get_with` for a key, so its
1585
        // init closure will be evaluated and then an error will be returned. Nothing
1586
        // will be inserted to the cache.
1587
        let thread1 = {
1588
            let cache1 = cache.clone();
1589
            spawn(move || {
1590
                // Call `try_get_with` immediately.
1591
                let v = cache1.try_get_with(KEY, || {
1592
                    // Wait for 300 ms and return an error.
1593
                    sleep(Duration::from_millis(300));
1594
                    Err(MyError("thread1 error".into()))
1595
                });
1596
                assert!(v.is_err());
1597
            })
1598
        };
1599
1600
        // Thread2 will be the second thread to call `try_get_with` for the same key,
1601
        // so its init closure will not be evaluated. Once thread1's init closure
1602
        // finishes, it will get the same error value returned by thread1's init
1603
        // closure.
1604
        let thread2 = {
1605
            let cache2 = cache.clone();
1606
            spawn(move || {
1607
                // Wait for 100 ms before calling `try_get_with`.
1608
                sleep(Duration::from_millis(100));
1609
                let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!());
1610
                assert!(v.is_err());
1611
            })
1612
        };
1613
1614
        // Thread3 will be the third thread to call `get_with` for the same key. By
1615
        // the time it calls, thread1's init closure should have finished already,
1616
        // but the key still does not exist in the cache. So its init closure will be
1617
        // evaluated and then an okay &str value will be returned. That value will be
1618
        // inserted to the cache.
1619
        let thread3 = {
1620
            let cache3 = cache.clone();
1621
            spawn(move || {
1622
                // Wait for 400 ms before calling `try_get_with`.
1623
                sleep(Duration::from_millis(400));
1624
                let v: MyResult<_> = cache3.try_get_with(KEY, || {
1625
                    // Wait for 300 ms and return an Ok(&str) value.
1626
                    sleep(Duration::from_millis(300));
1627
                    Ok("thread3")
1628
                });
1629
                assert_eq!(v.unwrap(), "thread3");
1630
            })
1631
        };
1632
1633
        // thread4 will be the fourth thread to call `try_get_with` for the same
1634
        // key. So its init closure will not be evaluated. Once thread3's init
1635
        // closure finishes, it will get the same okay &str value.
1636
        let thread4 = {
1637
            let cache4 = cache.clone();
1638
            spawn(move || {
1639
                // Wait for 500 ms before calling `try_get_with`.
1640
                sleep(Duration::from_millis(500));
1641
                let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!());
1642
                assert_eq!(v.unwrap(), "thread3");
1643
            })
1644
        };
1645
1646
        // Thread5 will be the fifth thread to call `try_get_with` for the same
1647
        // key. So its init closure will not be evaluated. By the time it calls,
1648
        // thread3's init closure should have finished already, so its init closure
1649
        // will not be evaluated and will get the value insert by thread3's init
1650
        // closure immediately.
1651
        let thread5 = {
1652
            let cache5 = cache.clone();
1653
            spawn(move || {
1654
                // Wait for 800 ms before calling `try_get_with`.
1655
                sleep(Duration::from_millis(800));
1656
                let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!());
1657
                assert_eq!(v.unwrap(), "thread3");
1658
            })
1659
        };
1660
1661
        // Thread6 will call `get` for the same key. It will call when thread1's init
1662
        // closure is still running, so it will get none for the key.
1663
        let thread6 = {
1664
            let cache6 = cache.clone();
1665
            spawn(move || {
1666
                // Wait for 200 ms before calling `get`.
1667
                sleep(Duration::from_millis(200));
1668
                let maybe_v = cache6.get(&KEY);
1669
                assert!(maybe_v.is_none());
1670
            })
1671
        };
1672
1673
        // Thread7 will call `get` for the same key. It will call after thread1's init
1674
        // closure finished with an error. So it will get none for the key.
1675
        let thread7 = {
1676
            let cache7 = cache.clone();
1677
            spawn(move || {
1678
                // Wait for 400 ms before calling `get`.
1679
                sleep(Duration::from_millis(400));
1680
                let maybe_v = cache7.get(&KEY);
1681
                assert!(maybe_v.is_none());
1682
            })
1683
        };
1684
1685
        // Thread8 will call `get` for the same key. It will call after thread3's init
1686
        // closure finished, so it will get the value insert by thread3's init closure.
1687
        let thread8 = {
1688
            let cache8 = cache.clone();
1689
            spawn(move || {
1690
                // Wait for 800 ms before calling `get`.
1691
                sleep(Duration::from_millis(800));
1692
                let maybe_v = cache8.get(&KEY);
1693
                assert_eq!(maybe_v, Some("thread3"));
1694
            })
1695
        };
1696
1697
        for t in [
1698
            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
1699
        ] {
1700
            t.join().expect("Failed to join");
1701
        }
1702
1703
        assert!(cache.is_waiter_map_empty());
1704
    }
1705
1706
    #[test]
1707
    fn optionally_get_with() {
1708
        use std::thread::{sleep, spawn};
1709
1710
        let cache = SegmentedCache::new(100, 4);
1711
        const KEY: u32 = 0;
1712
1713
        // This test will run eight threads:
1714
        //
1715
        // Thread1 will be the first thread to call `optionally_get_with` for a key, so its
1716
        // init closure will be evaluated and then an error will be returned. Nothing
1717
        // will be inserted to the cache.
1718
        let thread1 = {
1719
            let cache1 = cache.clone();
1720
            spawn(move || {
1721
                // Call `optionally_get_with` immediately.
1722
                let v = cache1.optionally_get_with(KEY, || {
1723
                    // Wait for 300 ms and return an error.
1724
                    sleep(Duration::from_millis(300));
1725
                    None
1726
                });
1727
                assert!(v.is_none());
1728
            })
1729
        };
1730
1731
        // Thread2 will be the second thread to call `optionally_get_with` for the same key,
1732
        // so its init closure will not be evaluated. Once thread1's init closure
1733
        // finishes, it will get the same error value returned by thread1's init
1734
        // closure.
1735
        let thread2 = {
1736
            let cache2 = cache.clone();
1737
            spawn(move || {
1738
                // Wait for 100 ms before calling `optionally_get_with`.
1739
                sleep(Duration::from_millis(100));
1740
                let v = cache2.optionally_get_with(KEY, || unreachable!());
1741
                assert!(v.is_none());
1742
            })
1743
        };
1744
1745
        // Thread3 will be the third thread to call `get_with` for the same key. By
1746
        // the time it calls, thread1's init closure should have finished already,
1747
        // but the key still does not exist in the cache. So its init closure will be
1748
        // evaluated and then an okay &str value will be returned. That value will be
1749
        // inserted to the cache.
1750
        let thread3 = {
1751
            let cache3 = cache.clone();
1752
            spawn(move || {
1753
                // Wait for 400 ms before calling `optionally_get_with`.
1754
                sleep(Duration::from_millis(400));
1755
                let v = cache3.optionally_get_with(KEY, || {
1756
                    // Wait for 300 ms and return an Ok(&str) value.
1757
                    sleep(Duration::from_millis(300));
1758
                    Some("thread3")
1759
                });
1760
                assert_eq!(v.unwrap(), "thread3");
1761
            })
1762
        };
1763
1764
        // thread4 will be the fourth thread to call `optionally_get_with` for the same
1765
        // key. So its init closure will not be evaluated. Once thread3's init
1766
        // closure finishes, it will get the same okay &str value.
1767
        let thread4 = {
1768
            let cache4 = cache.clone();
1769
            spawn(move || {
1770
                // Wait for 500 ms before calling `optionally_get_with`.
1771
                sleep(Duration::from_millis(500));
1772
                let v = cache4.optionally_get_with(KEY, || unreachable!());
1773
                assert_eq!(v.unwrap(), "thread3");
1774
            })
1775
        };
1776
1777
        // Thread5 will be the fifth thread to call `optionally_get_with` for the same
1778
        // key. So its init closure will not be evaluated. By the time it calls,
1779
        // thread3's init closure should have finished already, so its init closure
1780
        // will not be evaluated and will get the value insert by thread3's init
1781
        // closure immediately.
1782
        let thread5 = {
1783
            let cache5 = cache.clone();
1784
            spawn(move || {
1785
                // Wait for 800 ms before calling `optionally_get_with`.
1786
                sleep(Duration::from_millis(800));
1787
                let v = cache5.optionally_get_with(KEY, || unreachable!());
1788
                assert_eq!(v.unwrap(), "thread3");
1789
            })
1790
        };
1791
1792
        // Thread6 will call `get` for the same key. It will call when thread1's init
1793
        // closure is still running, so it will get none for the key.
1794
        let thread6 = {
1795
            let cache6 = cache.clone();
1796
            spawn(move || {
1797
                // Wait for 200 ms before calling `get`.
1798
                sleep(Duration::from_millis(200));
1799
                let maybe_v = cache6.get(&KEY);
1800
                assert!(maybe_v.is_none());
1801
            })
1802
        };
1803
1804
        // Thread7 will call `get` for the same key. It will call after thread1's init
1805
        // closure finished with an error. So it will get none for the key.
1806
        let thread7 = {
1807
            let cache7 = cache.clone();
1808
            spawn(move || {
1809
                // Wait for 400 ms before calling `get`.
1810
                sleep(Duration::from_millis(400));
1811
                let maybe_v = cache7.get(&KEY);
1812
                assert!(maybe_v.is_none());
1813
            })
1814
        };
1815
1816
        // Thread8 will call `get` for the same key. It will call after thread3's init
1817
        // closure finished, so it will get the value insert by thread3's init closure.
1818
        let thread8 = {
1819
            let cache8 = cache.clone();
1820
            spawn(move || {
1821
                // Wait for 800 ms before calling `get`.
1822
                sleep(Duration::from_millis(800));
1823
                let maybe_v = cache8.get(&KEY);
1824
                assert_eq!(maybe_v, Some("thread3"));
1825
            })
1826
        };
1827
1828
        for t in [
1829
            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
1830
        ] {
1831
            t.join().expect("Failed to join");
1832
        }
1833
1834
        assert!(cache.is_waiter_map_empty());
1835
    }
1836
1837
    // This test ensures that the `contains_key`, `get` and `invalidate` can use
1838
    // borrowed form `&[u8]` for key with type `Vec<u8>`.
1839
    // https://github.com/moka-rs/moka/issues/166
1840
    #[test]
1841
    fn borrowed_forms_of_key() {
1842
        let cache: SegmentedCache<Vec<u8>, ()> = SegmentedCache::new(1, 2);
1843
1844
        let key = vec![1_u8];
1845
        cache.insert(key.clone(), ());
1846
1847
        // key as &Vec<u8>
1848
        let key_v: &Vec<u8> = &key;
1849
        assert!(cache.contains_key(key_v));
1850
        assert_eq!(cache.get(key_v), Some(()));
1851
        cache.invalidate(key_v);
1852
1853
        cache.insert(key, ());
1854
1855
        // key as &[u8]
1856
        let key_s: &[u8] = &[1_u8];
1857
        assert!(cache.contains_key(key_s));
1858
        assert_eq!(cache.get(key_s), Some(()));
1859
        cache.invalidate(key_s);
1860
    }
1861
1862
    // Ignored by default. This test becomes unstable when run in parallel with
1863
    // other tests.
1864
    #[test]
1865
    #[ignore]
1866
    fn drop_value_immediately_after_eviction() {
1867
        use crate::common::test_utils::{Counters, Value};
1868
1869
        const NUM_SEGMENTS: usize = 1;
1870
        const MAX_CAPACITY: u32 = 500;
1871
        const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
1872
1873
        let counters = Arc::new(Counters::default());
1874
        let counters1 = Arc::clone(&counters);
1875
1876
        let listener = move |_k, _v, cause| match cause {
1877
            RemovalCause::Size => counters1.incl_evicted(),
1878
            RemovalCause::Explicit => counters1.incl_invalidated(),
1879
            _ => (),
1880
        };
1881
1882
        let mut cache = SegmentedCache::builder(NUM_SEGMENTS)
1883
            .max_capacity(MAX_CAPACITY as u64)
1884
            .eviction_listener(listener)
1885
            .build();
1886
        cache.reconfigure_for_testing();
1887
1888
        // Make the cache exterior immutable.
1889
        let cache = cache;
1890
1891
        for key in 0..KEYS {
1892
            let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
1893
            cache.insert(key, value);
1894
            counters.incl_inserted();
1895
            cache.run_pending_tasks();
1896
        }
1897
1898
        let eviction_count = KEYS - MAX_CAPACITY;
1899
1900
        cache.run_pending_tasks();
1901
        assert_eq!(counters.inserted(), KEYS, "inserted");
1902
        assert_eq!(counters.value_created(), KEYS, "value_created");
1903
        assert_eq!(counters.evicted(), eviction_count, "evicted");
1904
        assert_eq!(counters.invalidated(), 0, "invalidated");
1905
        assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
1906
1907
        for key in 0..KEYS {
1908
            cache.invalidate(&key);
1909
            cache.run_pending_tasks();
1910
        }
1911
1912
        cache.run_pending_tasks();
1913
        assert_eq!(counters.inserted(), KEYS, "inserted");
1914
        assert_eq!(counters.value_created(), KEYS, "value_created");
1915
        assert_eq!(counters.evicted(), eviction_count, "evicted");
1916
        assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
1917
        assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
1918
1919
        std::mem::drop(cache);
1920
        assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
1921
    }
1922
1923
    #[test]
1924
    fn test_debug_format() {
1925
        let cache = SegmentedCache::new(10, 4);
1926
        cache.insert('a', "alice");
1927
        cache.insert('b', "bob");
1928
        cache.insert('c', "cindy");
1929
1930
        let debug_str = format!("{cache:?}");
1931
        assert!(debug_str.starts_with('{'));
1932
        assert!(debug_str.contains(r#"'a': "alice""#));
1933
        assert!(debug_str.contains(r#"'b': "bob""#));
1934
        assert!(debug_str.contains(r#"'c': "cindy""#));
1935
        assert!(debug_str.ends_with('}'));
1936
    }
1937
1938
    type NotificationPair<V> = (V, RemovalCause);
1939
    type NotificationTriple<K, V> = (Arc<K>, V, RemovalCause);
1940
1941
    fn verify_notification_vec<K, V, S>(
1942
        cache: &SegmentedCache<K, V, S>,
1943
        actual: Arc<Mutex<Vec<NotificationTriple<K, V>>>>,
1944
        expected: &[NotificationTriple<K, V>],
1945
    ) where
1946
        K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
1947
        V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
1948
        S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
1949
    {
1950
        // Retries will be needed when testing in a QEMU VM.
1951
        const MAX_RETRIES: usize = 5;
1952
        let mut retries = 0;
1953
        loop {
1954
            // Ensure all scheduled notifications have been processed.
1955
            std::thread::sleep(Duration::from_millis(500));
1956
1957
            let actual = &*actual.lock();
1958
            if actual.len() != expected.len() {
1959
                if retries <= MAX_RETRIES {
1960
                    retries += 1;
1961
                    cache.run_pending_tasks();
1962
                    continue;
1963
                } else {
1964
                    assert_eq!(actual.len(), expected.len(), "Retries exhausted");
1965
                }
1966
            }
1967
1968
            for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
1969
                assert_eq!(actual, expected, "expected[{i}]");
1970
            }
1971
1972
            break;
1973
        }
1974
    }
1975
1976
    fn verify_notification_map<K, V, S>(
1977
        cache: &SegmentedCache<K, V, S>,
1978
        actual: Arc<Mutex<std::collections::HashMap<Arc<K>, NotificationPair<V>>>>,
1979
        expected: &std::collections::HashMap<Arc<K>, NotificationPair<V>>,
1980
    ) where
1981
        K: std::hash::Hash + Eq + std::fmt::Display + Send + Sync + 'static,
1982
        V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
1983
        S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
1984
    {
1985
        // Retries will be needed when testing in a QEMU VM.
1986
        const MAX_RETRIES: usize = 5;
1987
        let mut retries = 0;
1988
        loop {
1989
            // Ensure all scheduled notifications have been processed.
1990
            std::thread::sleep(Duration::from_millis(500));
1991
1992
            let actual = &*actual.lock();
1993
            if actual.len() != expected.len() {
1994
                if retries <= MAX_RETRIES {
1995
                    retries += 1;
1996
                    cache.run_pending_tasks();
1997
                    continue;
1998
                } else {
1999
                    assert_eq!(actual.len(), expected.len(), "Retries exhausted");
2000
                }
2001
            }
2002
2003
            for actual_key in actual.keys() {
2004
                assert_eq!(
2005
                    actual.get(actual_key),
2006
                    expected.get(actual_key),
2007
                    "expected[{actual_key}]",
2008
                );
2009
            }
2010
2011
            break;
2012
        }
2013
    }
2014
}