Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/cache/compressed_secondary_cache.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "cache/compressed_secondary_cache.h"
7
8
#include <algorithm>
9
#include <cstdint>
10
#include <memory>
11
12
#include "memory/memory_allocator_impl.h"
13
#include "monitoring/perf_context_imp.h"
14
#include "util/coding.h"
15
#include "util/compression.h"
16
#include "util/string_util.h"
17
18
namespace ROCKSDB_NAMESPACE {
19
20
CompressedSecondaryCache::CompressedSecondaryCache(
21
    const CompressedSecondaryCacheOptions& opts)
22
    : cache_(opts.LRUCacheOptions::MakeSharedCache()),
23
      cache_options_(opts),
24
      cache_res_mgr_(std::make_shared<ConcurrentCacheReservationManager>(
25
          std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>(
26
              cache_))),
27
0
      disable_cache_(opts.capacity == 0) {}
28
29
0
CompressedSecondaryCache::~CompressedSecondaryCache() = default;
30
31
std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
32
    const Slice& key, const Cache::CacheItemHelper* helper,
33
    Cache::CreateContext* create_context, bool /*wait*/, bool advise_erase,
34
0
    Statistics* stats, bool& kept_in_sec_cache) {
35
0
  assert(helper);
36
  // This is a minor optimization. Its ok to skip it in TSAN in order to
37
  // avoid a false positive.
38
0
#ifndef __SANITIZE_THREAD__
39
0
  if (disable_cache_) {
40
0
    return nullptr;
41
0
  }
42
0
#endif
43
44
0
  std::unique_ptr<SecondaryCacheResultHandle> handle;
45
0
  kept_in_sec_cache = false;
46
0
  Cache::Handle* lru_handle = cache_->Lookup(key);
47
0
  if (lru_handle == nullptr) {
48
0
    return nullptr;
49
0
  }
50
51
0
  void* handle_value = cache_->Value(lru_handle);
52
0
  if (handle_value == nullptr) {
53
0
    cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
54
0
    RecordTick(stats, COMPRESSED_SECONDARY_CACHE_DUMMY_HITS);
55
0
    return nullptr;
56
0
  }
57
58
0
  CacheAllocationPtr* ptr{nullptr};
59
0
  CacheAllocationPtr merged_value;
60
0
  size_t handle_value_charge{0};
61
0
  const char* data_ptr = nullptr;
62
0
  CacheTier source = CacheTier::kVolatileCompressedTier;
63
0
  CompressionType type = cache_options_.compression_type;
64
0
  if (cache_options_.enable_custom_split_merge) {
65
0
    CacheValueChunk* value_chunk_ptr =
66
0
        reinterpret_cast<CacheValueChunk*>(handle_value);
67
0
    merged_value = MergeChunksIntoValue(value_chunk_ptr, handle_value_charge);
68
0
    ptr = &merged_value;
69
0
    data_ptr = ptr->get();
70
0
  } else {
71
0
    uint32_t type_32 = static_cast<uint32_t>(type);
72
0
    uint32_t source_32 = static_cast<uint32_t>(source);
73
0
    ptr = reinterpret_cast<CacheAllocationPtr*>(handle_value);
74
0
    handle_value_charge = cache_->GetCharge(lru_handle);
75
0
    data_ptr = ptr->get();
76
0
    data_ptr = GetVarint32Ptr(data_ptr, data_ptr + 1,
77
0
                              static_cast<uint32_t*>(&type_32));
78
0
    type = static_cast<CompressionType>(type_32);
79
0
    data_ptr = GetVarint32Ptr(data_ptr, data_ptr + 1,
80
0
                              static_cast<uint32_t*>(&source_32));
81
0
    source = static_cast<CacheTier>(source_32);
82
0
    handle_value_charge -= (data_ptr - ptr->get());
83
0
  }
84
0
  MemoryAllocator* allocator = cache_options_.memory_allocator.get();
85
86
0
  Status s;
87
0
  Cache::ObjectPtr value{nullptr};
88
0
  size_t charge{0};
89
0
  if (source == CacheTier::kVolatileCompressedTier) {
90
0
    if (cache_options_.compression_type == kNoCompression ||
91
0
        cache_options_.do_not_compress_roles.Contains(helper->role)) {
92
0
      s = helper->create_cb(Slice(data_ptr, handle_value_charge),
93
0
                            kNoCompression, CacheTier::kVolatileTier,
94
0
                            create_context, allocator, &value, &charge);
95
0
    } else {
96
0
      UncompressionContext uncompression_context(
97
0
          cache_options_.compression_type);
98
0
      UncompressionInfo uncompression_info(uncompression_context,
99
0
                                           UncompressionDict::GetEmptyDict(),
100
0
                                           cache_options_.compression_type);
101
102
0
      size_t uncompressed_size{0};
103
0
      CacheAllocationPtr uncompressed =
104
0
          UncompressData(uncompression_info, (char*)data_ptr,
105
0
                         handle_value_charge, &uncompressed_size,
106
0
                         cache_options_.compress_format_version, allocator);
107
108
0
      if (!uncompressed) {
109
0
        cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
110
0
        return nullptr;
111
0
      }
112
0
      s = helper->create_cb(Slice(uncompressed.get(), uncompressed_size),
113
0
                            kNoCompression, CacheTier::kVolatileTier,
114
0
                            create_context, allocator, &value, &charge);
115
0
    }
116
0
  } else {
117
    // The item was not compressed by us. Let the helper create_cb
118
    // uncompress it
119
0
    s = helper->create_cb(Slice(data_ptr, handle_value_charge), type, source,
120
0
                          create_context, allocator, &value, &charge);
121
0
  }
122
123
0
  if (!s.ok()) {
124
0
    cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
125
0
    return nullptr;
126
0
  }
127
128
0
  if (advise_erase) {
129
0
    cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
130
    // Insert a dummy handle.
131
0
    cache_
132
0
        ->Insert(key, /*obj=*/nullptr,
133
0
                 GetHelper(cache_options_.enable_custom_split_merge),
134
0
                 /*charge=*/0)
135
0
        .PermitUncheckedError();
136
0
  } else {
137
0
    kept_in_sec_cache = true;
138
0
    cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
139
0
  }
140
0
  handle.reset(new CompressedSecondaryCacheResultHandle(value, charge));
141
0
  RecordTick(stats, COMPRESSED_SECONDARY_CACHE_HITS);
142
0
  return handle;
143
0
}
144
145
0
bool CompressedSecondaryCache::MaybeInsertDummy(const Slice& key) {
146
0
  auto internal_helper = GetHelper(cache_options_.enable_custom_split_merge);
147
0
  Cache::Handle* lru_handle = cache_->Lookup(key);
148
0
  if (lru_handle == nullptr) {
149
0
    PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count, 1);
150
    // Insert a dummy handle if the handle is evicted for the first time.
151
0
    cache_->Insert(key, /*obj=*/nullptr, internal_helper, /*charge=*/0)
152
0
        .PermitUncheckedError();
153
0
    return true;
154
0
  } else {
155
0
    cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
156
0
  }
157
158
0
  return false;
159
0
}
160
161
Status CompressedSecondaryCache::InsertInternal(
162
    const Slice& key, Cache::ObjectPtr value,
163
    const Cache::CacheItemHelper* helper, CompressionType type,
164
0
    CacheTier source) {
165
0
  if (source != CacheTier::kVolatileCompressedTier &&
166
0
      cache_options_.enable_custom_split_merge) {
167
    // We don't support custom split/merge for the tiered case
168
0
    return Status::OK();
169
0
  }
170
171
0
  auto internal_helper = GetHelper(cache_options_.enable_custom_split_merge);
172
0
  char header[10];
173
0
  char* payload = header;
174
0
  payload = EncodeVarint32(payload, static_cast<uint32_t>(type));
175
0
  payload = EncodeVarint32(payload, static_cast<uint32_t>(source));
176
177
0
  size_t header_size = payload - header;
178
0
  size_t data_size = (*helper->size_cb)(value);
179
0
  size_t total_size = data_size + header_size;
180
0
  CacheAllocationPtr ptr =
181
0
      AllocateBlock(total_size, cache_options_.memory_allocator.get());
182
0
  char* data_ptr = ptr.get() + header_size;
183
184
0
  Status s = (*helper->saveto_cb)(value, 0, data_size, data_ptr);
185
0
  if (!s.ok()) {
186
0
    return s;
187
0
  }
188
0
  Slice val(data_ptr, data_size);
189
190
0
  std::string compressed_val;
191
0
  if (cache_options_.compression_type != kNoCompression &&
192
0
      type == kNoCompression &&
193
0
      !cache_options_.do_not_compress_roles.Contains(helper->role)) {
194
0
    PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, data_size);
195
0
    CompressionContext compression_context(cache_options_.compression_type,
196
0
                                           cache_options_.compression_opts);
197
0
    uint64_t sample_for_compression{0};
198
0
    CompressionInfo compression_info(
199
0
        cache_options_.compression_opts, compression_context,
200
0
        CompressionDict::GetEmptyDict(), cache_options_.compression_type,
201
0
        sample_for_compression);
202
203
0
    bool success =
204
0
        CompressData(val, compression_info,
205
0
                     cache_options_.compress_format_version, &compressed_val);
206
207
0
    if (!success) {
208
0
      return Status::Corruption("Error compressing value.");
209
0
    }
210
211
0
    val = Slice(compressed_val);
212
0
    data_size = compressed_val.size();
213
0
    total_size = header_size + data_size;
214
0
    PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, data_size);
215
216
0
    if (!cache_options_.enable_custom_split_merge) {
217
0
      ptr = AllocateBlock(total_size, cache_options_.memory_allocator.get());
218
0
      data_ptr = ptr.get() + header_size;
219
0
      memcpy(data_ptr, compressed_val.data(), data_size);
220
0
    }
221
0
  }
222
223
0
  PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1);
224
0
  if (cache_options_.enable_custom_split_merge) {
225
0
    size_t charge{0};
226
0
    CacheValueChunk* value_chunks_head =
227
0
        SplitValueIntoChunks(val, cache_options_.compression_type, charge);
228
0
    return cache_->Insert(key, value_chunks_head, internal_helper, charge);
229
0
  } else {
230
0
    std::memcpy(ptr.get(), header, header_size);
231
0
    CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr));
232
0
    return cache_->Insert(key, buf, internal_helper, total_size);
233
0
  }
234
0
}
235
236
Status CompressedSecondaryCache::Insert(const Slice& key,
237
                                        Cache::ObjectPtr value,
238
                                        const Cache::CacheItemHelper* helper,
239
0
                                        bool force_insert) {
240
0
  if (value == nullptr) {
241
0
    return Status::InvalidArgument();
242
0
  }
243
244
0
  if (!force_insert && MaybeInsertDummy(key)) {
245
0
    return Status::OK();
246
0
  }
247
248
0
  return InsertInternal(key, value, helper, kNoCompression,
249
0
                        CacheTier::kVolatileCompressedTier);
250
0
}
251
252
Status CompressedSecondaryCache::InsertSaved(
253
    const Slice& key, const Slice& saved, CompressionType type = kNoCompression,
254
0
    CacheTier source = CacheTier::kVolatileTier) {
255
0
  if (type == kNoCompression) {
256
0
    return Status::OK();
257
0
  }
258
259
0
  auto slice_helper = &kSliceCacheItemHelper;
260
0
  if (MaybeInsertDummy(key)) {
261
0
    return Status::OK();
262
0
  }
263
264
0
  return InsertInternal(
265
0
      key, static_cast<Cache::ObjectPtr>(const_cast<Slice*>(&saved)),
266
0
      slice_helper, type, source);
267
0
}
268
269
0
void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); }
270
271
0
Status CompressedSecondaryCache::SetCapacity(size_t capacity) {
272
0
  MutexLock l(&capacity_mutex_);
273
0
  cache_options_.capacity = capacity;
274
0
  cache_->SetCapacity(capacity);
275
0
  disable_cache_ = capacity == 0;
276
0
  return Status::OK();
277
0
}
278
279
0
Status CompressedSecondaryCache::GetCapacity(size_t& capacity) {
280
0
  MutexLock l(&capacity_mutex_);
281
0
  capacity = cache_options_.capacity;
282
0
  return Status::OK();
283
0
}
284
285
0
std::string CompressedSecondaryCache::GetPrintableOptions() const {
286
0
  std::string ret;
287
0
  ret.reserve(20000);
288
0
  const int kBufferSize{200};
289
0
  char buffer[kBufferSize];
290
0
  ret.append(cache_->GetPrintableOptions());
291
0
  snprintf(buffer, kBufferSize, "    compression_type : %s\n",
292
0
           CompressionTypeToString(cache_options_.compression_type).c_str());
293
0
  ret.append(buffer);
294
0
  snprintf(buffer, kBufferSize, "    compression_opts : %s\n",
295
0
           CompressionOptionsToString(
296
0
               const_cast<CompressionOptions&>(cache_options_.compression_opts))
297
0
               .c_str());
298
0
  ret.append(buffer);
299
0
  snprintf(buffer, kBufferSize, "    compress_format_version : %d\n",
300
0
           cache_options_.compress_format_version);
301
0
  ret.append(buffer);
302
0
  return ret;
303
0
}
304
305
CompressedSecondaryCache::CacheValueChunk*
306
CompressedSecondaryCache::SplitValueIntoChunks(const Slice& value,
307
                                               CompressionType compression_type,
308
0
                                               size_t& charge) {
309
0
  assert(!value.empty());
310
0
  const char* src_ptr = value.data();
311
0
  size_t src_size{value.size()};
312
313
0
  CacheValueChunk dummy_head = CacheValueChunk();
314
0
  CacheValueChunk* current_chunk = &dummy_head;
315
  // Do not split when value size is large or there is no compression.
316
0
  size_t predicted_chunk_size{0};
317
0
  size_t actual_chunk_size{0};
318
0
  size_t tmp_size{0};
319
0
  while (src_size > 0) {
320
0
    predicted_chunk_size = sizeof(CacheValueChunk) - 1 + src_size;
321
0
    auto upper =
322
0
        std::upper_bound(malloc_bin_sizes_.begin(), malloc_bin_sizes_.end(),
323
0
                         predicted_chunk_size);
324
    // Do not split when value size is too small, too large, close to a bin
325
    // size, or there is no compression.
326
0
    if (upper == malloc_bin_sizes_.begin() ||
327
0
        upper == malloc_bin_sizes_.end() ||
328
0
        *upper - predicted_chunk_size < malloc_bin_sizes_.front() ||
329
0
        compression_type == kNoCompression) {
330
0
      tmp_size = predicted_chunk_size;
331
0
    } else {
332
0
      tmp_size = *(--upper);
333
0
    }
334
335
0
    CacheValueChunk* new_chunk =
336
0
        reinterpret_cast<CacheValueChunk*>(new char[tmp_size]);
337
0
    current_chunk->next = new_chunk;
338
0
    current_chunk = current_chunk->next;
339
0
    actual_chunk_size = tmp_size - sizeof(CacheValueChunk) + 1;
340
0
    memcpy(current_chunk->data, src_ptr, actual_chunk_size);
341
0
    current_chunk->size = actual_chunk_size;
342
0
    src_ptr += actual_chunk_size;
343
0
    src_size -= actual_chunk_size;
344
0
    charge += tmp_size;
345
0
  }
346
0
  current_chunk->next = nullptr;
347
348
0
  return dummy_head.next;
349
0
}
350
351
CacheAllocationPtr CompressedSecondaryCache::MergeChunksIntoValue(
352
0
    const void* chunks_head, size_t& charge) {
353
0
  const CacheValueChunk* head =
354
0
      reinterpret_cast<const CacheValueChunk*>(chunks_head);
355
0
  const CacheValueChunk* current_chunk = head;
356
0
  charge = 0;
357
0
  while (current_chunk != nullptr) {
358
0
    charge += current_chunk->size;
359
0
    current_chunk = current_chunk->next;
360
0
  }
361
362
0
  CacheAllocationPtr ptr =
363
0
      AllocateBlock(charge, cache_options_.memory_allocator.get());
364
0
  current_chunk = head;
365
0
  size_t pos{0};
366
0
  while (current_chunk != nullptr) {
367
0
    memcpy(ptr.get() + pos, current_chunk->data, current_chunk->size);
368
0
    pos += current_chunk->size;
369
0
    current_chunk = current_chunk->next;
370
0
  }
371
372
0
  return ptr;
373
0
}
374
375
const Cache::CacheItemHelper* CompressedSecondaryCache::GetHelper(
376
0
    bool enable_custom_split_merge) const {
377
0
  if (enable_custom_split_merge) {
378
0
    static const Cache::CacheItemHelper kHelper{
379
0
        CacheEntryRole::kMisc,
380
0
        [](Cache::ObjectPtr obj, MemoryAllocator* /*alloc*/) {
381
0
          CacheValueChunk* chunks_head = static_cast<CacheValueChunk*>(obj);
382
0
          while (chunks_head != nullptr) {
383
0
            CacheValueChunk* tmp_chunk = chunks_head;
384
0
            chunks_head = chunks_head->next;
385
0
            tmp_chunk->Free();
386
0
            obj = nullptr;
387
0
          }
388
0
        }};
389
0
    return &kHelper;
390
0
  } else {
391
0
    static const Cache::CacheItemHelper kHelper{
392
0
        CacheEntryRole::kMisc,
393
0
        [](Cache::ObjectPtr obj, MemoryAllocator* /*alloc*/) {
394
0
          delete static_cast<CacheAllocationPtr*>(obj);
395
0
          obj = nullptr;
396
0
        }};
397
0
    return &kHelper;
398
0
  }
399
0
}
400
401
std::shared_ptr<SecondaryCache>
402
0
CompressedSecondaryCacheOptions::MakeSharedSecondaryCache() const {
403
0
  return std::make_shared<CompressedSecondaryCache>(*this);
404
0
}
405
406
0
Status CompressedSecondaryCache::Deflate(size_t decrease) {
407
0
  return cache_res_mgr_->UpdateCacheReservation(decrease, /*increase=*/true);
408
0
}
409
410
0
Status CompressedSecondaryCache::Inflate(size_t increase) {
411
0
  return cache_res_mgr_->UpdateCacheReservation(increase, /*increase=*/false);
412
0
}
413
414
}  // namespace ROCKSDB_NAMESPACE