/src/rocksdb/cache/secondary_cache_adapter.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "cache/secondary_cache_adapter.h" |
7 | | |
8 | | #include <atomic> |
9 | | |
10 | | #include "cache/tiered_secondary_cache.h" |
11 | | #include "monitoring/perf_context_imp.h" |
12 | | #include "test_util/sync_point.h" |
13 | | #include "util/cast_util.h" |
14 | | |
15 | | namespace ROCKSDB_NAMESPACE { |
16 | | |
17 | | namespace { |
18 | | // A distinct pointer value for marking "dummy" cache entries |
19 | | struct Dummy { |
20 | | char val[7] = "kDummy"; |
21 | | }; |
22 | | const Dummy kDummy{}; |
23 | | Cache::ObjectPtr const kDummyObj = const_cast<Dummy*>(&kDummy); |
24 | | const char* kTieredCacheName = "TieredCache"; |
25 | | } // namespace |
26 | | |
27 | | // When CacheWithSecondaryAdapter is constructed with the distribute_cache_res |
28 | | // parameter set to true, it manages the entire memory budget across the |
29 | | // primary and secondary cache. The secondary cache is assumed to be in |
30 | | // memory, such as the CompressedSecondaryCache. When a placeholder entry |
31 | | // is inserted by a CacheReservationManager instance to reserve memory, |
32 | | // the CacheWithSecondaryAdapter ensures that the reservation is distributed |
33 | | // proportionally across the primary/secondary caches. |
34 | | // |
35 | | // The primary block cache is initially sized to the sum of the primary cache |
36 | | // budget + teh secondary cache budget, as follows - |
37 | | // |--------- Primary Cache Configured Capacity -----------| |
38 | | // |---Secondary Cache Budget----|----Primary Cache Budget-----| |
39 | | // |
40 | | // A ConcurrentCacheReservationManager member in the CacheWithSecondaryAdapter, |
41 | | // pri_cache_res_, |
42 | | // is used to help with tracking the distribution of memory reservations. |
43 | | // Initially, it accounts for the entire secondary cache budget as a |
44 | | // reservation against the primary cache. This shrinks the usable capacity of |
45 | | // the primary cache to the budget that the user originally desired. |
46 | | // |
47 | | // |--Reservation for Sec Cache--|-Pri Cache Usable Capacity---| |
48 | | // |
49 | | // When a reservation placeholder is inserted into the adapter, it is inserted |
50 | | // directly into the primary cache. This means the entire charge of the |
51 | | // placeholder is counted against the primary cache. To compensate and count |
52 | | // a portion of it against the secondary cache, the secondary cache Deflate() |
53 | | // method is called to shrink it. Since the Deflate() causes the secondary |
54 | | // actual usage to shrink, it is refelcted here by releasing an equal amount |
55 | | // from the pri_cache_res_ reservation. The Deflate() in the secondary cache |
56 | | // can be, but is not required to be, implemented using its own cache |
57 | | // reservation manager. |
58 | | // |
59 | | // For example, if the pri/sec ratio is 70/30, and the combined capacity is |
60 | | // 100MB, the intermediate and final state after inserting a reservation |
61 | | // placeholder for 10MB would be as follows - |
62 | | // |
63 | | // |-Reservation for Sec Cache-|-Pri Cache Usable Capacity-|---R---| |
64 | | // 1. After inserting the placeholder in primary |
65 | | // |------- 30MB -------------|------- 60MB -------------|-10MB--| |
66 | | // 2. After deflating the secondary and adjusting the reservation for |
67 | | // secondary against the primary |
68 | | // |------- 27MB -------------|------- 63MB -------------|-10MB--| |
69 | | // |
70 | | // Likewise, when the user inserted placeholder is released, the secondary |
71 | | // cache Inflate() method is called to grow it, and the pri_cache_res_ |
72 | | // reservation is increased by an equal amount. |
73 | | // |
74 | | // Another way of implementing this would have been to simply split the user |
75 | | // reservation into primary and seconary components. However, this would |
76 | | // require allocating a structure to track the associated secondary cache |
77 | | // reservation, which adds some complexity and overhead. |
78 | | // |
79 | | CacheWithSecondaryAdapter::CacheWithSecondaryAdapter( |
80 | | std::shared_ptr<Cache> target, |
81 | | std::shared_ptr<SecondaryCache> secondary_cache, |
82 | | TieredAdmissionPolicy adm_policy, bool distribute_cache_res) |
83 | 0 | : CacheWrapper(std::move(target)), |
84 | 0 | secondary_cache_(std::move(secondary_cache)), |
85 | 0 | adm_policy_(adm_policy), |
86 | 0 | distribute_cache_res_(distribute_cache_res), |
87 | 0 | placeholder_usage_(0), |
88 | 0 | reserved_usage_(0), |
89 | 0 | sec_reserved_(0) { |
90 | 0 | target_->SetEvictionCallback( |
91 | 0 | [this](const Slice& key, Handle* handle, bool was_hit) { |
92 | 0 | return EvictionHandler(key, handle, was_hit); |
93 | 0 | }); |
94 | 0 | if (distribute_cache_res_) { |
95 | 0 | size_t sec_capacity = 0; |
96 | 0 | pri_cache_res_ = std::make_shared<ConcurrentCacheReservationManager>( |
97 | 0 | std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>( |
98 | 0 | target_)); |
99 | 0 | Status s = secondary_cache_->GetCapacity(sec_capacity); |
100 | 0 | assert(s.ok()); |
101 | | // Initially, the primary cache is sized to uncompressed cache budget plsu |
102 | | // compressed secondary cache budget. The secondary cache budget is then |
103 | | // taken away from the primary cache through cache reservations. Later, |
104 | | // when a placeholder entry is inserted by the caller, its inserted |
105 | | // into the primary cache and the portion that should be assigned to the |
106 | | // secondary cache is freed from the reservation. |
107 | 0 | s = pri_cache_res_->UpdateCacheReservation(sec_capacity); |
108 | 0 | assert(s.ok()); |
109 | 0 | sec_cache_res_ratio_ = (double)sec_capacity / target_->GetCapacity(); |
110 | 0 | } |
111 | 0 | } |
112 | | |
113 | 0 | CacheWithSecondaryAdapter::~CacheWithSecondaryAdapter() { |
114 | | // `*this` will be destroyed before `*target_`, so we have to prevent |
115 | | // use after free |
116 | 0 | target_->SetEvictionCallback({}); |
117 | | #ifndef NDEBUG |
118 | | if (distribute_cache_res_) { |
119 | | size_t sec_capacity = 0; |
120 | | Status s = secondary_cache_->GetCapacity(sec_capacity); |
121 | | assert(s.ok()); |
122 | | assert(placeholder_usage_ == 0); |
123 | | assert(reserved_usage_ == 0); |
124 | | bool pri_cache_res_mismatch = |
125 | | pri_cache_res_->GetTotalMemoryUsed() != sec_capacity; |
126 | | if (pri_cache_res_mismatch) { |
127 | | fprintf(stderr, |
128 | | "~CacheWithSecondaryAdapter: Primary cache reservation: " |
129 | | "%zu, Secondary cache capacity: %zu, " |
130 | | "Secondary cache reserved: %zu\n", |
131 | | pri_cache_res_->GetTotalMemoryUsed(), sec_capacity, |
132 | | sec_reserved_); |
133 | | assert(!pri_cache_res_mismatch); |
134 | | } |
135 | | } |
136 | | #endif // NDEBUG |
137 | 0 | } |
138 | | |
139 | | bool CacheWithSecondaryAdapter::EvictionHandler(const Slice& key, |
140 | 0 | Handle* handle, bool was_hit) { |
141 | 0 | auto helper = GetCacheItemHelper(handle); |
142 | 0 | if (helper->IsSecondaryCacheCompatible() && |
143 | 0 | adm_policy_ != TieredAdmissionPolicy::kAdmPolicyThreeQueue) { |
144 | 0 | auto obj = target_->Value(handle); |
145 | | // Ignore dummy entry |
146 | 0 | if (obj != kDummyObj) { |
147 | 0 | bool force = false; |
148 | 0 | if (adm_policy_ == TieredAdmissionPolicy::kAdmPolicyAllowCacheHits) { |
149 | 0 | force = was_hit; |
150 | 0 | } else if (adm_policy_ == TieredAdmissionPolicy::kAdmPolicyAllowAll) { |
151 | 0 | force = true; |
152 | 0 | } |
153 | | // Spill into secondary cache. |
154 | 0 | secondary_cache_->Insert(key, obj, helper, force).PermitUncheckedError(); |
155 | 0 | } |
156 | 0 | } |
157 | | // Never takes ownership of obj |
158 | 0 | return false; |
159 | 0 | } |
160 | | |
161 | | bool CacheWithSecondaryAdapter::ProcessDummyResult(Cache::Handle** handle, |
162 | 0 | bool erase) { |
163 | 0 | if (*handle && target_->Value(*handle) == kDummyObj) { |
164 | 0 | target_->Release(*handle, erase); |
165 | 0 | *handle = nullptr; |
166 | 0 | return true; |
167 | 0 | } else { |
168 | 0 | return false; |
169 | 0 | } |
170 | 0 | } |
171 | | |
172 | | void CacheWithSecondaryAdapter::CleanupCacheObject( |
173 | 0 | ObjectPtr obj, const CacheItemHelper* helper) { |
174 | 0 | if (helper->del_cb) { |
175 | 0 | helper->del_cb(obj, memory_allocator()); |
176 | 0 | } |
177 | 0 | } |
178 | | |
179 | | Cache::Handle* CacheWithSecondaryAdapter::Promote( |
180 | | std::unique_ptr<SecondaryCacheResultHandle>&& secondary_handle, |
181 | | const Slice& key, const CacheItemHelper* helper, Priority priority, |
182 | 0 | Statistics* stats, bool found_dummy_entry, bool kept_in_sec_cache) { |
183 | 0 | assert(secondary_handle->IsReady()); |
184 | |
|
185 | 0 | ObjectPtr obj = secondary_handle->Value(); |
186 | 0 | if (!obj) { |
187 | | // Nothing found. |
188 | 0 | return nullptr; |
189 | 0 | } |
190 | | // Found something. |
191 | 0 | switch (helper->role) { |
192 | 0 | case CacheEntryRole::kFilterBlock: |
193 | 0 | RecordTick(stats, SECONDARY_CACHE_FILTER_HITS); |
194 | 0 | break; |
195 | 0 | case CacheEntryRole::kIndexBlock: |
196 | 0 | RecordTick(stats, SECONDARY_CACHE_INDEX_HITS); |
197 | 0 | break; |
198 | 0 | case CacheEntryRole::kDataBlock: |
199 | 0 | RecordTick(stats, SECONDARY_CACHE_DATA_HITS); |
200 | 0 | break; |
201 | 0 | default: |
202 | 0 | break; |
203 | 0 | } |
204 | 0 | PERF_COUNTER_ADD(secondary_cache_hit_count, 1); |
205 | 0 | RecordTick(stats, SECONDARY_CACHE_HITS); |
206 | | |
207 | | // Note: SecondaryCache::Size() is really charge (from the CreateCallback) |
208 | 0 | size_t charge = secondary_handle->Size(); |
209 | 0 | Handle* result = nullptr; |
210 | | // Insert into primary cache, possibly as a standalone+dummy entries. |
211 | 0 | if (secondary_cache_->SupportForceErase() && !found_dummy_entry) { |
212 | | // Create standalone and insert dummy |
213 | | // Allow standalone to be created even if cache is full, to avoid |
214 | | // reading the entry from storage. |
215 | 0 | result = |
216 | 0 | CreateStandalone(key, obj, helper, charge, /*allow_uncharged*/ true); |
217 | 0 | assert(result); |
218 | 0 | PERF_COUNTER_ADD(block_cache_standalone_handle_count, 1); |
219 | | |
220 | | // Insert dummy to record recent use |
221 | | // TODO: try to avoid case where inserting this dummy could overwrite a |
222 | | // regular entry |
223 | 0 | Status s = Insert(key, kDummyObj, &kNoopCacheItemHelper, /*charge=*/0, |
224 | 0 | /*handle=*/nullptr, priority); |
225 | 0 | s.PermitUncheckedError(); |
226 | | // Nothing to do or clean up on dummy insertion failure |
227 | 0 | } else { |
228 | | // Insert regular entry into primary cache. |
229 | | // Don't allow it to spill into secondary cache again if it was kept there. |
230 | 0 | Status s = Insert( |
231 | 0 | key, obj, kept_in_sec_cache ? helper->without_secondary_compat : helper, |
232 | 0 | charge, &result, priority); |
233 | 0 | if (s.ok()) { |
234 | 0 | assert(result); |
235 | 0 | PERF_COUNTER_ADD(block_cache_real_handle_count, 1); |
236 | 0 | } else { |
237 | | // Create standalone result instead, even if cache is full, to avoid |
238 | | // reading the entry from storage. |
239 | 0 | result = |
240 | 0 | CreateStandalone(key, obj, helper, charge, /*allow_uncharged*/ true); |
241 | 0 | assert(result); |
242 | 0 | PERF_COUNTER_ADD(block_cache_standalone_handle_count, 1); |
243 | 0 | } |
244 | 0 | } |
245 | 0 | return result; |
246 | 0 | } |
247 | | |
248 | | Status CacheWithSecondaryAdapter::Insert(const Slice& key, ObjectPtr value, |
249 | | const CacheItemHelper* helper, |
250 | | size_t charge, Handle** handle, |
251 | | Priority priority, |
252 | | const Slice& compressed_value, |
253 | 0 | CompressionType type) { |
254 | 0 | Status s = target_->Insert(key, value, helper, charge, handle, priority); |
255 | 0 | if (s.ok() && value == nullptr && distribute_cache_res_ && handle) { |
256 | 0 | charge = target_->GetCharge(*handle); |
257 | |
|
258 | 0 | MutexLock l(&cache_res_mutex_); |
259 | 0 | placeholder_usage_ += charge; |
260 | | // Check if total placeholder reservation is more than the overall |
261 | | // cache capacity. If it is, then we don't try to charge the |
262 | | // secondary cache because we don't want to overcharge it (beyond |
263 | | // its capacity). |
264 | | // In order to make this a bit more lightweight, we also check if |
265 | | // the difference between placeholder_usage_ and reserved_usage_ is |
266 | | // atleast kReservationChunkSize and avoid any adjustments if not. |
267 | 0 | if ((placeholder_usage_ <= target_->GetCapacity()) && |
268 | 0 | ((placeholder_usage_ - reserved_usage_) >= kReservationChunkSize)) { |
269 | 0 | reserved_usage_ = placeholder_usage_ & ~(kReservationChunkSize - 1); |
270 | 0 | size_t new_sec_reserved = |
271 | 0 | static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_); |
272 | 0 | size_t sec_charge = new_sec_reserved - sec_reserved_; |
273 | 0 | s = secondary_cache_->Deflate(sec_charge); |
274 | 0 | assert(s.ok()); |
275 | 0 | s = pri_cache_res_->UpdateCacheReservation(sec_charge, |
276 | 0 | /*increase=*/false); |
277 | 0 | assert(s.ok()); |
278 | 0 | sec_reserved_ += sec_charge; |
279 | 0 | } |
280 | 0 | } |
281 | | // Warm up the secondary cache with the compressed block. The secondary |
282 | | // cache may choose to ignore it based on the admission policy. |
283 | 0 | if (value != nullptr && !compressed_value.empty() && |
284 | 0 | adm_policy_ == TieredAdmissionPolicy::kAdmPolicyThreeQueue && |
285 | 0 | helper->IsSecondaryCacheCompatible()) { |
286 | 0 | Status status = secondary_cache_->InsertSaved(key, compressed_value, type); |
287 | 0 | assert(status.ok() || status.IsNotSupported()); |
288 | 0 | } |
289 | |
|
290 | 0 | return s; |
291 | 0 | } |
292 | | |
293 | | Cache::Handle* CacheWithSecondaryAdapter::Lookup(const Slice& key, |
294 | | const CacheItemHelper* helper, |
295 | | CreateContext* create_context, |
296 | | Priority priority, |
297 | 0 | Statistics* stats) { |
298 | | // NOTE: we could just StartAsyncLookup() and Wait(), but this should be a bit |
299 | | // more efficient |
300 | 0 | Handle* result = |
301 | 0 | target_->Lookup(key, helper, create_context, priority, stats); |
302 | 0 | bool secondary_compatible = helper && helper->IsSecondaryCacheCompatible(); |
303 | 0 | bool found_dummy_entry = |
304 | 0 | ProcessDummyResult(&result, /*erase=*/secondary_compatible); |
305 | 0 | if (!result && secondary_compatible) { |
306 | | // Try our secondary cache |
307 | 0 | bool kept_in_sec_cache = false; |
308 | 0 | std::unique_ptr<SecondaryCacheResultHandle> secondary_handle = |
309 | 0 | secondary_cache_->Lookup(key, helper, create_context, /*wait*/ true, |
310 | 0 | found_dummy_entry, stats, |
311 | 0 | /*out*/ kept_in_sec_cache); |
312 | 0 | if (secondary_handle) { |
313 | 0 | result = Promote(std::move(secondary_handle), key, helper, priority, |
314 | 0 | stats, found_dummy_entry, kept_in_sec_cache); |
315 | 0 | } |
316 | 0 | } |
317 | 0 | return result; |
318 | 0 | } |
319 | | |
320 | | bool CacheWithSecondaryAdapter::Release(Handle* handle, |
321 | 0 | bool erase_if_last_ref) { |
322 | 0 | if (erase_if_last_ref) { |
323 | 0 | ObjectPtr v = target_->Value(handle); |
324 | 0 | if (v == nullptr && distribute_cache_res_) { |
325 | 0 | size_t charge = target_->GetCharge(handle); |
326 | |
|
327 | 0 | MutexLock l(&cache_res_mutex_); |
328 | 0 | placeholder_usage_ -= charge; |
329 | | // Check if total placeholder reservation is more than the overall |
330 | | // cache capacity. If it is, then we do nothing as reserved_usage_ must |
331 | | // be already maxed out |
332 | 0 | if ((placeholder_usage_ <= target_->GetCapacity()) && |
333 | 0 | (placeholder_usage_ < reserved_usage_)) { |
334 | | // Adjust reserved_usage_ in chunks of kReservationChunkSize, so |
335 | | // we don't hit this slow path too often. |
336 | 0 | reserved_usage_ = placeholder_usage_ & ~(kReservationChunkSize - 1); |
337 | 0 | size_t new_sec_reserved = |
338 | 0 | static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_); |
339 | 0 | size_t sec_charge = sec_reserved_ - new_sec_reserved; |
340 | 0 | Status s = secondary_cache_->Inflate(sec_charge); |
341 | 0 | assert(s.ok()); |
342 | 0 | s = pri_cache_res_->UpdateCacheReservation(sec_charge, |
343 | 0 | /*increase=*/true); |
344 | 0 | assert(s.ok()); |
345 | 0 | sec_reserved_ -= sec_charge; |
346 | 0 | } |
347 | 0 | } |
348 | 0 | } |
349 | 0 | return target_->Release(handle, erase_if_last_ref); |
350 | 0 | } |
351 | | |
352 | 0 | Cache::ObjectPtr CacheWithSecondaryAdapter::Value(Handle* handle) { |
353 | 0 | ObjectPtr v = target_->Value(handle); |
354 | | // TODO with stacked secondaries: might fail in EvictionHandler |
355 | 0 | assert(v != kDummyObj); |
356 | 0 | return v; |
357 | 0 | } |
358 | | |
359 | | void CacheWithSecondaryAdapter::StartAsyncLookupOnMySecondary( |
360 | 0 | AsyncLookupHandle& async_handle) { |
361 | 0 | assert(!async_handle.IsPending()); |
362 | 0 | assert(async_handle.result_handle == nullptr); |
363 | |
|
364 | 0 | std::unique_ptr<SecondaryCacheResultHandle> secondary_handle = |
365 | 0 | secondary_cache_->Lookup( |
366 | 0 | async_handle.key, async_handle.helper, async_handle.create_context, |
367 | 0 | /*wait*/ false, async_handle.found_dummy_entry, async_handle.stats, |
368 | 0 | /*out*/ async_handle.kept_in_sec_cache); |
369 | 0 | if (secondary_handle) { |
370 | | // TODO with stacked secondaries: Check & process if already ready? |
371 | 0 | async_handle.pending_handle = secondary_handle.release(); |
372 | 0 | async_handle.pending_cache = secondary_cache_.get(); |
373 | 0 | } |
374 | 0 | } |
375 | | |
376 | | void CacheWithSecondaryAdapter::StartAsyncLookup( |
377 | 0 | AsyncLookupHandle& async_handle) { |
378 | 0 | target_->StartAsyncLookup(async_handle); |
379 | 0 | if (!async_handle.IsPending()) { |
380 | 0 | bool secondary_compatible = |
381 | 0 | async_handle.helper && |
382 | 0 | async_handle.helper->IsSecondaryCacheCompatible(); |
383 | 0 | async_handle.found_dummy_entry |= ProcessDummyResult( |
384 | 0 | &async_handle.result_handle, /*erase=*/secondary_compatible); |
385 | |
|
386 | 0 | if (async_handle.Result() == nullptr && secondary_compatible) { |
387 | | // Not found and not pending on another secondary cache |
388 | 0 | StartAsyncLookupOnMySecondary(async_handle); |
389 | 0 | } |
390 | 0 | } |
391 | 0 | } |
392 | | |
393 | | void CacheWithSecondaryAdapter::WaitAll(AsyncLookupHandle* async_handles, |
394 | 0 | size_t count) { |
395 | 0 | if (count == 0) { |
396 | | // Nothing to do |
397 | 0 | return; |
398 | 0 | } |
399 | | // Requests that are pending on *my* secondary cache, at the start of this |
400 | | // function |
401 | 0 | std::vector<AsyncLookupHandle*> my_pending; |
402 | | // Requests that are pending on an "inner" secondary cache (managed somewhere |
403 | | // under target_), as of the start of this function |
404 | 0 | std::vector<AsyncLookupHandle*> inner_pending; |
405 | | |
406 | | // Initial accounting of pending handles, excluding those already handled |
407 | | // by "outer" secondary caches. (See cur->pending_cache = nullptr.) |
408 | 0 | for (size_t i = 0; i < count; ++i) { |
409 | 0 | AsyncLookupHandle* cur = async_handles + i; |
410 | 0 | if (cur->pending_cache) { |
411 | 0 | assert(cur->IsPending()); |
412 | 0 | assert(cur->helper); |
413 | 0 | assert(cur->helper->IsSecondaryCacheCompatible()); |
414 | 0 | if (cur->pending_cache == secondary_cache_.get()) { |
415 | 0 | my_pending.push_back(cur); |
416 | | // Mark as "to be handled by this caller" |
417 | 0 | cur->pending_cache = nullptr; |
418 | 0 | } else { |
419 | | // Remember as potentially needing a lookup in my secondary |
420 | 0 | inner_pending.push_back(cur); |
421 | 0 | } |
422 | 0 | } |
423 | 0 | } |
424 | | |
425 | | // Wait on inner-most cache lookups first |
426 | | // TODO with stacked secondaries: because we are not using proper |
427 | | // async/await constructs here yet, there is a false synchronization point |
428 | | // here where all the results at one level are needed before initiating |
429 | | // any lookups at the next level. Probably not a big deal, but worth noting. |
430 | 0 | if (!inner_pending.empty()) { |
431 | 0 | target_->WaitAll(async_handles, count); |
432 | 0 | } |
433 | | |
434 | | // For those that failed to find something, convert to lookup in my |
435 | | // secondary cache. |
436 | 0 | for (AsyncLookupHandle* cur : inner_pending) { |
437 | 0 | if (cur->Result() == nullptr) { |
438 | | // Not found, try my secondary |
439 | 0 | StartAsyncLookupOnMySecondary(*cur); |
440 | 0 | if (cur->IsPending()) { |
441 | 0 | assert(cur->pending_cache == secondary_cache_.get()); |
442 | 0 | my_pending.push_back(cur); |
443 | | // Mark as "to be handled by this caller" |
444 | 0 | cur->pending_cache = nullptr; |
445 | 0 | } |
446 | 0 | } |
447 | 0 | } |
448 | | |
449 | | // Wait on all lookups on my secondary cache |
450 | 0 | { |
451 | 0 | std::vector<SecondaryCacheResultHandle*> my_secondary_handles; |
452 | 0 | for (AsyncLookupHandle* cur : my_pending) { |
453 | 0 | my_secondary_handles.push_back(cur->pending_handle); |
454 | 0 | } |
455 | 0 | secondary_cache_->WaitAll(std::move(my_secondary_handles)); |
456 | 0 | } |
457 | | |
458 | | // Process results |
459 | 0 | for (AsyncLookupHandle* cur : my_pending) { |
460 | 0 | std::unique_ptr<SecondaryCacheResultHandle> secondary_handle( |
461 | 0 | cur->pending_handle); |
462 | 0 | cur->pending_handle = nullptr; |
463 | 0 | cur->result_handle = Promote( |
464 | 0 | std::move(secondary_handle), cur->key, cur->helper, cur->priority, |
465 | 0 | cur->stats, cur->found_dummy_entry, cur->kept_in_sec_cache); |
466 | 0 | assert(cur->pending_cache == nullptr); |
467 | 0 | } |
468 | 0 | } |
469 | | |
470 | 0 | std::string CacheWithSecondaryAdapter::GetPrintableOptions() const { |
471 | 0 | std::string str = target_->GetPrintableOptions(); |
472 | 0 | str.append(" secondary_cache:\n"); |
473 | 0 | str.append(secondary_cache_->GetPrintableOptions()); |
474 | 0 | return str; |
475 | 0 | } |
476 | | |
477 | 0 | const char* CacheWithSecondaryAdapter::Name() const { |
478 | 0 | if (distribute_cache_res_) { |
479 | 0 | return kTieredCacheName; |
480 | 0 | } else { |
481 | | // To the user, at least for now, configure the underlying cache with |
482 | | // a secondary cache. So we pretend to be that cache |
483 | 0 | return target_->Name(); |
484 | 0 | } |
485 | 0 | } |
486 | | |
487 | | // Update the total cache capacity. If we're distributing cache reservations |
488 | | // to both primary and secondary, then update the pri_cache_res_reservation |
489 | | // as well. At the moment, we don't have a good way of handling the case |
490 | | // where the new capacity < total cache reservations. |
491 | 0 | void CacheWithSecondaryAdapter::SetCapacity(size_t capacity) { |
492 | 0 | if (distribute_cache_res_) { |
493 | 0 | MutexLock m(&cache_res_mutex_); |
494 | 0 | size_t sec_capacity = static_cast<size_t>(capacity * sec_cache_res_ratio_); |
495 | 0 | size_t old_sec_capacity = 0; |
496 | |
|
497 | 0 | Status s = secondary_cache_->GetCapacity(old_sec_capacity); |
498 | 0 | if (!s.ok()) { |
499 | 0 | return; |
500 | 0 | } |
501 | 0 | if (old_sec_capacity > sec_capacity) { |
502 | | // We're shrinking the cache. We do things in the following order to |
503 | | // avoid a temporary spike in usage over the configured capacity - |
504 | | // 1. Lower the secondary cache capacity |
505 | | // 2. Credit an equal amount (by decreasing pri_cache_res_) to the |
506 | | // primary cache |
507 | | // 3. Decrease the primary cache capacity to the total budget |
508 | 0 | s = secondary_cache_->SetCapacity(sec_capacity); |
509 | 0 | if (s.ok()) { |
510 | 0 | if (placeholder_usage_ > capacity) { |
511 | | // Adjust reserved_usage_ down |
512 | 0 | reserved_usage_ = capacity & ~(kReservationChunkSize - 1); |
513 | 0 | } |
514 | 0 | size_t new_sec_reserved = |
515 | 0 | static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_); |
516 | 0 | s = pri_cache_res_->UpdateCacheReservation( |
517 | 0 | (old_sec_capacity - sec_capacity) - |
518 | 0 | (sec_reserved_ - new_sec_reserved), |
519 | 0 | /*increase=*/false); |
520 | 0 | sec_reserved_ = new_sec_reserved; |
521 | 0 | assert(s.ok()); |
522 | 0 | target_->SetCapacity(capacity); |
523 | 0 | } |
524 | 0 | } else { |
525 | | // We're expanding the cache. Do it in the following order to avoid |
526 | | // unnecessary evictions - |
527 | | // 1. Increase the primary cache capacity to total budget |
528 | | // 2. Reserve additional memory in primary on behalf of secondary (by |
529 | | // increasing pri_cache_res_ reservation) |
530 | | // 3. Increase secondary cache capacity |
531 | 0 | target_->SetCapacity(capacity); |
532 | 0 | s = pri_cache_res_->UpdateCacheReservation( |
533 | 0 | sec_capacity - old_sec_capacity, |
534 | 0 | /*increase=*/true); |
535 | 0 | assert(s.ok()); |
536 | 0 | s = secondary_cache_->SetCapacity(sec_capacity); |
537 | 0 | assert(s.ok()); |
538 | 0 | } |
539 | 0 | } else { |
540 | | // No cache reservation distribution. Just set the primary cache capacity. |
541 | 0 | target_->SetCapacity(capacity); |
542 | 0 | } |
543 | 0 | } |
544 | | |
545 | | Status CacheWithSecondaryAdapter::GetSecondaryCacheCapacity( |
546 | 0 | size_t& size) const { |
547 | 0 | return secondary_cache_->GetCapacity(size); |
548 | 0 | } |
549 | | |
550 | | Status CacheWithSecondaryAdapter::GetSecondaryCachePinnedUsage( |
551 | 0 | size_t& size) const { |
552 | 0 | Status s; |
553 | 0 | if (distribute_cache_res_) { |
554 | 0 | MutexLock m(&cache_res_mutex_); |
555 | 0 | size_t capacity = 0; |
556 | 0 | s = secondary_cache_->GetCapacity(capacity); |
557 | 0 | if (s.ok()) { |
558 | 0 | size = capacity - pri_cache_res_->GetTotalMemoryUsed(); |
559 | 0 | } else { |
560 | 0 | size = 0; |
561 | 0 | } |
562 | 0 | } else { |
563 | 0 | size = 0; |
564 | 0 | } |
565 | 0 | return s; |
566 | 0 | } |
567 | | |
568 | | // Update the secondary/primary allocation ratio (remember, the primary |
569 | | // capacity is the total memory budget when distribute_cache_res_ is true). |
570 | | // When the ratio changes, we may accumulate some error in the calculations |
571 | | // for secondary cache inflate/deflate and pri_cache_res_ reservations. |
572 | | // This is due to the rounding of the reservation amount. |
573 | | // |
574 | | // We rely on the current pri_cache_res_ total memory used to estimate the |
575 | | // new secondary cache reservation after the ratio change. For this reason, |
576 | | // once the ratio is lowered to 0.0 (effectively disabling the secondary |
577 | | // cache and pri_cache_res_ total mem used going down to 0), we cannot |
578 | | // increase the ratio and re-enable it, We might remove this limitation |
579 | | // in the future. |
580 | | Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio( |
581 | 0 | double compressed_secondary_ratio) { |
582 | 0 | if (!distribute_cache_res_) { |
583 | 0 | return Status::NotSupported(); |
584 | 0 | } |
585 | | |
586 | 0 | MutexLock m(&cache_res_mutex_); |
587 | 0 | size_t pri_capacity = target_->GetCapacity(); |
588 | 0 | size_t sec_capacity = |
589 | 0 | static_cast<size_t>(pri_capacity * compressed_secondary_ratio); |
590 | 0 | size_t old_sec_capacity; |
591 | 0 | Status s = secondary_cache_->GetCapacity(old_sec_capacity); |
592 | 0 | if (!s.ok()) { |
593 | 0 | return s; |
594 | 0 | } |
595 | | |
596 | | // Calculate the new secondary cache reservation |
597 | | // reserved_usage_ will never be > the cache capacity, so we don't |
598 | | // have to worry about adjusting it here. |
599 | 0 | sec_cache_res_ratio_ = compressed_secondary_ratio; |
600 | 0 | size_t new_sec_reserved = |
601 | 0 | static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_); |
602 | 0 | if (sec_capacity > old_sec_capacity) { |
603 | | // We're increasing the ratio, thus ending up with a larger secondary |
604 | | // cache and a smaller usable primary cache capacity. Similar to |
605 | | // SetCapacity(), we try to avoid a temporary increase in total usage |
606 | | // beyond the configured capacity - |
607 | | // 1. A higher secondary cache ratio means it gets a higher share of |
608 | | // cache reservations. So first account for that by deflating the |
609 | | // secondary cache |
610 | | // 2. Increase pri_cache_res_ reservation to reflect the new secondary |
611 | | // cache utilization (increase in capacity - increase in share of cache |
612 | | // reservation) |
613 | | // 3. Increase secondary cache capacity |
614 | 0 | assert(new_sec_reserved >= sec_reserved_); |
615 | 0 | s = secondary_cache_->Deflate(new_sec_reserved - sec_reserved_); |
616 | 0 | assert(s.ok()); |
617 | 0 | s = pri_cache_res_->UpdateCacheReservation( |
618 | 0 | (sec_capacity - old_sec_capacity) - (new_sec_reserved - sec_reserved_), |
619 | 0 | /*increase=*/true); |
620 | 0 | assert(s.ok()); |
621 | 0 | sec_reserved_ = new_sec_reserved; |
622 | 0 | s = secondary_cache_->SetCapacity(sec_capacity); |
623 | 0 | assert(s.ok()); |
624 | 0 | } else { |
625 | | // We're shrinking the ratio. Try to avoid unnecessary evictions - |
626 | | // 1. Lower the secondary cache capacity |
627 | | // 2. Decrease pri_cache_res_ reservation to relect lower secondary |
628 | | // cache utilization (decrease in capacity - decrease in share of cache |
629 | | // reservations) |
630 | | // 3. Inflate the secondary cache to give it back the reduction in its |
631 | | // share of cache reservations |
632 | 0 | s = secondary_cache_->SetCapacity(sec_capacity); |
633 | 0 | if (s.ok()) { |
634 | 0 | s = pri_cache_res_->UpdateCacheReservation( |
635 | 0 | (old_sec_capacity - sec_capacity) - |
636 | 0 | (sec_reserved_ - new_sec_reserved), |
637 | 0 | /*increase=*/false); |
638 | 0 | assert(s.ok()); |
639 | 0 | s = secondary_cache_->Inflate(sec_reserved_ - new_sec_reserved); |
640 | 0 | assert(s.ok()); |
641 | 0 | sec_reserved_ = new_sec_reserved; |
642 | 0 | } |
643 | 0 | } |
644 | |
|
645 | 0 | return s; |
646 | 0 | } |
647 | | |
648 | | Status CacheWithSecondaryAdapter::UpdateAdmissionPolicy( |
649 | 0 | TieredAdmissionPolicy adm_policy) { |
650 | 0 | adm_policy_ = adm_policy; |
651 | 0 | return Status::OK(); |
652 | 0 | } |
653 | | |
654 | 0 | std::shared_ptr<Cache> NewTieredCache(const TieredCacheOptions& _opts) { |
655 | 0 | if (!_opts.cache_opts) { |
656 | 0 | return nullptr; |
657 | 0 | } |
658 | | |
659 | 0 | TieredCacheOptions opts = _opts; |
660 | 0 | { |
661 | 0 | bool valid_adm_policy = true; |
662 | |
|
663 | 0 | switch (_opts.adm_policy) { |
664 | 0 | case TieredAdmissionPolicy::kAdmPolicyAuto: |
665 | | // Select an appropriate default policy |
666 | 0 | if (opts.adm_policy == TieredAdmissionPolicy::kAdmPolicyAuto) { |
667 | 0 | if (opts.nvm_sec_cache) { |
668 | 0 | opts.adm_policy = TieredAdmissionPolicy::kAdmPolicyThreeQueue; |
669 | 0 | } else { |
670 | 0 | opts.adm_policy = TieredAdmissionPolicy::kAdmPolicyPlaceholder; |
671 | 0 | } |
672 | 0 | } |
673 | 0 | break; |
674 | 0 | case TieredAdmissionPolicy::kAdmPolicyPlaceholder: |
675 | 0 | case TieredAdmissionPolicy::kAdmPolicyAllowCacheHits: |
676 | 0 | case TieredAdmissionPolicy::kAdmPolicyAllowAll: |
677 | 0 | if (opts.nvm_sec_cache) { |
678 | 0 | valid_adm_policy = false; |
679 | 0 | } |
680 | 0 | break; |
681 | 0 | case TieredAdmissionPolicy::kAdmPolicyThreeQueue: |
682 | 0 | if (!opts.nvm_sec_cache) { |
683 | 0 | valid_adm_policy = false; |
684 | 0 | } |
685 | 0 | break; |
686 | 0 | default: |
687 | 0 | valid_adm_policy = false; |
688 | 0 | } |
689 | 0 | if (!valid_adm_policy) { |
690 | 0 | return nullptr; |
691 | 0 | } |
692 | 0 | } |
693 | | |
694 | 0 | std::shared_ptr<Cache> cache; |
695 | 0 | if (opts.cache_type == PrimaryCacheType::kCacheTypeLRU) { |
696 | 0 | LRUCacheOptions cache_opts = |
697 | 0 | *(static_cast_with_check<LRUCacheOptions, ShardedCacheOptions>( |
698 | 0 | opts.cache_opts)); |
699 | 0 | cache_opts.capacity = opts.total_capacity; |
700 | 0 | cache_opts.secondary_cache = nullptr; |
701 | 0 | cache = cache_opts.MakeSharedCache(); |
702 | 0 | } else if (opts.cache_type == PrimaryCacheType::kCacheTypeHCC) { |
703 | 0 | HyperClockCacheOptions cache_opts = |
704 | 0 | *(static_cast_with_check<HyperClockCacheOptions, ShardedCacheOptions>( |
705 | 0 | opts.cache_opts)); |
706 | 0 | cache_opts.capacity = opts.total_capacity; |
707 | 0 | cache_opts.secondary_cache = nullptr; |
708 | 0 | cache = cache_opts.MakeSharedCache(); |
709 | 0 | } else { |
710 | 0 | return nullptr; |
711 | 0 | } |
712 | 0 | std::shared_ptr<SecondaryCache> sec_cache; |
713 | 0 | opts.comp_cache_opts.capacity = static_cast<size_t>( |
714 | 0 | opts.total_capacity * opts.compressed_secondary_ratio); |
715 | 0 | sec_cache = NewCompressedSecondaryCache(opts.comp_cache_opts); |
716 | |
|
717 | 0 | if (opts.nvm_sec_cache) { |
718 | 0 | if (opts.adm_policy == TieredAdmissionPolicy::kAdmPolicyThreeQueue) { |
719 | 0 | sec_cache = std::make_shared<TieredSecondaryCache>( |
720 | 0 | sec_cache, opts.nvm_sec_cache, |
721 | 0 | TieredAdmissionPolicy::kAdmPolicyThreeQueue); |
722 | 0 | } else { |
723 | 0 | return nullptr; |
724 | 0 | } |
725 | 0 | } |
726 | | |
727 | 0 | return std::make_shared<CacheWithSecondaryAdapter>( |
728 | 0 | cache, sec_cache, opts.adm_policy, /*distribute_cache_res=*/true); |
729 | 0 | } |
730 | | |
731 | | Status UpdateTieredCache(const std::shared_ptr<Cache>& cache, |
732 | | int64_t total_capacity, |
733 | | double compressed_secondary_ratio, |
734 | 0 | TieredAdmissionPolicy adm_policy) { |
735 | 0 | if (!cache || strcmp(cache->Name(), kTieredCacheName)) { |
736 | 0 | return Status::InvalidArgument(); |
737 | 0 | } |
738 | 0 | CacheWithSecondaryAdapter* tiered_cache = |
739 | 0 | static_cast<CacheWithSecondaryAdapter*>(cache.get()); |
740 | |
|
741 | 0 | Status s; |
742 | 0 | if (total_capacity > 0) { |
743 | 0 | tiered_cache->SetCapacity(total_capacity); |
744 | 0 | } |
745 | 0 | if (compressed_secondary_ratio >= 0.0 && compressed_secondary_ratio <= 1.0) { |
746 | 0 | s = tiered_cache->UpdateCacheReservationRatio(compressed_secondary_ratio); |
747 | 0 | } |
748 | 0 | if (adm_policy < TieredAdmissionPolicy::kAdmPolicyMax) { |
749 | 0 | s = tiered_cache->UpdateAdmissionPolicy(adm_policy); |
750 | 0 | } |
751 | 0 | return s; |
752 | 0 | } |
753 | | } // namespace ROCKSDB_NAMESPACE |