Coverage Report

Created: 2025-07-23 07:17

/src/rocksdb/table/block_fetcher.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "table/block_fetcher.h"
11
12
#include <cassert>
13
#include <cinttypes>
14
#include <string>
15
16
#include "logging/logging.h"
17
#include "memory/memory_allocator_impl.h"
18
#include "monitoring/perf_context_imp.h"
19
#include "rocksdb/compression_type.h"
20
#include "rocksdb/env.h"
21
#include "table/block_based/block.h"
22
#include "table/block_based/block_based_table_reader.h"
23
#include "table/block_based/block_type.h"
24
#include "table/block_based/reader_common.h"
25
#include "table/format.h"
26
#include "table/persistent_cache_helper.h"
27
#include "util/compression.h"
28
#include "util/stop_watch.h"
29
30
namespace ROCKSDB_NAMESPACE {
31
32
413k
inline void BlockFetcher::ProcessTrailerIfPresent() {
33
413k
  if (footer_.GetBlockTrailerSize() > 0) {
34
413k
    assert(footer_.GetBlockTrailerSize() == BlockBasedTable::kBlockTrailerSize);
35
413k
    if (read_options_.verify_checksums) {
36
290k
      io_status_ = status_to_io_status(
37
290k
          VerifyBlockChecksum(footer_, slice_.data(), block_size_,
38
290k
                              file_->file_name(), handle_.offset()));
39
290k
      RecordTick(ioptions_.stats, BLOCK_CHECKSUM_COMPUTE_COUNT);
40
290k
      if (!io_status_.ok()) {
41
0
        assert(io_status_.IsCorruption());
42
0
        RecordTick(ioptions_.stats, BLOCK_CHECKSUM_MISMATCH_COUNT);
43
0
      }
44
290k
    }
45
413k
    compression_type() =
46
413k
        BlockBasedTable::GetBlockCompressionType(slice_.data(), block_size_);
47
413k
  } else {
48
    // E.g. plain table or cuckoo table
49
33
    compression_type() = kNoCompression;
50
33
  }
51
413k
}
52
53
414k
inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
54
414k
  if (cache_options_.persistent_cache &&
55
414k
      !cache_options_.persistent_cache->IsCompressed()) {
56
0
    Status status = PersistentCacheHelper::LookupUncompressed(
57
0
        cache_options_, handle_, contents_);
58
0
    if (status.ok()) {
59
      // uncompressed page is found for the block handle
60
0
      return true;
61
0
    } else {
62
      // uncompressed page is not found
63
0
      if (ioptions_.logger && !status.IsNotFound()) {
64
0
        assert(!status.ok());
65
0
        ROCKS_LOG_INFO(ioptions_.logger,
66
0
                       "Error reading from persistent cache. %s",
67
0
                       status.ToString().c_str());
68
0
      }
69
0
    }
70
0
  }
71
414k
  return false;
72
414k
}
73
74
414k
inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
75
414k
  if (prefetch_buffer_ != nullptr) {
76
377k
    IOOptions opts;
77
377k
    IODebugContext dbg;
78
377k
    IOStatus io_s = file_->PrepareIOOptions(read_options_, opts, &dbg);
79
377k
    if (io_s.ok()) {
80
375k
      bool read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache(
81
375k
          opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
82
375k
          &io_s, for_compaction_);
83
375k
      if (read_from_prefetch_buffer) {
84
0
        ProcessTrailerIfPresent();
85
0
        if (io_status_.ok()) {
86
0
          got_from_prefetch_buffer_ = true;
87
0
          used_buf_ = const_cast<char*>(slice_.data());
88
0
        } else if (io_status_.IsCorruption()) {
89
          // Returning true apparently indicates we either got some data from
90
          // the prefetch buffer, or we tried and encountered an error.
91
0
          return true;
92
0
        }
93
0
      }
94
375k
    }
95
377k
    if (!io_s.ok()) {
96
0
      io_status_ = io_s;
97
0
      return true;
98
0
    }
99
377k
  }
100
414k
  return got_from_prefetch_buffer_;
101
414k
}
102
103
413k
inline bool BlockFetcher::TryGetSerializedBlockFromPersistentCache() {
104
413k
  if (cache_options_.persistent_cache &&
105
413k
      cache_options_.persistent_cache->IsCompressed()) {
106
0
    std::unique_ptr<char[]> buf;
107
0
    io_status_ = status_to_io_status(PersistentCacheHelper::LookupSerialized(
108
0
        cache_options_, handle_, &buf, block_size_with_trailer_));
109
0
    if (io_status_.ok()) {
110
0
      heap_buf_ = CacheAllocationPtr(buf.release());
111
0
      used_buf_ = heap_buf_.get();
112
0
      slice_ = Slice(heap_buf_.get(), block_size_);
113
0
      ProcessTrailerIfPresent();
114
0
      return true;
115
0
    } else if (!io_status_.IsNotFound() && ioptions_.logger) {
116
0
      assert(!io_status_.ok());
117
0
      ROCKS_LOG_INFO(ioptions_.logger,
118
0
                     "Error reading from persistent cache. %s",
119
0
                     io_status_.ToString().c_str());
120
0
    }
121
0
  }
122
413k
  return false;
123
413k
}
124
125
413k
inline void BlockFetcher::PrepareBufferForBlockFromFile() {
126
  // cache miss read from device
127
413k
  if ((do_uncompress_ || ioptions_.allow_mmap_reads) &&
128
413k
      block_size_with_trailer_ < kDefaultStackBufferSize) {
129
    // If we've got a small enough chunk of data, read it in to the
130
    // trivially allocated stack buffer instead of needing a full malloc()
131
    //
132
    // `GetBlockContents()` cannot return this data as its lifetime is tied to
133
    // this `BlockFetcher`'s lifetime. That is fine because this is only used
134
    // in cases where we do not expect the `GetBlockContents()` result to be the
135
    // same buffer we are assigning here. If we guess incorrectly, there will be
136
    // a heap allocation and memcpy in `GetBlockContents()` to obtain the final
137
    // result. Considering we are eliding a heap allocation here by using the
138
    // stack buffer, the cost of guessing incorrectly here is one extra memcpy.
139
    //
140
    // When `do_uncompress_` is true, we expect the uncompression step will
141
    // allocate heap memory for the final result. However this expectation will
142
    // be wrong if the block turns out to already be uncompressed, which we
143
    // won't know for sure until after reading it.
144
    //
145
    // When `ioptions_.allow_mmap_reads` is true, we do not expect the file
146
    // reader to use the scratch buffer at all, but instead return a pointer
147
    // into the mapped memory. This expectation will be wrong when using a
148
    // file reader that does not implement mmap reads properly.
149
125k
    used_buf_ = &stack_buf_[0];
150
288k
  } else if (maybe_compressed_ && !do_uncompress_) {
151
0
    compressed_buf_ =
152
0
        AllocateBlock(block_size_with_trailer_, memory_allocator_compressed_);
153
0
    used_buf_ = compressed_buf_.get();
154
288k
  } else {
155
288k
    heap_buf_ = AllocateBlock(block_size_with_trailer_, memory_allocator_);
156
288k
    used_buf_ = heap_buf_.get();
157
288k
  }
158
413k
}
159
160
413k
inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
161
413k
  if (io_status_.ok() && read_options_.fill_cache &&
162
413k
      cache_options_.persistent_cache &&
163
413k
      cache_options_.persistent_cache->IsCompressed()) {
164
0
    PersistentCacheHelper::InsertSerialized(cache_options_, handle_, used_buf_,
165
0
                                            block_size_with_trailer_);
166
0
  }
167
413k
}
168
169
417k
inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
170
417k
  if (io_status_.ok() && !got_from_prefetch_buffer_ &&
171
417k
      read_options_.fill_cache && cache_options_.persistent_cache &&
172
417k
      !cache_options_.persistent_cache->IsCompressed()) {
173
    // insert to uncompressed cache
174
0
    PersistentCacheHelper::InsertUncompressed(cache_options_, handle_,
175
0
                                              *contents_);
176
0
  }
177
417k
}
178
179
124k
inline void BlockFetcher::CopyBufferToHeapBuf() {
180
124k
  assert(used_buf_ != heap_buf_.get());
181
124k
  heap_buf_ = AllocateBlock(block_size_with_trailer_, memory_allocator_);
182
124k
  memcpy(heap_buf_.get(), used_buf_, block_size_with_trailer_);
183
#ifndef NDEBUG
184
  num_heap_buf_memcpy_++;
185
#endif
186
124k
}
187
188
0
inline void BlockFetcher::CopyBufferToCompressedBuf() {
189
0
  assert(used_buf_ != compressed_buf_.get());
190
0
  compressed_buf_ =
191
0
      AllocateBlock(block_size_with_trailer_, memory_allocator_compressed_);
192
0
  memcpy(compressed_buf_.get(), used_buf_, block_size_with_trailer_);
193
#ifndef NDEBUG
194
  num_compressed_buf_memcpy_++;
195
#endif
196
0
}
197
198
// Before - Entering this method means the block is uncompressed or do not need
199
// to be decompressed.
200
//
201
// The block can be in one of the following buffers:
202
// 1. prefetch buffer if prefetch is enabled and the block is prefetched before
203
// 2. stack_buf_ if block size is smaller than the stack_buf_ size and block
204
//    is not compressed
205
// 3. heap_buf_ if the block is not compressed
206
// 4. compressed_buf_ if the block is compressed
207
// 5. direct_io_buf_ if direct IO is enabled or
208
// 6. underlying file_system scratch is used (FSReadRequest.fs_scratch).
209
//
210
// After - After this method, if the block is compressed, it should be in
211
// compressed_buf_ and heap_buf_ points to compressed_buf_, otherwise should be
212
// in heap_buf_.
213
414k
inline void BlockFetcher::GetBlockContents() {
214
414k
  if (slice_.data() != used_buf_) {
215
    // the slice content is not the buffer provided
216
0
    *contents_ = BlockContents(Slice(slice_.data(), block_size_));
217
414k
  } else {
218
    // page can be either uncompressed or compressed, the buffer either stack
219
    // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
220
414k
    if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
221
124k
      CopyBufferToHeapBuf();
222
289k
    } else if (used_buf_ == compressed_buf_.get()) {
223
0
      if (compression_type() == kNoCompression &&
224
0
          memory_allocator_ != memory_allocator_compressed_) {
225
0
        CopyBufferToHeapBuf();
226
0
      } else {
227
0
        heap_buf_ = std::move(compressed_buf_);
228
0
      }
229
290k
    } else if (direct_io_buf_.get() != nullptr || use_fs_scratch_) {
230
0
      if (compression_type() == kNoCompression) {
231
0
        CopyBufferToHeapBuf();
232
0
      } else {
233
0
        CopyBufferToCompressedBuf();
234
0
        heap_buf_ = std::move(compressed_buf_);
235
0
      }
236
0
    }
237
414k
    *contents_ = BlockContents(std::move(heap_buf_), block_size_);
238
414k
  }
239
#ifndef NDEBUG
240
  contents_->has_trailer = footer_.GetBlockTrailerSize() > 0;
241
#endif
242
414k
}
243
244
// Read a block from the file and verify its checksum. Upon return, io_status_
245
// will be updated with the status of the read, and slice_ will be
246
// updated with a pointer to the data.
247
413k
void BlockFetcher::ReadBlock(bool retry) {
248
413k
  FSReadRequest read_req;
249
413k
  IOOptions opts;
250
413k
  IODebugContext dbg;
251
413k
  io_status_ = file_->PrepareIOOptions(read_options_, opts, &dbg);
252
413k
  opts.verify_and_reconstruct_read = retry;
253
413k
  read_req.status.PermitUncheckedError();
254
  // Actual file read
255
413k
  if (io_status_.ok()) {
256
412k
    if (file_->use_direct_io()) {
257
0
      PERF_TIMER_GUARD(block_read_time);
258
0
      PERF_CPU_TIMER_GUARD(
259
0
          block_read_cpu_time,
260
0
          ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
261
0
      io_status_ =
262
0
          file_->Read(opts, handle_.offset(), block_size_with_trailer_, &slice_,
263
0
                      /*scratch=*/nullptr, &direct_io_buf_, &dbg);
264
0
      PERF_COUNTER_ADD(block_read_count, 1);
265
0
      used_buf_ = const_cast<char*>(slice_.data());
266
412k
    } else if (use_fs_scratch_) {
267
0
      PERF_TIMER_GUARD(block_read_time);
268
0
      PERF_CPU_TIMER_GUARD(
269
0
          block_read_cpu_time,
270
0
          ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
271
0
      read_req.offset = handle_.offset();
272
0
      read_req.len = block_size_with_trailer_;
273
0
      read_req.scratch = nullptr;
274
0
      io_status_ = file_->MultiRead(opts, &read_req, /*num_reqs=*/1,
275
0
                                    /*AlignedBuf* =*/nullptr, &dbg);
276
0
      PERF_COUNTER_ADD(block_read_count, 1);
277
278
0
      slice_ = Slice(read_req.result.data(), read_req.result.size());
279
0
      used_buf_ = const_cast<char*>(slice_.data());
280
412k
    } else {
281
      // It allocates/assign used_buf_
282
412k
      PrepareBufferForBlockFromFile();
283
284
412k
      PERF_TIMER_GUARD(block_read_time);
285
412k
      PERF_CPU_TIMER_GUARD(
286
412k
          block_read_cpu_time,
287
412k
          ioptions_.env ? ioptions_.env->GetSystemClock().get() : nullptr);
288
289
412k
      io_status_ =
290
412k
          file_->Read(opts, handle_.offset(), /*size*/ block_size_with_trailer_,
291
412k
                      /*result*/ &slice_, /*scratch*/ used_buf_,
292
412k
                      /*aligned_buf=*/nullptr, &dbg);
293
412k
      PERF_COUNTER_ADD(block_read_count, 1);
294
#ifndef NDEBUG
295
      if (slice_.data() == &stack_buf_[0]) {
296
        num_stack_buf_memcpy_++;
297
      } else if (slice_.data() == heap_buf_.get()) {
298
        num_heap_buf_memcpy_++;
299
      } else if (slice_.data() == compressed_buf_.get()) {
300
        num_compressed_buf_memcpy_++;
301
      }
302
#endif
303
412k
    }
304
412k
  }
305
306
  // TODO: introduce dedicated perf counter for range tombstones
307
413k
  switch (block_type_) {
308
0
    case BlockType::kFilter:
309
0
    case BlockType::kFilterPartitionIndex:
310
0
      PERF_COUNTER_ADD(filter_block_read_count, 1);
311
0
      break;
312
313
0
    case BlockType::kCompressionDictionary:
314
0
      PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
315
0
      break;
316
317
124k
    case BlockType::kIndex:
318
124k
      PERF_COUNTER_ADD(index_block_read_count, 1);
319
124k
      break;
320
321
    // Nothing to do here as we don't have counters for the other types.
322
289k
    default:
323
289k
      break;
324
413k
  }
325
326
414k
  PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_);
327
414k
  IGNORE_STATUS_IF_ERROR(io_status_);
328
414k
  if (io_status_.ok()) {
329
414k
    if (use_fs_scratch_ && !read_req.status.ok()) {
330
0
      io_status_ = read_req.status;
331
414k
    } else if (slice_.size() != block_size_with_trailer_) {
332
0
      io_status_ = IOStatus::Corruption(
333
0
          "truncated block read from " + file_->file_name() + " offset " +
334
0
          std::to_string(handle_.offset()) + ", expected " +
335
0
          std::to_string(block_size_with_trailer_) + " bytes, got " +
336
0
          std::to_string(slice_.size()));
337
0
    }
338
414k
  }
339
340
414k
  if (io_status_.ok()) {
341
413k
    ProcessTrailerIfPresent();
342
413k
  }
343
344
414k
  if (retry) {
345
0
    RecordTick(ioptions_.stats, FILE_READ_CORRUPTION_RETRY_COUNT);
346
0
  }
347
414k
  if (io_status_.ok()) {
348
413k
    InsertCompressedBlockToPersistentCacheIfNeeded();
349
413k
    fs_buf_ = std::move(read_req.fs_scratch);
350
413k
    if (retry) {
351
0
      RecordTick(ioptions_.stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
352
0
    }
353
413k
  } else {
354
1.29k
    ReleaseFileSystemProvidedBuffer(&read_req);
355
1.29k
    direct_io_buf_.reset();
356
1.29k
    compressed_buf_.reset();
357
1.29k
    heap_buf_.reset();
358
1.29k
    used_buf_ = nullptr;
359
1.29k
  }
360
414k
}
361
362
414k
IOStatus BlockFetcher::ReadBlockContents() {
363
414k
  if (TryGetUncompressBlockFromPersistentCache()) {
364
0
    compression_type() = kNoCompression;
365
#ifndef NDEBUG
366
    contents_->has_trailer = footer_.GetBlockTrailerSize() > 0;
367
#endif  // NDEBUG
368
0
    return IOStatus::OK();
369
0
  }
370
414k
  if (TryGetFromPrefetchBuffer()) {
371
0
    if (io_status_.IsCorruption() && retry_corrupt_read_) {
372
0
      ReadBlock(/*retry=*/true);
373
0
    }
374
0
    if (!io_status_.ok()) {
375
0
      assert(!fs_buf_);
376
0
      return io_status_;
377
0
    }
378
414k
  } else if (!TryGetSerializedBlockFromPersistentCache()) {
379
413k
    ReadBlock(/*retry =*/false);
380
    // If the file system supports retry after corruption, then try to
381
    // re-read the block and see if it succeeds.
382
413k
    if (io_status_.IsCorruption() && retry_corrupt_read_) {
383
0
      assert(!fs_buf_);
384
0
      ReadBlock(/*retry=*/true);
385
0
    }
386
413k
    if (!io_status_.ok()) {
387
0
      assert(!fs_buf_);
388
0
      return io_status_;
389
0
    }
390
413k
  }
391
392
414k
  if (do_uncompress_ && compression_type() != kNoCompression) {
393
0
    PERF_TIMER_GUARD(block_decompress_time);
394
    // Process the compressed block without trailer
395
0
    slice_.size_ = block_size_;
396
0
    decomp_args_.compressed_data = slice_;
397
0
    io_status_ = status_to_io_status(DecompressSerializedBlock(
398
0
        decomp_args_, *decompressor_, contents_, ioptions_, memory_allocator_));
399
#ifndef NDEBUG
400
    num_heap_buf_memcpy_++;
401
#endif
402
414k
  } else {
403
414k
    GetBlockContents();
404
414k
    slice_ = Slice();
405
414k
  }
406
407
414k
  InsertUncompressedBlockToPersistentCacheIfNeeded();
408
409
414k
  return io_status_;
410
414k
}
411
412
0
IOStatus BlockFetcher::ReadAsyncBlockContents() {
413
0
  if (TryGetUncompressBlockFromPersistentCache()) {
414
0
    compression_type() = kNoCompression;
415
#ifndef NDEBUG
416
    contents_->has_trailer = footer_.GetBlockTrailerSize() > 0;
417
#endif  // NDEBUG
418
0
    return IOStatus::OK();
419
0
  } else if (!TryGetSerializedBlockFromPersistentCache()) {
420
0
    assert(prefetch_buffer_ != nullptr);
421
0
    if (!for_compaction_) {
422
0
      IOOptions opts;
423
0
      IODebugContext dbg;
424
0
      IOStatus io_s = file_->PrepareIOOptions(read_options_, opts, &dbg);
425
0
      if (!io_s.ok()) {
426
0
        return io_s;
427
0
      }
428
0
      io_s = status_to_io_status(prefetch_buffer_->PrefetchAsync(
429
0
          opts, file_, handle_.offset(), block_size_with_trailer_, &slice_));
430
0
      if (io_s.IsTryAgain()) {
431
0
        return io_s;
432
0
      }
433
0
      if (io_s.ok()) {
434
        // Data Block is already in prefetch.
435
0
        got_from_prefetch_buffer_ = true;
436
0
        ProcessTrailerIfPresent();
437
0
        if (io_status_.IsCorruption() && retry_corrupt_read_) {
438
0
          got_from_prefetch_buffer_ = false;
439
0
          ReadBlock(/*retry = */ true);
440
0
        }
441
0
        if (!io_status_.ok()) {
442
0
          assert(!fs_buf_);
443
0
          return io_status_;
444
0
        }
445
0
        used_buf_ = const_cast<char*>(slice_.data());
446
447
0
        if (do_uncompress_ && compression_type() != kNoCompression) {
448
0
          PERF_TIMER_GUARD(block_decompress_time);
449
          // Process the compressed block without trailer
450
0
          slice_.size_ = block_size_;
451
0
          decomp_args_.compressed_data = slice_;
452
0
          io_status_ = status_to_io_status(
453
0
              DecompressSerializedBlock(decomp_args_, *decompressor_, contents_,
454
0
                                        ioptions_, memory_allocator_));
455
#ifndef NDEBUG
456
          num_heap_buf_memcpy_++;
457
#endif
458
0
        } else {
459
0
          GetBlockContents();
460
0
        }
461
0
        InsertUncompressedBlockToPersistentCacheIfNeeded();
462
0
        return io_status_;
463
0
      }
464
0
    }
465
    // Fallback to sequential reading of data blocks in case of io_s returns
466
    // error or for_compaction_is true.
467
0
    return ReadBlockContents();
468
0
  }
469
0
  return io_status_;
470
0
}
471
472
}  // namespace ROCKSDB_NAMESPACE