Coverage Report

Created: 2026-04-10 07:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/blob/blob_source.cc
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "db/blob/blob_source.h"
7
8
#include <cassert>
9
#include <string>
10
11
#include "cache/cache_reservation_manager.h"
12
#include "cache/charged_cache.h"
13
#include "db/blob/blob_contents.h"
14
#include "db/blob/blob_file_reader.h"
15
#include "db/blob/blob_log_format.h"
16
#include "monitoring/statistics_impl.h"
17
#include "options/cf_options.h"
18
#include "table/get_context.h"
19
#include "table/multiget_context.h"
20
21
namespace ROCKSDB_NAMESPACE {
22
23
namespace {
24
25
Status AppendBlobRefreshRetryFailure(const Status& stale_status,
26
0
                                     const Status& retry_status) {
27
0
  assert(stale_status.IsCorruption());
28
0
  assert(!retry_status.ok());
29
0
  if (retry_status.IsCorruption()) {
30
0
    return retry_status;
31
0
  }
32
0
  return Status::CopyAppendMessage(
33
0
      stale_status, "; refresh retry failed: ", retry_status.ToString());
34
0
}
35
36
}  // namespace
37
38
BlobSource::BlobSource(const ImmutableOptions& immutable_options,
39
                       const MutableCFOptions& mutable_cf_options,
40
                       const std::string& db_id,
41
                       const std::string& db_session_id,
42
                       BlobFileCache* blob_file_cache)
43
83.7k
    : db_id_(db_id),
44
83.7k
      db_session_id_(db_session_id),
45
83.7k
      statistics_(immutable_options.statistics.get()),
46
83.7k
      blob_file_cache_(blob_file_cache),
47
83.7k
      blob_cache_(immutable_options.blob_cache),
48
83.7k
      lowest_used_cache_tier_(immutable_options.lowest_used_cache_tier) {
49
83.7k
  auto bbto =
50
83.7k
      mutable_cf_options.table_factory->GetOptions<BlockBasedTableOptions>();
51
83.7k
  if (bbto &&
52
83.7k
      bbto->cache_usage_options.options_overrides.at(CacheEntryRole::kBlobCache)
53
83.7k
              .charged == CacheEntryRoleOptions::Decision::kEnabled) {
54
0
    blob_cache_ = SharedCacheInterface{std::make_shared<ChargedCache>(
55
0
        immutable_options.blob_cache, bbto->block_cache)};
56
0
  }
57
83.7k
}
58
59
83.7k
BlobSource::~BlobSource() = default;
60
61
Status BlobSource::GetBlobFromCache(
62
0
    const Slice& cache_key, CacheHandleGuard<BlobContents>* cached_blob) const {
63
0
  assert(blob_cache_);
64
0
  assert(!cache_key.empty());
65
0
  assert(cached_blob);
66
0
  assert(cached_blob->IsEmpty());
67
68
0
  Cache::Handle* cache_handle = nullptr;
69
0
  cache_handle = GetEntryFromCache(cache_key);
70
0
  if (cache_handle != nullptr) {
71
0
    *cached_blob =
72
0
        CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
73
74
0
    assert(cached_blob->GetValue());
75
76
0
    PERF_COUNTER_ADD(blob_cache_hit_count, 1);
77
0
    RecordTick(statistics_, BLOB_DB_CACHE_HIT);
78
0
    RecordTick(statistics_, BLOB_DB_CACHE_BYTES_READ,
79
0
               cached_blob->GetValue()->size());
80
81
0
    return Status::OK();
82
0
  }
83
84
0
  RecordTick(statistics_, BLOB_DB_CACHE_MISS);
85
86
0
  return Status::NotFound("Blob not found in cache");
87
0
}
88
89
Status BlobSource::PutBlobIntoCache(
90
    const Slice& cache_key, std::unique_ptr<BlobContents>* blob,
91
0
    CacheHandleGuard<BlobContents>* cached_blob) const {
92
0
  assert(blob_cache_);
93
0
  assert(!cache_key.empty());
94
0
  assert(blob);
95
0
  assert(*blob);
96
0
  assert(cached_blob);
97
0
  assert(cached_blob->IsEmpty());
98
99
0
  TypedHandle* cache_handle = nullptr;
100
0
  const Status s = InsertEntryIntoCache(cache_key, blob->get(), &cache_handle,
101
0
                                        Cache::Priority::BOTTOM);
102
0
  if (s.ok()) {
103
0
    blob->release();
104
105
0
    assert(cache_handle != nullptr);
106
0
    *cached_blob =
107
0
        CacheHandleGuard<BlobContents>(blob_cache_.get(), cache_handle);
108
109
0
    assert(cached_blob->GetValue());
110
111
0
    RecordTick(statistics_, BLOB_DB_CACHE_ADD);
112
0
    RecordTick(statistics_, BLOB_DB_CACHE_BYTES_WRITE,
113
0
               cached_blob->GetValue()->size());
114
115
0
  } else {
116
0
    RecordTick(statistics_, BLOB_DB_CACHE_ADD_FAILURES);
117
0
  }
118
119
0
  return s;
120
0
}
121
122
0
BlobSource::TypedHandle* BlobSource::GetEntryFromCache(const Slice& key) const {
123
0
  return blob_cache_.LookupFull(key, nullptr /* context */,
124
0
                                Cache::Priority::BOTTOM, statistics_,
125
0
                                lowest_used_cache_tier_);
126
0
}
127
128
void BlobSource::PinCachedBlob(CacheHandleGuard<BlobContents>* cached_blob,
129
0
                               PinnableSlice* value) {
130
0
  assert(cached_blob);
131
0
  assert(cached_blob->GetValue());
132
0
  assert(value);
133
134
  // To avoid copying the cached blob into the buffer provided by the
135
  // application, we can simply transfer ownership of the cache handle to
136
  // the target PinnableSlice. This has the potential to save a lot of
137
  // CPU, especially with large blob values.
138
139
0
  value->Reset();
140
141
0
  constexpr Cleanable* cleanable = nullptr;
142
0
  value->PinSlice(cached_blob->GetValue()->data(), cleanable);
143
144
0
  cached_blob->TransferTo(value);
145
0
}
146
147
void BlobSource::PinOwnedBlob(std::unique_ptr<BlobContents>* owned_blob,
148
0
                              PinnableSlice* value) {
149
0
  assert(owned_blob);
150
0
  assert(*owned_blob);
151
0
  assert(value);
152
153
0
  BlobContents* const blob = owned_blob->release();
154
0
  assert(blob);
155
156
0
  value->Reset();
157
0
  value->PinSlice(
158
0
      blob->data(),
159
0
      [](void* arg1, void* /* arg2 */) {
160
0
        delete static_cast<BlobContents*>(arg1);
161
0
      },
162
0
      blob, nullptr);
163
0
}
164
165
Status BlobSource::InsertEntryIntoCache(const Slice& key, BlobContents* value,
166
                                        TypedHandle** cache_handle,
167
0
                                        Cache::Priority priority) const {
168
0
  return blob_cache_.InsertFull(key, value, value->ApproximateMemoryUsage(),
169
0
                                cache_handle, priority,
170
0
                                lowest_used_cache_tier_);
171
0
}
172
173
Status BlobSource::GetBlob(const ReadOptions& read_options,
174
                           const Slice& user_key, uint64_t file_number,
175
                           uint64_t offset, uint64_t file_size,
176
                           uint64_t value_size,
177
                           CompressionType compression_type,
178
                           FilePrefetchBuffer* prefetch_buffer,
179
0
                           PinnableSlice* value, uint64_t* bytes_read) {
180
0
  assert(value);
181
182
0
  Status s;
183
184
0
  const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
185
186
0
  CacheHandleGuard<BlobContents> blob_handle;
187
188
  // First, try to get the blob from the cache
189
  //
190
  // If blob cache is enabled, we'll try to read from it.
191
0
  if (blob_cache_) {
192
0
    Slice key = cache_key.AsSlice();
193
0
    s = GetBlobFromCache(key, &blob_handle);
194
0
    if (s.ok()) {
195
0
      PinCachedBlob(&blob_handle, value);
196
197
      // For consistency, the size of on-disk (possibly compressed) blob record
198
      // is assigned to bytes_read.
199
0
      uint64_t adjustment =
200
0
          read_options.verify_checksums
201
0
              ? BlobLogRecord::CalculateAdjustmentForRecordHeader(
202
0
                    user_key.size())
203
0
              : 0;
204
0
      assert(offset >= adjustment);
205
206
0
      uint64_t record_size = value_size + adjustment;
207
0
      if (bytes_read) {
208
0
        *bytes_read = record_size;
209
0
      }
210
0
      return s;
211
0
    }
212
0
  }
213
214
0
  assert(blob_handle.IsEmpty());
215
216
0
  const bool no_io = read_options.read_tier == kBlockCacheTier;
217
0
  if (no_io) {
218
0
    s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
219
0
    return s;
220
0
  }
221
222
  // Can't find the blob from the cache. Since I/O is allowed, read from the
223
  // file.
224
0
  std::unique_ptr<BlobContents> blob_contents;
225
226
0
  {
227
0
    CacheHandleGuard<BlobFileReader> blob_file_reader;
228
0
    s = blob_file_cache_->GetBlobFileReader(read_options, file_number,
229
0
                                            &blob_file_reader);
230
0
    if (!s.ok()) {
231
0
      return s;
232
0
    }
233
234
0
    assert(blob_file_reader.GetValue());
235
236
0
    if (compression_type != blob_file_reader.GetValue()->GetCompressionType()) {
237
0
      return Status::Corruption("Compression type mismatch when reading blob");
238
0
    }
239
240
0
    MemoryAllocator* const allocator =
241
0
        (blob_cache_ && read_options.fill_cache)
242
0
            ? blob_cache_.get()->memory_allocator()
243
0
            : nullptr;
244
245
0
    uint64_t read_size = 0;
246
0
    s = blob_file_reader.GetValue()->GetBlob(
247
0
        read_options, user_key, offset, value_size, compression_type,
248
0
        prefetch_buffer, allocator, &blob_contents, &read_size);
249
0
    if (s.IsCorruption()) {
250
0
      const Status stale_status = s;
251
0
      blob_file_reader.Reset();
252
0
      blob_file_cache_->Evict(file_number);
253
254
0
      std::unique_ptr<BlobFileReader> fresh_reader;
255
0
      s = blob_file_cache_->OpenBlobFileReaderUncached(
256
0
          read_options, file_number, &fresh_reader,
257
0
          /*allow_footer_skip_retry=*/false);
258
0
      if (!s.ok()) {
259
0
        return AppendBlobRefreshRetryFailure(stale_status, s);
260
0
      }
261
262
0
      if (compression_type != fresh_reader->GetCompressionType()) {
263
0
        return Status::Corruption(
264
0
            "Compression type mismatch when reading blob");
265
0
      }
266
267
0
      blob_contents.reset();
268
0
      read_size = 0;
269
0
      s = fresh_reader->GetBlob(read_options, user_key, offset, value_size,
270
0
                                compression_type, prefetch_buffer, allocator,
271
0
                                &blob_contents, &read_size);
272
0
      if (!s.ok()) {
273
0
        s = AppendBlobRefreshRetryFailure(stale_status, s);
274
0
      } else {
275
0
        CacheHandleGuard<BlobFileReader> ignored_reader;
276
0
        blob_file_cache_
277
0
            ->RefreshBlobFileReader(file_number, &fresh_reader, &ignored_reader)
278
0
            .PermitUncheckedError();
279
0
      }
280
0
    }
281
0
    if (!s.ok()) {
282
0
      return s;
283
0
    }
284
0
    if (bytes_read) {
285
0
      *bytes_read = read_size;
286
0
    }
287
0
  }
288
289
0
  if (blob_cache_ && read_options.fill_cache) {
290
    // If filling cache is allowed and a cache is configured, try to put the
291
    // blob to the cache.
292
0
    Slice key = cache_key.AsSlice();
293
0
    s = PutBlobIntoCache(key, &blob_contents, &blob_handle);
294
0
    if (!s.ok()) {
295
0
      return s;
296
0
    }
297
298
0
    PinCachedBlob(&blob_handle, value);
299
0
  } else {
300
0
    PinOwnedBlob(&blob_contents, value);
301
0
  }
302
303
0
  assert(s.ok());
304
0
  return s;
305
0
}
306
307
void BlobSource::MultiGetBlob(const ReadOptions& read_options,
308
                              autovector<BlobFileReadRequests>& blob_reqs,
309
0
                              uint64_t* bytes_read) {
310
0
  assert(blob_reqs.size() > 0);
311
312
0
  uint64_t total_bytes_read = 0;
313
0
  uint64_t bytes_read_in_file = 0;
314
315
0
  for (auto& [file_number, file_size, blob_reqs_in_file] : blob_reqs) {
316
    // sort blob_reqs_in_file by file offset.
317
0
    std::sort(
318
0
        blob_reqs_in_file.begin(), blob_reqs_in_file.end(),
319
0
        [](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool {
320
0
          return lhs.offset < rhs.offset;
321
0
        });
322
323
0
    MultiGetBlobFromOneFile(read_options, file_number, file_size,
324
0
                            blob_reqs_in_file, &bytes_read_in_file);
325
326
0
    total_bytes_read += bytes_read_in_file;
327
0
  }
328
329
0
  if (bytes_read) {
330
0
    *bytes_read = total_bytes_read;
331
0
  }
332
0
}
333
334
void BlobSource::MultiGetBlobFromOneFile(const ReadOptions& read_options,
335
                                         uint64_t file_number,
336
                                         uint64_t /*file_size*/,
337
                                         autovector<BlobReadRequest>& blob_reqs,
338
0
                                         uint64_t* bytes_read) {
339
0
  const size_t num_blobs = blob_reqs.size();
340
0
  assert(num_blobs > 0);
341
0
  assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE);
342
343
#ifndef NDEBUG
344
  for (size_t i = 0; i < num_blobs - 1; ++i) {
345
    assert(blob_reqs[i].offset <= blob_reqs[i + 1].offset);
346
  }
347
#endif  // !NDEBUG
348
349
0
  using Mask = uint64_t;
350
0
  Mask cache_hit_mask = 0;
351
352
0
  uint64_t total_bytes = 0;
353
0
  const OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number);
354
355
0
  if (blob_cache_) {
356
0
    size_t cached_blob_count = 0;
357
0
    for (size_t i = 0; i < num_blobs; ++i) {
358
0
      auto& req = blob_reqs[i];
359
360
0
      CacheHandleGuard<BlobContents> blob_handle;
361
0
      const CacheKey cache_key = base_cache_key.WithOffset(req.offset);
362
0
      const Slice key = cache_key.AsSlice();
363
364
0
      const Status s = GetBlobFromCache(key, &blob_handle);
365
366
0
      if (s.ok()) {
367
0
        assert(req.status);
368
0
        *req.status = s;
369
370
0
        PinCachedBlob(&blob_handle, req.result);
371
372
        // Update the counter for the number of valid blobs read from the cache.
373
0
        ++cached_blob_count;
374
375
        // For consistency, the size of each on-disk (possibly compressed) blob
376
        // record is accumulated to total_bytes.
377
0
        uint64_t adjustment =
378
0
            read_options.verify_checksums
379
0
                ? BlobLogRecord::CalculateAdjustmentForRecordHeader(
380
0
                      req.user_key->size())
381
0
                : 0;
382
0
        assert(req.offset >= adjustment);
383
0
        total_bytes += req.len + adjustment;
384
0
        cache_hit_mask |= (Mask{1} << i);  // cache hit
385
0
      }
386
0
    }
387
388
    // All blobs were read from the cache.
389
0
    if (cached_blob_count == num_blobs) {
390
0
      if (bytes_read) {
391
0
        *bytes_read = total_bytes;
392
0
      }
393
0
      return;
394
0
    }
395
0
  }
396
397
0
  const bool no_io = read_options.read_tier == kBlockCacheTier;
398
0
  if (no_io) {
399
0
    for (size_t i = 0; i < num_blobs; ++i) {
400
0
      if (!(cache_hit_mask & (Mask{1} << i))) {
401
0
        BlobReadRequest& req = blob_reqs[i];
402
0
        assert(req.status);
403
404
0
        *req.status =
405
0
            Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
406
0
      }
407
0
    }
408
0
    return;
409
0
  }
410
411
0
  {
412
    // Find the rest of blobs from the file since I/O is allowed.
413
0
    autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
414
0
        _blob_reqs;
415
0
    uint64_t _bytes_read = 0;
416
417
0
    for (size_t i = 0; i < num_blobs; ++i) {
418
0
      if (!(cache_hit_mask & (Mask{1} << i))) {
419
0
        _blob_reqs.emplace_back(&blob_reqs[i], std::unique_ptr<BlobContents>());
420
0
      }
421
0
    }
422
423
0
    CacheHandleGuard<BlobFileReader> blob_file_reader;
424
0
    Status s = blob_file_cache_->GetBlobFileReader(read_options, file_number,
425
0
                                                   &blob_file_reader);
426
0
    if (!s.ok()) {
427
0
      for (size_t i = 0; i < _blob_reqs.size(); ++i) {
428
0
        BlobReadRequest* const req = _blob_reqs[i].first;
429
0
        assert(req);
430
0
        assert(req->status);
431
432
0
        *req->status = s;
433
0
      }
434
0
      return;
435
0
    }
436
437
0
    assert(blob_file_reader.GetValue());
438
439
0
    MemoryAllocator* const allocator =
440
0
        (blob_cache_ && read_options.fill_cache)
441
0
            ? blob_cache_.get()->memory_allocator()
442
0
            : nullptr;
443
444
0
    blob_file_reader.GetValue()->MultiGetBlob(read_options, allocator,
445
0
                                              _blob_reqs, &_bytes_read);
446
447
0
    bool needs_reader_refresh = false;
448
0
    for (const auto& blob_req : _blob_reqs) {
449
0
      BlobReadRequest* const req = blob_req.first;
450
0
      assert(req != nullptr);
451
0
      assert(req->status != nullptr);
452
0
      if (req->status->IsCorruption()) {
453
0
        needs_reader_refresh = true;
454
0
        break;
455
0
      }
456
0
    }
457
458
0
    if (needs_reader_refresh) {
459
0
      blob_file_reader.Reset();
460
0
      blob_file_cache_->Evict(file_number);
461
462
0
      std::unique_ptr<BlobFileReader> fresh_reader;
463
0
      s = blob_file_cache_->OpenBlobFileReaderUncached(
464
0
          read_options, file_number, &fresh_reader,
465
0
          /*allow_footer_skip_retry=*/false);
466
0
      if (!s.ok()) {
467
0
        for (const auto& blob_req : _blob_reqs) {
468
0
          BlobReadRequest* const req = blob_req.first;
469
0
          assert(req != nullptr);
470
0
          assert(req->status != nullptr);
471
0
          if (req->status->IsCorruption()) {
472
0
            *req->status = AppendBlobRefreshRetryFailure(*req->status, s);
473
0
          }
474
0
        }
475
0
        return;
476
0
      }
477
478
0
      autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
479
0
          retry_blob_reqs;
480
0
      autovector<Status> stale_statuses;
481
0
      for (auto& blob_req : _blob_reqs) {
482
0
        BlobReadRequest* const req = blob_req.first;
483
0
        assert(req != nullptr);
484
0
        assert(req->status != nullptr);
485
0
        if (!req->status->IsCorruption()) {
486
0
          continue;
487
0
        }
488
489
0
        stale_statuses.emplace_back(*req->status);
490
0
        *req->status = Status::OK();
491
0
        blob_req.second.reset();
492
0
        retry_blob_reqs.emplace_back(req, std::unique_ptr<BlobContents>());
493
0
      }
494
495
0
      uint64_t refreshed_bytes_read = 0;
496
0
      fresh_reader->MultiGetBlob(read_options, allocator, retry_blob_reqs,
497
0
                                 &refreshed_bytes_read);
498
0
      _bytes_read += refreshed_bytes_read;
499
500
0
      bool install_fresh_reader = false;
501
0
      for (size_t i = 0; i < retry_blob_reqs.size(); ++i) {
502
0
        auto& retried_blob_req = retry_blob_reqs[i];
503
0
        BlobReadRequest* const retried_req = retried_blob_req.first;
504
0
        assert(retried_req != nullptr);
505
0
        if (retried_req->status->ok()) {
506
0
          install_fresh_reader = true;
507
0
        } else {
508
0
          *retried_req->status = AppendBlobRefreshRetryFailure(
509
0
              stale_statuses[i], *retried_req->status);
510
0
        }
511
512
0
        for (auto& blob_req : _blob_reqs) {
513
0
          if (blob_req.first != retried_req) {
514
0
            continue;
515
0
          }
516
517
0
          blob_req.second = std::move(retried_blob_req.second);
518
0
          break;
519
0
        }
520
0
      }
521
522
0
      if (install_fresh_reader) {
523
0
        CacheHandleGuard<BlobFileReader> ignored_reader;
524
0
        blob_file_cache_
525
0
            ->RefreshBlobFileReader(file_number, &fresh_reader, &ignored_reader)
526
0
            .PermitUncheckedError();
527
0
      }
528
0
    }
529
530
0
    if (blob_cache_ && read_options.fill_cache) {
531
      // If filling cache is allowed and a cache is configured, try to put
532
      // the blob(s) to the cache.
533
0
      for (auto& [req, blob_contents] : _blob_reqs) {
534
0
        assert(req);
535
536
0
        if (req->status->ok()) {
537
0
          CacheHandleGuard<BlobContents> blob_handle;
538
0
          const CacheKey cache_key = base_cache_key.WithOffset(req->offset);
539
0
          const Slice key = cache_key.AsSlice();
540
0
          s = PutBlobIntoCache(key, &blob_contents, &blob_handle);
541
0
          if (!s.ok()) {
542
0
            *req->status = s;
543
0
          } else {
544
0
            PinCachedBlob(&blob_handle, req->result);
545
0
          }
546
0
        }
547
0
      }
548
0
    } else {
549
0
      for (auto& [req, blob_contents] : _blob_reqs) {
550
0
        assert(req);
551
552
0
        if (req->status->ok()) {
553
0
          PinOwnedBlob(&blob_contents, req->result);
554
0
        }
555
0
      }
556
0
    }
557
558
0
    total_bytes += _bytes_read;
559
0
    if (bytes_read) {
560
0
      *bytes_read = total_bytes;
561
0
    }
562
0
  }
563
0
}
564
565
bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
566
0
                                  uint64_t offset, size_t* charge) const {
567
0
  const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
568
0
  const Slice key = cache_key.AsSlice();
569
570
0
  CacheHandleGuard<BlobContents> blob_handle;
571
0
  const Status s = GetBlobFromCache(key, &blob_handle);
572
573
0
  if (s.ok() && blob_handle.GetValue() != nullptr) {
574
0
    if (charge) {
575
0
      const Cache* const cache = blob_handle.GetCache();
576
0
      assert(cache);
577
578
0
      Cache::Handle* const handle = blob_handle.GetCacheHandle();
579
0
      assert(handle);
580
581
0
      *charge = cache->GetUsage(handle);
582
0
    }
583
584
0
    return true;
585
0
  }
586
587
0
  return false;
588
0
}
589
590
}  // namespace ROCKSDB_NAMESPACE