Coverage Report

Created: 2026-02-14 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/blob/blob_source.cc
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "db/blob/blob_source.h"
7
8
#include <cassert>
9
#include <string>
10
11
#include "cache/cache_reservation_manager.h"
12
#include "cache/charged_cache.h"
13
#include "db/blob/blob_contents.h"
14
#include "db/blob/blob_file_reader.h"
15
#include "db/blob/blob_log_format.h"
16
#include "monitoring/statistics_impl.h"
17
#include "options/cf_options.h"
18
#include "table/get_context.h"
19
#include "table/multiget_context.h"
20
21
namespace ROCKSDB_NAMESPACE {
22
23
BlobSource::BlobSource(const ImmutableOptions& immutable_options,
24
                       const MutableCFOptions& mutable_cf_options,
25
                       const std::string& db_id,
26
                       const std::string& db_session_id,
27
                       BlobFileCache* blob_file_cache)
28
43.4k
    : db_id_(db_id),
29
43.4k
      db_session_id_(db_session_id),
30
43.4k
      statistics_(immutable_options.statistics.get()),
31
43.4k
      blob_file_cache_(blob_file_cache),
32
43.4k
      blob_cache_(immutable_options.blob_cache),
33
43.4k
      lowest_used_cache_tier_(immutable_options.lowest_used_cache_tier) {
34
43.4k
  auto bbto =
35
43.4k
      mutable_cf_options.table_factory->GetOptions<BlockBasedTableOptions>();
36
43.4k
  if (bbto &&
37
43.4k
      bbto->cache_usage_options.options_overrides.at(CacheEntryRole::kBlobCache)
38
43.4k
              .charged == CacheEntryRoleOptions::Decision::kEnabled) {
39
0
    blob_cache_ = SharedCacheInterface{std::make_shared<ChargedCache>(
40
0
        immutable_options.blob_cache, bbto->block_cache)};
41
0
  }
42
43.4k
}
43
44
43.4k
BlobSource::~BlobSource() = default;
45
46
Status BlobSource::GetBlobFromCache(
47
0
    const Slice& cache_key, CacheHandleGuard<BlobContents>* cached_blob) const {
48
0
  assert(blob_cache_);
49
0
  assert(!cache_key.empty());
50
0
  assert(cached_blob);
51
0
  assert(cached_blob->IsEmpty());
52
53
0
  Cache::Handle* cache_handle = nullptr;
54
0
  cache_handle = GetEntryFromCache(cache_key);
55
0
  if (cache_handle != nullptr) {
56
0
    *cached_blob =
57
0
        CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
58
59
0
    assert(cached_blob->GetValue());
60
61
0
    PERF_COUNTER_ADD(blob_cache_hit_count, 1);
62
0
    RecordTick(statistics_, BLOB_DB_CACHE_HIT);
63
0
    RecordTick(statistics_, BLOB_DB_CACHE_BYTES_READ,
64
0
               cached_blob->GetValue()->size());
65
66
0
    return Status::OK();
67
0
  }
68
69
0
  RecordTick(statistics_, BLOB_DB_CACHE_MISS);
70
71
0
  return Status::NotFound("Blob not found in cache");
72
0
}
73
74
Status BlobSource::PutBlobIntoCache(
75
    const Slice& cache_key, std::unique_ptr<BlobContents>* blob,
76
0
    CacheHandleGuard<BlobContents>* cached_blob) const {
77
0
  assert(blob_cache_);
78
0
  assert(!cache_key.empty());
79
0
  assert(blob);
80
0
  assert(*blob);
81
0
  assert(cached_blob);
82
0
  assert(cached_blob->IsEmpty());
83
84
0
  TypedHandle* cache_handle = nullptr;
85
0
  const Status s = InsertEntryIntoCache(cache_key, blob->get(), &cache_handle,
86
0
                                        Cache::Priority::BOTTOM);
87
0
  if (s.ok()) {
88
0
    blob->release();
89
90
0
    assert(cache_handle != nullptr);
91
0
    *cached_blob =
92
0
        CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
93
94
0
    assert(cached_blob->GetValue());
95
96
0
    RecordTick(statistics_, BLOB_DB_CACHE_ADD);
97
0
    RecordTick(statistics_, BLOB_DB_CACHE_BYTES_WRITE,
98
0
               cached_blob->GetValue()->size());
99
100
0
  } else {
101
0
    RecordTick(statistics_, BLOB_DB_CACHE_ADD_FAILURES);
102
0
  }
103
104
0
  return s;
105
0
}
106
107
0
BlobSource::TypedHandle* BlobSource::GetEntryFromCache(const Slice& key) const {
108
0
  return blob_cache_.LookupFull(key, nullptr /* context */,
109
0
                                Cache::Priority::BOTTOM, statistics_,
110
0
                                lowest_used_cache_tier_);
111
0
}
112
113
void BlobSource::PinCachedBlob(CacheHandleGuard<BlobContents>* cached_blob,
114
0
                               PinnableSlice* value) {
115
0
  assert(cached_blob);
116
0
  assert(cached_blob->GetValue());
117
0
  assert(value);
118
119
  // To avoid copying the cached blob into the buffer provided by the
120
  // application, we can simply transfer ownership of the cache handle to
121
  // the target PinnableSlice. This has the potential to save a lot of
122
  // CPU, especially with large blob values.
123
124
0
  value->Reset();
125
126
0
  constexpr Cleanable* cleanable = nullptr;
127
0
  value->PinSlice(cached_blob->GetValue()->data(), cleanable);
128
129
0
  cached_blob->TransferTo(value);
130
0
}
131
132
void BlobSource::PinOwnedBlob(std::unique_ptr<BlobContents>* owned_blob,
133
0
                              PinnableSlice* value) {
134
0
  assert(owned_blob);
135
0
  assert(*owned_blob);
136
0
  assert(value);
137
138
0
  BlobContents* const blob = owned_blob->release();
139
0
  assert(blob);
140
141
0
  value->Reset();
142
0
  value->PinSlice(
143
0
      blob->data(),
144
0
      [](void* arg1, void* /* arg2 */) {
145
0
        delete static_cast<BlobContents*>(arg1);
146
0
      },
147
0
      blob, nullptr);
148
0
}
149
150
Status BlobSource::InsertEntryIntoCache(const Slice& key, BlobContents* value,
151
                                        TypedHandle** cache_handle,
152
0
                                        Cache::Priority priority) const {
153
0
  return blob_cache_.InsertFull(key, value, value->ApproximateMemoryUsage(),
154
0
                                cache_handle, priority,
155
0
                                lowest_used_cache_tier_);
156
0
}
157
158
Status BlobSource::GetBlob(const ReadOptions& read_options,
159
                           const Slice& user_key, uint64_t file_number,
160
                           uint64_t offset, uint64_t file_size,
161
                           uint64_t value_size,
162
                           CompressionType compression_type,
163
                           FilePrefetchBuffer* prefetch_buffer,
164
0
                           PinnableSlice* value, uint64_t* bytes_read) {
165
0
  assert(value);
166
167
0
  Status s;
168
169
0
  const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
170
171
0
  CacheHandleGuard<BlobContents> blob_handle;
172
173
  // First, try to get the blob from the cache
174
  //
175
  // If blob cache is enabled, we'll try to read from it.
176
0
  if (blob_cache_) {
177
0
    Slice key = cache_key.AsSlice();
178
0
    s = GetBlobFromCache(key, &blob_handle);
179
0
    if (s.ok()) {
180
0
      PinCachedBlob(&blob_handle, value);
181
182
      // For consistency, the size of on-disk (possibly compressed) blob record
183
      // is assigned to bytes_read.
184
0
      uint64_t adjustment =
185
0
          read_options.verify_checksums
186
0
              ? BlobLogRecord::CalculateAdjustmentForRecordHeader(
187
0
                    user_key.size())
188
0
              : 0;
189
0
      assert(offset >= adjustment);
190
191
0
      uint64_t record_size = value_size + adjustment;
192
0
      if (bytes_read) {
193
0
        *bytes_read = record_size;
194
0
      }
195
0
      return s;
196
0
    }
197
0
  }
198
199
0
  assert(blob_handle.IsEmpty());
200
201
0
  const bool no_io = read_options.read_tier == kBlockCacheTier;
202
0
  if (no_io) {
203
0
    s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
204
0
    return s;
205
0
  }
206
207
  // Can't find the blob from the cache. Since I/O is allowed, read from the
208
  // file.
209
0
  std::unique_ptr<BlobContents> blob_contents;
210
211
0
  {
212
0
    CacheHandleGuard<BlobFileReader> blob_file_reader;
213
0
    s = blob_file_cache_->GetBlobFileReader(read_options, file_number,
214
0
                                            &blob_file_reader);
215
0
    if (!s.ok()) {
216
0
      return s;
217
0
    }
218
219
0
    assert(blob_file_reader.GetValue());
220
221
0
    if (compression_type != blob_file_reader.GetValue()->GetCompressionType()) {
222
0
      return Status::Corruption("Compression type mismatch when reading blob");
223
0
    }
224
225
0
    MemoryAllocator* const allocator =
226
0
        (blob_cache_ && read_options.fill_cache)
227
0
            ? blob_cache_.get()->memory_allocator()
228
0
            : nullptr;
229
230
0
    uint64_t read_size = 0;
231
0
    s = blob_file_reader.GetValue()->GetBlob(
232
0
        read_options, user_key, offset, value_size, compression_type,
233
0
        prefetch_buffer, allocator, &blob_contents, &read_size);
234
0
    if (!s.ok()) {
235
0
      return s;
236
0
    }
237
0
    if (bytes_read) {
238
0
      *bytes_read = read_size;
239
0
    }
240
0
  }
241
242
0
  if (blob_cache_ && read_options.fill_cache) {
243
    // If filling cache is allowed and a cache is configured, try to put the
244
    // blob to the cache.
245
0
    Slice key = cache_key.AsSlice();
246
0
    s = PutBlobIntoCache(key, &blob_contents, &blob_handle);
247
0
    if (!s.ok()) {
248
0
      return s;
249
0
    }
250
251
0
    PinCachedBlob(&blob_handle, value);
252
0
  } else {
253
0
    PinOwnedBlob(&blob_contents, value);
254
0
  }
255
256
0
  assert(s.ok());
257
0
  return s;
258
0
}
259
260
void BlobSource::MultiGetBlob(const ReadOptions& read_options,
261
                              autovector<BlobFileReadRequests>& blob_reqs,
262
0
                              uint64_t* bytes_read) {
263
0
  assert(blob_reqs.size() > 0);
264
265
0
  uint64_t total_bytes_read = 0;
266
0
  uint64_t bytes_read_in_file = 0;
267
268
0
  for (auto& [file_number, file_size, blob_reqs_in_file] : blob_reqs) {
269
    // sort blob_reqs_in_file by file offset.
270
0
    std::sort(
271
0
        blob_reqs_in_file.begin(), blob_reqs_in_file.end(),
272
0
        [](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool {
273
0
          return lhs.offset < rhs.offset;
274
0
        });
275
276
0
    MultiGetBlobFromOneFile(read_options, file_number, file_size,
277
0
                            blob_reqs_in_file, &bytes_read_in_file);
278
279
0
    total_bytes_read += bytes_read_in_file;
280
0
  }
281
282
0
  if (bytes_read) {
283
0
    *bytes_read = total_bytes_read;
284
0
  }
285
0
}
286
287
void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
288
                                         uint64_t file_number,
289
                                         uint64_t /*file_size*/,
290
                                         autovector<BlobReadRequest>& blob_reqs,
291
0
                                         uint64_t* bytes_read) {
292
0
  const size_t num_blobs = blob_reqs.size();
293
0
  assert(num_blobs > 0);
294
0
  assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE);
295
296
#ifndef NDEBUG
297
  for (size_t i = 0; i < num_blobs - 1; ++i) {
298
    assert(blob_reqs[i].offset <= blob_reqs[i + 1].offset);
299
  }
300
#endif  // !NDEBUG
301
302
0
  using Mask = uint64_t;
303
0
  Mask cache_hit_mask = 0;
304
305
0
  uint64_t total_bytes = 0;
306
0
  const OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number);
307
308
0
  if (blob_cache_) {
309
0
    size_t cached_blob_count = 0;
310
0
    for (size_t i = 0; i < num_blobs; ++i) {
311
0
      auto& req = blob_reqs[i];
312
313
0
      CacheHandleGuard<BlobContents> blob_handle;
314
0
      const CacheKey cache_key = base_cache_key.WithOffset(req.offset);
315
0
      const Slice key = cache_key.AsSlice();
316
317
0
      const Status s = GetBlobFromCache(key, &blob_handle);
318
319
0
      if (s.ok()) {
320
0
        assert(req.status);
321
0
        *req.status = s;
322
323
0
        PinCachedBlob(&blob_handle, req.result);
324
325
        // Update the counter for the number of valid blobs read from the cache.
326
0
        ++cached_blob_count;
327
328
        // For consistency, the size of each on-disk (possibly compressed) blob
329
        // record is accumulated to total_bytes.
330
0
        uint64_t adjustment =
331
0
            read_options.verify_checksums
332
0
                ? BlobLogRecord::CalculateAdjustmentForRecordHeader(
333
0
                      req.user_key->size())
334
0
                : 0;
335
0
        assert(req.offset >= adjustment);
336
0
        total_bytes += req.len + adjustment;
337
0
        cache_hit_mask |= (Mask{1} << i);  // cache hit
338
0
      }
339
0
    }
340
341
    // All blobs were read from the cache.
342
0
    if (cached_blob_count == num_blobs) {
343
0
      if (bytes_read) {
344
0
        *bytes_read = total_bytes;
345
0
      }
346
0
      return;
347
0
    }
348
0
  }
349
350
0
  const bool no_io = read_options.read_tier == kBlockCacheTier;
351
0
  if (no_io) {
352
0
    for (size_t i = 0; i < num_blobs; ++i) {
353
0
      if (!(cache_hit_mask & (Mask{1} << i))) {
354
0
        BlobReadRequest& req = blob_reqs[i];
355
0
        assert(req.status);
356
357
0
        *req.status =
358
0
            Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
359
0
      }
360
0
    }
361
0
    return;
362
0
  }
363
364
0
  {
365
    // Find the rest of blobs from the file since I/O is allowed.
366
0
    autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
367
0
        _blob_reqs;
368
0
    uint64_t _bytes_read = 0;
369
370
0
    for (size_t i = 0; i < num_blobs; ++i) {
371
0
      if (!(cache_hit_mask & (Mask{1} << i))) {
372
0
        _blob_reqs.emplace_back(&blob_reqs[i], std::unique_ptr<BlobContents>());
373
0
      }
374
0
    }
375
376
0
    CacheHandleGuard<BlobFileReader> blob_file_reader;
377
0
    Status s = blob_file_cache_->GetBlobFileReader(read_options, file_number,
378
0
                                                   &blob_file_reader);
379
0
    if (!s.ok()) {
380
0
      for (size_t i = 0; i < _blob_reqs.size(); ++i) {
381
0
        BlobReadRequest* const req = _blob_reqs[i].first;
382
0
        assert(req);
383
0
        assert(req->status);
384
385
0
        *req->status = s;
386
0
      }
387
0
      return;
388
0
    }
389
390
0
    assert(blob_file_reader.GetValue());
391
392
0
    MemoryAllocator* const allocator =
393
0
        (blob_cache_ && read_options.fill_cache)
394
0
            ? blob_cache_.get()->memory_allocator()
395
0
            : nullptr;
396
397
0
    blob_file_reader.GetValue()->MultiGetBlob(read_options, allocator,
398
0
                                              _blob_reqs, &_bytes_read);
399
400
0
    if (blob_cache_ && read_options.fill_cache) {
401
      // If filling cache is allowed and a cache is configured, try to put
402
      // the blob(s) to the cache.
403
0
      for (auto& [req, blob_contents] : _blob_reqs) {
404
0
        assert(req);
405
406
0
        if (req->status->ok()) {
407
0
          CacheHandleGuard<BlobContents> blob_handle;
408
0
          const CacheKey cache_key = base_cache_key.WithOffset(req->offset);
409
0
          const Slice key = cache_key.AsSlice();
410
0
          s = PutBlobIntoCache(key, &blob_contents, &blob_handle);
411
0
          if (!s.ok()) {
412
0
            *req->status = s;
413
0
          } else {
414
0
            PinCachedBlob(&blob_handle, req->result);
415
0
          }
416
0
        }
417
0
      }
418
0
    } else {
419
0
      for (auto& [req, blob_contents] : _blob_reqs) {
420
0
        assert(req);
421
422
0
        if (req->status->ok()) {
423
0
          PinOwnedBlob(&blob_contents, req->result);
424
0
        }
425
0
      }
426
0
    }
427
428
0
    total_bytes += _bytes_read;
429
0
    if (bytes_read) {
430
0
      *bytes_read = total_bytes;
431
0
    }
432
0
  }
433
0
}
434
435
bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
436
0
                                  uint64_t offset, size_t* charge) const {
437
0
  const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
438
0
  const Slice key = cache_key.AsSlice();
439
440
0
  CacheHandleGuard<BlobContents> blob_handle;
441
0
  const Status s = GetBlobFromCache(key, &blob_handle);
442
443
0
  if (s.ok() && blob_handle.GetValue() != nullptr) {
444
0
    if (charge) {
445
0
      const Cache* const cache = blob_handle.GetCache();
446
0
      assert(cache);
447
448
0
      Cache::Handle* const handle = blob_handle.GetCacheHandle();
449
0
      assert(handle);
450
451
0
      *charge = cache->GetUsage(handle);
452
0
    }
453
454
0
    return true;
455
0
  }
456
457
0
  return false;
458
0
}
459
460
}  // namespace ROCKSDB_NAMESPACE