Coverage Report

Created: 2026-04-10 07:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/table/block_based/partitioned_index_reader.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
#include "table/block_based/partitioned_index_reader.h"
10
11
#include "block_cache.h"
12
#include "file/random_access_file_reader.h"
13
#include "table/block_based/block_based_table_reader.h"
14
#include "table/block_based/partitioned_index_iterator.h"
15
16
namespace ROCKSDB_NAMESPACE {
17
Status PartitionIndexReader::Create(
18
    const BlockBasedTable* table, const ReadOptions& ro,
19
    FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch,
20
    bool pin, BlockCacheLookupContext* lookup_context,
21
0
    std::unique_ptr<IndexReader>* index_reader) {
22
0
  assert(table != nullptr);
23
0
  assert(table->get_rep());
24
0
  assert(!pin || prefetch);
25
0
  assert(index_reader != nullptr);
26
27
0
  CachableEntry<Block> index_block;
28
0
  if (prefetch || !use_cache) {
29
0
    const Status s =
30
0
        ReadIndexBlock(table, prefetch_buffer, ro, use_cache,
31
0
                       /*get_context=*/nullptr, lookup_context, &index_block);
32
0
    if (!s.ok()) {
33
0
      return s;
34
0
    }
35
36
0
    if (use_cache && !pin) {
37
0
      index_block.Reset();
38
0
    }
39
0
  }
40
41
0
  index_reader->reset(new PartitionIndexReader(table, std::move(index_block)));
42
43
0
  return Status::OK();
44
0
}
45
46
InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
47
    const ReadOptions& read_options, bool /* disable_prefix_seek */,
48
    IndexBlockIter* iter, GetContext* get_context,
49
0
    BlockCacheLookupContext* lookup_context) {
50
0
  CachableEntry<Block> index_block;
51
0
  const Status s = GetOrReadIndexBlock(get_context, lookup_context,
52
0
                                       &index_block, read_options);
53
0
  if (!s.ok()) {
54
0
    if (iter != nullptr) {
55
0
      iter->Invalidate(s);
56
0
      return iter;
57
0
    }
58
59
0
    return NewErrorInternalIterator<IndexValue>(s);
60
0
  }
61
62
0
  const BlockBasedTable::Rep* rep = table()->rep_;
63
0
  InternalIteratorBase<IndexValue>* it = nullptr;
64
65
0
  Statistics* kNullStats = nullptr;
66
  // Filters are already checked before seeking the index
67
0
  if (!partition_map_.empty()) {
68
    // We don't return pinned data from index blocks, so no need
69
    // to set `block_contents_pinned`.
70
0
    it = NewTwoLevelIterator(
71
0
        new BlockBasedTable::PartitionedIndexIteratorState(table(),
72
0
                                                           &partition_map_),
73
0
        index_block.GetValue()->NewIndexIterator(
74
0
            internal_comparator()->user_comparator(),
75
0
            rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
76
0
            index_has_first_key(), index_key_includes_seq(),
77
0
            index_value_is_full(), false /* block_contents_pinned */,
78
0
            user_defined_timestamps_persisted()));
79
0
  } else {
80
0
    ReadOptions ro{read_options};
81
    // FIXME? Possible regression seen in prefetch_test if this field is
82
    // propagated
83
0
    ro.readahead_size = ReadOptions{}.readahead_size;
84
85
    // We don't return pinned data from index blocks, so no need
86
    // to set `block_contents_pinned`.
87
0
    std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter(
88
0
        index_block.GetValue()->NewIndexIterator(
89
0
            internal_comparator()->user_comparator(),
90
0
            rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
91
0
            index_has_first_key(), index_key_includes_seq(),
92
0
            index_value_is_full(), false /* block_contents_pinned */,
93
0
            user_defined_timestamps_persisted()));
94
95
0
    it = new PartitionedIndexIterator(
96
0
        table(), ro, *internal_comparator(), std::move(index_iter),
97
0
        lookup_context ? lookup_context->caller
98
0
                       : TableReaderCaller::kUncategorized);
99
0
  }
100
101
0
  assert(it != nullptr);
102
0
  index_block.TransferTo(it);
103
104
0
  return it;
105
106
  // TODO(myabandeh): Update TwoLevelIterator to be able to make use of
107
  // on-stack BlockIter while the state is on heap. Currentlly it assumes
108
  // the first level iter is always on heap and will attempt to delete it
109
  // in its destructor.
110
0
}
111
Status PartitionIndexReader::CacheDependencies(
112
0
    const ReadOptions& ro, bool pin, FilePrefetchBuffer* tail_prefetch_buffer) {
113
0
  if (!partition_map_.empty()) {
114
    // The dependencies are already cached since `partition_map_` is filled in
115
    // an all-or-nothing manner.
116
0
    return Status::OK();
117
0
  }
118
  // Before read partitions, prefetch them to avoid lots of IOs
119
0
  BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
120
0
  const BlockBasedTable::Rep* rep = table()->rep_;
121
0
  IndexBlockIter biter;
122
0
  BlockHandle handle;
123
0
  Statistics* kNullStats = nullptr;
124
125
0
  CachableEntry<Block> index_block;
126
0
  {
127
0
    Status s = GetOrReadIndexBlock(nullptr /* get_context */, &lookup_context,
128
0
                                   &index_block, ro);
129
0
    if (!s.ok()) {
130
0
      return s;
131
0
    }
132
0
  }
133
134
  // We don't return pinned data from index blocks, so no need
135
  // to set `block_contents_pinned`.
136
0
  index_block.GetValue()->NewIndexIterator(
137
0
      internal_comparator()->user_comparator(),
138
0
      rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true,
139
0
      index_has_first_key(), index_key_includes_seq(), index_value_is_full(),
140
0
      false /* block_contents_pinned */, user_defined_timestamps_persisted());
141
  // Index partitions are assumed to be consecuitive. Prefetch them all.
142
  // Read the first block offset
143
0
  biter.SeekToFirst();
144
0
  if (!biter.Valid()) {
145
    // Empty index.
146
0
    return biter.status();
147
0
  }
148
0
  handle = biter.value().handle;
149
0
  uint64_t prefetch_off = handle.offset();
150
151
  // Read the last block's offset
152
0
  biter.SeekToLast();
153
0
  if (!biter.Valid()) {
154
    // Empty index.
155
0
    return biter.status();
156
0
  }
157
0
  handle = biter.value().handle;
158
0
  uint64_t last_off =
159
0
      handle.offset() + BlockBasedTable::BlockSizeWithTrailer(handle);
160
0
  uint64_t prefetch_len = last_off - prefetch_off;
161
0
  std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
162
0
  if (tail_prefetch_buffer == nullptr || !tail_prefetch_buffer->Enabled() ||
163
0
      tail_prefetch_buffer->GetPrefetchOffset() > prefetch_off) {
164
0
    rep->CreateFilePrefetchBuffer(ReadaheadParams(), &prefetch_buffer,
165
0
                                  /*readaheadsize_cb*/ nullptr,
166
0
                                  /*usage=*/FilePrefetchBufferUsage::kUnknown);
167
0
    IOOptions opts;
168
0
    {
169
0
      Status s = rep->file->PrepareIOOptions(ro, opts);
170
0
      if (s.ok()) {
171
0
        s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off,
172
0
                                      static_cast<size_t>(prefetch_len));
173
0
      }
174
0
      if (!s.ok()) {
175
0
        return s;
176
0
      }
177
0
    }
178
0
  }
179
  // For saving "all or nothing" to partition_map_
180
0
  UnorderedMap<uint64_t, CachableEntry<Block>> map_in_progress;
181
182
  // After prefetch, read the partitions one by one
183
0
  biter.SeekToFirst();
184
0
  size_t partition_count = 0;
185
0
  for (; biter.Valid(); biter.Next()) {
186
0
    handle = biter.value().handle;
187
0
    CachableEntry<Block> block;
188
0
    ++partition_count;
189
    // TODO: Support counter batch update for partitioned index and
190
    // filter blocks
191
0
    Status s = table()->MaybeReadBlockAndLoadToCache(
192
0
        prefetch_buffer ? prefetch_buffer.get() : tail_prefetch_buffer, ro,
193
0
        handle, rep->decompressor.get(),
194
0
        /*for_compaction=*/false, &block.As<Block_kIndex>(),
195
0
        /*get_context=*/nullptr, &lookup_context, /*contents=*/nullptr,
196
0
        /*async_read=*/false, /*use_block_cache_for_lookup=*/true);
197
198
0
    if (!s.ok()) {
199
0
      return s;
200
0
    }
201
0
    if (block.GetValue() != nullptr) {
202
      // Might need to "pin" some mmap-read blocks (GetOwnValue) if some
203
      // partitions are successfully compressed (cached) and some are not
204
      // compressed (mmap eligible)
205
0
      if (block.IsCached() || block.GetOwnValue()) {
206
0
        if (pin) {
207
0
          map_in_progress[handle.offset()] = std::move(block);
208
0
        }
209
0
      }
210
0
    }
211
0
  }
212
0
  Status s = biter.status();
213
  // Save (pin) them only if everything checks out
214
0
  if (map_in_progress.size() == partition_count && s.ok()) {
215
0
    std::swap(partition_map_, map_in_progress);
216
0
  }
217
0
  return s;
218
0
}
219
220
void PartitionIndexReader::EraseFromCacheBeforeDestruction(
221
0
    uint32_t uncache_aggressiveness) {
222
  // NOTE: essentially a copy of
223
  // PartitionedFilterBlockReader::EraseFromCacheBeforeDestruction
224
0
  if (uncache_aggressiveness > 0) {
225
0
    CachableEntry<Block> top_level_block;
226
227
0
    ReadOptions ro_no_io;
228
0
    ro_no_io.read_tier = ReadTier::kBlockCacheTier;
229
0
    GetOrReadIndexBlock(/*get_context=*/nullptr,
230
0
                        /*lookup_context=*/nullptr, &top_level_block, ro_no_io)
231
0
        .PermitUncheckedError();
232
233
0
    if (!partition_map_.empty()) {
234
      // All partitions present if any
235
0
      for (auto& e : partition_map_) {
236
0
        e.second.ResetEraseIfLastRef();
237
0
      }
238
0
    } else if (!top_level_block.IsEmpty()) {
239
0
      IndexBlockIter biter;
240
0
      const InternalKeyComparator* const comparator = internal_comparator();
241
0
      Statistics* kNullStats = nullptr;
242
0
      top_level_block.GetValue()->NewIndexIterator(
243
0
          comparator->user_comparator(),
244
0
          table()->get_rep()->get_global_seqno(BlockType::kIndex), &biter,
245
0
          kNullStats, true /* total_order_seek */, index_has_first_key(),
246
0
          index_key_includes_seq(), index_value_is_full(),
247
0
          false /* block_contents_pinned */,
248
0
          user_defined_timestamps_persisted());
249
250
0
      UncacheAggressivenessAdvisor advisor(uncache_aggressiveness);
251
0
      for (biter.SeekToFirst(); biter.Valid() && advisor.ShouldContinue();
252
0
           biter.Next()) {
253
0
        bool erased = table()->EraseFromCache(biter.value().handle);
254
0
        advisor.Report(erased);
255
0
      }
256
0
      biter.status().PermitUncheckedError();
257
0
    }
258
0
    top_level_block.ResetEraseIfLastRef();
259
0
  }
260
  // Might be needed to un-cache a pinned top-level block
261
0
  BlockBasedTable::IndexReaderCommon::EraseFromCacheBeforeDestruction(
262
0
      uncache_aggressiveness);
263
0
}
264
}  // namespace ROCKSDB_NAMESPACE