Coverage Report

Created: 2026-04-14 06:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/rust/registry/src/index.crates.io-1949cf8c6b5b557f/pingora-pool-0.8.0/src/lru.rs
Line
Count
Source
1
// Copyright 2026 Cloudflare, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::hash::Hash;
16
use lru::LruCache;
17
use parking_lot::RwLock;
18
use std::cell::RefCell;
19
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
20
use std::sync::Arc;
21
use thread_local::ThreadLocal;
22
use tokio::sync::Notify;
23
24
pub struct Node<T> {
25
    pub close_notifier: Arc<Notify>,
26
    pub meta: T,
27
}
28
29
impl<T> Node<T> {
30
0
    pub fn new(meta: T) -> Self {
31
0
        Node {
32
0
            close_notifier: Arc::new(Notify::new()),
33
0
            meta,
34
0
        }
35
0
    }
Unexecuted instantiation: <pingora_pool::lru::Node<pingora_pool::connection::ConnectionMeta>>::new
Unexecuted instantiation: <pingora_pool::lru::Node<_>>::new
36
37
0
    pub fn notify_close(&self) {
38
0
        self.close_notifier.notify_one();
39
0
    }
Unexecuted instantiation: <pingora_pool::lru::Node<pingora_pool::connection::ConnectionMeta>>::notify_close
Unexecuted instantiation: <pingora_pool::lru::Node<_>>::notify_close
40
}
41
42
pub struct Lru<K, T>
43
where
44
    K: Send,
45
    T: Send,
46
{
47
    lru: RwLock<ThreadLocal<RefCell<LruCache<K, Node<T>>>>>,
48
    size: usize,
49
    drain: AtomicBool,
50
}
51
52
impl<K, T> Lru<K, T>
53
where
54
    K: Hash + Eq + Send,
55
    T: Send,
56
{
57
0
    pub fn new(size: usize) -> Self {
58
0
        Lru {
59
0
            lru: RwLock::new(ThreadLocal::new()),
60
0
            size,
61
0
            drain: AtomicBool::new(false),
62
0
        }
63
0
    }
Unexecuted instantiation: <pingora_pool::lru::Lru<i32, pingora_pool::connection::ConnectionMeta>>::new
Unexecuted instantiation: <pingora_pool::lru::Lru<_, _>>::new
64
65
    // put a node in and return the meta of the replaced node
66
0
    pub fn put(&self, key: K, value: Node<T>) -> Option<T> {
67
0
        if self.drain.load(Relaxed) {
68
0
            value.notify_close(); // sort of hack to simulate being evicted right away
69
0
            return None;
70
0
        }
71
0
        let lru = self.lru.read(); /* read lock */
72
0
        let lru_cache = &mut *(lru
73
0
            .get_or(|| RefCell::new(LruCache::unbounded()))
Unexecuted instantiation: <pingora_pool::lru::Lru<i32, pingora_pool::connection::ConnectionMeta>>::put::{closure#0}
Unexecuted instantiation: <pingora_pool::lru::Lru<_, _>>::put::{closure#0}
74
0
            .borrow_mut());
75
0
        lru_cache.put(key, value);
76
0
        if lru_cache.len() > self.size {
77
0
            match lru_cache.pop_lru() {
78
0
                Some((_, v)) => {
79
                    // TODO: drop the lock here?
80
0
                    v.notify_close();
81
0
                    return Some(v.meta);
82
                }
83
0
                None => return None,
84
            }
85
0
        }
86
0
        None
87
        /* read lock dropped */
88
0
    }
Unexecuted instantiation: <pingora_pool::lru::Lru<i32, pingora_pool::connection::ConnectionMeta>>::put
Unexecuted instantiation: <pingora_pool::lru::Lru<_, _>>::put
89
90
0
    pub fn add(&self, key: K, meta: T) -> (Arc<Notify>, Option<T>) {
91
0
        let node = Node::new(meta);
92
0
        let notifier = node.close_notifier.clone();
93
        // TODO: check if the key is already in it
94
0
        (notifier, self.put(key, node))
95
0
    }
Unexecuted instantiation: <pingora_pool::lru::Lru<i32, pingora_pool::connection::ConnectionMeta>>::add
Unexecuted instantiation: <pingora_pool::lru::Lru<_, _>>::add
96
97
0
    pub fn pop(&self, key: &K) -> Option<Node<T>> {
98
0
        let lru = self.lru.read(); /* read lock */
99
0
        let lru_cache = &mut *(lru
100
0
            .get_or(|| RefCell::new(LruCache::unbounded()))
Unexecuted instantiation: <pingora_pool::lru::Lru<i32, pingora_pool::connection::ConnectionMeta>>::pop::{closure#0}
Unexecuted instantiation: <pingora_pool::lru::Lru<_, _>>::pop::{closure#0}
101
0
            .borrow_mut());
102
0
        lru_cache.pop(key)
103
        /* read lock dropped */
104
0
    }
Unexecuted instantiation: <pingora_pool::lru::Lru<i32, pingora_pool::connection::ConnectionMeta>>::pop
Unexecuted instantiation: <pingora_pool::lru::Lru<_, _>>::pop
105
106
    #[allow(dead_code)]
107
0
    pub fn drain(&self) {
108
0
        self.drain.store(true, Relaxed);
109
110
        /* drain need to go through all the local lru cache objects
111
         * acquire an exclusive write lock to make it safe */
112
0
        let mut lru = self.lru.write(); /* write lock */
113
0
        let lru_cache_iter = lru.iter_mut();
114
0
        for lru_cache_rc in lru_cache_iter {
115
0
            let mut lru_cache = lru_cache_rc.borrow_mut();
116
0
            for (_, item) in lru_cache.iter() {
117
0
                item.notify_close();
118
0
            }
119
0
            lru_cache.clear();
120
        }
121
        /* write lock dropped */
122
0
    }
123
}
124
125
#[cfg(test)]
126
mod tests {
127
    use super::*;
128
    use log::debug;
129
130
    #[tokio::test]
131
    async fn test_evict_close() {
132
        let pool: Lru<i32, ()> = Lru::new(2);
133
        let (notifier1, _) = pool.add(1, ());
134
        let (notifier2, _) = pool.add(2, ());
135
        let (notifier3, _) = pool.add(3, ());
136
        let closed_item = tokio::select! {
137
            _ = notifier1.notified() => {debug!("notifier1"); 1},
138
            _ = notifier2.notified() => {debug!("notifier2"); 2},
139
            _ = notifier3.notified() => {debug!("notifier3"); 3},
140
        };
141
        assert_eq!(closed_item, 1);
142
    }
143
144
    #[tokio::test]
145
    async fn test_evict_close_with_pop() {
146
        let pool: Lru<i32, ()> = Lru::new(2);
147
        let (notifier1, _) = pool.add(1, ());
148
        let (notifier2, _) = pool.add(2, ());
149
        pool.pop(&1);
150
        let (notifier3, _) = pool.add(3, ());
151
        let (notifier4, _) = pool.add(4, ());
152
        let closed_item = tokio::select! {
153
            _ = notifier1.notified() => {debug!("notifier1"); 1},
154
            _ = notifier2.notified() => {debug!("notifier2"); 2},
155
            _ = notifier3.notified() => {debug!("notifier3"); 3},
156
            _ = notifier4.notified() => {debug!("notifier4"); 4},
157
        };
158
        assert_eq!(closed_item, 2);
159
    }
160
161
    #[tokio::test]
162
    async fn test_drain() {
163
        let pool: Lru<i32, ()> = Lru::new(4);
164
        let (notifier1, _) = pool.add(1, ());
165
        let (notifier2, _) = pool.add(2, ());
166
        let (notifier3, _) = pool.add(3, ());
167
        pool.drain();
168
        let (notifier4, _) = pool.add(4, ());
169
170
        tokio::join!(
171
            notifier1.notified(),
172
            notifier2.notified(),
173
            notifier3.notified(),
174
            notifier4.notified()
175
        );
176
    }
177
}