/src/rocksdb/table/block_based/partitioned_index_reader.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | #include "table/block_based/partitioned_index_reader.h" |
10 | | |
11 | | #include "block_cache.h" |
12 | | #include "file/random_access_file_reader.h" |
13 | | #include "table/block_based/block_based_table_reader.h" |
14 | | #include "table/block_based/partitioned_index_iterator.h" |
15 | | |
16 | | namespace ROCKSDB_NAMESPACE { |
17 | | Status PartitionIndexReader::Create( |
18 | | const BlockBasedTable* table, const ReadOptions& ro, |
19 | | FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch, |
20 | | bool pin, BlockCacheLookupContext* lookup_context, |
21 | 0 | std::unique_ptr<IndexReader>* index_reader) { |
22 | 0 | assert(table != nullptr); |
23 | 0 | assert(table->get_rep()); |
24 | 0 | assert(!pin || prefetch); |
25 | 0 | assert(index_reader != nullptr); |
26 | |
|
27 | 0 | CachableEntry<Block> index_block; |
28 | 0 | if (prefetch || !use_cache) { |
29 | 0 | const Status s = |
30 | 0 | ReadIndexBlock(table, prefetch_buffer, ro, use_cache, |
31 | 0 | /*get_context=*/nullptr, lookup_context, &index_block); |
32 | 0 | if (!s.ok()) { |
33 | 0 | return s; |
34 | 0 | } |
35 | | |
36 | 0 | if (use_cache && !pin) { |
37 | 0 | index_block.Reset(); |
38 | 0 | } |
39 | 0 | } |
40 | | |
41 | 0 | index_reader->reset(new PartitionIndexReader(table, std::move(index_block))); |
42 | |
|
43 | 0 | return Status::OK(); |
44 | 0 | } |
45 | | |
46 | | InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator( |
47 | | const ReadOptions& read_options, bool /* disable_prefix_seek */, |
48 | | IndexBlockIter* iter, GetContext* get_context, |
49 | 0 | BlockCacheLookupContext* lookup_context) { |
50 | 0 | CachableEntry<Block> index_block; |
51 | 0 | const Status s = GetOrReadIndexBlock(get_context, lookup_context, |
52 | 0 | &index_block, read_options); |
53 | 0 | if (!s.ok()) { |
54 | 0 | if (iter != nullptr) { |
55 | 0 | iter->Invalidate(s); |
56 | 0 | return iter; |
57 | 0 | } |
58 | | |
59 | 0 | return NewErrorInternalIterator<IndexValue>(s); |
60 | 0 | } |
61 | | |
62 | 0 | const BlockBasedTable::Rep* rep = table()->rep_; |
63 | 0 | InternalIteratorBase<IndexValue>* it = nullptr; |
64 | |
|
65 | 0 | Statistics* kNullStats = nullptr; |
66 | | // Filters are already checked before seeking the index |
67 | 0 | if (!partition_map_.empty()) { |
68 | | // We don't return pinned data from index blocks, so no need |
69 | | // to set `block_contents_pinned`. |
70 | 0 | it = NewTwoLevelIterator( |
71 | 0 | new BlockBasedTable::PartitionedIndexIteratorState(table(), |
72 | 0 | &partition_map_), |
73 | 0 | index_block.GetValue()->NewIndexIterator( |
74 | 0 | internal_comparator()->user_comparator(), |
75 | 0 | rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true, |
76 | 0 | index_has_first_key(), index_key_includes_seq(), |
77 | 0 | index_value_is_full(), false /* block_contents_pinned */, |
78 | 0 | user_defined_timestamps_persisted())); |
79 | 0 | } else { |
80 | 0 | ReadOptions ro{read_options}; |
81 | | // FIXME? Possible regression seen in prefetch_test if this field is |
82 | | // propagated |
83 | 0 | ro.readahead_size = ReadOptions{}.readahead_size; |
84 | | |
85 | | // We don't return pinned data from index blocks, so no need |
86 | | // to set `block_contents_pinned`. |
87 | 0 | std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter( |
88 | 0 | index_block.GetValue()->NewIndexIterator( |
89 | 0 | internal_comparator()->user_comparator(), |
90 | 0 | rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true, |
91 | 0 | index_has_first_key(), index_key_includes_seq(), |
92 | 0 | index_value_is_full(), false /* block_contents_pinned */, |
93 | 0 | user_defined_timestamps_persisted())); |
94 | |
|
95 | 0 | it = new PartitionedIndexIterator( |
96 | 0 | table(), ro, *internal_comparator(), std::move(index_iter), |
97 | 0 | lookup_context ? lookup_context->caller |
98 | 0 | : TableReaderCaller::kUncategorized); |
99 | 0 | } |
100 | |
|
101 | 0 | assert(it != nullptr); |
102 | 0 | index_block.TransferTo(it); |
103 | |
|
104 | 0 | return it; |
105 | | |
106 | | // TODO(myabandeh): Update TwoLevelIterator to be able to make use of |
107 | | // on-stack BlockIter while the state is on heap. Currentlly it assumes |
108 | | // the first level iter is always on heap and will attempt to delete it |
109 | | // in its destructor. |
110 | 0 | } |
111 | | Status PartitionIndexReader::CacheDependencies( |
112 | 0 | const ReadOptions& ro, bool pin, FilePrefetchBuffer* tail_prefetch_buffer) { |
113 | 0 | if (!partition_map_.empty()) { |
114 | | // The dependencies are already cached since `partition_map_` is filled in |
115 | | // an all-or-nothing manner. |
116 | 0 | return Status::OK(); |
117 | 0 | } |
118 | | // Before read partitions, prefetch them to avoid lots of IOs |
119 | 0 | BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch}; |
120 | 0 | const BlockBasedTable::Rep* rep = table()->rep_; |
121 | 0 | IndexBlockIter biter; |
122 | 0 | BlockHandle handle; |
123 | 0 | Statistics* kNullStats = nullptr; |
124 | |
|
125 | 0 | CachableEntry<Block> index_block; |
126 | 0 | { |
127 | 0 | Status s = GetOrReadIndexBlock(nullptr /* get_context */, &lookup_context, |
128 | 0 | &index_block, ro); |
129 | 0 | if (!s.ok()) { |
130 | 0 | return s; |
131 | 0 | } |
132 | 0 | } |
133 | | |
134 | | // We don't return pinned data from index blocks, so no need |
135 | | // to set `block_contents_pinned`. |
136 | 0 | index_block.GetValue()->NewIndexIterator( |
137 | 0 | internal_comparator()->user_comparator(), |
138 | 0 | rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true, |
139 | 0 | index_has_first_key(), index_key_includes_seq(), index_value_is_full(), |
140 | 0 | false /* block_contents_pinned */, user_defined_timestamps_persisted()); |
141 | | // Index partitions are assumed to be consecuitive. Prefetch them all. |
142 | | // Read the first block offset |
143 | 0 | biter.SeekToFirst(); |
144 | 0 | if (!biter.Valid()) { |
145 | | // Empty index. |
146 | 0 | return biter.status(); |
147 | 0 | } |
148 | 0 | handle = biter.value().handle; |
149 | 0 | uint64_t prefetch_off = handle.offset(); |
150 | | |
151 | | // Read the last block's offset |
152 | 0 | biter.SeekToLast(); |
153 | 0 | if (!biter.Valid()) { |
154 | | // Empty index. |
155 | 0 | return biter.status(); |
156 | 0 | } |
157 | 0 | handle = biter.value().handle; |
158 | 0 | uint64_t last_off = |
159 | 0 | handle.offset() + BlockBasedTable::BlockSizeWithTrailer(handle); |
160 | 0 | uint64_t prefetch_len = last_off - prefetch_off; |
161 | 0 | std::unique_ptr<FilePrefetchBuffer> prefetch_buffer; |
162 | 0 | if (tail_prefetch_buffer == nullptr || !tail_prefetch_buffer->Enabled() || |
163 | 0 | tail_prefetch_buffer->GetPrefetchOffset() > prefetch_off) { |
164 | 0 | rep->CreateFilePrefetchBuffer(ReadaheadParams(), &prefetch_buffer, |
165 | 0 | /*readaheadsize_cb*/ nullptr, |
166 | 0 | /*usage=*/FilePrefetchBufferUsage::kUnknown); |
167 | 0 | IOOptions opts; |
168 | 0 | { |
169 | 0 | Status s = rep->file->PrepareIOOptions(ro, opts); |
170 | 0 | if (s.ok()) { |
171 | 0 | s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off, |
172 | 0 | static_cast<size_t>(prefetch_len)); |
173 | 0 | } |
174 | 0 | if (!s.ok()) { |
175 | 0 | return s; |
176 | 0 | } |
177 | 0 | } |
178 | 0 | } |
179 | | // For saving "all or nothing" to partition_map_ |
180 | 0 | UnorderedMap<uint64_t, CachableEntry<Block>> map_in_progress; |
181 | | |
182 | | // After prefetch, read the partitions one by one |
183 | 0 | biter.SeekToFirst(); |
184 | 0 | size_t partition_count = 0; |
185 | 0 | for (; biter.Valid(); biter.Next()) { |
186 | 0 | handle = biter.value().handle; |
187 | 0 | CachableEntry<Block> block; |
188 | 0 | ++partition_count; |
189 | | // TODO: Support counter batch update for partitioned index and |
190 | | // filter blocks |
191 | 0 | Status s = table()->MaybeReadBlockAndLoadToCache( |
192 | 0 | prefetch_buffer ? prefetch_buffer.get() : tail_prefetch_buffer, ro, |
193 | 0 | handle, rep->decompressor.get(), |
194 | 0 | /*for_compaction=*/false, &block.As<Block_kIndex>(), |
195 | 0 | /*get_context=*/nullptr, &lookup_context, /*contents=*/nullptr, |
196 | 0 | /*async_read=*/false, /*use_block_cache_for_lookup=*/true); |
197 | |
|
198 | 0 | if (!s.ok()) { |
199 | 0 | return s; |
200 | 0 | } |
201 | 0 | if (block.GetValue() != nullptr) { |
202 | | // Might need to "pin" some mmap-read blocks (GetOwnValue) if some |
203 | | // partitions are successfully compressed (cached) and some are not |
204 | | // compressed (mmap eligible) |
205 | 0 | if (block.IsCached() || block.GetOwnValue()) { |
206 | 0 | if (pin) { |
207 | 0 | map_in_progress[handle.offset()] = std::move(block); |
208 | 0 | } |
209 | 0 | } |
210 | 0 | } |
211 | 0 | } |
212 | 0 | Status s = biter.status(); |
213 | | // Save (pin) them only if everything checks out |
214 | 0 | if (map_in_progress.size() == partition_count && s.ok()) { |
215 | 0 | std::swap(partition_map_, map_in_progress); |
216 | 0 | } |
217 | 0 | return s; |
218 | 0 | } |
219 | | |
220 | | void PartitionIndexReader::EraseFromCacheBeforeDestruction( |
221 | 0 | uint32_t uncache_aggressiveness) { |
222 | | // NOTE: essentially a copy of |
223 | | // PartitionedFilterBlockReader::EraseFromCacheBeforeDestruction |
224 | 0 | if (uncache_aggressiveness > 0) { |
225 | 0 | CachableEntry<Block> top_level_block; |
226 | |
|
227 | 0 | ReadOptions ro_no_io; |
228 | 0 | ro_no_io.read_tier = ReadTier::kBlockCacheTier; |
229 | 0 | GetOrReadIndexBlock(/*get_context=*/nullptr, |
230 | 0 | /*lookup_context=*/nullptr, &top_level_block, ro_no_io) |
231 | 0 | .PermitUncheckedError(); |
232 | |
|
233 | 0 | if (!partition_map_.empty()) { |
234 | | // All partitions present if any |
235 | 0 | for (auto& e : partition_map_) { |
236 | 0 | e.second.ResetEraseIfLastRef(); |
237 | 0 | } |
238 | 0 | } else if (!top_level_block.IsEmpty()) { |
239 | 0 | IndexBlockIter biter; |
240 | 0 | const InternalKeyComparator* const comparator = internal_comparator(); |
241 | 0 | Statistics* kNullStats = nullptr; |
242 | 0 | top_level_block.GetValue()->NewIndexIterator( |
243 | 0 | comparator->user_comparator(), |
244 | 0 | table()->get_rep()->get_global_seqno(BlockType::kIndex), &biter, |
245 | 0 | kNullStats, true /* total_order_seek */, index_has_first_key(), |
246 | 0 | index_key_includes_seq(), index_value_is_full(), |
247 | 0 | false /* block_contents_pinned */, |
248 | 0 | user_defined_timestamps_persisted()); |
249 | |
|
250 | 0 | UncacheAggressivenessAdvisor advisor(uncache_aggressiveness); |
251 | 0 | for (biter.SeekToFirst(); biter.Valid() && advisor.ShouldContinue(); |
252 | 0 | biter.Next()) { |
253 | 0 | bool erased = table()->EraseFromCache(biter.value().handle); |
254 | 0 | advisor.Report(erased); |
255 | 0 | } |
256 | 0 | biter.status().PermitUncheckedError(); |
257 | 0 | } |
258 | 0 | top_level_block.ResetEraseIfLastRef(); |
259 | 0 | } |
260 | | // Might be needed to un-cache a pinned top-level block |
261 | 0 | BlockBasedTable::IndexReaderCommon::EraseFromCacheBeforeDestruction( |
262 | 0 | uncache_aggressiveness); |
263 | 0 | } |
264 | | } // namespace ROCKSDB_NAMESPACE |