/src/rocksdb/db/blob/blob_source.cc
Line | Count | Source |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "db/blob/blob_source.h" |
7 | | |
8 | | #include <cassert> |
9 | | #include <string> |
10 | | |
11 | | #include "cache/cache_reservation_manager.h" |
12 | | #include "cache/charged_cache.h" |
13 | | #include "db/blob/blob_contents.h" |
14 | | #include "db/blob/blob_file_reader.h" |
15 | | #include "db/blob/blob_log_format.h" |
16 | | #include "monitoring/statistics_impl.h" |
17 | | #include "options/cf_options.h" |
18 | | #include "table/get_context.h" |
19 | | #include "table/multiget_context.h" |
20 | | |
21 | | namespace ROCKSDB_NAMESPACE { |
22 | | |
23 | | BlobSource::BlobSource(const ImmutableOptions& immutable_options, |
24 | | const MutableCFOptions& mutable_cf_options, |
25 | | const std::string& db_id, |
26 | | const std::string& db_session_id, |
27 | | BlobFileCache* blob_file_cache) |
28 | 43.4k | : db_id_(db_id), |
29 | 43.4k | db_session_id_(db_session_id), |
30 | 43.4k | statistics_(immutable_options.statistics.get()), |
31 | 43.4k | blob_file_cache_(blob_file_cache), |
32 | 43.4k | blob_cache_(immutable_options.blob_cache), |
33 | 43.4k | lowest_used_cache_tier_(immutable_options.lowest_used_cache_tier) { |
34 | 43.4k | auto bbto = |
35 | 43.4k | mutable_cf_options.table_factory->GetOptions<BlockBasedTableOptions>(); |
36 | 43.4k | if (bbto && |
37 | 43.4k | bbto->cache_usage_options.options_overrides.at(CacheEntryRole::kBlobCache) |
38 | 43.4k | .charged == CacheEntryRoleOptions::Decision::kEnabled) { |
39 | 0 | blob_cache_ = SharedCacheInterface{std::make_shared<ChargedCache>( |
40 | 0 | immutable_options.blob_cache, bbto->block_cache)}; |
41 | 0 | } |
42 | 43.4k | } |
43 | | |
44 | 43.4k | BlobSource::~BlobSource() = default; |
45 | | |
46 | | Status BlobSource::GetBlobFromCache( |
47 | 0 | const Slice& cache_key, CacheHandleGuard<BlobContents>* cached_blob) const { |
48 | 0 | assert(blob_cache_); |
49 | 0 | assert(!cache_key.empty()); |
50 | 0 | assert(cached_blob); |
51 | 0 | assert(cached_blob->IsEmpty()); |
52 | |
|
53 | 0 | Cache::Handle* cache_handle = nullptr; |
54 | 0 | cache_handle = GetEntryFromCache(cache_key); |
55 | 0 | if (cache_handle != nullptr) { |
56 | 0 | *cached_blob = |
57 | 0 | CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle); |
58 | |
|
59 | 0 | assert(cached_blob->GetValue()); |
60 | |
|
61 | 0 | PERF_COUNTER_ADD(blob_cache_hit_count, 1); |
62 | 0 | RecordTick(statistics_, BLOB_DB_CACHE_HIT); |
63 | 0 | RecordTick(statistics_, BLOB_DB_CACHE_BYTES_READ, |
64 | 0 | cached_blob->GetValue()->size()); |
65 | |
|
66 | 0 | return Status::OK(); |
67 | 0 | } |
68 | | |
69 | 0 | RecordTick(statistics_, BLOB_DB_CACHE_MISS); |
70 | |
|
71 | 0 | return Status::NotFound("Blob not found in cache"); |
72 | 0 | } |
73 | | |
74 | | Status BlobSource::PutBlobIntoCache( |
75 | | const Slice& cache_key, std::unique_ptr<BlobContents>* blob, |
76 | 0 | CacheHandleGuard<BlobContents>* cached_blob) const { |
77 | 0 | assert(blob_cache_); |
78 | 0 | assert(!cache_key.empty()); |
79 | 0 | assert(blob); |
80 | 0 | assert(*blob); |
81 | 0 | assert(cached_blob); |
82 | 0 | assert(cached_blob->IsEmpty()); |
83 | |
|
84 | 0 | TypedHandle* cache_handle = nullptr; |
85 | 0 | const Status s = InsertEntryIntoCache(cache_key, blob->get(), &cache_handle, |
86 | 0 | Cache::Priority::BOTTOM); |
87 | 0 | if (s.ok()) { |
88 | 0 | blob->release(); |
89 | |
|
90 | 0 | assert(cache_handle != nullptr); |
91 | 0 | *cached_blob = |
92 | 0 | CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle); |
93 | |
|
94 | 0 | assert(cached_blob->GetValue()); |
95 | |
|
96 | 0 | RecordTick(statistics_, BLOB_DB_CACHE_ADD); |
97 | 0 | RecordTick(statistics_, BLOB_DB_CACHE_BYTES_WRITE, |
98 | 0 | cached_blob->GetValue()->size()); |
99 | |
|
100 | 0 | } else { |
101 | 0 | RecordTick(statistics_, BLOB_DB_CACHE_ADD_FAILURES); |
102 | 0 | } |
103 | |
|
104 | 0 | return s; |
105 | 0 | } |
106 | | |
107 | 0 | BlobSource::TypedHandle* BlobSource::GetEntryFromCache(const Slice& key) const { |
108 | 0 | return blob_cache_.LookupFull(key, nullptr /* context */, |
109 | 0 | Cache::Priority::BOTTOM, statistics_, |
110 | 0 | lowest_used_cache_tier_); |
111 | 0 | } |
112 | | |
113 | | void BlobSource::PinCachedBlob(CacheHandleGuard<BlobContents>* cached_blob, |
114 | 0 | PinnableSlice* value) { |
115 | 0 | assert(cached_blob); |
116 | 0 | assert(cached_blob->GetValue()); |
117 | 0 | assert(value); |
118 | | |
119 | | // To avoid copying the cached blob into the buffer provided by the |
120 | | // application, we can simply transfer ownership of the cache handle to |
121 | | // the target PinnableSlice. This has the potential to save a lot of |
122 | | // CPU, especially with large blob values. |
123 | |
|
124 | 0 | value->Reset(); |
125 | |
|
126 | 0 | constexpr Cleanable* cleanable = nullptr; |
127 | 0 | value->PinSlice(cached_blob->GetValue()->data(), cleanable); |
128 | |
|
129 | 0 | cached_blob->TransferTo(value); |
130 | 0 | } |
131 | | |
132 | | void BlobSource::PinOwnedBlob(std::unique_ptr<BlobContents>* owned_blob, |
133 | 0 | PinnableSlice* value) { |
134 | 0 | assert(owned_blob); |
135 | 0 | assert(*owned_blob); |
136 | 0 | assert(value); |
137 | |
|
138 | 0 | BlobContents* const blob = owned_blob->release(); |
139 | 0 | assert(blob); |
140 | |
|
141 | 0 | value->Reset(); |
142 | 0 | value->PinSlice( |
143 | 0 | blob->data(), |
144 | 0 | [](void* arg1, void* /* arg2 */) { |
145 | 0 | delete static_cast<BlobContents*>(arg1); |
146 | 0 | }, |
147 | 0 | blob, nullptr); |
148 | 0 | } |
149 | | |
150 | | Status BlobSource::InsertEntryIntoCache(const Slice& key, BlobContents* value, |
151 | | TypedHandle** cache_handle, |
152 | 0 | Cache::Priority priority) const { |
153 | 0 | return blob_cache_.InsertFull(key, value, value->ApproximateMemoryUsage(), |
154 | 0 | cache_handle, priority, |
155 | 0 | lowest_used_cache_tier_); |
156 | 0 | } |
157 | | |
158 | | Status BlobSource::GetBlob(const ReadOptions& read_options, |
159 | | const Slice& user_key, uint64_t file_number, |
160 | | uint64_t offset, uint64_t file_size, |
161 | | uint64_t value_size, |
162 | | CompressionType compression_type, |
163 | | FilePrefetchBuffer* prefetch_buffer, |
164 | 0 | PinnableSlice* value, uint64_t* bytes_read) { |
165 | 0 | assert(value); |
166 | |
|
167 | 0 | Status s; |
168 | |
|
169 | 0 | const CacheKey cache_key = GetCacheKey(file_number, file_size, offset); |
170 | |
|
171 | 0 | CacheHandleGuard<BlobContents> blob_handle; |
172 | | |
173 | | // First, try to get the blob from the cache |
174 | | // |
175 | | // If blob cache is enabled, we'll try to read from it. |
176 | 0 | if (blob_cache_) { |
177 | 0 | Slice key = cache_key.AsSlice(); |
178 | 0 | s = GetBlobFromCache(key, &blob_handle); |
179 | 0 | if (s.ok()) { |
180 | 0 | PinCachedBlob(&blob_handle, value); |
181 | | |
182 | | // For consistency, the size of on-disk (possibly compressed) blob record |
183 | | // is assigned to bytes_read. |
184 | 0 | uint64_t adjustment = |
185 | 0 | read_options.verify_checksums |
186 | 0 | ? BlobLogRecord::CalculateAdjustmentForRecordHeader( |
187 | 0 | user_key.size()) |
188 | 0 | : 0; |
189 | 0 | assert(offset >= adjustment); |
190 | |
|
191 | 0 | uint64_t record_size = value_size + adjustment; |
192 | 0 | if (bytes_read) { |
193 | 0 | *bytes_read = record_size; |
194 | 0 | } |
195 | 0 | return s; |
196 | 0 | } |
197 | 0 | } |
198 | | |
199 | 0 | assert(blob_handle.IsEmpty()); |
200 | |
|
201 | 0 | const bool no_io = read_options.read_tier == kBlockCacheTier; |
202 | 0 | if (no_io) { |
203 | 0 | s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); |
204 | 0 | return s; |
205 | 0 | } |
206 | | |
207 | | // Can't find the blob from the cache. Since I/O is allowed, read from the |
208 | | // file. |
209 | 0 | std::unique_ptr<BlobContents> blob_contents; |
210 | |
|
211 | 0 | { |
212 | 0 | CacheHandleGuard<BlobFileReader> blob_file_reader; |
213 | 0 | s = blob_file_cache_->GetBlobFileReader(read_options, file_number, |
214 | 0 | &blob_file_reader); |
215 | 0 | if (!s.ok()) { |
216 | 0 | return s; |
217 | 0 | } |
218 | | |
219 | 0 | assert(blob_file_reader.GetValue()); |
220 | |
|
221 | 0 | if (compression_type != blob_file_reader.GetValue()->GetCompressionType()) { |
222 | 0 | return Status::Corruption("Compression type mismatch when reading blob"); |
223 | 0 | } |
224 | | |
225 | 0 | MemoryAllocator* const allocator = |
226 | 0 | (blob_cache_ && read_options.fill_cache) |
227 | 0 | ? blob_cache_.get()->memory_allocator() |
228 | 0 | : nullptr; |
229 | |
|
230 | 0 | uint64_t read_size = 0; |
231 | 0 | s = blob_file_reader.GetValue()->GetBlob( |
232 | 0 | read_options, user_key, offset, value_size, compression_type, |
233 | 0 | prefetch_buffer, allocator, &blob_contents, &read_size); |
234 | 0 | if (!s.ok()) { |
235 | 0 | return s; |
236 | 0 | } |
237 | 0 | if (bytes_read) { |
238 | 0 | *bytes_read = read_size; |
239 | 0 | } |
240 | 0 | } |
241 | | |
242 | 0 | if (blob_cache_ && read_options.fill_cache) { |
243 | | // If filling cache is allowed and a cache is configured, try to put the |
244 | | // blob to the cache. |
245 | 0 | Slice key = cache_key.AsSlice(); |
246 | 0 | s = PutBlobIntoCache(key, &blob_contents, &blob_handle); |
247 | 0 | if (!s.ok()) { |
248 | 0 | return s; |
249 | 0 | } |
250 | | |
251 | 0 | PinCachedBlob(&blob_handle, value); |
252 | 0 | } else { |
253 | 0 | PinOwnedBlob(&blob_contents, value); |
254 | 0 | } |
255 | | |
256 | 0 | assert(s.ok()); |
257 | 0 | return s; |
258 | 0 | } |
259 | | |
260 | | void BlobSource::MultiGetBlob(const ReadOptions& read_options, |
261 | | autovector<BlobFileReadRequests>& blob_reqs, |
262 | 0 | uint64_t* bytes_read) { |
263 | 0 | assert(blob_reqs.size() > 0); |
264 | |
|
265 | 0 | uint64_t total_bytes_read = 0; |
266 | 0 | uint64_t bytes_read_in_file = 0; |
267 | |
|
268 | 0 | for (auto& [file_number, file_size, blob_reqs_in_file] : blob_reqs) { |
269 | | // sort blob_reqs_in_file by file offset. |
270 | 0 | std::sort( |
271 | 0 | blob_reqs_in_file.begin(), blob_reqs_in_file.end(), |
272 | 0 | [](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool { |
273 | 0 | return lhs.offset < rhs.offset; |
274 | 0 | }); |
275 | |
|
276 | 0 | MultiGetBlobFromOneFile(read_options, file_number, file_size, |
277 | 0 | blob_reqs_in_file, &bytes_read_in_file); |
278 | |
|
279 | 0 | total_bytes_read += bytes_read_in_file; |
280 | 0 | } |
281 | |
|
282 | 0 | if (bytes_read) { |
283 | 0 | *bytes_read = total_bytes_read; |
284 | 0 | } |
285 | 0 | } |
286 | | |
287 | | void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options, |
288 | | uint64_t file_number, |
289 | | uint64_t /*file_size*/, |
290 | | autovector<BlobReadRequest>& blob_reqs, |
291 | 0 | uint64_t* bytes_read) { |
292 | 0 | const size_t num_blobs = blob_reqs.size(); |
293 | 0 | assert(num_blobs > 0); |
294 | 0 | assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE); |
295 | |
|
296 | | #ifndef NDEBUG |
297 | | for (size_t i = 0; i < num_blobs - 1; ++i) { |
298 | | assert(blob_reqs[i].offset <= blob_reqs[i + 1].offset); |
299 | | } |
300 | | #endif // !NDEBUG |
301 | |
|
302 | 0 | using Mask = uint64_t; |
303 | 0 | Mask cache_hit_mask = 0; |
304 | |
|
305 | 0 | uint64_t total_bytes = 0; |
306 | 0 | const OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number); |
307 | |
|
308 | 0 | if (blob_cache_) { |
309 | 0 | size_t cached_blob_count = 0; |
310 | 0 | for (size_t i = 0; i < num_blobs; ++i) { |
311 | 0 | auto& req = blob_reqs[i]; |
312 | |
|
313 | 0 | CacheHandleGuard<BlobContents> blob_handle; |
314 | 0 | const CacheKey cache_key = base_cache_key.WithOffset(req.offset); |
315 | 0 | const Slice key = cache_key.AsSlice(); |
316 | |
|
317 | 0 | const Status s = GetBlobFromCache(key, &blob_handle); |
318 | |
|
319 | 0 | if (s.ok()) { |
320 | 0 | assert(req.status); |
321 | 0 | *req.status = s; |
322 | |
|
323 | 0 | PinCachedBlob(&blob_handle, req.result); |
324 | | |
325 | | // Update the counter for the number of valid blobs read from the cache. |
326 | 0 | ++cached_blob_count; |
327 | | |
328 | | // For consistency, the size of each on-disk (possibly compressed) blob |
329 | | // record is accumulated to total_bytes. |
330 | 0 | uint64_t adjustment = |
331 | 0 | read_options.verify_checksums |
332 | 0 | ? BlobLogRecord::CalculateAdjustmentForRecordHeader( |
333 | 0 | req.user_key->size()) |
334 | 0 | : 0; |
335 | 0 | assert(req.offset >= adjustment); |
336 | 0 | total_bytes += req.len + adjustment; |
337 | 0 | cache_hit_mask |= (Mask{1} << i); // cache hit |
338 | 0 | } |
339 | 0 | } |
340 | | |
341 | | // All blobs were read from the cache. |
342 | 0 | if (cached_blob_count == num_blobs) { |
343 | 0 | if (bytes_read) { |
344 | 0 | *bytes_read = total_bytes; |
345 | 0 | } |
346 | 0 | return; |
347 | 0 | } |
348 | 0 | } |
349 | | |
350 | 0 | const bool no_io = read_options.read_tier == kBlockCacheTier; |
351 | 0 | if (no_io) { |
352 | 0 | for (size_t i = 0; i < num_blobs; ++i) { |
353 | 0 | if (!(cache_hit_mask & (Mask{1} << i))) { |
354 | 0 | BlobReadRequest& req = blob_reqs[i]; |
355 | 0 | assert(req.status); |
356 | |
|
357 | 0 | *req.status = |
358 | 0 | Status::Incomplete("Cannot read blob(s): no disk I/O allowed"); |
359 | 0 | } |
360 | 0 | } |
361 | 0 | return; |
362 | 0 | } |
363 | | |
364 | 0 | { |
365 | | // Find the rest of blobs from the file since I/O is allowed. |
366 | 0 | autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>> |
367 | 0 | _blob_reqs; |
368 | 0 | uint64_t _bytes_read = 0; |
369 | |
|
370 | 0 | for (size_t i = 0; i < num_blobs; ++i) { |
371 | 0 | if (!(cache_hit_mask & (Mask{1} << i))) { |
372 | 0 | _blob_reqs.emplace_back(&blob_reqs[i], std::unique_ptr<BlobContents>()); |
373 | 0 | } |
374 | 0 | } |
375 | |
|
376 | 0 | CacheHandleGuard<BlobFileReader> blob_file_reader; |
377 | 0 | Status s = blob_file_cache_->GetBlobFileReader(read_options, file_number, |
378 | 0 | &blob_file_reader); |
379 | 0 | if (!s.ok()) { |
380 | 0 | for (size_t i = 0; i < _blob_reqs.size(); ++i) { |
381 | 0 | BlobReadRequest* const req = _blob_reqs[i].first; |
382 | 0 | assert(req); |
383 | 0 | assert(req->status); |
384 | |
|
385 | 0 | *req->status = s; |
386 | 0 | } |
387 | 0 | return; |
388 | 0 | } |
389 | | |
390 | 0 | assert(blob_file_reader.GetValue()); |
391 | |
|
392 | 0 | MemoryAllocator* const allocator = |
393 | 0 | (blob_cache_ && read_options.fill_cache) |
394 | 0 | ? blob_cache_.get()->memory_allocator() |
395 | 0 | : nullptr; |
396 | |
|
397 | 0 | blob_file_reader.GetValue()->MultiGetBlob(read_options, allocator, |
398 | 0 | _blob_reqs, &_bytes_read); |
399 | |
|
400 | 0 | if (blob_cache_ && read_options.fill_cache) { |
401 | | // If filling cache is allowed and a cache is configured, try to put |
402 | | // the blob(s) to the cache. |
403 | 0 | for (auto& [req, blob_contents] : _blob_reqs) { |
404 | 0 | assert(req); |
405 | |
|
406 | 0 | if (req->status->ok()) { |
407 | 0 | CacheHandleGuard<BlobContents> blob_handle; |
408 | 0 | const CacheKey cache_key = base_cache_key.WithOffset(req->offset); |
409 | 0 | const Slice key = cache_key.AsSlice(); |
410 | 0 | s = PutBlobIntoCache(key, &blob_contents, &blob_handle); |
411 | 0 | if (!s.ok()) { |
412 | 0 | *req->status = s; |
413 | 0 | } else { |
414 | 0 | PinCachedBlob(&blob_handle, req->result); |
415 | 0 | } |
416 | 0 | } |
417 | 0 | } |
418 | 0 | } else { |
419 | 0 | for (auto& [req, blob_contents] : _blob_reqs) { |
420 | 0 | assert(req); |
421 | |
|
422 | 0 | if (req->status->ok()) { |
423 | 0 | PinOwnedBlob(&blob_contents, req->result); |
424 | 0 | } |
425 | 0 | } |
426 | 0 | } |
427 | |
|
428 | 0 | total_bytes += _bytes_read; |
429 | 0 | if (bytes_read) { |
430 | 0 | *bytes_read = total_bytes; |
431 | 0 | } |
432 | 0 | } |
433 | 0 | } |
434 | | |
435 | | bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size, |
436 | 0 | uint64_t offset, size_t* charge) const { |
437 | 0 | const CacheKey cache_key = GetCacheKey(file_number, file_size, offset); |
438 | 0 | const Slice key = cache_key.AsSlice(); |
439 | |
|
440 | 0 | CacheHandleGuard<BlobContents> blob_handle; |
441 | 0 | const Status s = GetBlobFromCache(key, &blob_handle); |
442 | |
|
443 | 0 | if (s.ok() && blob_handle.GetValue() != nullptr) { |
444 | 0 | if (charge) { |
445 | 0 | const Cache* const cache = blob_handle.GetCache(); |
446 | 0 | assert(cache); |
447 | |
|
448 | 0 | Cache::Handle* const handle = blob_handle.GetCacheHandle(); |
449 | 0 | assert(handle); |
450 | |
|
451 | 0 | *charge = cache->GetUsage(handle); |
452 | 0 | } |
453 | |
|
454 | 0 | return true; |
455 | 0 | } |
456 | | |
457 | 0 | return false; |
458 | 0 | } |
459 | | |
460 | | } // namespace ROCKSDB_NAMESPACE |