/src/rocksdb/cache/compressed_secondary_cache.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "cache/compressed_secondary_cache.h" |
7 | | |
8 | | #include <algorithm> |
9 | | #include <cstdint> |
10 | | #include <memory> |
11 | | |
12 | | #include "memory/memory_allocator_impl.h" |
13 | | #include "monitoring/perf_context_imp.h" |
14 | | #include "util/coding.h" |
15 | | #include "util/compression.h" |
16 | | #include "util/string_util.h" |
17 | | |
18 | | namespace ROCKSDB_NAMESPACE { |
19 | | |
20 | | CompressedSecondaryCache::CompressedSecondaryCache( |
21 | | const CompressedSecondaryCacheOptions& opts) |
22 | | : cache_(opts.LRUCacheOptions::MakeSharedCache()), |
23 | | cache_options_(opts), |
24 | | cache_res_mgr_(std::make_shared<ConcurrentCacheReservationManager>( |
25 | | std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>( |
26 | | cache_))), |
27 | 0 | disable_cache_(opts.capacity == 0) {} |
28 | | |
29 | 0 | CompressedSecondaryCache::~CompressedSecondaryCache() = default; |
30 | | |
31 | | std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup( |
32 | | const Slice& key, const Cache::CacheItemHelper* helper, |
33 | | Cache::CreateContext* create_context, bool /*wait*/, bool advise_erase, |
34 | 0 | Statistics* stats, bool& kept_in_sec_cache) { |
35 | 0 | assert(helper); |
36 | | // This is a minor optimization. Its ok to skip it in TSAN in order to |
37 | | // avoid a false positive. |
38 | 0 | #ifndef __SANITIZE_THREAD__ |
39 | 0 | if (disable_cache_) { |
40 | 0 | return nullptr; |
41 | 0 | } |
42 | 0 | #endif |
43 | | |
44 | 0 | std::unique_ptr<SecondaryCacheResultHandle> handle; |
45 | 0 | kept_in_sec_cache = false; |
46 | 0 | Cache::Handle* lru_handle = cache_->Lookup(key); |
47 | 0 | if (lru_handle == nullptr) { |
48 | 0 | return nullptr; |
49 | 0 | } |
50 | | |
51 | 0 | void* handle_value = cache_->Value(lru_handle); |
52 | 0 | if (handle_value == nullptr) { |
53 | 0 | cache_->Release(lru_handle, /*erase_if_last_ref=*/false); |
54 | 0 | RecordTick(stats, COMPRESSED_SECONDARY_CACHE_DUMMY_HITS); |
55 | 0 | return nullptr; |
56 | 0 | } |
57 | | |
58 | 0 | CacheAllocationPtr* ptr{nullptr}; |
59 | 0 | CacheAllocationPtr merged_value; |
60 | 0 | size_t handle_value_charge{0}; |
61 | 0 | const char* data_ptr = nullptr; |
62 | 0 | CacheTier source = CacheTier::kVolatileCompressedTier; |
63 | 0 | CompressionType type = cache_options_.compression_type; |
64 | 0 | if (cache_options_.enable_custom_split_merge) { |
65 | 0 | CacheValueChunk* value_chunk_ptr = |
66 | 0 | reinterpret_cast<CacheValueChunk*>(handle_value); |
67 | 0 | merged_value = MergeChunksIntoValue(value_chunk_ptr, handle_value_charge); |
68 | 0 | ptr = &merged_value; |
69 | 0 | data_ptr = ptr->get(); |
70 | 0 | } else { |
71 | 0 | uint32_t type_32 = static_cast<uint32_t>(type); |
72 | 0 | uint32_t source_32 = static_cast<uint32_t>(source); |
73 | 0 | ptr = reinterpret_cast<CacheAllocationPtr*>(handle_value); |
74 | 0 | handle_value_charge = cache_->GetCharge(lru_handle); |
75 | 0 | data_ptr = ptr->get(); |
76 | 0 | data_ptr = GetVarint32Ptr(data_ptr, data_ptr + 1, |
77 | 0 | static_cast<uint32_t*>(&type_32)); |
78 | 0 | type = static_cast<CompressionType>(type_32); |
79 | 0 | data_ptr = GetVarint32Ptr(data_ptr, data_ptr + 1, |
80 | 0 | static_cast<uint32_t*>(&source_32)); |
81 | 0 | source = static_cast<CacheTier>(source_32); |
82 | 0 | handle_value_charge -= (data_ptr - ptr->get()); |
83 | 0 | } |
84 | 0 | MemoryAllocator* allocator = cache_options_.memory_allocator.get(); |
85 | |
|
86 | 0 | Status s; |
87 | 0 | Cache::ObjectPtr value{nullptr}; |
88 | 0 | size_t charge{0}; |
89 | 0 | if (source == CacheTier::kVolatileCompressedTier) { |
90 | 0 | if (cache_options_.compression_type == kNoCompression || |
91 | 0 | cache_options_.do_not_compress_roles.Contains(helper->role)) { |
92 | 0 | s = helper->create_cb(Slice(data_ptr, handle_value_charge), |
93 | 0 | kNoCompression, CacheTier::kVolatileTier, |
94 | 0 | create_context, allocator, &value, &charge); |
95 | 0 | } else { |
96 | 0 | UncompressionContext uncompression_context( |
97 | 0 | cache_options_.compression_type); |
98 | 0 | UncompressionInfo uncompression_info(uncompression_context, |
99 | 0 | UncompressionDict::GetEmptyDict(), |
100 | 0 | cache_options_.compression_type); |
101 | |
|
102 | 0 | size_t uncompressed_size{0}; |
103 | 0 | CacheAllocationPtr uncompressed = |
104 | 0 | UncompressData(uncompression_info, (char*)data_ptr, |
105 | 0 | handle_value_charge, &uncompressed_size, |
106 | 0 | cache_options_.compress_format_version, allocator); |
107 | |
|
108 | 0 | if (!uncompressed) { |
109 | 0 | cache_->Release(lru_handle, /*erase_if_last_ref=*/true); |
110 | 0 | return nullptr; |
111 | 0 | } |
112 | 0 | s = helper->create_cb(Slice(uncompressed.get(), uncompressed_size), |
113 | 0 | kNoCompression, CacheTier::kVolatileTier, |
114 | 0 | create_context, allocator, &value, &charge); |
115 | 0 | } |
116 | 0 | } else { |
117 | | // The item was not compressed by us. Let the helper create_cb |
118 | | // uncompress it |
119 | 0 | s = helper->create_cb(Slice(data_ptr, handle_value_charge), type, source, |
120 | 0 | create_context, allocator, &value, &charge); |
121 | 0 | } |
122 | | |
123 | 0 | if (!s.ok()) { |
124 | 0 | cache_->Release(lru_handle, /*erase_if_last_ref=*/true); |
125 | 0 | return nullptr; |
126 | 0 | } |
127 | | |
128 | 0 | if (advise_erase) { |
129 | 0 | cache_->Release(lru_handle, /*erase_if_last_ref=*/true); |
130 | | // Insert a dummy handle. |
131 | 0 | cache_ |
132 | 0 | ->Insert(key, /*obj=*/nullptr, |
133 | 0 | GetHelper(cache_options_.enable_custom_split_merge), |
134 | 0 | /*charge=*/0) |
135 | 0 | .PermitUncheckedError(); |
136 | 0 | } else { |
137 | 0 | kept_in_sec_cache = true; |
138 | 0 | cache_->Release(lru_handle, /*erase_if_last_ref=*/false); |
139 | 0 | } |
140 | 0 | handle.reset(new CompressedSecondaryCacheResultHandle(value, charge)); |
141 | 0 | RecordTick(stats, COMPRESSED_SECONDARY_CACHE_HITS); |
142 | 0 | return handle; |
143 | 0 | } |
144 | | |
145 | 0 | bool CompressedSecondaryCache::MaybeInsertDummy(const Slice& key) { |
146 | 0 | auto internal_helper = GetHelper(cache_options_.enable_custom_split_merge); |
147 | 0 | Cache::Handle* lru_handle = cache_->Lookup(key); |
148 | 0 | if (lru_handle == nullptr) { |
149 | 0 | PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count, 1); |
150 | | // Insert a dummy handle if the handle is evicted for the first time. |
151 | 0 | cache_->Insert(key, /*obj=*/nullptr, internal_helper, /*charge=*/0) |
152 | 0 | .PermitUncheckedError(); |
153 | 0 | return true; |
154 | 0 | } else { |
155 | 0 | cache_->Release(lru_handle, /*erase_if_last_ref=*/false); |
156 | 0 | } |
157 | | |
158 | 0 | return false; |
159 | 0 | } |
160 | | |
161 | | Status CompressedSecondaryCache::InsertInternal( |
162 | | const Slice& key, Cache::ObjectPtr value, |
163 | | const Cache::CacheItemHelper* helper, CompressionType type, |
164 | 0 | CacheTier source) { |
165 | 0 | if (source != CacheTier::kVolatileCompressedTier && |
166 | 0 | cache_options_.enable_custom_split_merge) { |
167 | | // We don't support custom split/merge for the tiered case |
168 | 0 | return Status::OK(); |
169 | 0 | } |
170 | | |
171 | 0 | auto internal_helper = GetHelper(cache_options_.enable_custom_split_merge); |
172 | 0 | char header[10]; |
173 | 0 | char* payload = header; |
174 | 0 | payload = EncodeVarint32(payload, static_cast<uint32_t>(type)); |
175 | 0 | payload = EncodeVarint32(payload, static_cast<uint32_t>(source)); |
176 | |
|
177 | 0 | size_t header_size = payload - header; |
178 | 0 | size_t data_size = (*helper->size_cb)(value); |
179 | 0 | size_t total_size = data_size + header_size; |
180 | 0 | CacheAllocationPtr ptr = |
181 | 0 | AllocateBlock(total_size, cache_options_.memory_allocator.get()); |
182 | 0 | char* data_ptr = ptr.get() + header_size; |
183 | |
|
184 | 0 | Status s = (*helper->saveto_cb)(value, 0, data_size, data_ptr); |
185 | 0 | if (!s.ok()) { |
186 | 0 | return s; |
187 | 0 | } |
188 | 0 | Slice val(data_ptr, data_size); |
189 | |
|
190 | 0 | std::string compressed_val; |
191 | 0 | if (cache_options_.compression_type != kNoCompression && |
192 | 0 | type == kNoCompression && |
193 | 0 | !cache_options_.do_not_compress_roles.Contains(helper->role)) { |
194 | 0 | PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, data_size); |
195 | 0 | CompressionContext compression_context(cache_options_.compression_type, |
196 | 0 | cache_options_.compression_opts); |
197 | 0 | uint64_t sample_for_compression{0}; |
198 | 0 | CompressionInfo compression_info( |
199 | 0 | cache_options_.compression_opts, compression_context, |
200 | 0 | CompressionDict::GetEmptyDict(), cache_options_.compression_type, |
201 | 0 | sample_for_compression); |
202 | |
|
203 | 0 | bool success = |
204 | 0 | CompressData(val, compression_info, |
205 | 0 | cache_options_.compress_format_version, &compressed_val); |
206 | |
|
207 | 0 | if (!success) { |
208 | 0 | return Status::Corruption("Error compressing value."); |
209 | 0 | } |
210 | | |
211 | 0 | val = Slice(compressed_val); |
212 | 0 | data_size = compressed_val.size(); |
213 | 0 | total_size = header_size + data_size; |
214 | 0 | PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, data_size); |
215 | |
|
216 | 0 | if (!cache_options_.enable_custom_split_merge) { |
217 | 0 | ptr = AllocateBlock(total_size, cache_options_.memory_allocator.get()); |
218 | 0 | data_ptr = ptr.get() + header_size; |
219 | 0 | memcpy(data_ptr, compressed_val.data(), data_size); |
220 | 0 | } |
221 | 0 | } |
222 | | |
223 | 0 | PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1); |
224 | 0 | if (cache_options_.enable_custom_split_merge) { |
225 | 0 | size_t charge{0}; |
226 | 0 | CacheValueChunk* value_chunks_head = |
227 | 0 | SplitValueIntoChunks(val, cache_options_.compression_type, charge); |
228 | 0 | return cache_->Insert(key, value_chunks_head, internal_helper, charge); |
229 | 0 | } else { |
230 | 0 | std::memcpy(ptr.get(), header, header_size); |
231 | 0 | CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr)); |
232 | 0 | return cache_->Insert(key, buf, internal_helper, total_size); |
233 | 0 | } |
234 | 0 | } |
235 | | |
236 | | Status CompressedSecondaryCache::Insert(const Slice& key, |
237 | | Cache::ObjectPtr value, |
238 | | const Cache::CacheItemHelper* helper, |
239 | 0 | bool force_insert) { |
240 | 0 | if (value == nullptr) { |
241 | 0 | return Status::InvalidArgument(); |
242 | 0 | } |
243 | | |
244 | 0 | if (!force_insert && MaybeInsertDummy(key)) { |
245 | 0 | return Status::OK(); |
246 | 0 | } |
247 | | |
248 | 0 | return InsertInternal(key, value, helper, kNoCompression, |
249 | 0 | CacheTier::kVolatileCompressedTier); |
250 | 0 | } |
251 | | |
252 | | Status CompressedSecondaryCache::InsertSaved( |
253 | | const Slice& key, const Slice& saved, CompressionType type = kNoCompression, |
254 | 0 | CacheTier source = CacheTier::kVolatileTier) { |
255 | 0 | if (type == kNoCompression) { |
256 | 0 | return Status::OK(); |
257 | 0 | } |
258 | | |
259 | 0 | auto slice_helper = &kSliceCacheItemHelper; |
260 | 0 | if (MaybeInsertDummy(key)) { |
261 | 0 | return Status::OK(); |
262 | 0 | } |
263 | | |
264 | 0 | return InsertInternal( |
265 | 0 | key, static_cast<Cache::ObjectPtr>(const_cast<Slice*>(&saved)), |
266 | 0 | slice_helper, type, source); |
267 | 0 | } |
268 | | |
269 | 0 | void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } |
270 | | |
271 | 0 | Status CompressedSecondaryCache::SetCapacity(size_t capacity) { |
272 | 0 | MutexLock l(&capacity_mutex_); |
273 | 0 | cache_options_.capacity = capacity; |
274 | 0 | cache_->SetCapacity(capacity); |
275 | 0 | disable_cache_ = capacity == 0; |
276 | 0 | return Status::OK(); |
277 | 0 | } |
278 | | |
279 | 0 | Status CompressedSecondaryCache::GetCapacity(size_t& capacity) { |
280 | 0 | MutexLock l(&capacity_mutex_); |
281 | 0 | capacity = cache_options_.capacity; |
282 | 0 | return Status::OK(); |
283 | 0 | } |
284 | | |
285 | 0 | std::string CompressedSecondaryCache::GetPrintableOptions() const { |
286 | 0 | std::string ret; |
287 | 0 | ret.reserve(20000); |
288 | 0 | const int kBufferSize{200}; |
289 | 0 | char buffer[kBufferSize]; |
290 | 0 | ret.append(cache_->GetPrintableOptions()); |
291 | 0 | snprintf(buffer, kBufferSize, " compression_type : %s\n", |
292 | 0 | CompressionTypeToString(cache_options_.compression_type).c_str()); |
293 | 0 | ret.append(buffer); |
294 | 0 | snprintf(buffer, kBufferSize, " compression_opts : %s\n", |
295 | 0 | CompressionOptionsToString( |
296 | 0 | const_cast<CompressionOptions&>(cache_options_.compression_opts)) |
297 | 0 | .c_str()); |
298 | 0 | ret.append(buffer); |
299 | 0 | snprintf(buffer, kBufferSize, " compress_format_version : %d\n", |
300 | 0 | cache_options_.compress_format_version); |
301 | 0 | ret.append(buffer); |
302 | 0 | return ret; |
303 | 0 | } |
304 | | |
305 | | CompressedSecondaryCache::CacheValueChunk* |
306 | | CompressedSecondaryCache::SplitValueIntoChunks(const Slice& value, |
307 | | CompressionType compression_type, |
308 | 0 | size_t& charge) { |
309 | 0 | assert(!value.empty()); |
310 | 0 | const char* src_ptr = value.data(); |
311 | 0 | size_t src_size{value.size()}; |
312 | |
|
313 | 0 | CacheValueChunk dummy_head = CacheValueChunk(); |
314 | 0 | CacheValueChunk* current_chunk = &dummy_head; |
315 | | // Do not split when value size is large or there is no compression. |
316 | 0 | size_t predicted_chunk_size{0}; |
317 | 0 | size_t actual_chunk_size{0}; |
318 | 0 | size_t tmp_size{0}; |
319 | 0 | while (src_size > 0) { |
320 | 0 | predicted_chunk_size = sizeof(CacheValueChunk) - 1 + src_size; |
321 | 0 | auto upper = |
322 | 0 | std::upper_bound(malloc_bin_sizes_.begin(), malloc_bin_sizes_.end(), |
323 | 0 | predicted_chunk_size); |
324 | | // Do not split when value size is too small, too large, close to a bin |
325 | | // size, or there is no compression. |
326 | 0 | if (upper == malloc_bin_sizes_.begin() || |
327 | 0 | upper == malloc_bin_sizes_.end() || |
328 | 0 | *upper - predicted_chunk_size < malloc_bin_sizes_.front() || |
329 | 0 | compression_type == kNoCompression) { |
330 | 0 | tmp_size = predicted_chunk_size; |
331 | 0 | } else { |
332 | 0 | tmp_size = *(--upper); |
333 | 0 | } |
334 | |
|
335 | 0 | CacheValueChunk* new_chunk = |
336 | 0 | reinterpret_cast<CacheValueChunk*>(new char[tmp_size]); |
337 | 0 | current_chunk->next = new_chunk; |
338 | 0 | current_chunk = current_chunk->next; |
339 | 0 | actual_chunk_size = tmp_size - sizeof(CacheValueChunk) + 1; |
340 | 0 | memcpy(current_chunk->data, src_ptr, actual_chunk_size); |
341 | 0 | current_chunk->size = actual_chunk_size; |
342 | 0 | src_ptr += actual_chunk_size; |
343 | 0 | src_size -= actual_chunk_size; |
344 | 0 | charge += tmp_size; |
345 | 0 | } |
346 | 0 | current_chunk->next = nullptr; |
347 | |
|
348 | 0 | return dummy_head.next; |
349 | 0 | } |
350 | | |
351 | | CacheAllocationPtr CompressedSecondaryCache::MergeChunksIntoValue( |
352 | 0 | const void* chunks_head, size_t& charge) { |
353 | 0 | const CacheValueChunk* head = |
354 | 0 | reinterpret_cast<const CacheValueChunk*>(chunks_head); |
355 | 0 | const CacheValueChunk* current_chunk = head; |
356 | 0 | charge = 0; |
357 | 0 | while (current_chunk != nullptr) { |
358 | 0 | charge += current_chunk->size; |
359 | 0 | current_chunk = current_chunk->next; |
360 | 0 | } |
361 | |
|
362 | 0 | CacheAllocationPtr ptr = |
363 | 0 | AllocateBlock(charge, cache_options_.memory_allocator.get()); |
364 | 0 | current_chunk = head; |
365 | 0 | size_t pos{0}; |
366 | 0 | while (current_chunk != nullptr) { |
367 | 0 | memcpy(ptr.get() + pos, current_chunk->data, current_chunk->size); |
368 | 0 | pos += current_chunk->size; |
369 | 0 | current_chunk = current_chunk->next; |
370 | 0 | } |
371 | |
|
372 | 0 | return ptr; |
373 | 0 | } |
374 | | |
375 | | const Cache::CacheItemHelper* CompressedSecondaryCache::GetHelper( |
376 | 0 | bool enable_custom_split_merge) const { |
377 | 0 | if (enable_custom_split_merge) { |
378 | 0 | static const Cache::CacheItemHelper kHelper{ |
379 | 0 | CacheEntryRole::kMisc, |
380 | 0 | [](Cache::ObjectPtr obj, MemoryAllocator* /*alloc*/) { |
381 | 0 | CacheValueChunk* chunks_head = static_cast<CacheValueChunk*>(obj); |
382 | 0 | while (chunks_head != nullptr) { |
383 | 0 | CacheValueChunk* tmp_chunk = chunks_head; |
384 | 0 | chunks_head = chunks_head->next; |
385 | 0 | tmp_chunk->Free(); |
386 | 0 | obj = nullptr; |
387 | 0 | } |
388 | 0 | }}; |
389 | 0 | return &kHelper; |
390 | 0 | } else { |
391 | 0 | static const Cache::CacheItemHelper kHelper{ |
392 | 0 | CacheEntryRole::kMisc, |
393 | 0 | [](Cache::ObjectPtr obj, MemoryAllocator* /*alloc*/) { |
394 | 0 | delete static_cast<CacheAllocationPtr*>(obj); |
395 | 0 | obj = nullptr; |
396 | 0 | }}; |
397 | 0 | return &kHelper; |
398 | 0 | } |
399 | 0 | } |
400 | | |
401 | | std::shared_ptr<SecondaryCache> |
402 | 0 | CompressedSecondaryCacheOptions::MakeSharedSecondaryCache() const { |
403 | 0 | return std::make_shared<CompressedSecondaryCache>(*this); |
404 | 0 | } |
405 | | |
406 | 0 | Status CompressedSecondaryCache::Deflate(size_t decrease) { |
407 | 0 | return cache_res_mgr_->UpdateCacheReservation(decrease, /*increase=*/true); |
408 | 0 | } |
409 | | |
410 | 0 | Status CompressedSecondaryCache::Inflate(size_t increase) { |
411 | 0 | return cache_res_mgr_->UpdateCacheReservation(increase, /*increase=*/false); |
412 | 0 | } |
413 | | |
414 | | } // namespace ROCKSDB_NAMESPACE |