/rust/registry/src/index.crates.io-1949cf8c6b5b557f/pingora-pool-0.8.0/src/lru.rs
Line | Count | Source |
1 | | // Copyright 2026 Cloudflare, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::hash::Hash; |
16 | | use lru::LruCache; |
17 | | use parking_lot::RwLock; |
18 | | use std::cell::RefCell; |
19 | | use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; |
20 | | use std::sync::Arc; |
21 | | use thread_local::ThreadLocal; |
22 | | use tokio::sync::Notify; |
23 | | |
24 | | pub struct Node<T> { |
25 | | pub close_notifier: Arc<Notify>, |
26 | | pub meta: T, |
27 | | } |
28 | | |
29 | | impl<T> Node<T> { |
30 | 0 | pub fn new(meta: T) -> Self { |
31 | 0 | Node { |
32 | 0 | close_notifier: Arc::new(Notify::new()), |
33 | 0 | meta, |
34 | 0 | } |
35 | 0 | } Unexecuted instantiation: <pingora_pool::lru::Node<pingora_pool::connection::ConnectionMeta>>::new Unexecuted instantiation: <pingora_pool::lru::Node<_>>::new |
36 | | |
37 | 0 | pub fn notify_close(&self) { |
38 | 0 | self.close_notifier.notify_one(); |
39 | 0 | } Unexecuted instantiation: <pingora_pool::lru::Node<pingora_pool::connection::ConnectionMeta>>::notify_close Unexecuted instantiation: <pingora_pool::lru::Node<_>>::notify_close |
40 | | } |
41 | | |
42 | | pub struct Lru<K, T> |
43 | | where |
44 | | K: Send, |
45 | | T: Send, |
46 | | { |
47 | | lru: RwLock<ThreadLocal<RefCell<LruCache<K, Node<T>>>>>, |
48 | | size: usize, |
49 | | drain: AtomicBool, |
50 | | } |
51 | | |
52 | | impl<K, T> Lru<K, T> |
53 | | where |
54 | | K: Hash + Eq + Send, |
55 | | T: Send, |
56 | | { |
57 | 0 | pub fn new(size: usize) -> Self { |
58 | 0 | Lru { |
59 | 0 | lru: RwLock::new(ThreadLocal::new()), |
60 | 0 | size, |
61 | 0 | drain: AtomicBool::new(false), |
62 | 0 | } |
63 | 0 | } Unexecuted instantiation: <pingora_pool::lru::Lru<i32, pingora_pool::connection::ConnectionMeta>>::new Unexecuted instantiation: <pingora_pool::lru::Lru<_, _>>::new |
64 | | |
65 | | // put a node in and return the meta of the replaced node |
66 | 0 | pub fn put(&self, key: K, value: Node<T>) -> Option<T> { |
67 | 0 | if self.drain.load(Relaxed) { |
68 | 0 | value.notify_close(); // sort of hack to simulate being evicted right away |
69 | 0 | return None; |
70 | 0 | } |
71 | 0 | let lru = self.lru.read(); /* read lock */ |
72 | 0 | let lru_cache = &mut *(lru |
73 | 0 | .get_or(|| RefCell::new(LruCache::unbounded())) Unexecuted instantiation: <pingora_pool::lru::Lru<i32, pingora_pool::connection::ConnectionMeta>>::put::{closure#0}Unexecuted instantiation: <pingora_pool::lru::Lru<_, _>>::put::{closure#0} |
74 | 0 | .borrow_mut()); |
75 | 0 | lru_cache.put(key, value); |
76 | 0 | if lru_cache.len() > self.size { |
77 | 0 | match lru_cache.pop_lru() { |
78 | 0 | Some((_, v)) => { |
79 | | // TODO: drop the lock here? |
80 | 0 | v.notify_close(); |
81 | 0 | return Some(v.meta); |
82 | | } |
83 | 0 | None => return None, |
84 | | } |
85 | 0 | } |
86 | 0 | None |
87 | | /* read lock dropped */ |
88 | 0 | } Unexecuted instantiation: <pingora_pool::lru::Lru<i32, pingora_pool::connection::ConnectionMeta>>::put Unexecuted instantiation: <pingora_pool::lru::Lru<_, _>>::put |
89 | | |
90 | 0 | pub fn add(&self, key: K, meta: T) -> (Arc<Notify>, Option<T>) { |
91 | 0 | let node = Node::new(meta); |
92 | 0 | let notifier = node.close_notifier.clone(); |
93 | | // TODO: check if the key is already in it |
94 | 0 | (notifier, self.put(key, node)) |
95 | 0 | } Unexecuted instantiation: <pingora_pool::lru::Lru<i32, pingora_pool::connection::ConnectionMeta>>::add Unexecuted instantiation: <pingora_pool::lru::Lru<_, _>>::add |
96 | | |
97 | 0 | pub fn pop(&self, key: &K) -> Option<Node<T>> { |
98 | 0 | let lru = self.lru.read(); /* read lock */ |
99 | 0 | let lru_cache = &mut *(lru |
100 | 0 | .get_or(|| RefCell::new(LruCache::unbounded())) Unexecuted instantiation: <pingora_pool::lru::Lru<i32, pingora_pool::connection::ConnectionMeta>>::pop::{closure#0}Unexecuted instantiation: <pingora_pool::lru::Lru<_, _>>::pop::{closure#0} |
101 | 0 | .borrow_mut()); |
102 | 0 | lru_cache.pop(key) |
103 | | /* read lock dropped */ |
104 | 0 | } Unexecuted instantiation: <pingora_pool::lru::Lru<i32, pingora_pool::connection::ConnectionMeta>>::pop Unexecuted instantiation: <pingora_pool::lru::Lru<_, _>>::pop |
105 | | |
106 | | #[allow(dead_code)] |
107 | 0 | pub fn drain(&self) { |
108 | 0 | self.drain.store(true, Relaxed); |
109 | | |
110 | | /* drain need to go through all the local lru cache objects |
111 | | * acquire an exclusive write lock to make it safe */ |
112 | 0 | let mut lru = self.lru.write(); /* write lock */ |
113 | 0 | let lru_cache_iter = lru.iter_mut(); |
114 | 0 | for lru_cache_rc in lru_cache_iter { |
115 | 0 | let mut lru_cache = lru_cache_rc.borrow_mut(); |
116 | 0 | for (_, item) in lru_cache.iter() { |
117 | 0 | item.notify_close(); |
118 | 0 | } |
119 | 0 | lru_cache.clear(); |
120 | | } |
121 | | /* write lock dropped */ |
122 | 0 | } |
123 | | } |
124 | | |
125 | | #[cfg(test)] |
126 | | mod tests { |
127 | | use super::*; |
128 | | use log::debug; |
129 | | |
130 | | #[tokio::test] |
131 | | async fn test_evict_close() { |
132 | | let pool: Lru<i32, ()> = Lru::new(2); |
133 | | let (notifier1, _) = pool.add(1, ()); |
134 | | let (notifier2, _) = pool.add(2, ()); |
135 | | let (notifier3, _) = pool.add(3, ()); |
136 | | let closed_item = tokio::select! { |
137 | | _ = notifier1.notified() => {debug!("notifier1"); 1}, |
138 | | _ = notifier2.notified() => {debug!("notifier2"); 2}, |
139 | | _ = notifier3.notified() => {debug!("notifier3"); 3}, |
140 | | }; |
141 | | assert_eq!(closed_item, 1); |
142 | | } |
143 | | |
144 | | #[tokio::test] |
145 | | async fn test_evict_close_with_pop() { |
146 | | let pool: Lru<i32, ()> = Lru::new(2); |
147 | | let (notifier1, _) = pool.add(1, ()); |
148 | | let (notifier2, _) = pool.add(2, ()); |
149 | | pool.pop(&1); |
150 | | let (notifier3, _) = pool.add(3, ()); |
151 | | let (notifier4, _) = pool.add(4, ()); |
152 | | let closed_item = tokio::select! { |
153 | | _ = notifier1.notified() => {debug!("notifier1"); 1}, |
154 | | _ = notifier2.notified() => {debug!("notifier2"); 2}, |
155 | | _ = notifier3.notified() => {debug!("notifier3"); 3}, |
156 | | _ = notifier4.notified() => {debug!("notifier4"); 4}, |
157 | | }; |
158 | | assert_eq!(closed_item, 2); |
159 | | } |
160 | | |
161 | | #[tokio::test] |
162 | | async fn test_drain() { |
163 | | let pool: Lru<i32, ()> = Lru::new(4); |
164 | | let (notifier1, _) = pool.add(1, ()); |
165 | | let (notifier2, _) = pool.add(2, ()); |
166 | | let (notifier3, _) = pool.add(3, ()); |
167 | | pool.drain(); |
168 | | let (notifier4, _) = pool.add(4, ()); |
169 | | |
170 | | tokio::join!( |
171 | | notifier1.notified(), |
172 | | notifier2.notified(), |
173 | | notifier3.notified(), |
174 | | notifier4.notified() |
175 | | ); |
176 | | } |
177 | | } |