/src/rocksdb/cache/secondary_cache_adapter.cc
Line | Count | Source |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "cache/secondary_cache_adapter.h" |
7 | | |
8 | | #include <atomic> |
9 | | |
10 | | #include "cache/tiered_secondary_cache.h" |
11 | | #include "monitoring/perf_context_imp.h" |
12 | | #include "test_util/sync_point.h" |
13 | | #include "util/cast_util.h" |
14 | | |
15 | | namespace ROCKSDB_NAMESPACE { |
16 | | |
17 | | namespace { |
18 | | // A distinct pointer value for marking "dummy" cache entries |
19 | | struct Dummy { |
20 | | char val[7] = "kDummy"; |
21 | | }; |
22 | | const Dummy kDummy{}; |
23 | | Cache::ObjectPtr const kDummyObj = const_cast<Dummy*>(&kDummy); |
24 | | const char* kTieredCacheName = "TieredCache"; |
25 | | } // namespace |
26 | | |
27 | | // When CacheWithSecondaryAdapter is constructed with the distribute_cache_res |
28 | | // parameter set to true, it manages the entire memory budget across the |
29 | | // primary and secondary cache. The secondary cache is assumed to be in |
30 | | // memory, such as the CompressedSecondaryCache. When a placeholder entry |
31 | | // is inserted by a CacheReservationManager instance to reserve memory, |
32 | | // the CacheWithSecondaryAdapter ensures that the reservation is distributed |
33 | | // proportionally across the primary/secondary caches. |
34 | | // |
35 | | // The primary block cache is initially sized to the sum of the primary cache |
36 | | // budget + the secondary cache budget, as follows - |
37 | | // |--------- Primary Cache Configured Capacity -----------| |
38 | | // |---Secondary Cache Budget----|----Primary Cache Budget-----| |
39 | | // |
40 | | // A ConcurrentCacheReservationManager member in the CacheWithSecondaryAdapter, |
41 | | // pri_cache_res_, |
42 | | // is used to help with tracking the distribution of memory reservations. |
43 | | // Initially, it accounts for the entire secondary cache budget as a |
44 | | // reservation against the primary cache. This shrinks the usable capacity of |
45 | | // the primary cache to the budget that the user originally desired. |
46 | | // |
47 | | // |--Reservation for Sec Cache--|-Pri Cache Usable Capacity---| |
48 | | // |
49 | | // When a reservation placeholder is inserted into the adapter, it is inserted |
50 | | // directly into the primary cache. This means the entire charge of the |
51 | | // placeholder is counted against the primary cache. To compensate and count |
52 | | // a portion of it against the secondary cache, the secondary cache Deflate() |
53 | | // method is called to shrink it. Since the Deflate() causes the secondary |
54 | | // actual usage to shrink, it is reflected here by releasing an equal amount |
55 | | // from the pri_cache_res_ reservation. The Deflate() in the secondary cache |
56 | | // can be, but is not required to be, implemented using its own cache |
57 | | // reservation manager. |
58 | | // |
59 | | // For example, if the pri/sec ratio is 70/30, and the combined capacity is |
60 | | // 100MB, the intermediate and final state after inserting a reservation |
61 | | // placeholder for 10MB would be as follows - |
62 | | // |
63 | | // |-Reservation for Sec Cache-|-Pri Cache Usable Capacity-|---R---| |
64 | | // 1. After inserting the placeholder in primary |
65 | | // |------- 30MB -------------|------- 60MB -------------|-10MB--| |
66 | | // 2. After deflating the secondary and adjusting the reservation for |
67 | | // secondary against the primary |
68 | | // |------- 27MB -------------|------- 63MB -------------|-10MB--| |
69 | | // |
70 | | // Likewise, when the user inserted placeholder is released, the secondary |
71 | | // cache Inflate() method is called to grow it, and the pri_cache_res_ |
72 | | // reservation is increased by an equal amount. |
73 | | // |
74 | | // Another way of implementing this would have been to simply split the user |
75 | | // reservation into primary and secondary components. However, this would |
76 | | // require allocating a structure to track the associated secondary cache |
77 | | // reservation, which adds some complexity and overhead. |
78 | | // |
79 | | CacheWithSecondaryAdapter::CacheWithSecondaryAdapter( |
80 | | std::shared_ptr<Cache> target, |
81 | | std::shared_ptr<SecondaryCache> secondary_cache, |
82 | | TieredAdmissionPolicy adm_policy, bool distribute_cache_res) |
83 | 0 | : CacheWrapper(std::move(target)), |
84 | 0 | secondary_cache_(std::move(secondary_cache)), |
85 | 0 | adm_policy_(adm_policy), |
86 | 0 | distribute_cache_res_(distribute_cache_res), |
87 | 0 | placeholder_usage_(0), |
88 | 0 | reserved_usage_(0), |
89 | 0 | sec_reserved_(0) { |
90 | 0 | target_->SetEvictionCallback( |
91 | 0 | [this](const Slice& key, Handle* handle, bool was_hit) { |
92 | 0 | return EvictionHandler(key, handle, was_hit); |
93 | 0 | }); |
94 | 0 | if (distribute_cache_res_) { |
95 | 0 | size_t sec_capacity = 0; |
96 | 0 | pri_cache_res_ = std::make_shared<ConcurrentCacheReservationManager>( |
97 | 0 | std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>( |
98 | 0 | target_)); |
99 | 0 | Status s = secondary_cache_->GetCapacity(sec_capacity); |
100 | 0 | assert(s.ok()); |
101 | | // Initially, the primary cache is sized to uncompressed cache budget plsu |
102 | | // compressed secondary cache budget. The secondary cache budget is then |
103 | | // taken away from the primary cache through cache reservations. Later, |
104 | | // when a placeholder entry is inserted by the caller, its inserted |
105 | | // into the primary cache and the portion that should be assigned to the |
106 | | // secondary cache is freed from the reservation. |
107 | 0 | s = pri_cache_res_->UpdateCacheReservation(sec_capacity); |
108 | 0 | assert(s.ok()); |
109 | 0 | sec_cache_res_ratio_ = (double)sec_capacity / target_->GetCapacity(); |
110 | 0 | } |
111 | 0 | } |
112 | | |
113 | 0 | CacheWithSecondaryAdapter::~CacheWithSecondaryAdapter() { |
114 | | // `*this` will be destroyed before `*target_`, so we have to prevent |
115 | | // use after free |
116 | 0 | target_->SetEvictionCallback({}); |
117 | | #ifndef NDEBUG |
118 | | if (distribute_cache_res_) { |
119 | | size_t sec_capacity = 0; |
120 | | Status s = secondary_cache_->GetCapacity(sec_capacity); |
121 | | assert(s.ok()); |
122 | | assert(placeholder_usage_ == 0); |
123 | | assert(reserved_usage_ == 0); |
124 | | if (pri_cache_res_->GetTotalMemoryUsed() != sec_capacity) { |
125 | | fprintf(stdout, |
126 | | "~CacheWithSecondaryAdapter: Primary cache reservation: " |
127 | | "%zu, Secondary cache capacity: %zu, " |
128 | | "Secondary cache reserved: %zu\n", |
129 | | pri_cache_res_->GetTotalMemoryUsed(), sec_capacity, |
130 | | sec_reserved_); |
131 | | } |
132 | | } |
133 | | #endif // NDEBUG |
134 | 0 | } |
135 | | |
136 | | bool CacheWithSecondaryAdapter::EvictionHandler(const Slice& key, |
137 | 0 | Handle* handle, bool was_hit) { |
138 | 0 | auto helper = GetCacheItemHelper(handle); |
139 | 0 | if (helper->IsSecondaryCacheCompatible() && |
140 | 0 | adm_policy_ != TieredAdmissionPolicy::kAdmPolicyThreeQueue) { |
141 | 0 | auto obj = target_->Value(handle); |
142 | | // Ignore dummy entry |
143 | 0 | if (obj != kDummyObj) { |
144 | 0 | bool force = false; |
145 | 0 | if (adm_policy_ == TieredAdmissionPolicy::kAdmPolicyAllowCacheHits) { |
146 | 0 | force = was_hit; |
147 | 0 | } else if (adm_policy_ == TieredAdmissionPolicy::kAdmPolicyAllowAll) { |
148 | 0 | force = true; |
149 | 0 | } |
150 | | // Spill into secondary cache. |
151 | 0 | secondary_cache_->Insert(key, obj, helper, force).PermitUncheckedError(); |
152 | 0 | } |
153 | 0 | } |
154 | | // Never takes ownership of obj |
155 | 0 | return false; |
156 | 0 | } |
157 | | |
158 | | bool CacheWithSecondaryAdapter::ProcessDummyResult(Cache::Handle** handle, |
159 | 0 | bool erase) { |
160 | 0 | if (*handle && target_->Value(*handle) == kDummyObj) { |
161 | 0 | target_->Release(*handle, erase); |
162 | 0 | *handle = nullptr; |
163 | 0 | return true; |
164 | 0 | } else { |
165 | 0 | return false; |
166 | 0 | } |
167 | 0 | } |
168 | | |
169 | | void CacheWithSecondaryAdapter::CleanupCacheObject( |
170 | 0 | ObjectPtr obj, const CacheItemHelper* helper) { |
171 | 0 | if (helper->del_cb) { |
172 | 0 | helper->del_cb(obj, memory_allocator()); |
173 | 0 | } |
174 | 0 | } |
175 | | |
176 | | Cache::Handle* CacheWithSecondaryAdapter::Promote( |
177 | | std::unique_ptr<SecondaryCacheResultHandle>&& secondary_handle, |
178 | | const Slice& key, const CacheItemHelper* helper, Priority priority, |
179 | 0 | Statistics* stats, bool found_dummy_entry, bool kept_in_sec_cache) { |
180 | 0 | assert(secondary_handle->IsReady()); |
181 | |
|
182 | 0 | ObjectPtr obj = secondary_handle->Value(); |
183 | 0 | if (!obj) { |
184 | | // Nothing found. |
185 | 0 | return nullptr; |
186 | 0 | } |
187 | | // Found something. |
188 | 0 | switch (helper->role) { |
189 | 0 | case CacheEntryRole::kFilterBlock: |
190 | 0 | RecordTick(stats, SECONDARY_CACHE_FILTER_HITS); |
191 | 0 | break; |
192 | 0 | case CacheEntryRole::kIndexBlock: |
193 | 0 | RecordTick(stats, SECONDARY_CACHE_INDEX_HITS); |
194 | 0 | break; |
195 | 0 | case CacheEntryRole::kDataBlock: |
196 | 0 | RecordTick(stats, SECONDARY_CACHE_DATA_HITS); |
197 | 0 | break; |
198 | 0 | default: |
199 | 0 | break; |
200 | 0 | } |
201 | 0 | PERF_COUNTER_ADD(secondary_cache_hit_count, 1); |
202 | 0 | RecordTick(stats, SECONDARY_CACHE_HITS); |
203 | | |
204 | | // Note: SecondaryCache::Size() is really charge (from the CreateCallback) |
205 | 0 | size_t charge = secondary_handle->Size(); |
206 | 0 | Handle* result = nullptr; |
207 | | // Insert into primary cache, possibly as a standalone+dummy entries. |
208 | 0 | if (secondary_cache_->SupportForceErase() && !found_dummy_entry) { |
209 | | // Create standalone and insert dummy |
210 | | // Allow standalone to be created even if cache is full, to avoid |
211 | | // reading the entry from storage. |
212 | 0 | result = |
213 | 0 | CreateStandalone(key, obj, helper, charge, /*allow_uncharged*/ true); |
214 | 0 | assert(result); |
215 | 0 | PERF_COUNTER_ADD(block_cache_standalone_handle_count, 1); |
216 | | |
217 | | // Insert dummy to record recent use |
218 | | // TODO: try to avoid case where inserting this dummy could overwrite a |
219 | | // regular entry |
220 | 0 | Status s = Insert(key, kDummyObj, &kNoopCacheItemHelper, /*charge=*/0, |
221 | 0 | /*handle=*/nullptr, priority); |
222 | 0 | s.PermitUncheckedError(); |
223 | | // Nothing to do or clean up on dummy insertion failure |
224 | 0 | } else { |
225 | | // Insert regular entry into primary cache. |
226 | | // Don't allow it to spill into secondary cache again if it was kept there. |
227 | 0 | Status s = Insert( |
228 | 0 | key, obj, kept_in_sec_cache ? helper->without_secondary_compat : helper, |
229 | 0 | charge, &result, priority); |
230 | 0 | if (s.ok()) { |
231 | 0 | assert(result); |
232 | 0 | PERF_COUNTER_ADD(block_cache_real_handle_count, 1); |
233 | 0 | } else { |
234 | | // Create standalone result instead, even if cache is full, to avoid |
235 | | // reading the entry from storage. |
236 | 0 | result = |
237 | 0 | CreateStandalone(key, obj, helper, charge, /*allow_uncharged*/ true); |
238 | 0 | assert(result); |
239 | 0 | PERF_COUNTER_ADD(block_cache_standalone_handle_count, 1); |
240 | 0 | } |
241 | 0 | } |
242 | 0 | return result; |
243 | 0 | } |
244 | | |
245 | | Status CacheWithSecondaryAdapter::Insert(const Slice& key, ObjectPtr value, |
246 | | const CacheItemHelper* helper, |
247 | | size_t charge, Handle** handle, |
248 | | Priority priority, |
249 | | const Slice& compressed_value, |
250 | 0 | CompressionType type) { |
251 | 0 | Status s = target_->Insert(key, value, helper, charge, handle, priority); |
252 | 0 | if (s.ok() && value == nullptr && distribute_cache_res_ && handle) { |
253 | 0 | charge = target_->GetCharge(*handle); |
254 | |
|
255 | 0 | MutexLock l(&cache_res_mutex_); |
256 | 0 | placeholder_usage_ += charge; |
257 | | // Check if total placeholder reservation is more than the overall |
258 | | // cache capacity. If it is, then we don't try to charge the |
259 | | // secondary cache because we don't want to overcharge it (beyond |
260 | | // its capacity). |
261 | | // In order to make this a bit more lightweight, we also check if |
262 | | // the difference between placeholder_usage_ and reserved_usage_ is |
263 | | // atleast kReservationChunkSize and avoid any adjustments if not. |
264 | 0 | if ((placeholder_usage_ <= target_->GetCapacity()) && |
265 | 0 | ((placeholder_usage_ - reserved_usage_) >= kReservationChunkSize)) { |
266 | 0 | reserved_usage_ = placeholder_usage_ & ~(kReservationChunkSize - 1); |
267 | 0 | size_t new_sec_reserved = |
268 | 0 | static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_); |
269 | 0 | size_t sec_charge = new_sec_reserved - sec_reserved_; |
270 | 0 | s = secondary_cache_->Deflate(sec_charge); |
271 | 0 | assert(s.ok()); |
272 | 0 | s = pri_cache_res_->UpdateCacheReservation(sec_charge, |
273 | 0 | /*increase=*/false); |
274 | 0 | assert(s.ok()); |
275 | 0 | sec_reserved_ += sec_charge; |
276 | 0 | } |
277 | 0 | } |
278 | | // Warm up the secondary cache with the compressed block. The secondary |
279 | | // cache may choose to ignore it based on the admission policy. |
280 | 0 | if (value != nullptr && !compressed_value.empty() && |
281 | 0 | adm_policy_ == TieredAdmissionPolicy::kAdmPolicyThreeQueue && |
282 | 0 | helper->IsSecondaryCacheCompatible()) { |
283 | 0 | Status status = secondary_cache_->InsertSaved(key, compressed_value, type); |
284 | 0 | assert(status.ok() || status.IsNotSupported()); |
285 | 0 | } |
286 | |
|
287 | 0 | return s; |
288 | 0 | } |
289 | | |
290 | | Cache::Handle* CacheWithSecondaryAdapter::Lookup(const Slice& key, |
291 | | const CacheItemHelper* helper, |
292 | | CreateContext* create_context, |
293 | | Priority priority, |
294 | 0 | Statistics* stats) { |
295 | | // NOTE: we could just StartAsyncLookup() and Wait(), but this should be a bit |
296 | | // more efficient |
297 | 0 | Handle* result = |
298 | 0 | target_->Lookup(key, helper, create_context, priority, stats); |
299 | 0 | bool secondary_compatible = helper && helper->IsSecondaryCacheCompatible(); |
300 | 0 | bool found_dummy_entry = |
301 | 0 | ProcessDummyResult(&result, /*erase=*/secondary_compatible); |
302 | 0 | if (!result && secondary_compatible) { |
303 | | // Try our secondary cache |
304 | 0 | bool kept_in_sec_cache = false; |
305 | 0 | std::unique_ptr<SecondaryCacheResultHandle> secondary_handle = |
306 | 0 | secondary_cache_->Lookup(key, helper, create_context, /*wait*/ true, |
307 | 0 | found_dummy_entry, stats, |
308 | 0 | /*out*/ kept_in_sec_cache); |
309 | 0 | if (secondary_handle) { |
310 | 0 | result = Promote(std::move(secondary_handle), key, helper, priority, |
311 | 0 | stats, found_dummy_entry, kept_in_sec_cache); |
312 | 0 | } |
313 | 0 | } |
314 | 0 | return result; |
315 | 0 | } |
316 | | |
317 | | bool CacheWithSecondaryAdapter::Release(Handle* handle, |
318 | 0 | bool erase_if_last_ref) { |
319 | 0 | if (erase_if_last_ref) { |
320 | 0 | ObjectPtr v = target_->Value(handle); |
321 | 0 | if (v == nullptr && distribute_cache_res_) { |
322 | 0 | size_t charge = target_->GetCharge(handle); |
323 | |
|
324 | 0 | MutexLock l(&cache_res_mutex_); |
325 | 0 | placeholder_usage_ -= charge; |
326 | | // Check if total placeholder reservation is more than the overall |
327 | | // cache capacity. If it is, then we do nothing as reserved_usage_ must |
328 | | // be already maxed out |
329 | 0 | if ((placeholder_usage_ <= target_->GetCapacity()) && |
330 | 0 | (placeholder_usage_ < reserved_usage_)) { |
331 | | // Adjust reserved_usage_ in chunks of kReservationChunkSize, so |
332 | | // we don't hit this slow path too often. |
333 | 0 | reserved_usage_ = placeholder_usage_ & ~(kReservationChunkSize - 1); |
334 | 0 | size_t new_sec_reserved = |
335 | 0 | static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_); |
336 | 0 | size_t sec_charge = sec_reserved_ - new_sec_reserved; |
337 | 0 | Status s = secondary_cache_->Inflate(sec_charge); |
338 | 0 | assert(s.ok()); |
339 | 0 | s = pri_cache_res_->UpdateCacheReservation(sec_charge, |
340 | 0 | /*increase=*/true); |
341 | 0 | assert(s.ok()); |
342 | 0 | sec_reserved_ -= sec_charge; |
343 | 0 | } |
344 | 0 | } |
345 | 0 | } |
346 | 0 | return target_->Release(handle, erase_if_last_ref); |
347 | 0 | } |
348 | | |
349 | 0 | Cache::ObjectPtr CacheWithSecondaryAdapter::Value(Handle* handle) { |
350 | 0 | ObjectPtr v = target_->Value(handle); |
351 | | // TODO with stacked secondaries: might fail in EvictionHandler |
352 | 0 | assert(v != kDummyObj); |
353 | 0 | return v; |
354 | 0 | } |
355 | | |
356 | | void CacheWithSecondaryAdapter::StartAsyncLookupOnMySecondary( |
357 | 0 | AsyncLookupHandle& async_handle) { |
358 | 0 | assert(!async_handle.IsPending()); |
359 | 0 | assert(async_handle.result_handle == nullptr); |
360 | |
|
361 | 0 | std::unique_ptr<SecondaryCacheResultHandle> secondary_handle = |
362 | 0 | secondary_cache_->Lookup( |
363 | 0 | async_handle.key, async_handle.helper, async_handle.create_context, |
364 | 0 | /*wait*/ false, async_handle.found_dummy_entry, async_handle.stats, |
365 | 0 | /*out*/ async_handle.kept_in_sec_cache); |
366 | 0 | if (secondary_handle) { |
367 | | // TODO with stacked secondaries: Check & process if already ready? |
368 | 0 | async_handle.pending_handle = secondary_handle.release(); |
369 | 0 | async_handle.pending_cache = secondary_cache_.get(); |
370 | 0 | } |
371 | 0 | } |
372 | | |
373 | | void CacheWithSecondaryAdapter::StartAsyncLookup( |
374 | 0 | AsyncLookupHandle& async_handle) { |
375 | 0 | target_->StartAsyncLookup(async_handle); |
376 | 0 | if (!async_handle.IsPending()) { |
377 | 0 | bool secondary_compatible = |
378 | 0 | async_handle.helper && |
379 | 0 | async_handle.helper->IsSecondaryCacheCompatible(); |
380 | 0 | async_handle.found_dummy_entry |= ProcessDummyResult( |
381 | 0 | &async_handle.result_handle, /*erase=*/secondary_compatible); |
382 | |
|
383 | 0 | if (async_handle.Result() == nullptr && secondary_compatible) { |
384 | | // Not found and not pending on another secondary cache |
385 | 0 | StartAsyncLookupOnMySecondary(async_handle); |
386 | 0 | } |
387 | 0 | } |
388 | 0 | } |
389 | | |
390 | | void CacheWithSecondaryAdapter::WaitAll(AsyncLookupHandle* async_handles, |
391 | 0 | size_t count) { |
392 | 0 | if (count == 0) { |
393 | | // Nothing to do |
394 | 0 | return; |
395 | 0 | } |
396 | | // Requests that are pending on *my* secondary cache, at the start of this |
397 | | // function |
398 | 0 | std::vector<AsyncLookupHandle*> my_pending; |
399 | | // Requests that are pending on an "inner" secondary cache (managed somewhere |
400 | | // under target_), as of the start of this function |
401 | 0 | std::vector<AsyncLookupHandle*> inner_pending; |
402 | | |
403 | | // Initial accounting of pending handles, excluding those already handled |
404 | | // by "outer" secondary caches. (See cur->pending_cache = nullptr.) |
405 | 0 | for (size_t i = 0; i < count; ++i) { |
406 | 0 | AsyncLookupHandle* cur = async_handles + i; |
407 | 0 | if (cur->pending_cache) { |
408 | 0 | assert(cur->IsPending()); |
409 | 0 | assert(cur->helper); |
410 | 0 | assert(cur->helper->IsSecondaryCacheCompatible()); |
411 | 0 | if (cur->pending_cache == secondary_cache_.get()) { |
412 | 0 | my_pending.push_back(cur); |
413 | | // Mark as "to be handled by this caller" |
414 | 0 | cur->pending_cache = nullptr; |
415 | 0 | } else { |
416 | | // Remember as potentially needing a lookup in my secondary |
417 | 0 | inner_pending.push_back(cur); |
418 | 0 | } |
419 | 0 | } |
420 | 0 | } |
421 | | |
422 | | // Wait on inner-most cache lookups first |
423 | | // TODO with stacked secondaries: because we are not using proper |
424 | | // async/await constructs here yet, there is a false synchronization point |
425 | | // here where all the results at one level are needed before initiating |
426 | | // any lookups at the next level. Probably not a big deal, but worth noting. |
427 | 0 | if (!inner_pending.empty()) { |
428 | 0 | target_->WaitAll(async_handles, count); |
429 | 0 | } |
430 | | |
431 | | // For those that failed to find something, convert to lookup in my |
432 | | // secondary cache. |
433 | 0 | for (AsyncLookupHandle* cur : inner_pending) { |
434 | 0 | if (cur->Result() == nullptr) { |
435 | | // Not found, try my secondary |
436 | 0 | StartAsyncLookupOnMySecondary(*cur); |
437 | 0 | if (cur->IsPending()) { |
438 | 0 | assert(cur->pending_cache == secondary_cache_.get()); |
439 | 0 | my_pending.push_back(cur); |
440 | | // Mark as "to be handled by this caller" |
441 | 0 | cur->pending_cache = nullptr; |
442 | 0 | } |
443 | 0 | } |
444 | 0 | } |
445 | | |
446 | | // Wait on all lookups on my secondary cache |
447 | 0 | { |
448 | 0 | std::vector<SecondaryCacheResultHandle*> my_secondary_handles; |
449 | 0 | for (AsyncLookupHandle* cur : my_pending) { |
450 | 0 | my_secondary_handles.push_back(cur->pending_handle); |
451 | 0 | } |
452 | 0 | secondary_cache_->WaitAll(std::move(my_secondary_handles)); |
453 | 0 | } |
454 | | |
455 | | // Process results |
456 | 0 | for (AsyncLookupHandle* cur : my_pending) { |
457 | 0 | std::unique_ptr<SecondaryCacheResultHandle> secondary_handle( |
458 | 0 | cur->pending_handle); |
459 | 0 | cur->pending_handle = nullptr; |
460 | 0 | cur->result_handle = Promote( |
461 | 0 | std::move(secondary_handle), cur->key, cur->helper, cur->priority, |
462 | 0 | cur->stats, cur->found_dummy_entry, cur->kept_in_sec_cache); |
463 | 0 | assert(cur->pending_cache == nullptr); |
464 | 0 | } |
465 | 0 | } |
466 | | |
467 | 0 | std::string CacheWithSecondaryAdapter::GetPrintableOptions() const { |
468 | 0 | std::string str = target_->GetPrintableOptions(); |
469 | 0 | str.append(" secondary_cache:\n"); |
470 | 0 | str.append(secondary_cache_->GetPrintableOptions()); |
471 | 0 | return str; |
472 | 0 | } |
473 | | |
474 | 0 | const char* CacheWithSecondaryAdapter::Name() const { |
475 | 0 | if (distribute_cache_res_) { |
476 | 0 | return kTieredCacheName; |
477 | 0 | } else { |
478 | | // To the user, at least for now, configure the underlying cache with |
479 | | // a secondary cache. So we pretend to be that cache |
480 | 0 | return target_->Name(); |
481 | 0 | } |
482 | 0 | } |
483 | | |
484 | | // Update the total cache capacity. If we're distributing cache reservations |
485 | | // to both primary and secondary, then update the pri_cache_res_reservation |
486 | | // as well. At the moment, we don't have a good way of handling the case |
487 | | // where the new capacity < total cache reservations. |
488 | 0 | void CacheWithSecondaryAdapter::SetCapacity(size_t capacity) { |
489 | 0 | if (distribute_cache_res_) { |
490 | 0 | MutexLock m(&cache_res_mutex_); |
491 | 0 | size_t sec_capacity = static_cast<size_t>(capacity * sec_cache_res_ratio_); |
492 | 0 | size_t old_sec_capacity = 0; |
493 | |
|
494 | 0 | Status s = secondary_cache_->GetCapacity(old_sec_capacity); |
495 | 0 | if (!s.ok()) { |
496 | 0 | return; |
497 | 0 | } |
498 | 0 | if (old_sec_capacity > sec_capacity) { |
499 | | // We're shrinking the cache. We do things in the following order to |
500 | | // avoid a temporary spike in usage over the configured capacity - |
501 | | // 1. Lower the secondary cache capacity |
502 | | // 2. Credit an equal amount (by decreasing pri_cache_res_) to the |
503 | | // primary cache |
504 | | // 3. Decrease the primary cache capacity to the total budget |
505 | 0 | s = secondary_cache_->SetCapacity(sec_capacity); |
506 | 0 | if (s.ok()) { |
507 | 0 | if (placeholder_usage_ > capacity) { |
508 | | // Adjust reserved_usage_ down |
509 | 0 | reserved_usage_ = capacity & ~(kReservationChunkSize - 1); |
510 | 0 | } |
511 | 0 | size_t new_sec_reserved = |
512 | 0 | static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_); |
513 | 0 | s = pri_cache_res_->UpdateCacheReservation( |
514 | 0 | (old_sec_capacity - sec_capacity) - |
515 | 0 | (sec_reserved_ - new_sec_reserved), |
516 | 0 | /*increase=*/false); |
517 | 0 | sec_reserved_ = new_sec_reserved; |
518 | 0 | assert(s.ok()); |
519 | 0 | target_->SetCapacity(capacity); |
520 | 0 | } |
521 | 0 | } else { |
522 | | // We're expanding the cache. Do it in the following order to avoid |
523 | | // unnecessary evictions - |
524 | | // 1. Increase the primary cache capacity to total budget |
525 | | // 2. Reserve additional memory in primary on behalf of secondary (by |
526 | | // increasing pri_cache_res_ reservation) |
527 | | // 3. Increase secondary cache capacity |
528 | 0 | target_->SetCapacity(capacity); |
529 | 0 | s = pri_cache_res_->UpdateCacheReservation( |
530 | 0 | sec_capacity - old_sec_capacity, |
531 | 0 | /*increase=*/true); |
532 | 0 | assert(s.ok()); |
533 | 0 | s = secondary_cache_->SetCapacity(sec_capacity); |
534 | 0 | assert(s.ok()); |
535 | 0 | } |
536 | 0 | } else { |
537 | | // No cache reservation distribution. Just set the primary cache capacity. |
538 | 0 | target_->SetCapacity(capacity); |
539 | 0 | } |
540 | 0 | } |
541 | | |
542 | | Status CacheWithSecondaryAdapter::GetSecondaryCacheCapacity( |
543 | 0 | size_t& size) const { |
544 | 0 | return secondary_cache_->GetCapacity(size); |
545 | 0 | } |
546 | | |
547 | | Status CacheWithSecondaryAdapter::GetSecondaryCachePinnedUsage( |
548 | 0 | size_t& size) const { |
549 | 0 | Status s; |
550 | 0 | if (distribute_cache_res_) { |
551 | 0 | MutexLock m(&cache_res_mutex_); |
552 | 0 | size_t capacity = 0; |
553 | 0 | s = secondary_cache_->GetCapacity(capacity); |
554 | 0 | if (s.ok()) { |
555 | 0 | size = capacity - pri_cache_res_->GetTotalMemoryUsed(); |
556 | 0 | } else { |
557 | 0 | size = 0; |
558 | 0 | } |
559 | 0 | } else { |
560 | 0 | size = 0; |
561 | 0 | } |
562 | 0 | return s; |
563 | 0 | } |
564 | | |
565 | | // Update the secondary/primary allocation ratio (remember, the primary |
566 | | // capacity is the total memory budget when distribute_cache_res_ is true). |
567 | | // When the ratio changes, we may accumulate some error in the calculations |
568 | | // for secondary cache inflate/deflate and pri_cache_res_ reservations. |
569 | | // This is due to the rounding of the reservation amount. |
570 | | // |
571 | | // We rely on the current pri_cache_res_ total memory used to estimate the |
572 | | // new secondary cache reservation after the ratio change. For this reason, |
573 | | // once the ratio is lowered to 0.0 (effectively disabling the secondary |
574 | | // cache and pri_cache_res_ total mem used going down to 0), we cannot |
575 | | // increase the ratio and re-enable it, We might remove this limitation |
576 | | // in the future. |
577 | | Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio( |
578 | 0 | double compressed_secondary_ratio) { |
579 | 0 | if (!distribute_cache_res_) { |
580 | 0 | return Status::NotSupported(); |
581 | 0 | } |
582 | | |
583 | 0 | MutexLock m(&cache_res_mutex_); |
584 | 0 | size_t pri_capacity = target_->GetCapacity(); |
585 | 0 | size_t sec_capacity = |
586 | 0 | static_cast<size_t>(pri_capacity * compressed_secondary_ratio); |
587 | 0 | size_t old_sec_capacity = 0; |
588 | 0 | Status s = secondary_cache_->GetCapacity(old_sec_capacity); |
589 | 0 | if (!s.ok()) { |
590 | 0 | return s; |
591 | 0 | } |
592 | | |
593 | | // Calculate the new secondary cache reservation |
594 | | // reserved_usage_ will never be > the cache capacity, so we don't |
595 | | // have to worry about adjusting it here. |
596 | 0 | sec_cache_res_ratio_ = compressed_secondary_ratio; |
597 | 0 | size_t new_sec_reserved = |
598 | 0 | static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_); |
599 | 0 | if (sec_capacity > old_sec_capacity) { |
600 | | // We're increasing the ratio, thus ending up with a larger secondary |
601 | | // cache and a smaller usable primary cache capacity. Similar to |
602 | | // SetCapacity(), we try to avoid a temporary increase in total usage |
603 | | // beyond the configured capacity - |
604 | | // 1. A higher secondary cache ratio means it gets a higher share of |
605 | | // cache reservations. So first account for that by deflating the |
606 | | // secondary cache |
607 | | // 2. Increase pri_cache_res_ reservation to reflect the new secondary |
608 | | // cache utilization (increase in capacity - increase in share of cache |
609 | | // reservation) |
610 | | // 3. Increase secondary cache capacity |
611 | 0 | assert(new_sec_reserved >= sec_reserved_); |
612 | 0 | s = secondary_cache_->Deflate(new_sec_reserved - sec_reserved_); |
613 | 0 | assert(s.ok()); |
614 | 0 | s = pri_cache_res_->UpdateCacheReservation( |
615 | 0 | (sec_capacity - old_sec_capacity) - (new_sec_reserved - sec_reserved_), |
616 | 0 | /*increase=*/true); |
617 | 0 | assert(s.ok()); |
618 | 0 | sec_reserved_ = new_sec_reserved; |
619 | 0 | s = secondary_cache_->SetCapacity(sec_capacity); |
620 | 0 | assert(s.ok()); |
621 | 0 | } else { |
622 | | // We're shrinking the ratio. Try to avoid unnecessary evictions - |
623 | | // 1. Lower the secondary cache capacity |
624 | | // 2. Decrease pri_cache_res_ reservation to reflect lower secondary |
625 | | // cache utilization (decrease in capacity - decrease in share of cache |
626 | | // reservations) |
627 | | // 3. Inflate the secondary cache to give it back the reduction in its |
628 | | // share of cache reservations |
629 | 0 | s = secondary_cache_->SetCapacity(sec_capacity); |
630 | 0 | if (s.ok()) { |
631 | 0 | s = pri_cache_res_->UpdateCacheReservation( |
632 | 0 | (old_sec_capacity - sec_capacity) - |
633 | 0 | (sec_reserved_ - new_sec_reserved), |
634 | 0 | /*increase=*/false); |
635 | 0 | assert(s.ok()); |
636 | 0 | s = secondary_cache_->Inflate(sec_reserved_ - new_sec_reserved); |
637 | 0 | assert(s.ok()); |
638 | 0 | sec_reserved_ = new_sec_reserved; |
639 | 0 | } |
640 | 0 | } |
641 | |
|
642 | 0 | return s; |
643 | 0 | } |
644 | | |
645 | | Status CacheWithSecondaryAdapter::UpdateAdmissionPolicy( |
646 | 0 | TieredAdmissionPolicy adm_policy) { |
647 | 0 | adm_policy_ = adm_policy; |
648 | 0 | return Status::OK(); |
649 | 0 | } |
650 | | |
651 | 0 | std::shared_ptr<Cache> NewTieredCache(const TieredCacheOptions& _opts) { |
652 | 0 | if (!_opts.cache_opts) { |
653 | 0 | return nullptr; |
654 | 0 | } |
655 | | |
656 | 0 | TieredCacheOptions opts = _opts; |
657 | 0 | { |
658 | 0 | bool valid_adm_policy = true; |
659 | |
|
660 | 0 | switch (_opts.adm_policy) { |
661 | 0 | case TieredAdmissionPolicy::kAdmPolicyAuto: |
662 | | // Select an appropriate default policy |
663 | 0 | if (opts.adm_policy == TieredAdmissionPolicy::kAdmPolicyAuto) { |
664 | 0 | if (opts.nvm_sec_cache) { |
665 | 0 | opts.adm_policy = TieredAdmissionPolicy::kAdmPolicyThreeQueue; |
666 | 0 | } else { |
667 | 0 | opts.adm_policy = TieredAdmissionPolicy::kAdmPolicyPlaceholder; |
668 | 0 | } |
669 | 0 | } |
670 | 0 | break; |
671 | 0 | case TieredAdmissionPolicy::kAdmPolicyPlaceholder: |
672 | 0 | case TieredAdmissionPolicy::kAdmPolicyAllowCacheHits: |
673 | 0 | case TieredAdmissionPolicy::kAdmPolicyAllowAll: |
674 | 0 | if (opts.nvm_sec_cache) { |
675 | 0 | valid_adm_policy = false; |
676 | 0 | } |
677 | 0 | break; |
678 | 0 | case TieredAdmissionPolicy::kAdmPolicyThreeQueue: |
679 | 0 | if (!opts.nvm_sec_cache) { |
680 | 0 | valid_adm_policy = false; |
681 | 0 | } |
682 | 0 | break; |
683 | 0 | default: |
684 | 0 | valid_adm_policy = false; |
685 | 0 | } |
686 | 0 | if (!valid_adm_policy) { |
687 | 0 | return nullptr; |
688 | 0 | } |
689 | 0 | } |
690 | | |
691 | 0 | std::shared_ptr<Cache> cache; |
692 | 0 | if (opts.cache_type == PrimaryCacheType::kCacheTypeLRU) { |
693 | 0 | LRUCacheOptions cache_opts = |
694 | 0 | *(static_cast_with_check<LRUCacheOptions, ShardedCacheOptions>( |
695 | 0 | opts.cache_opts)); |
696 | 0 | cache_opts.capacity = opts.total_capacity; |
697 | 0 | cache_opts.secondary_cache = nullptr; |
698 | 0 | cache = cache_opts.MakeSharedCache(); |
699 | 0 | } else if (opts.cache_type == PrimaryCacheType::kCacheTypeHCC) { |
700 | 0 | HyperClockCacheOptions cache_opts = |
701 | 0 | *(static_cast_with_check<HyperClockCacheOptions, ShardedCacheOptions>( |
702 | 0 | opts.cache_opts)); |
703 | 0 | cache_opts.capacity = opts.total_capacity; |
704 | 0 | cache_opts.secondary_cache = nullptr; |
705 | 0 | cache = cache_opts.MakeSharedCache(); |
706 | 0 | } else { |
707 | 0 | return nullptr; |
708 | 0 | } |
709 | 0 | std::shared_ptr<SecondaryCache> sec_cache; |
710 | 0 | opts.comp_cache_opts.capacity = static_cast<size_t>( |
711 | 0 | opts.total_capacity * opts.compressed_secondary_ratio); |
712 | 0 | sec_cache = NewCompressedSecondaryCache(opts.comp_cache_opts); |
713 | |
|
714 | 0 | if (opts.nvm_sec_cache) { |
715 | 0 | if (opts.adm_policy == TieredAdmissionPolicy::kAdmPolicyThreeQueue) { |
716 | 0 | sec_cache = std::make_shared<TieredSecondaryCache>( |
717 | 0 | sec_cache, opts.nvm_sec_cache, |
718 | 0 | TieredAdmissionPolicy::kAdmPolicyThreeQueue); |
719 | 0 | } else { |
720 | 0 | return nullptr; |
721 | 0 | } |
722 | 0 | } |
723 | | |
724 | 0 | return std::make_shared<CacheWithSecondaryAdapter>( |
725 | 0 | cache, sec_cache, opts.adm_policy, /*distribute_cache_res=*/true); |
726 | 0 | } |
727 | | |
728 | | Status UpdateTieredCache(const std::shared_ptr<Cache>& cache, |
729 | | int64_t total_capacity, |
730 | | double compressed_secondary_ratio, |
731 | 0 | TieredAdmissionPolicy adm_policy) { |
732 | 0 | if (!cache || strcmp(cache->Name(), kTieredCacheName)) { |
733 | 0 | return Status::InvalidArgument(); |
734 | 0 | } |
735 | 0 | CacheWithSecondaryAdapter* tiered_cache = |
736 | 0 | static_cast<CacheWithSecondaryAdapter*>(cache.get()); |
737 | |
|
738 | 0 | Status s; |
739 | 0 | if (total_capacity > 0) { |
740 | 0 | tiered_cache->SetCapacity(total_capacity); |
741 | 0 | } |
742 | 0 | if (compressed_secondary_ratio >= 0.0 && compressed_secondary_ratio <= 1.0) { |
743 | 0 | s = tiered_cache->UpdateCacheReservationRatio(compressed_secondary_ratio); |
744 | 0 | } |
745 | 0 | if (adm_policy < TieredAdmissionPolicy::kAdmPolicyMax) { |
746 | 0 | s = tiered_cache->UpdateAdmissionPolicy(adm_policy); |
747 | 0 | } |
748 | 0 | return s; |
749 | 0 | } |
750 | | } // namespace ROCKSDB_NAMESPACE |