Coverage Report

Created: 2025-07-23 07:17

/src/rocksdb/cache/secondary_cache_adapter.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "cache/secondary_cache_adapter.h"
7
8
#include <atomic>
9
10
#include "cache/tiered_secondary_cache.h"
11
#include "monitoring/perf_context_imp.h"
12
#include "test_util/sync_point.h"
13
#include "util/cast_util.h"
14
15
namespace ROCKSDB_NAMESPACE {
16
17
namespace {
18
// A distinct pointer value for marking "dummy" cache entries
19
struct Dummy {
20
  char val[7] = "kDummy";
21
};
22
const Dummy kDummy{};
23
Cache::ObjectPtr const kDummyObj = const_cast<Dummy*>(&kDummy);
24
const char* kTieredCacheName = "TieredCache";
25
}  // namespace
26
27
// When CacheWithSecondaryAdapter is constructed with the distribute_cache_res
28
// parameter set to true, it manages the entire memory budget across the
29
// primary and secondary cache. The secondary cache is assumed to be in
30
// memory, such as the CompressedSecondaryCache. When a placeholder entry
31
// is inserted by a CacheReservationManager instance to reserve memory,
32
// the CacheWithSecondaryAdapter ensures that the reservation is distributed
33
// proportionally across the primary/secondary caches.
34
//
35
// The primary block cache is initially sized to the sum of the primary cache
36
// budget + teh secondary cache budget, as follows -
37
//   |---------    Primary Cache Configured Capacity  -----------|
38
//   |---Secondary Cache Budget----|----Primary Cache Budget-----|
39
//
40
// A ConcurrentCacheReservationManager member in the CacheWithSecondaryAdapter,
41
// pri_cache_res_,
42
// is used to help with tracking the distribution of memory reservations.
43
// Initially, it accounts for the entire secondary cache budget as a
44
// reservation against the primary cache. This shrinks the usable capacity of
45
// the primary cache to the budget that the user originally desired.
46
//
47
//   |--Reservation for Sec Cache--|-Pri Cache Usable Capacity---|
48
//
49
// When a reservation placeholder is inserted into the adapter, it is inserted
50
// directly into the primary cache. This means the entire charge of the
51
// placeholder is counted against the primary cache. To compensate and count
52
// a portion of it against the secondary cache, the secondary cache Deflate()
53
// method is called to shrink it. Since the Deflate() causes the secondary
54
// actual usage to shrink, it is refelcted here by releasing an equal amount
55
// from the pri_cache_res_ reservation. The Deflate() in the secondary cache
56
// can be, but is not required to be, implemented using its own cache
57
// reservation manager.
58
//
59
// For example, if the pri/sec ratio is 70/30, and the combined capacity is
60
// 100MB, the intermediate and final  state after inserting a reservation
61
// placeholder for 10MB would be as follows -
62
//
63
//   |-Reservation for Sec Cache-|-Pri Cache Usable Capacity-|---R---|
64
// 1. After inserting the placeholder in primary
65
//   |-------  30MB -------------|------- 60MB -------------|-10MB--|
66
// 2. After deflating the secondary and adjusting the reservation for
67
//    secondary against the primary
68
//   |-------  27MB -------------|------- 63MB -------------|-10MB--|
69
//
70
// Likewise, when the user inserted placeholder is released, the secondary
71
// cache Inflate() method is called to grow it, and the pri_cache_res_
72
// reservation is increased by an equal amount.
73
//
74
// Another way of implementing this would have been to simply split the user
75
// reservation into primary and seconary components. However, this would
76
// require allocating a structure to track the associated secondary cache
77
// reservation, which adds some complexity and overhead.
78
//
79
CacheWithSecondaryAdapter::CacheWithSecondaryAdapter(
80
    std::shared_ptr<Cache> target,
81
    std::shared_ptr<SecondaryCache> secondary_cache,
82
    TieredAdmissionPolicy adm_policy, bool distribute_cache_res)
83
0
    : CacheWrapper(std::move(target)),
84
0
      secondary_cache_(std::move(secondary_cache)),
85
0
      adm_policy_(adm_policy),
86
0
      distribute_cache_res_(distribute_cache_res),
87
0
      placeholder_usage_(0),
88
0
      reserved_usage_(0),
89
0
      sec_reserved_(0) {
90
0
  target_->SetEvictionCallback(
91
0
      [this](const Slice& key, Handle* handle, bool was_hit) {
92
0
        return EvictionHandler(key, handle, was_hit);
93
0
      });
94
0
  if (distribute_cache_res_) {
95
0
    size_t sec_capacity = 0;
96
0
    pri_cache_res_ = std::make_shared<ConcurrentCacheReservationManager>(
97
0
        std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>(
98
0
            target_));
99
0
    Status s = secondary_cache_->GetCapacity(sec_capacity);
100
0
    assert(s.ok());
101
    // Initially, the primary cache is sized to uncompressed cache budget plsu
102
    // compressed secondary cache budget. The secondary cache budget is then
103
    // taken away from the primary cache through cache reservations. Later,
104
    // when a placeholder entry is inserted by the caller, its inserted
105
    // into the primary cache and the portion that should be assigned to the
106
    // secondary cache is freed from the reservation.
107
0
    s = pri_cache_res_->UpdateCacheReservation(sec_capacity);
108
0
    assert(s.ok());
109
0
    sec_cache_res_ratio_ = (double)sec_capacity / target_->GetCapacity();
110
0
  }
111
0
}
112
113
0
CacheWithSecondaryAdapter::~CacheWithSecondaryAdapter() {
114
  // `*this` will be destroyed before `*target_`, so we have to prevent
115
  // use after free
116
0
  target_->SetEvictionCallback({});
117
#ifndef NDEBUG
118
  if (distribute_cache_res_) {
119
    size_t sec_capacity = 0;
120
    Status s = secondary_cache_->GetCapacity(sec_capacity);
121
    assert(s.ok());
122
    assert(placeholder_usage_ == 0);
123
    assert(reserved_usage_ == 0);
124
    bool pri_cache_res_mismatch =
125
        pri_cache_res_->GetTotalMemoryUsed() != sec_capacity;
126
    if (pri_cache_res_mismatch) {
127
      fprintf(stderr,
128
              "~CacheWithSecondaryAdapter: Primary cache reservation: "
129
              "%zu, Secondary cache capacity: %zu, "
130
              "Secondary cache reserved: %zu\n",
131
              pri_cache_res_->GetTotalMemoryUsed(), sec_capacity,
132
              sec_reserved_);
133
      assert(!pri_cache_res_mismatch);
134
    }
135
  }
136
#endif  // NDEBUG
137
0
}
138
139
bool CacheWithSecondaryAdapter::EvictionHandler(const Slice& key,
140
0
                                                Handle* handle, bool was_hit) {
141
0
  auto helper = GetCacheItemHelper(handle);
142
0
  if (helper->IsSecondaryCacheCompatible() &&
143
0
      adm_policy_ != TieredAdmissionPolicy::kAdmPolicyThreeQueue) {
144
0
    auto obj = target_->Value(handle);
145
    // Ignore dummy entry
146
0
    if (obj != kDummyObj) {
147
0
      bool force = false;
148
0
      if (adm_policy_ == TieredAdmissionPolicy::kAdmPolicyAllowCacheHits) {
149
0
        force = was_hit;
150
0
      } else if (adm_policy_ == TieredAdmissionPolicy::kAdmPolicyAllowAll) {
151
0
        force = true;
152
0
      }
153
      // Spill into secondary cache.
154
0
      secondary_cache_->Insert(key, obj, helper, force).PermitUncheckedError();
155
0
    }
156
0
  }
157
  // Never takes ownership of obj
158
0
  return false;
159
0
}
160
161
bool CacheWithSecondaryAdapter::ProcessDummyResult(Cache::Handle** handle,
162
0
                                                   bool erase) {
163
0
  if (*handle && target_->Value(*handle) == kDummyObj) {
164
0
    target_->Release(*handle, erase);
165
0
    *handle = nullptr;
166
0
    return true;
167
0
  } else {
168
0
    return false;
169
0
  }
170
0
}
171
172
void CacheWithSecondaryAdapter::CleanupCacheObject(
173
0
    ObjectPtr obj, const CacheItemHelper* helper) {
174
0
  if (helper->del_cb) {
175
0
    helper->del_cb(obj, memory_allocator());
176
0
  }
177
0
}
178
179
Cache::Handle* CacheWithSecondaryAdapter::Promote(
180
    std::unique_ptr<SecondaryCacheResultHandle>&& secondary_handle,
181
    const Slice& key, const CacheItemHelper* helper, Priority priority,
182
0
    Statistics* stats, bool found_dummy_entry, bool kept_in_sec_cache) {
183
0
  assert(secondary_handle->IsReady());
184
185
0
  ObjectPtr obj = secondary_handle->Value();
186
0
  if (!obj) {
187
    // Nothing found.
188
0
    return nullptr;
189
0
  }
190
  // Found something.
191
0
  switch (helper->role) {
192
0
    case CacheEntryRole::kFilterBlock:
193
0
      RecordTick(stats, SECONDARY_CACHE_FILTER_HITS);
194
0
      break;
195
0
    case CacheEntryRole::kIndexBlock:
196
0
      RecordTick(stats, SECONDARY_CACHE_INDEX_HITS);
197
0
      break;
198
0
    case CacheEntryRole::kDataBlock:
199
0
      RecordTick(stats, SECONDARY_CACHE_DATA_HITS);
200
0
      break;
201
0
    default:
202
0
      break;
203
0
  }
204
0
  PERF_COUNTER_ADD(secondary_cache_hit_count, 1);
205
0
  RecordTick(stats, SECONDARY_CACHE_HITS);
206
207
  // Note: SecondaryCache::Size() is really charge (from the CreateCallback)
208
0
  size_t charge = secondary_handle->Size();
209
0
  Handle* result = nullptr;
210
  // Insert into primary cache, possibly as a standalone+dummy entries.
211
0
  if (secondary_cache_->SupportForceErase() && !found_dummy_entry) {
212
    // Create standalone and insert dummy
213
    // Allow standalone to be created even if cache is full, to avoid
214
    // reading the entry from storage.
215
0
    result =
216
0
        CreateStandalone(key, obj, helper, charge, /*allow_uncharged*/ true);
217
0
    assert(result);
218
0
    PERF_COUNTER_ADD(block_cache_standalone_handle_count, 1);
219
220
    // Insert dummy to record recent use
221
    // TODO: try to avoid case where inserting this dummy could overwrite a
222
    // regular entry
223
0
    Status s = Insert(key, kDummyObj, &kNoopCacheItemHelper, /*charge=*/0,
224
0
                      /*handle=*/nullptr, priority);
225
0
    s.PermitUncheckedError();
226
    // Nothing to do or clean up on dummy insertion failure
227
0
  } else {
228
    // Insert regular entry into primary cache.
229
    // Don't allow it to spill into secondary cache again if it was kept there.
230
0
    Status s = Insert(
231
0
        key, obj, kept_in_sec_cache ? helper->without_secondary_compat : helper,
232
0
        charge, &result, priority);
233
0
    if (s.ok()) {
234
0
      assert(result);
235
0
      PERF_COUNTER_ADD(block_cache_real_handle_count, 1);
236
0
    } else {
237
      // Create standalone result instead, even if cache is full, to avoid
238
      // reading the entry from storage.
239
0
      result =
240
0
          CreateStandalone(key, obj, helper, charge, /*allow_uncharged*/ true);
241
0
      assert(result);
242
0
      PERF_COUNTER_ADD(block_cache_standalone_handle_count, 1);
243
0
    }
244
0
  }
245
0
  return result;
246
0
}
247
248
Status CacheWithSecondaryAdapter::Insert(const Slice& key, ObjectPtr value,
249
                                         const CacheItemHelper* helper,
250
                                         size_t charge, Handle** handle,
251
                                         Priority priority,
252
                                         const Slice& compressed_value,
253
0
                                         CompressionType type) {
254
0
  Status s = target_->Insert(key, value, helper, charge, handle, priority);
255
0
  if (s.ok() && value == nullptr && distribute_cache_res_ && handle) {
256
0
    charge = target_->GetCharge(*handle);
257
258
0
    MutexLock l(&cache_res_mutex_);
259
0
    placeholder_usage_ += charge;
260
    // Check if total placeholder reservation is more than the overall
261
    // cache capacity. If it is, then we don't try to charge the
262
    // secondary cache because we don't want to overcharge it (beyond
263
    // its capacity).
264
    // In order to make this a bit more lightweight, we also check if
265
    // the difference between placeholder_usage_ and reserved_usage_ is
266
    // atleast kReservationChunkSize and avoid any adjustments if not.
267
0
    if ((placeholder_usage_ <= target_->GetCapacity()) &&
268
0
        ((placeholder_usage_ - reserved_usage_) >= kReservationChunkSize)) {
269
0
      reserved_usage_ = placeholder_usage_ & ~(kReservationChunkSize - 1);
270
0
      size_t new_sec_reserved =
271
0
          static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_);
272
0
      size_t sec_charge = new_sec_reserved - sec_reserved_;
273
0
      s = secondary_cache_->Deflate(sec_charge);
274
0
      assert(s.ok());
275
0
      s = pri_cache_res_->UpdateCacheReservation(sec_charge,
276
0
                                                 /*increase=*/false);
277
0
      assert(s.ok());
278
0
      sec_reserved_ += sec_charge;
279
0
    }
280
0
  }
281
  // Warm up the secondary cache with the compressed block. The secondary
282
  // cache may choose to ignore it based on the admission policy.
283
0
  if (value != nullptr && !compressed_value.empty() &&
284
0
      adm_policy_ == TieredAdmissionPolicy::kAdmPolicyThreeQueue &&
285
0
      helper->IsSecondaryCacheCompatible()) {
286
0
    Status status = secondary_cache_->InsertSaved(key, compressed_value, type);
287
0
    assert(status.ok() || status.IsNotSupported());
288
0
  }
289
290
0
  return s;
291
0
}
292
293
Cache::Handle* CacheWithSecondaryAdapter::Lookup(const Slice& key,
294
                                                 const CacheItemHelper* helper,
295
                                                 CreateContext* create_context,
296
                                                 Priority priority,
297
0
                                                 Statistics* stats) {
298
  // NOTE: we could just StartAsyncLookup() and Wait(), but this should be a bit
299
  // more efficient
300
0
  Handle* result =
301
0
      target_->Lookup(key, helper, create_context, priority, stats);
302
0
  bool secondary_compatible = helper && helper->IsSecondaryCacheCompatible();
303
0
  bool found_dummy_entry =
304
0
      ProcessDummyResult(&result, /*erase=*/secondary_compatible);
305
0
  if (!result && secondary_compatible) {
306
    // Try our secondary cache
307
0
    bool kept_in_sec_cache = false;
308
0
    std::unique_ptr<SecondaryCacheResultHandle> secondary_handle =
309
0
        secondary_cache_->Lookup(key, helper, create_context, /*wait*/ true,
310
0
                                 found_dummy_entry, stats,
311
0
                                 /*out*/ kept_in_sec_cache);
312
0
    if (secondary_handle) {
313
0
      result = Promote(std::move(secondary_handle), key, helper, priority,
314
0
                       stats, found_dummy_entry, kept_in_sec_cache);
315
0
    }
316
0
  }
317
0
  return result;
318
0
}
319
320
bool CacheWithSecondaryAdapter::Release(Handle* handle,
321
0
                                        bool erase_if_last_ref) {
322
0
  if (erase_if_last_ref) {
323
0
    ObjectPtr v = target_->Value(handle);
324
0
    if (v == nullptr && distribute_cache_res_) {
325
0
      size_t charge = target_->GetCharge(handle);
326
327
0
      MutexLock l(&cache_res_mutex_);
328
0
      placeholder_usage_ -= charge;
329
      // Check if total placeholder reservation is more than the overall
330
      // cache capacity. If it is, then we do nothing as reserved_usage_ must
331
      // be already maxed out
332
0
      if ((placeholder_usage_ <= target_->GetCapacity()) &&
333
0
          (placeholder_usage_ < reserved_usage_)) {
334
        // Adjust reserved_usage_ in chunks of kReservationChunkSize, so
335
        // we don't hit this slow path too often.
336
0
        reserved_usage_ = placeholder_usage_ & ~(kReservationChunkSize - 1);
337
0
        size_t new_sec_reserved =
338
0
            static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_);
339
0
        size_t sec_charge = sec_reserved_ - new_sec_reserved;
340
0
        Status s = secondary_cache_->Inflate(sec_charge);
341
0
        assert(s.ok());
342
0
        s = pri_cache_res_->UpdateCacheReservation(sec_charge,
343
0
                                                   /*increase=*/true);
344
0
        assert(s.ok());
345
0
        sec_reserved_ -= sec_charge;
346
0
      }
347
0
    }
348
0
  }
349
0
  return target_->Release(handle, erase_if_last_ref);
350
0
}
351
352
0
Cache::ObjectPtr CacheWithSecondaryAdapter::Value(Handle* handle) {
353
0
  ObjectPtr v = target_->Value(handle);
354
  // TODO with stacked secondaries: might fail in EvictionHandler
355
0
  assert(v != kDummyObj);
356
0
  return v;
357
0
}
358
359
void CacheWithSecondaryAdapter::StartAsyncLookupOnMySecondary(
360
0
    AsyncLookupHandle& async_handle) {
361
0
  assert(!async_handle.IsPending());
362
0
  assert(async_handle.result_handle == nullptr);
363
364
0
  std::unique_ptr<SecondaryCacheResultHandle> secondary_handle =
365
0
      secondary_cache_->Lookup(
366
0
          async_handle.key, async_handle.helper, async_handle.create_context,
367
0
          /*wait*/ false, async_handle.found_dummy_entry, async_handle.stats,
368
0
          /*out*/ async_handle.kept_in_sec_cache);
369
0
  if (secondary_handle) {
370
    // TODO with stacked secondaries: Check & process if already ready?
371
0
    async_handle.pending_handle = secondary_handle.release();
372
0
    async_handle.pending_cache = secondary_cache_.get();
373
0
  }
374
0
}
375
376
void CacheWithSecondaryAdapter::StartAsyncLookup(
377
0
    AsyncLookupHandle& async_handle) {
378
0
  target_->StartAsyncLookup(async_handle);
379
0
  if (!async_handle.IsPending()) {
380
0
    bool secondary_compatible =
381
0
        async_handle.helper &&
382
0
        async_handle.helper->IsSecondaryCacheCompatible();
383
0
    async_handle.found_dummy_entry |= ProcessDummyResult(
384
0
        &async_handle.result_handle, /*erase=*/secondary_compatible);
385
386
0
    if (async_handle.Result() == nullptr && secondary_compatible) {
387
      // Not found and not pending on another secondary cache
388
0
      StartAsyncLookupOnMySecondary(async_handle);
389
0
    }
390
0
  }
391
0
}
392
393
void CacheWithSecondaryAdapter::WaitAll(AsyncLookupHandle* async_handles,
394
0
                                        size_t count) {
395
0
  if (count == 0) {
396
    // Nothing to do
397
0
    return;
398
0
  }
399
  // Requests that are pending on *my* secondary cache, at the start of this
400
  // function
401
0
  std::vector<AsyncLookupHandle*> my_pending;
402
  // Requests that are pending on an "inner" secondary cache (managed somewhere
403
  // under target_), as of the start of this function
404
0
  std::vector<AsyncLookupHandle*> inner_pending;
405
406
  // Initial accounting of pending handles, excluding those already handled
407
  // by "outer" secondary caches. (See cur->pending_cache = nullptr.)
408
0
  for (size_t i = 0; i < count; ++i) {
409
0
    AsyncLookupHandle* cur = async_handles + i;
410
0
    if (cur->pending_cache) {
411
0
      assert(cur->IsPending());
412
0
      assert(cur->helper);
413
0
      assert(cur->helper->IsSecondaryCacheCompatible());
414
0
      if (cur->pending_cache == secondary_cache_.get()) {
415
0
        my_pending.push_back(cur);
416
        // Mark as "to be handled by this caller"
417
0
        cur->pending_cache = nullptr;
418
0
      } else {
419
        // Remember as potentially needing a lookup in my secondary
420
0
        inner_pending.push_back(cur);
421
0
      }
422
0
    }
423
0
  }
424
425
  // Wait on inner-most cache lookups first
426
  // TODO with stacked secondaries: because we are not using proper
427
  // async/await constructs here yet, there is a false synchronization point
428
  // here where all the results at one level are needed before initiating
429
  // any lookups at the next level. Probably not a big deal, but worth noting.
430
0
  if (!inner_pending.empty()) {
431
0
    target_->WaitAll(async_handles, count);
432
0
  }
433
434
  // For those that failed to find something, convert to lookup in my
435
  // secondary cache.
436
0
  for (AsyncLookupHandle* cur : inner_pending) {
437
0
    if (cur->Result() == nullptr) {
438
      // Not found, try my secondary
439
0
      StartAsyncLookupOnMySecondary(*cur);
440
0
      if (cur->IsPending()) {
441
0
        assert(cur->pending_cache == secondary_cache_.get());
442
0
        my_pending.push_back(cur);
443
        // Mark as "to be handled by this caller"
444
0
        cur->pending_cache = nullptr;
445
0
      }
446
0
    }
447
0
  }
448
449
  // Wait on all lookups on my secondary cache
450
0
  {
451
0
    std::vector<SecondaryCacheResultHandle*> my_secondary_handles;
452
0
    for (AsyncLookupHandle* cur : my_pending) {
453
0
      my_secondary_handles.push_back(cur->pending_handle);
454
0
    }
455
0
    secondary_cache_->WaitAll(std::move(my_secondary_handles));
456
0
  }
457
458
  // Process results
459
0
  for (AsyncLookupHandle* cur : my_pending) {
460
0
    std::unique_ptr<SecondaryCacheResultHandle> secondary_handle(
461
0
        cur->pending_handle);
462
0
    cur->pending_handle = nullptr;
463
0
    cur->result_handle = Promote(
464
0
        std::move(secondary_handle), cur->key, cur->helper, cur->priority,
465
0
        cur->stats, cur->found_dummy_entry, cur->kept_in_sec_cache);
466
0
    assert(cur->pending_cache == nullptr);
467
0
  }
468
0
}
469
470
0
std::string CacheWithSecondaryAdapter::GetPrintableOptions() const {
471
0
  std::string str = target_->GetPrintableOptions();
472
0
  str.append("  secondary_cache:\n");
473
0
  str.append(secondary_cache_->GetPrintableOptions());
474
0
  return str;
475
0
}
476
477
0
const char* CacheWithSecondaryAdapter::Name() const {
478
0
  if (distribute_cache_res_) {
479
0
    return kTieredCacheName;
480
0
  } else {
481
    // To the user, at least for now, configure the underlying cache with
482
    // a secondary cache. So we pretend to be that cache
483
0
    return target_->Name();
484
0
  }
485
0
}
486
487
// Update the total cache capacity. If we're distributing cache reservations
488
// to both primary and secondary, then update the pri_cache_res_reservation
489
// as well. At the moment, we don't have a good way of handling the case
490
// where the new capacity < total cache reservations.
491
0
void CacheWithSecondaryAdapter::SetCapacity(size_t capacity) {
492
0
  if (distribute_cache_res_) {
493
0
    MutexLock m(&cache_res_mutex_);
494
0
    size_t sec_capacity = static_cast<size_t>(capacity * sec_cache_res_ratio_);
495
0
    size_t old_sec_capacity = 0;
496
497
0
    Status s = secondary_cache_->GetCapacity(old_sec_capacity);
498
0
    if (!s.ok()) {
499
0
      return;
500
0
    }
501
0
    if (old_sec_capacity > sec_capacity) {
502
      // We're shrinking the cache. We do things in the following order to
503
      // avoid a temporary spike in usage over the configured capacity -
504
      // 1. Lower the secondary cache capacity
505
      // 2. Credit an equal amount (by decreasing pri_cache_res_) to the
506
      //    primary cache
507
      // 3. Decrease the primary cache capacity to the total budget
508
0
      s = secondary_cache_->SetCapacity(sec_capacity);
509
0
      if (s.ok()) {
510
0
        if (placeholder_usage_ > capacity) {
511
          // Adjust reserved_usage_ down
512
0
          reserved_usage_ = capacity & ~(kReservationChunkSize - 1);
513
0
        }
514
0
        size_t new_sec_reserved =
515
0
            static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_);
516
0
        s = pri_cache_res_->UpdateCacheReservation(
517
0
            (old_sec_capacity - sec_capacity) -
518
0
                (sec_reserved_ - new_sec_reserved),
519
0
            /*increase=*/false);
520
0
        sec_reserved_ = new_sec_reserved;
521
0
        assert(s.ok());
522
0
        target_->SetCapacity(capacity);
523
0
      }
524
0
    } else {
525
      // We're expanding the cache. Do it in the following order to avoid
526
      // unnecessary evictions -
527
      // 1. Increase the primary cache capacity to total budget
528
      // 2. Reserve additional memory in primary on behalf of secondary (by
529
      //    increasing pri_cache_res_ reservation)
530
      // 3. Increase secondary cache capacity
531
0
      target_->SetCapacity(capacity);
532
0
      s = pri_cache_res_->UpdateCacheReservation(
533
0
          sec_capacity - old_sec_capacity,
534
0
          /*increase=*/true);
535
0
      assert(s.ok());
536
0
      s = secondary_cache_->SetCapacity(sec_capacity);
537
0
      assert(s.ok());
538
0
    }
539
0
  } else {
540
    // No cache reservation distribution. Just set the primary cache capacity.
541
0
    target_->SetCapacity(capacity);
542
0
  }
543
0
}
544
545
Status CacheWithSecondaryAdapter::GetSecondaryCacheCapacity(
546
0
    size_t& size) const {
547
0
  return secondary_cache_->GetCapacity(size);
548
0
}
549
550
Status CacheWithSecondaryAdapter::GetSecondaryCachePinnedUsage(
551
0
    size_t& size) const {
552
0
  Status s;
553
0
  if (distribute_cache_res_) {
554
0
    MutexLock m(&cache_res_mutex_);
555
0
    size_t capacity = 0;
556
0
    s = secondary_cache_->GetCapacity(capacity);
557
0
    if (s.ok()) {
558
0
      size = capacity - pri_cache_res_->GetTotalMemoryUsed();
559
0
    } else {
560
0
      size = 0;
561
0
    }
562
0
  } else {
563
0
    size = 0;
564
0
  }
565
0
  return s;
566
0
}
567
568
// Update the secondary/primary allocation ratio (remember, the primary
569
// capacity is the total memory budget when distribute_cache_res_ is true).
570
// When the ratio changes, we may accumulate some error in the calculations
571
// for secondary cache inflate/deflate and pri_cache_res_ reservations.
572
// This is due to the rounding of the reservation amount.
573
//
574
// We rely on the current pri_cache_res_ total memory used to estimate the
575
// new secondary cache reservation after the ratio change. For this reason,
576
// once the ratio is lowered to 0.0 (effectively disabling the secondary
577
// cache and pri_cache_res_ total mem used going down to 0), we cannot
578
// increase the ratio and re-enable it, We might remove this limitation
579
// in the future.
580
Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio(
581
0
    double compressed_secondary_ratio) {
582
0
  if (!distribute_cache_res_) {
583
0
    return Status::NotSupported();
584
0
  }
585
586
0
  MutexLock m(&cache_res_mutex_);
587
0
  size_t pri_capacity = target_->GetCapacity();
588
0
  size_t sec_capacity =
589
0
      static_cast<size_t>(pri_capacity * compressed_secondary_ratio);
590
0
  size_t old_sec_capacity;
591
0
  Status s = secondary_cache_->GetCapacity(old_sec_capacity);
592
0
  if (!s.ok()) {
593
0
    return s;
594
0
  }
595
596
  // Calculate the new secondary cache reservation
597
  // reserved_usage_ will never be > the cache capacity, so we don't
598
  // have to worry about adjusting it here.
599
0
  sec_cache_res_ratio_ = compressed_secondary_ratio;
600
0
  size_t new_sec_reserved =
601
0
      static_cast<size_t>(reserved_usage_ * sec_cache_res_ratio_);
602
0
  if (sec_capacity > old_sec_capacity) {
603
    // We're increasing the ratio, thus ending up with a larger secondary
604
    // cache and a smaller usable primary cache capacity. Similar to
605
    // SetCapacity(), we try to avoid a temporary increase in total usage
606
    // beyond the configured capacity -
607
    // 1. A higher secondary cache ratio means it gets a higher share of
608
    //    cache reservations. So first account for that by deflating the
609
    //    secondary cache
610
    // 2. Increase pri_cache_res_ reservation to reflect the new secondary
611
    //    cache utilization (increase in capacity - increase in share of cache
612
    //    reservation)
613
    // 3. Increase secondary cache capacity
614
0
    assert(new_sec_reserved >= sec_reserved_);
615
0
    s = secondary_cache_->Deflate(new_sec_reserved - sec_reserved_);
616
0
    assert(s.ok());
617
0
    s = pri_cache_res_->UpdateCacheReservation(
618
0
        (sec_capacity - old_sec_capacity) - (new_sec_reserved - sec_reserved_),
619
0
        /*increase=*/true);
620
0
    assert(s.ok());
621
0
    sec_reserved_ = new_sec_reserved;
622
0
    s = secondary_cache_->SetCapacity(sec_capacity);
623
0
    assert(s.ok());
624
0
  } else {
625
    // We're shrinking the ratio. Try to avoid unnecessary evictions -
626
    // 1. Lower the secondary cache capacity
627
    // 2. Decrease pri_cache_res_ reservation to relect lower secondary
628
    //    cache utilization (decrease in capacity - decrease in share of cache
629
    //    reservations)
630
    // 3. Inflate the secondary cache to give it back the reduction in its
631
    //    share of cache reservations
632
0
    s = secondary_cache_->SetCapacity(sec_capacity);
633
0
    if (s.ok()) {
634
0
      s = pri_cache_res_->UpdateCacheReservation(
635
0
          (old_sec_capacity - sec_capacity) -
636
0
              (sec_reserved_ - new_sec_reserved),
637
0
          /*increase=*/false);
638
0
      assert(s.ok());
639
0
      s = secondary_cache_->Inflate(sec_reserved_ - new_sec_reserved);
640
0
      assert(s.ok());
641
0
      sec_reserved_ = new_sec_reserved;
642
0
    }
643
0
  }
644
645
0
  return s;
646
0
}
647
648
Status CacheWithSecondaryAdapter::UpdateAdmissionPolicy(
649
0
    TieredAdmissionPolicy adm_policy) {
650
0
  adm_policy_ = adm_policy;
651
0
  return Status::OK();
652
0
}
653
654
0
std::shared_ptr<Cache> NewTieredCache(const TieredCacheOptions& _opts) {
655
0
  if (!_opts.cache_opts) {
656
0
    return nullptr;
657
0
  }
658
659
0
  TieredCacheOptions opts = _opts;
660
0
  {
661
0
    bool valid_adm_policy = true;
662
663
0
    switch (_opts.adm_policy) {
664
0
      case TieredAdmissionPolicy::kAdmPolicyAuto:
665
        // Select an appropriate default policy
666
0
        if (opts.adm_policy == TieredAdmissionPolicy::kAdmPolicyAuto) {
667
0
          if (opts.nvm_sec_cache) {
668
0
            opts.adm_policy = TieredAdmissionPolicy::kAdmPolicyThreeQueue;
669
0
          } else {
670
0
            opts.adm_policy = TieredAdmissionPolicy::kAdmPolicyPlaceholder;
671
0
          }
672
0
        }
673
0
        break;
674
0
      case TieredAdmissionPolicy::kAdmPolicyPlaceholder:
675
0
      case TieredAdmissionPolicy::kAdmPolicyAllowCacheHits:
676
0
      case TieredAdmissionPolicy::kAdmPolicyAllowAll:
677
0
        if (opts.nvm_sec_cache) {
678
0
          valid_adm_policy = false;
679
0
        }
680
0
        break;
681
0
      case TieredAdmissionPolicy::kAdmPolicyThreeQueue:
682
0
        if (!opts.nvm_sec_cache) {
683
0
          valid_adm_policy = false;
684
0
        }
685
0
        break;
686
0
      default:
687
0
        valid_adm_policy = false;
688
0
    }
689
0
    if (!valid_adm_policy) {
690
0
      return nullptr;
691
0
    }
692
0
  }
693
694
0
  std::shared_ptr<Cache> cache;
695
0
  if (opts.cache_type == PrimaryCacheType::kCacheTypeLRU) {
696
0
    LRUCacheOptions cache_opts =
697
0
        *(static_cast_with_check<LRUCacheOptions, ShardedCacheOptions>(
698
0
            opts.cache_opts));
699
0
    cache_opts.capacity = opts.total_capacity;
700
0
    cache_opts.secondary_cache = nullptr;
701
0
    cache = cache_opts.MakeSharedCache();
702
0
  } else if (opts.cache_type == PrimaryCacheType::kCacheTypeHCC) {
703
0
    HyperClockCacheOptions cache_opts =
704
0
        *(static_cast_with_check<HyperClockCacheOptions, ShardedCacheOptions>(
705
0
            opts.cache_opts));
706
0
    cache_opts.capacity = opts.total_capacity;
707
0
    cache_opts.secondary_cache = nullptr;
708
0
    cache = cache_opts.MakeSharedCache();
709
0
  } else {
710
0
    return nullptr;
711
0
  }
712
0
  std::shared_ptr<SecondaryCache> sec_cache;
713
0
  opts.comp_cache_opts.capacity = static_cast<size_t>(
714
0
      opts.total_capacity * opts.compressed_secondary_ratio);
715
0
  sec_cache = NewCompressedSecondaryCache(opts.comp_cache_opts);
716
717
0
  if (opts.nvm_sec_cache) {
718
0
    if (opts.adm_policy == TieredAdmissionPolicy::kAdmPolicyThreeQueue) {
719
0
      sec_cache = std::make_shared<TieredSecondaryCache>(
720
0
          sec_cache, opts.nvm_sec_cache,
721
0
          TieredAdmissionPolicy::kAdmPolicyThreeQueue);
722
0
    } else {
723
0
      return nullptr;
724
0
    }
725
0
  }
726
727
0
  return std::make_shared<CacheWithSecondaryAdapter>(
728
0
      cache, sec_cache, opts.adm_policy, /*distribute_cache_res=*/true);
729
0
}
730
731
Status UpdateTieredCache(const std::shared_ptr<Cache>& cache,
732
                         int64_t total_capacity,
733
                         double compressed_secondary_ratio,
734
0
                         TieredAdmissionPolicy adm_policy) {
735
0
  if (!cache || strcmp(cache->Name(), kTieredCacheName)) {
736
0
    return Status::InvalidArgument();
737
0
  }
738
0
  CacheWithSecondaryAdapter* tiered_cache =
739
0
      static_cast<CacheWithSecondaryAdapter*>(cache.get());
740
741
0
  Status s;
742
0
  if (total_capacity > 0) {
743
0
    tiered_cache->SetCapacity(total_capacity);
744
0
  }
745
0
  if (compressed_secondary_ratio >= 0.0 && compressed_secondary_ratio <= 1.0) {
746
0
    s = tiered_cache->UpdateCacheReservationRatio(compressed_secondary_ratio);
747
0
  }
748
0
  if (adm_policy < TieredAdmissionPolicy::kAdmPolicyMax) {
749
0
    s = tiered_cache->UpdateAdmissionPolicy(adm_policy);
750
0
  }
751
0
  return s;
752
0
}
753
}  // namespace ROCKSDB_NAMESPACE