/src/rocksdb/db/forward_iterator.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "db/forward_iterator.h" |
7 | | |
8 | | #include <limits> |
9 | | #include <string> |
10 | | #include <utility> |
11 | | |
12 | | #include "db/column_family.h" |
13 | | #include "db/db_impl/db_impl.h" |
14 | | #include "db/db_iter.h" |
15 | | #include "db/dbformat.h" |
16 | | #include "db/job_context.h" |
17 | | #include "db/range_del_aggregator.h" |
18 | | #include "db/range_tombstone_fragmenter.h" |
19 | | #include "rocksdb/env.h" |
20 | | #include "rocksdb/slice.h" |
21 | | #include "rocksdb/slice_transform.h" |
22 | | #include "table/merging_iterator.h" |
23 | | #include "test_util/sync_point.h" |
24 | | #include "util/string_util.h" |
25 | | |
26 | | namespace ROCKSDB_NAMESPACE { |
27 | | |
28 | | // Usage: |
29 | | // ForwardLevelIterator iter; |
30 | | // iter.SetFileIndex(file_index); |
31 | | // iter.Seek(target); // or iter.SeekToFirst(); |
32 | | // iter.Next() |
33 | | class ForwardLevelIterator : public InternalIterator { |
34 | | public: |
35 | | ForwardLevelIterator(const ColumnFamilyData* const cfd, |
36 | | const ReadOptions& read_options, |
37 | | const std::vector<FileMetaData*>& files, |
38 | | const MutableCFOptions& mutable_cf_options, |
39 | | bool allow_unprepared_value) |
40 | 0 | : cfd_(cfd), |
41 | 0 | read_options_(read_options), |
42 | 0 | files_(files), |
43 | 0 | valid_(false), |
44 | 0 | file_index_(std::numeric_limits<uint32_t>::max()), |
45 | 0 | file_iter_(nullptr), |
46 | 0 | pinned_iters_mgr_(nullptr), |
47 | 0 | mutable_cf_options_(mutable_cf_options), |
48 | 0 | allow_unprepared_value_(allow_unprepared_value) { |
49 | 0 | status_.PermitUncheckedError(); // Allow uninitialized status through |
50 | 0 | } |
51 | | |
52 | 0 | ~ForwardLevelIterator() override { |
53 | | // Reset current pointer |
54 | 0 | if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { |
55 | 0 | pinned_iters_mgr_->PinIterator(file_iter_); |
56 | 0 | } else { |
57 | 0 | delete file_iter_; |
58 | 0 | } |
59 | 0 | } |
60 | | |
61 | 0 | void SetFileIndex(uint32_t file_index) { |
62 | 0 | assert(file_index < files_.size()); |
63 | 0 | status_ = Status::OK(); |
64 | 0 | if (file_index != file_index_) { |
65 | 0 | file_index_ = file_index; |
66 | 0 | Reset(); |
67 | 0 | } |
68 | 0 | } |
69 | 0 | void Reset() { |
70 | 0 | assert(file_index_ < files_.size()); |
71 | | |
72 | | // Reset current pointer |
73 | 0 | if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { |
74 | 0 | pinned_iters_mgr_->PinIterator(file_iter_); |
75 | 0 | } else { |
76 | 0 | delete file_iter_; |
77 | 0 | } |
78 | |
|
79 | 0 | ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(), |
80 | 0 | kMaxSequenceNumber /* upper_bound */); |
81 | 0 | file_iter_ = cfd_->table_cache()->NewIterator( |
82 | 0 | read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), |
83 | 0 | *files_[file_index_], |
84 | 0 | read_options_.ignore_range_deletions ? nullptr : &range_del_agg, |
85 | 0 | mutable_cf_options_, /*table_reader_ptr=*/nullptr, |
86 | 0 | /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator, |
87 | 0 | /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1, |
88 | 0 | /*max_file_size_for_l0_meta_pin=*/0, |
89 | 0 | /*smallest_compaction_key=*/nullptr, |
90 | 0 | /*largest_compaction_key=*/nullptr, allow_unprepared_value_); |
91 | 0 | file_iter_->SetPinnedItersMgr(pinned_iters_mgr_); |
92 | 0 | valid_ = false; |
93 | 0 | if (!range_del_agg.IsEmpty()) { |
94 | 0 | status_ = Status::NotSupported( |
95 | 0 | "Range tombstones unsupported with ForwardIterator"); |
96 | 0 | } |
97 | 0 | } |
98 | 0 | void SeekToLast() override { |
99 | 0 | status_ = Status::NotSupported("ForwardLevelIterator::SeekToLast()"); |
100 | 0 | valid_ = false; |
101 | 0 | } |
102 | 0 | void Prev() override { |
103 | 0 | status_ = Status::NotSupported("ForwardLevelIterator::Prev()"); |
104 | 0 | valid_ = false; |
105 | 0 | } |
106 | 0 | bool Valid() const override { return valid_; } |
107 | 0 | void SeekToFirst() override { |
108 | 0 | assert(file_iter_ != nullptr); |
109 | 0 | if (!status_.ok()) { |
110 | 0 | assert(!valid_); |
111 | 0 | return; |
112 | 0 | } |
113 | 0 | file_iter_->SeekToFirst(); |
114 | 0 | valid_ = file_iter_->Valid(); |
115 | 0 | } |
116 | 0 | void Seek(const Slice& internal_key) override { |
117 | 0 | assert(file_iter_ != nullptr); |
118 | | |
119 | | // This deviates from the usual convention for InternalIterator::Seek() in |
120 | | // that it doesn't discard pre-existing error status. That's because this |
121 | | // Seek() is only supposed to be called immediately after SetFileIndex() |
122 | | // (which discards pre-existing error status), and SetFileIndex() may set |
123 | | // an error status, which we shouldn't discard. |
124 | 0 | if (!status_.ok()) { |
125 | 0 | assert(!valid_); |
126 | 0 | return; |
127 | 0 | } |
128 | | |
129 | 0 | file_iter_->Seek(internal_key); |
130 | 0 | valid_ = file_iter_->Valid(); |
131 | 0 | } |
132 | 0 | void SeekForPrev(const Slice& /*internal_key*/) override { |
133 | 0 | status_ = Status::NotSupported("ForwardLevelIterator::SeekForPrev()"); |
134 | 0 | valid_ = false; |
135 | 0 | } |
136 | 0 | void Next() override { |
137 | 0 | assert(valid_); |
138 | 0 | file_iter_->Next(); |
139 | 0 | for (;;) { |
140 | 0 | valid_ = file_iter_->Valid(); |
141 | 0 | if (!file_iter_->status().ok()) { |
142 | 0 | assert(!valid_); |
143 | 0 | return; |
144 | 0 | } |
145 | 0 | if (valid_) { |
146 | 0 | return; |
147 | 0 | } |
148 | 0 | if (file_index_ + 1 >= files_.size()) { |
149 | 0 | valid_ = false; |
150 | 0 | return; |
151 | 0 | } |
152 | 0 | SetFileIndex(file_index_ + 1); |
153 | 0 | if (!status_.ok()) { |
154 | 0 | assert(!valid_); |
155 | 0 | return; |
156 | 0 | } |
157 | 0 | file_iter_->SeekToFirst(); |
158 | 0 | } |
159 | 0 | } |
160 | 0 | Slice key() const override { |
161 | 0 | assert(valid_); |
162 | 0 | return file_iter_->key(); |
163 | 0 | } |
164 | 0 | Slice value() const override { |
165 | 0 | assert(valid_); |
166 | 0 | return file_iter_->value(); |
167 | 0 | } |
168 | 0 | uint64_t write_unix_time() const override { |
169 | 0 | assert(valid_); |
170 | 0 | return file_iter_->write_unix_time(); |
171 | 0 | } |
172 | 0 | Status status() const override { |
173 | 0 | if (!status_.ok()) { |
174 | 0 | return status_; |
175 | 0 | } else if (file_iter_) { |
176 | 0 | return file_iter_->status(); |
177 | 0 | } |
178 | 0 | return Status::OK(); |
179 | 0 | } |
180 | 0 | bool PrepareValue() override { |
181 | 0 | assert(valid_); |
182 | 0 | if (file_iter_->PrepareValue()) { |
183 | 0 | return true; |
184 | 0 | } |
185 | | |
186 | 0 | assert(!file_iter_->Valid()); |
187 | 0 | valid_ = false; |
188 | 0 | return false; |
189 | 0 | } |
190 | 0 | bool IsKeyPinned() const override { |
191 | 0 | return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && |
192 | 0 | file_iter_->IsKeyPinned(); |
193 | 0 | } |
194 | 0 | bool IsValuePinned() const override { |
195 | 0 | return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && |
196 | 0 | file_iter_->IsValuePinned(); |
197 | 0 | } |
198 | 0 | void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { |
199 | 0 | pinned_iters_mgr_ = pinned_iters_mgr; |
200 | 0 | if (file_iter_) { |
201 | 0 | file_iter_->SetPinnedItersMgr(pinned_iters_mgr_); |
202 | 0 | } |
203 | 0 | } |
204 | | |
205 | | private: |
206 | | const ColumnFamilyData* const cfd_; |
207 | | const ReadOptions& read_options_; |
208 | | const std::vector<FileMetaData*>& files_; |
209 | | |
210 | | bool valid_; |
211 | | uint32_t file_index_; |
212 | | Status status_; |
213 | | InternalIterator* file_iter_; |
214 | | PinnedIteratorsManager* pinned_iters_mgr_; |
215 | | const MutableCFOptions& mutable_cf_options_; |
216 | | |
217 | | const bool allow_unprepared_value_; |
218 | | }; |
219 | | |
220 | | ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options, |
221 | | ColumnFamilyData* cfd, |
222 | | SuperVersion* current_sv, |
223 | | bool allow_unprepared_value) |
224 | 0 | : db_(db), |
225 | 0 | read_options_(read_options), |
226 | 0 | cfd_(cfd), |
227 | 0 | prefix_extractor_(current_sv->mutable_cf_options.prefix_extractor.get()), |
228 | 0 | user_comparator_(cfd->user_comparator()), |
229 | 0 | allow_unprepared_value_(allow_unprepared_value), |
230 | 0 | immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())), |
231 | 0 | sv_(current_sv), |
232 | 0 | mutable_iter_(nullptr), |
233 | 0 | current_(nullptr), |
234 | 0 | valid_(false), |
235 | 0 | status_(Status::OK()), |
236 | 0 | immutable_status_(Status::OK()), |
237 | 0 | has_iter_trimmed_for_upper_bound_(false), |
238 | 0 | current_over_upper_bound_(false), |
239 | 0 | is_prev_set_(false), |
240 | 0 | is_prev_inclusive_(false), |
241 | 0 | pinned_iters_mgr_(nullptr) { |
242 | 0 | if (sv_) { |
243 | 0 | RebuildIterators(false); |
244 | 0 | } |
245 | 0 | if (!CheckFSFeatureSupport(cfd_->ioptions().env->GetFileSystem().get(), |
246 | 0 | FSSupportedOps::kAsyncIO)) { |
247 | 0 | read_options_.async_io = false; |
248 | 0 | } |
249 | | // immutable_status_ is a local aggregation of the |
250 | | // status of the immutable Iterators. |
251 | | // We have to PermitUncheckedError in case it is never |
252 | | // used, otherwise it will fail ASSERT_STATUS_CHECKED. |
253 | 0 | immutable_status_.PermitUncheckedError(); |
254 | 0 | } |
255 | | |
256 | 0 | ForwardIterator::~ForwardIterator() { Cleanup(true); } |
257 | | |
258 | | void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv, |
259 | 0 | bool background_purge_on_iterator_cleanup) { |
260 | 0 | if (sv->Unref()) { |
261 | | // Job id == 0 means that this is not our background process, but rather |
262 | | // user thread |
263 | 0 | JobContext job_context(0); |
264 | 0 | db->mutex_.Lock(); |
265 | 0 | sv->Cleanup(); |
266 | 0 | db->FindObsoleteFiles(&job_context, false, true); |
267 | 0 | if (background_purge_on_iterator_cleanup) { |
268 | 0 | db->ScheduleBgLogWriterClose(&job_context); |
269 | 0 | db->AddSuperVersionsToFreeQueue(sv); |
270 | 0 | db->SchedulePurge(); |
271 | 0 | } |
272 | 0 | db->mutex_.Unlock(); |
273 | 0 | if (!background_purge_on_iterator_cleanup) { |
274 | 0 | delete sv; |
275 | 0 | } |
276 | 0 | if (job_context.HaveSomethingToDelete()) { |
277 | 0 | db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup); |
278 | 0 | } |
279 | 0 | job_context.Clean(); |
280 | 0 | } |
281 | 0 | } |
282 | | |
283 | | namespace { |
284 | | struct SVCleanupParams { |
285 | | DBImpl* db; |
286 | | SuperVersion* sv; |
287 | | bool background_purge_on_iterator_cleanup; |
288 | | }; |
289 | | } // anonymous namespace |
290 | | |
291 | | // Used in PinnedIteratorsManager to release pinned SuperVersion |
292 | 0 | void ForwardIterator::DeferredSVCleanup(void* arg) { |
293 | 0 | auto d = static_cast<SVCleanupParams*>(arg); |
294 | 0 | ForwardIterator::SVCleanup(d->db, d->sv, |
295 | 0 | d->background_purge_on_iterator_cleanup); |
296 | 0 | delete d; |
297 | 0 | } |
298 | | |
299 | 0 | void ForwardIterator::SVCleanup() { |
300 | 0 | if (sv_ == nullptr) { |
301 | 0 | return; |
302 | 0 | } |
303 | 0 | bool background_purge = |
304 | 0 | read_options_.background_purge_on_iterator_cleanup || |
305 | 0 | db_->immutable_db_options().avoid_unnecessary_blocking_io; |
306 | 0 | if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { |
307 | | // pinned_iters_mgr_ tells us to make sure that all visited key-value slices |
308 | | // are alive until pinned_iters_mgr_->ReleasePinnedData() is called. |
309 | | // The slices may point into some memtables owned by sv_, so we need to keep |
310 | | // sv_ referenced until pinned_iters_mgr_ unpins everything. |
311 | 0 | auto p = new SVCleanupParams{db_, sv_, background_purge}; |
312 | 0 | pinned_iters_mgr_->PinPtr(p, &ForwardIterator::DeferredSVCleanup); |
313 | 0 | } else { |
314 | 0 | SVCleanup(db_, sv_, background_purge); |
315 | 0 | } |
316 | 0 | } |
317 | | |
318 | 0 | void ForwardIterator::Cleanup(bool release_sv) { |
319 | 0 | if (mutable_iter_ != nullptr) { |
320 | 0 | DeleteIterator(mutable_iter_, true /* is_arena */); |
321 | 0 | } |
322 | |
|
323 | 0 | for (auto* m : imm_iters_) { |
324 | 0 | DeleteIterator(m, true /* is_arena */); |
325 | 0 | } |
326 | 0 | imm_iters_.clear(); |
327 | |
|
328 | 0 | for (auto* f : l0_iters_) { |
329 | 0 | DeleteIterator(f); |
330 | 0 | } |
331 | 0 | l0_iters_.clear(); |
332 | |
|
333 | 0 | for (auto* l : level_iters_) { |
334 | 0 | DeleteIterator(l); |
335 | 0 | } |
336 | 0 | level_iters_.clear(); |
337 | |
|
338 | 0 | if (release_sv) { |
339 | 0 | SVCleanup(); |
340 | 0 | } |
341 | 0 | } |
342 | | |
343 | 0 | bool ForwardIterator::Valid() const { |
344 | | // See UpdateCurrent(). |
345 | 0 | return valid_ ? !current_over_upper_bound_ : false; |
346 | 0 | } |
347 | | |
348 | 0 | void ForwardIterator::SeekToFirst() { |
349 | 0 | if (sv_ == nullptr) { |
350 | 0 | RebuildIterators(true); |
351 | 0 | } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) { |
352 | 0 | RenewIterators(); |
353 | 0 | } else if (immutable_status_.IsIncomplete()) { |
354 | 0 | ResetIncompleteIterators(); |
355 | 0 | } |
356 | 0 | SeekInternal(Slice(), true, false); |
357 | 0 | } |
358 | | |
359 | 0 | bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const { |
360 | 0 | return !(read_options_.iterate_upper_bound == nullptr || |
361 | 0 | cfd_->internal_comparator().user_comparator()->Compare( |
362 | 0 | ExtractUserKey(internal_key), |
363 | 0 | *read_options_.iterate_upper_bound) < 0); |
364 | 0 | } |
365 | | |
366 | 0 | void ForwardIterator::Seek(const Slice& internal_key) { |
367 | 0 | if (sv_ == nullptr) { |
368 | 0 | RebuildIterators(true); |
369 | 0 | } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) { |
370 | 0 | RenewIterators(); |
371 | 0 | } else if (immutable_status_.IsIncomplete()) { |
372 | 0 | ResetIncompleteIterators(); |
373 | 0 | } |
374 | |
|
375 | 0 | SeekInternal(internal_key, false, false); |
376 | 0 | if (read_options_.async_io) { |
377 | 0 | SeekInternal(internal_key, false, true); |
378 | 0 | } |
379 | 0 | } |
380 | | |
381 | | // In case of async_io, SeekInternal is called twice with seek_after_async_io |
382 | | // enabled in second call which only does seeking part to retrieve the blocks. |
383 | | void ForwardIterator::SeekInternal(const Slice& internal_key, |
384 | | bool seek_to_first, |
385 | 0 | bool seek_after_async_io) { |
386 | 0 | assert(mutable_iter_); |
387 | | // mutable |
388 | 0 | if (!seek_after_async_io) { |
389 | 0 | seek_to_first ? mutable_iter_->SeekToFirst() |
390 | 0 | : mutable_iter_->Seek(internal_key); |
391 | 0 | } |
392 | | |
393 | | // immutable |
394 | | // TODO(ljin): NeedToSeekImmutable has negative impact on performance |
395 | | // if it turns to need to seek immutable often. We probably want to have |
396 | | // an option to turn it off. |
397 | 0 | if (seek_to_first || seek_after_async_io || |
398 | 0 | NeedToSeekImmutable(internal_key)) { |
399 | 0 | if (!seek_after_async_io) { |
400 | 0 | immutable_status_ = Status::OK(); |
401 | 0 | if (has_iter_trimmed_for_upper_bound_ && |
402 | 0 | ( |
403 | | // prev_ is not set yet |
404 | 0 | is_prev_set_ == false || |
405 | | // We are doing SeekToFirst() and internal_key.size() = 0 |
406 | 0 | seek_to_first || |
407 | | // prev_key_ > internal_key |
408 | 0 | cfd_->internal_comparator().InternalKeyComparator::Compare( |
409 | 0 | prev_key_.GetInternalKey(), internal_key) > 0)) { |
410 | | // Some iterators are trimmed. Need to rebuild. |
411 | 0 | RebuildIterators(true); |
412 | | // Already seeked mutable iter, so seek again |
413 | 0 | seek_to_first ? mutable_iter_->SeekToFirst() |
414 | 0 | : mutable_iter_->Seek(internal_key); |
415 | 0 | } |
416 | 0 | { |
417 | 0 | auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator())); |
418 | 0 | immutable_min_heap_.swap(tmp); |
419 | 0 | } |
420 | 0 | for (size_t i = 0; i < imm_iters_.size(); i++) { |
421 | 0 | auto* m = imm_iters_[i]; |
422 | 0 | seek_to_first ? m->SeekToFirst() : m->Seek(internal_key); |
423 | 0 | if (!m->status().ok()) { |
424 | 0 | immutable_status_ = m->status(); |
425 | 0 | } else if (m->Valid()) { |
426 | 0 | immutable_min_heap_.push(m); |
427 | 0 | } |
428 | 0 | } |
429 | 0 | } |
430 | |
|
431 | 0 | Slice target_user_key; |
432 | 0 | if (!seek_to_first) { |
433 | 0 | target_user_key = ExtractUserKey(internal_key); |
434 | 0 | } |
435 | 0 | const VersionStorageInfo* vstorage = sv_->current->storage_info(); |
436 | 0 | const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0); |
437 | 0 | for (size_t i = 0; i < l0.size(); ++i) { |
438 | 0 | if (!l0_iters_[i]) { |
439 | 0 | continue; |
440 | 0 | } |
441 | 0 | if (seek_after_async_io) { |
442 | 0 | if (!l0_iters_[i]->status().IsTryAgain()) { |
443 | 0 | continue; |
444 | 0 | } |
445 | 0 | } |
446 | | |
447 | 0 | if (seek_to_first) { |
448 | 0 | l0_iters_[i]->SeekToFirst(); |
449 | 0 | } else { |
450 | | // If the target key passes over the largest key, we are sure Next() |
451 | | // won't go over this file. |
452 | 0 | if (seek_after_async_io == false && |
453 | 0 | user_comparator_->Compare(target_user_key, |
454 | 0 | l0[i]->largest.user_key()) > 0) { |
455 | 0 | if (read_options_.iterate_upper_bound != nullptr) { |
456 | 0 | has_iter_trimmed_for_upper_bound_ = true; |
457 | 0 | DeleteIterator(l0_iters_[i]); |
458 | 0 | l0_iters_[i] = nullptr; |
459 | 0 | } |
460 | 0 | continue; |
461 | 0 | } |
462 | 0 | l0_iters_[i]->Seek(internal_key); |
463 | 0 | } |
464 | | |
465 | 0 | if (l0_iters_[i]->status().IsTryAgain()) { |
466 | 0 | assert(!seek_after_async_io); |
467 | 0 | continue; |
468 | 0 | } else if (!l0_iters_[i]->status().ok()) { |
469 | 0 | immutable_status_ = l0_iters_[i]->status(); |
470 | 0 | } else if (l0_iters_[i]->Valid() && |
471 | 0 | !IsOverUpperBound(l0_iters_[i]->key())) { |
472 | 0 | immutable_min_heap_.push(l0_iters_[i]); |
473 | 0 | } else { |
474 | 0 | has_iter_trimmed_for_upper_bound_ = true; |
475 | 0 | DeleteIterator(l0_iters_[i]); |
476 | 0 | l0_iters_[i] = nullptr; |
477 | 0 | } |
478 | 0 | } |
479 | |
|
480 | 0 | for (int32_t level = 1; level < vstorage->num_levels(); ++level) { |
481 | 0 | const std::vector<FileMetaData*>& level_files = |
482 | 0 | vstorage->LevelFiles(level); |
483 | 0 | if (level_files.empty()) { |
484 | 0 | continue; |
485 | 0 | } |
486 | 0 | if (level_iters_[level - 1] == nullptr) { |
487 | 0 | continue; |
488 | 0 | } |
489 | | |
490 | 0 | if (seek_after_async_io) { |
491 | 0 | if (!level_iters_[level - 1]->status().IsTryAgain()) { |
492 | 0 | continue; |
493 | 0 | } |
494 | 0 | } |
495 | 0 | uint32_t f_idx = 0; |
496 | 0 | if (!seek_to_first && !seek_after_async_io) { |
497 | 0 | f_idx = FindFileInRange(level_files, internal_key, 0, |
498 | 0 | static_cast<uint32_t>(level_files.size())); |
499 | 0 | } |
500 | | |
501 | | // Seek |
502 | 0 | if (seek_after_async_io || f_idx < level_files.size()) { |
503 | 0 | if (!seek_after_async_io) { |
504 | 0 | level_iters_[level - 1]->SetFileIndex(f_idx); |
505 | 0 | } |
506 | 0 | seek_to_first ? level_iters_[level - 1]->SeekToFirst() |
507 | 0 | : level_iters_[level - 1]->Seek(internal_key); |
508 | |
|
509 | 0 | if (level_iters_[level - 1]->status().IsTryAgain()) { |
510 | 0 | assert(!seek_after_async_io); |
511 | 0 | continue; |
512 | 0 | } else if (!level_iters_[level - 1]->status().ok()) { |
513 | 0 | immutable_status_ = level_iters_[level - 1]->status(); |
514 | 0 | } else if (level_iters_[level - 1]->Valid() && |
515 | 0 | !IsOverUpperBound(level_iters_[level - 1]->key())) { |
516 | 0 | immutable_min_heap_.push(level_iters_[level - 1]); |
517 | 0 | } else { |
518 | | // Nothing in this level is interesting. Remove. |
519 | 0 | has_iter_trimmed_for_upper_bound_ = true; |
520 | 0 | DeleteIterator(level_iters_[level - 1]); |
521 | 0 | level_iters_[level - 1] = nullptr; |
522 | 0 | } |
523 | 0 | } |
524 | 0 | } |
525 | |
|
526 | 0 | if (seek_to_first) { |
527 | 0 | is_prev_set_ = false; |
528 | 0 | } else { |
529 | 0 | prev_key_.SetInternalKey(internal_key); |
530 | 0 | is_prev_set_ = true; |
531 | 0 | is_prev_inclusive_ = true; |
532 | 0 | } |
533 | |
|
534 | 0 | TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Immutable", this); |
535 | 0 | } else if (current_ && current_ != mutable_iter_) { |
536 | | // current_ is one of immutable iterators, push it back to the heap |
537 | 0 | immutable_min_heap_.push(current_); |
538 | 0 | } |
539 | | |
540 | | // For async_io, it should be updated when seek_after_async_io is true (in |
541 | | // second call). |
542 | 0 | if (seek_to_first || !read_options_.async_io || seek_after_async_io) { |
543 | 0 | UpdateCurrent(); |
544 | 0 | } |
545 | 0 | TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this); |
546 | 0 | } |
547 | | |
548 | 0 | void ForwardIterator::Next() { |
549 | 0 | assert(valid_); |
550 | 0 | bool update_prev_key = false; |
551 | |
|
552 | 0 | if (sv_ == nullptr || sv_->version_number != cfd_->GetSuperVersionNumber()) { |
553 | 0 | std::string current_key = key().ToString(); |
554 | 0 | Slice old_key(current_key.data(), current_key.size()); |
555 | |
|
556 | 0 | if (sv_ == nullptr) { |
557 | 0 | RebuildIterators(true); |
558 | 0 | } else { |
559 | 0 | RenewIterators(); |
560 | 0 | } |
561 | |
|
562 | 0 | SeekInternal(old_key, false, false); |
563 | 0 | if (read_options_.async_io) { |
564 | 0 | SeekInternal(old_key, false, true); |
565 | 0 | } |
566 | |
|
567 | 0 | if (!valid_ || key().compare(old_key) != 0) { |
568 | 0 | return; |
569 | 0 | } |
570 | 0 | } else if (current_ != mutable_iter_) { |
571 | | // It is going to advance immutable iterator |
572 | |
|
573 | 0 | if (is_prev_set_ && prefix_extractor_) { |
574 | | // advance prev_key_ to current_ only if they share the same prefix |
575 | 0 | update_prev_key = |
576 | 0 | prefix_extractor_->Transform(prev_key_.GetUserKey()) |
577 | 0 | .compare(prefix_extractor_->Transform(current_->key())) == 0; |
578 | 0 | } else { |
579 | 0 | update_prev_key = true; |
580 | 0 | } |
581 | |
|
582 | 0 | if (update_prev_key) { |
583 | 0 | prev_key_.SetInternalKey(current_->key()); |
584 | 0 | is_prev_set_ = true; |
585 | 0 | is_prev_inclusive_ = false; |
586 | 0 | } |
587 | 0 | } |
588 | | |
589 | 0 | current_->Next(); |
590 | 0 | if (current_ != mutable_iter_) { |
591 | 0 | if (!current_->status().ok()) { |
592 | 0 | immutable_status_ = current_->status(); |
593 | 0 | } else if ((current_->Valid()) && (!IsOverUpperBound(current_->key()))) { |
594 | 0 | immutable_min_heap_.push(current_); |
595 | 0 | } else { |
596 | 0 | if ((current_->Valid()) && (IsOverUpperBound(current_->key()))) { |
597 | | // remove the current iterator |
598 | 0 | DeleteCurrentIter(); |
599 | 0 | current_ = nullptr; |
600 | 0 | } |
601 | 0 | if (update_prev_key) { |
602 | 0 | mutable_iter_->Seek(prev_key_.GetInternalKey()); |
603 | 0 | } |
604 | 0 | } |
605 | 0 | } |
606 | 0 | UpdateCurrent(); |
607 | 0 | TEST_SYNC_POINT_CALLBACK("ForwardIterator::Next:Return", this); |
608 | 0 | } |
609 | | |
610 | 0 | Slice ForwardIterator::key() const { |
611 | 0 | assert(valid_); |
612 | 0 | return current_->key(); |
613 | 0 | } |
614 | | |
615 | 0 | uint64_t ForwardIterator::write_unix_time() const { |
616 | 0 | assert(valid_); |
617 | 0 | return current_->write_unix_time(); |
618 | 0 | } |
619 | | |
620 | 0 | Slice ForwardIterator::value() const { |
621 | 0 | assert(valid_); |
622 | 0 | return current_->value(); |
623 | 0 | } |
624 | | |
625 | 0 | Status ForwardIterator::status() const { |
626 | 0 | if (!status_.ok()) { |
627 | 0 | return status_; |
628 | 0 | } else if (!mutable_iter_->status().ok()) { |
629 | 0 | return mutable_iter_->status(); |
630 | 0 | } |
631 | | |
632 | 0 | return immutable_status_; |
633 | 0 | } |
634 | | |
635 | 0 | bool ForwardIterator::PrepareValue() { |
636 | 0 | assert(valid_); |
637 | 0 | if (current_->PrepareValue()) { |
638 | 0 | return true; |
639 | 0 | } |
640 | | |
641 | 0 | assert(!current_->Valid()); |
642 | 0 | assert(!current_->status().ok()); |
643 | 0 | assert(current_ != mutable_iter_); // memtable iterator can't fail |
644 | 0 | assert(immutable_status_.ok()); |
645 | |
|
646 | 0 | valid_ = false; |
647 | 0 | immutable_status_ = current_->status(); |
648 | 0 | return false; |
649 | 0 | } |
650 | | |
651 | 0 | Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) { |
652 | 0 | assert(prop != nullptr); |
653 | 0 | if (prop_name == "rocksdb.iterator.super-version-number") { |
654 | 0 | *prop = std::to_string(sv_->version_number); |
655 | 0 | return Status::OK(); |
656 | 0 | } |
657 | 0 | return Status::InvalidArgument("Unrecognized property: " + prop_name); |
658 | 0 | } |
659 | | |
660 | | void ForwardIterator::SetPinnedItersMgr( |
661 | 0 | PinnedIteratorsManager* pinned_iters_mgr) { |
662 | 0 | pinned_iters_mgr_ = pinned_iters_mgr; |
663 | 0 | UpdateChildrenPinnedItersMgr(); |
664 | 0 | } |
665 | | |
666 | 0 | void ForwardIterator::UpdateChildrenPinnedItersMgr() { |
667 | | // Set PinnedIteratorsManager for mutable memtable iterator. |
668 | 0 | if (mutable_iter_) { |
669 | 0 | mutable_iter_->SetPinnedItersMgr(pinned_iters_mgr_); |
670 | 0 | } |
671 | | |
672 | | // Set PinnedIteratorsManager for immutable memtable iterators. |
673 | 0 | for (InternalIterator* child_iter : imm_iters_) { |
674 | 0 | if (child_iter) { |
675 | 0 | child_iter->SetPinnedItersMgr(pinned_iters_mgr_); |
676 | 0 | } |
677 | 0 | } |
678 | | |
679 | | // Set PinnedIteratorsManager for L0 files iterators. |
680 | 0 | for (InternalIterator* child_iter : l0_iters_) { |
681 | 0 | if (child_iter) { |
682 | 0 | child_iter->SetPinnedItersMgr(pinned_iters_mgr_); |
683 | 0 | } |
684 | 0 | } |
685 | | |
686 | | // Set PinnedIteratorsManager for L1+ levels iterators. |
687 | 0 | for (ForwardLevelIterator* child_iter : level_iters_) { |
688 | 0 | if (child_iter) { |
689 | 0 | child_iter->SetPinnedItersMgr(pinned_iters_mgr_); |
690 | 0 | } |
691 | 0 | } |
692 | 0 | } |
693 | | |
694 | 0 | bool ForwardIterator::IsKeyPinned() const { |
695 | 0 | return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && |
696 | 0 | current_->IsKeyPinned(); |
697 | 0 | } |
698 | | |
699 | 0 | bool ForwardIterator::IsValuePinned() const { |
700 | 0 | return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && |
701 | 0 | current_->IsValuePinned(); |
702 | 0 | } |
703 | | |
704 | 0 | void ForwardIterator::RebuildIterators(bool refresh_sv) { |
705 | | // Clean up |
706 | 0 | Cleanup(refresh_sv); |
707 | 0 | if (refresh_sv) { |
708 | | // New |
709 | 0 | sv_ = cfd_->GetReferencedSuperVersion(db_); |
710 | 0 | } |
711 | 0 | ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(), |
712 | 0 | kMaxSequenceNumber /* upper_bound */); |
713 | 0 | UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping = |
714 | 0 | sv_->GetSeqnoToTimeMapping(); |
715 | 0 | mutable_iter_ = |
716 | 0 | sv_->mem->NewIterator(read_options_, seqno_to_time_mapping, &arena_, |
717 | 0 | sv_->mutable_cf_options.prefix_extractor.get(), |
718 | 0 | /*for_flush=*/false); |
719 | 0 | sv_->imm->AddIterators(read_options_, seqno_to_time_mapping, |
720 | 0 | sv_->mutable_cf_options.prefix_extractor.get(), |
721 | 0 | &imm_iters_, &arena_); |
722 | 0 | if (!read_options_.ignore_range_deletions) { |
723 | 0 | std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter( |
724 | 0 | sv_->mem->NewRangeTombstoneIterator( |
725 | 0 | read_options_, sv_->current->version_set()->LastSequence(), |
726 | 0 | false /* immutable_memtable */)); |
727 | 0 | range_del_agg.AddTombstones(std::move(range_del_iter)); |
728 | | // Always return Status::OK(). |
729 | 0 | Status temp_s = sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_, |
730 | 0 | &range_del_agg); |
731 | 0 | assert(temp_s.ok()); |
732 | 0 | } |
733 | 0 | has_iter_trimmed_for_upper_bound_ = false; |
734 | |
|
735 | 0 | const auto* vstorage = sv_->current->storage_info(); |
736 | 0 | const auto& l0_files = vstorage->LevelFiles(0); |
737 | 0 | l0_iters_.reserve(l0_files.size()); |
738 | 0 | for (const auto* l0 : l0_files) { |
739 | 0 | if ((read_options_.iterate_upper_bound != nullptr) && |
740 | 0 | cfd_->internal_comparator().user_comparator()->Compare( |
741 | 0 | l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) { |
742 | | // No need to set has_iter_trimmed_for_upper_bound_: this ForwardIterator |
743 | | // will never be interested in files with smallest key above |
744 | | // iterate_upper_bound, since iterate_upper_bound can't be changed. |
745 | 0 | l0_iters_.push_back(nullptr); |
746 | 0 | continue; |
747 | 0 | } |
748 | 0 | l0_iters_.push_back(cfd_->table_cache()->NewIterator( |
749 | 0 | read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0, |
750 | 0 | read_options_.ignore_range_deletions ? nullptr : &range_del_agg, |
751 | 0 | sv_->mutable_cf_options, |
752 | 0 | /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr, |
753 | 0 | TableReaderCaller::kUserIterator, /*arena=*/nullptr, |
754 | 0 | /*skip_filters=*/false, /*level=*/-1, |
755 | 0 | MaxFileSizeForL0MetaPin(sv_->mutable_cf_options), |
756 | 0 | /*smallest_compaction_key=*/nullptr, |
757 | 0 | /*largest_compaction_key=*/nullptr, allow_unprepared_value_)); |
758 | 0 | } |
759 | 0 | BuildLevelIterators(vstorage, sv_); |
760 | 0 | current_ = nullptr; |
761 | 0 | is_prev_set_ = false; |
762 | |
|
763 | 0 | UpdateChildrenPinnedItersMgr(); |
764 | 0 | if (!range_del_agg.IsEmpty()) { |
765 | 0 | status_ = Status::NotSupported( |
766 | 0 | "Range tombstones unsupported with ForwardIterator"); |
767 | 0 | valid_ = false; |
768 | 0 | } |
769 | 0 | } |
770 | | |
771 | 0 | void ForwardIterator::RenewIterators() { |
772 | 0 | SuperVersion* svnew; |
773 | 0 | assert(sv_); |
774 | 0 | svnew = cfd_->GetReferencedSuperVersion(db_); |
775 | |
|
776 | 0 | if (mutable_iter_ != nullptr) { |
777 | 0 | DeleteIterator(mutable_iter_, true /* is_arena */); |
778 | 0 | } |
779 | 0 | for (auto* m : imm_iters_) { |
780 | 0 | DeleteIterator(m, true /* is_arena */); |
781 | 0 | } |
782 | 0 | imm_iters_.clear(); |
783 | |
|
784 | 0 | UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping = |
785 | 0 | svnew->GetSeqnoToTimeMapping(); |
786 | 0 | mutable_iter_ = |
787 | 0 | svnew->mem->NewIterator(read_options_, seqno_to_time_mapping, &arena_, |
788 | 0 | svnew->mutable_cf_options.prefix_extractor.get(), |
789 | 0 | /*for_flush=*/false); |
790 | 0 | svnew->imm->AddIterators(read_options_, seqno_to_time_mapping, |
791 | 0 | svnew->mutable_cf_options.prefix_extractor.get(), |
792 | 0 | &imm_iters_, &arena_); |
793 | 0 | ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(), |
794 | 0 | kMaxSequenceNumber /* upper_bound */); |
795 | 0 | if (!read_options_.ignore_range_deletions) { |
796 | 0 | std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter( |
797 | 0 | svnew->mem->NewRangeTombstoneIterator( |
798 | 0 | read_options_, sv_->current->version_set()->LastSequence(), |
799 | 0 | false /* immutable_memtable */)); |
800 | 0 | range_del_agg.AddTombstones(std::move(range_del_iter)); |
801 | | // Always return Status::OK(). |
802 | 0 | Status temp_s = svnew->imm->AddRangeTombstoneIterators( |
803 | 0 | read_options_, &arena_, &range_del_agg); |
804 | 0 | assert(temp_s.ok()); |
805 | 0 | } |
806 | |
|
807 | 0 | const auto* vstorage = sv_->current->storage_info(); |
808 | 0 | const auto& l0_files = vstorage->LevelFiles(0); |
809 | 0 | const auto* vstorage_new = svnew->current->storage_info(); |
810 | 0 | const auto& l0_files_new = vstorage_new->LevelFiles(0); |
811 | 0 | size_t iold, inew; |
812 | 0 | bool found; |
813 | 0 | std::vector<InternalIterator*> l0_iters_new; |
814 | 0 | l0_iters_new.reserve(l0_files_new.size()); |
815 | |
|
816 | 0 | for (inew = 0; inew < l0_files_new.size(); inew++) { |
817 | 0 | found = false; |
818 | 0 | for (iold = 0; iold < l0_files.size(); iold++) { |
819 | 0 | if (l0_files[iold] == l0_files_new[inew]) { |
820 | 0 | found = true; |
821 | 0 | break; |
822 | 0 | } |
823 | 0 | } |
824 | 0 | if (found) { |
825 | 0 | if (l0_iters_[iold] == nullptr) { |
826 | 0 | l0_iters_new.push_back(nullptr); |
827 | 0 | TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Null", this); |
828 | 0 | } else { |
829 | 0 | l0_iters_new.push_back(l0_iters_[iold]); |
830 | 0 | l0_iters_[iold] = nullptr; |
831 | 0 | TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Copy", this); |
832 | 0 | } |
833 | 0 | continue; |
834 | 0 | } |
835 | 0 | l0_iters_new.push_back(cfd_->table_cache()->NewIterator( |
836 | 0 | read_options_, *cfd_->soptions(), cfd_->internal_comparator(), |
837 | 0 | *l0_files_new[inew], |
838 | 0 | read_options_.ignore_range_deletions ? nullptr : &range_del_agg, |
839 | 0 | svnew->mutable_cf_options, |
840 | 0 | /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr, |
841 | 0 | TableReaderCaller::kUserIterator, /*arena=*/nullptr, |
842 | 0 | /*skip_filters=*/false, /*level=*/-1, |
843 | 0 | MaxFileSizeForL0MetaPin(svnew->mutable_cf_options), |
844 | 0 | /*smallest_compaction_key=*/nullptr, |
845 | 0 | /*largest_compaction_key=*/nullptr, allow_unprepared_value_)); |
846 | 0 | } |
847 | |
|
848 | 0 | for (auto* f : l0_iters_) { |
849 | 0 | DeleteIterator(f); |
850 | 0 | } |
851 | 0 | l0_iters_.clear(); |
852 | 0 | l0_iters_ = l0_iters_new; |
853 | |
|
854 | 0 | for (auto* l : level_iters_) { |
855 | 0 | DeleteIterator(l); |
856 | 0 | } |
857 | 0 | level_iters_.clear(); |
858 | 0 | BuildLevelIterators(vstorage_new, svnew); |
859 | 0 | current_ = nullptr; |
860 | 0 | is_prev_set_ = false; |
861 | 0 | SVCleanup(); |
862 | 0 | sv_ = svnew; |
863 | |
|
864 | 0 | UpdateChildrenPinnedItersMgr(); |
865 | 0 | if (!range_del_agg.IsEmpty()) { |
866 | 0 | status_ = Status::NotSupported( |
867 | 0 | "Range tombstones unsupported with ForwardIterator"); |
868 | 0 | valid_ = false; |
869 | 0 | } |
870 | 0 | } |
871 | | |
872 | | void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage, |
873 | 0 | SuperVersion* sv) { |
874 | 0 | level_iters_.reserve(vstorage->num_levels() - 1); |
875 | 0 | for (int32_t level = 1; level < vstorage->num_levels(); ++level) { |
876 | 0 | const auto& level_files = vstorage->LevelFiles(level); |
877 | 0 | if ((level_files.empty()) || |
878 | 0 | ((read_options_.iterate_upper_bound != nullptr) && |
879 | 0 | (user_comparator_->Compare(*read_options_.iterate_upper_bound, |
880 | 0 | level_files[0]->smallest.user_key()) < |
881 | 0 | 0))) { |
882 | 0 | level_iters_.push_back(nullptr); |
883 | 0 | if (!level_files.empty()) { |
884 | 0 | has_iter_trimmed_for_upper_bound_ = true; |
885 | 0 | } |
886 | 0 | } else { |
887 | 0 | level_iters_.push_back(new ForwardLevelIterator( |
888 | 0 | cfd_, read_options_, level_files, sv->mutable_cf_options, |
889 | 0 | allow_unprepared_value_)); |
890 | 0 | } |
891 | 0 | } |
892 | 0 | } |
893 | | |
894 | 0 | void ForwardIterator::ResetIncompleteIterators() { |
895 | 0 | const auto& l0_files = sv_->current->storage_info()->LevelFiles(0); |
896 | 0 | for (size_t i = 0; i < l0_iters_.size(); ++i) { |
897 | 0 | assert(i < l0_files.size()); |
898 | 0 | if (!l0_iters_[i] || !l0_iters_[i]->status().IsIncomplete()) { |
899 | 0 | continue; |
900 | 0 | } |
901 | 0 | DeleteIterator(l0_iters_[i]); |
902 | 0 | l0_iters_[i] = cfd_->table_cache()->NewIterator( |
903 | 0 | read_options_, *cfd_->soptions(), cfd_->internal_comparator(), |
904 | 0 | *l0_files[i], /*range_del_agg=*/nullptr, sv_->mutable_cf_options, |
905 | 0 | /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr, |
906 | 0 | TableReaderCaller::kUserIterator, /*arena=*/nullptr, |
907 | 0 | /*skip_filters=*/false, /*level=*/-1, |
908 | 0 | MaxFileSizeForL0MetaPin(sv_->mutable_cf_options), |
909 | 0 | /*smallest_compaction_key=*/nullptr, |
910 | 0 | /*largest_compaction_key=*/nullptr, allow_unprepared_value_); |
911 | 0 | l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_); |
912 | 0 | } |
913 | |
|
914 | 0 | for (auto* level_iter : level_iters_) { |
915 | 0 | if (level_iter && level_iter->status().IsIncomplete()) { |
916 | 0 | level_iter->Reset(); |
917 | 0 | } |
918 | 0 | } |
919 | |
|
920 | 0 | current_ = nullptr; |
921 | 0 | is_prev_set_ = false; |
922 | 0 | } |
923 | | |
924 | 0 | void ForwardIterator::UpdateCurrent() { |
925 | 0 | if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) { |
926 | 0 | current_ = nullptr; |
927 | 0 | } else if (immutable_min_heap_.empty()) { |
928 | 0 | current_ = mutable_iter_; |
929 | 0 | } else if (!mutable_iter_->Valid()) { |
930 | 0 | current_ = immutable_min_heap_.top(); |
931 | 0 | immutable_min_heap_.pop(); |
932 | 0 | } else { |
933 | 0 | current_ = immutable_min_heap_.top(); |
934 | 0 | assert(current_ != nullptr); |
935 | 0 | assert(current_->Valid()); |
936 | 0 | int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare( |
937 | 0 | mutable_iter_->key(), current_->key()); |
938 | 0 | assert(cmp != 0); |
939 | 0 | if (cmp > 0) { |
940 | 0 | immutable_min_heap_.pop(); |
941 | 0 | } else { |
942 | 0 | current_ = mutable_iter_; |
943 | 0 | } |
944 | 0 | } |
945 | 0 | valid_ = current_ != nullptr && immutable_status_.ok(); |
946 | 0 | if (!status_.ok()) { |
947 | 0 | status_ = Status::OK(); |
948 | 0 | } |
949 | | |
950 | | // Upper bound doesn't apply to the memtable iterator. We want Valid() to |
951 | | // return false when all iterators are over iterate_upper_bound, but can't |
952 | | // just set valid_ to false, as that would effectively disable the tailing |
953 | | // optimization (Seek() would be called on all immutable iterators regardless |
954 | | // of whether the target key is greater than prev_key_). |
955 | 0 | current_over_upper_bound_ = valid_ && IsOverUpperBound(current_->key()); |
956 | 0 | } |
957 | | |
958 | 0 | bool ForwardIterator::NeedToSeekImmutable(const Slice& target) { |
959 | | // We maintain the interval (prev_key_, immutable_min_heap_.top()->key()) |
960 | | // such that there are no records with keys within that range in |
961 | | // immutable_min_heap_. Since immutable structures (SST files and immutable |
962 | | // memtables) can't change in this version, we don't need to do a seek if |
963 | | // 'target' belongs to that interval (immutable_min_heap_.top() is already |
964 | | // at the correct position). |
965 | |
|
966 | 0 | if (!valid_ || !current_ || !is_prev_set_ || !immutable_status_.ok()) { |
967 | 0 | return true; |
968 | 0 | } |
969 | 0 | Slice prev_key = prev_key_.GetInternalKey(); |
970 | 0 | if (prefix_extractor_ && prefix_extractor_->Transform(target).compare( |
971 | 0 | prefix_extractor_->Transform(prev_key)) != 0) { |
972 | 0 | return true; |
973 | 0 | } |
974 | 0 | if (cfd_->internal_comparator().InternalKeyComparator::Compare( |
975 | 0 | prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) { |
976 | 0 | return true; |
977 | 0 | } |
978 | | |
979 | 0 | if (immutable_min_heap_.empty() && current_ == mutable_iter_) { |
980 | | // Nothing to seek on. |
981 | 0 | return false; |
982 | 0 | } |
983 | 0 | if (cfd_->internal_comparator().InternalKeyComparator::Compare( |
984 | 0 | target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key() |
985 | 0 | : current_->key()) > 0) { |
986 | 0 | return true; |
987 | 0 | } |
988 | 0 | return false; |
989 | 0 | } |
990 | | |
991 | 0 | void ForwardIterator::DeleteCurrentIter() { |
992 | 0 | const VersionStorageInfo* vstorage = sv_->current->storage_info(); |
993 | 0 | const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0); |
994 | 0 | for (size_t i = 0; i < l0.size(); ++i) { |
995 | 0 | if (!l0_iters_[i]) { |
996 | 0 | continue; |
997 | 0 | } |
998 | 0 | if (l0_iters_[i] == current_) { |
999 | 0 | has_iter_trimmed_for_upper_bound_ = true; |
1000 | 0 | DeleteIterator(l0_iters_[i]); |
1001 | 0 | l0_iters_[i] = nullptr; |
1002 | 0 | return; |
1003 | 0 | } |
1004 | 0 | } |
1005 | | |
1006 | 0 | for (int32_t level = 1; level < vstorage->num_levels(); ++level) { |
1007 | 0 | if (level_iters_[level - 1] == nullptr) { |
1008 | 0 | continue; |
1009 | 0 | } |
1010 | 0 | if (level_iters_[level - 1] == current_) { |
1011 | 0 | has_iter_trimmed_for_upper_bound_ = true; |
1012 | 0 | DeleteIterator(level_iters_[level - 1]); |
1013 | 0 | level_iters_[level - 1] = nullptr; |
1014 | 0 | } |
1015 | 0 | } |
1016 | 0 | } |
1017 | | |
1018 | | bool ForwardIterator::TEST_CheckDeletedIters(int* pdeleted_iters, |
1019 | 0 | int* pnum_iters) { |
1020 | 0 | bool retval = false; |
1021 | 0 | int deleted_iters = 0; |
1022 | 0 | int num_iters = 0; |
1023 | |
|
1024 | 0 | const VersionStorageInfo* vstorage = sv_->current->storage_info(); |
1025 | 0 | const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0); |
1026 | 0 | for (size_t i = 0; i < l0.size(); ++i) { |
1027 | 0 | if (!l0_iters_[i]) { |
1028 | 0 | retval = true; |
1029 | 0 | deleted_iters++; |
1030 | 0 | } else { |
1031 | 0 | num_iters++; |
1032 | 0 | } |
1033 | 0 | } |
1034 | |
|
1035 | 0 | for (int32_t level = 1; level < vstorage->num_levels(); ++level) { |
1036 | 0 | if ((level_iters_[level - 1] == nullptr) && |
1037 | 0 | (!vstorage->LevelFiles(level).empty())) { |
1038 | 0 | retval = true; |
1039 | 0 | deleted_iters++; |
1040 | 0 | } else if (!vstorage->LevelFiles(level).empty()) { |
1041 | 0 | num_iters++; |
1042 | 0 | } |
1043 | 0 | } |
1044 | 0 | if ((!retval) && num_iters <= 1) { |
1045 | 0 | retval = true; |
1046 | 0 | } |
1047 | 0 | if (pdeleted_iters) { |
1048 | 0 | *pdeleted_iters = deleted_iters; |
1049 | 0 | } |
1050 | 0 | if (pnum_iters) { |
1051 | 0 | *pnum_iters = num_iters; |
1052 | 0 | } |
1053 | 0 | return retval; |
1054 | 0 | } |
1055 | | |
1056 | | uint32_t ForwardIterator::FindFileInRange( |
1057 | | const std::vector<FileMetaData*>& files, const Slice& internal_key, |
1058 | 0 | uint32_t left, uint32_t right) { |
1059 | 0 | auto cmp = [&](const FileMetaData* f, const Slice& k) -> bool { |
1060 | 0 | return cfd_->internal_comparator().InternalKeyComparator::Compare( |
1061 | 0 | f->largest.Encode(), k) < 0; |
1062 | 0 | }; |
1063 | 0 | const auto& b = files.begin(); |
1064 | 0 | return static_cast<uint32_t>( |
1065 | 0 | std::lower_bound(b + left, b + right, internal_key, cmp) - b); |
1066 | 0 | } |
1067 | | |
1068 | 0 | void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) { |
1069 | 0 | if (iter == nullptr) { |
1070 | 0 | return; |
1071 | 0 | } |
1072 | | |
1073 | 0 | if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { |
1074 | 0 | pinned_iters_mgr_->PinIterator(iter, is_arena); |
1075 | 0 | } else { |
1076 | 0 | if (is_arena) { |
1077 | 0 | iter->~InternalIterator(); |
1078 | 0 | } else { |
1079 | 0 | delete iter; |
1080 | 0 | } |
1081 | 0 | } |
1082 | 0 | } |
1083 | | |
1084 | | } // namespace ROCKSDB_NAMESPACE |