Coverage Report

Created: 2026-04-10 07:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/forward_iterator.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "db/forward_iterator.h"
7
8
#include <limits>
9
#include <string>
10
#include <utility>
11
12
#include "db/column_family.h"
13
#include "db/db_impl/db_impl.h"
14
#include "db/db_iter.h"
15
#include "db/dbformat.h"
16
#include "db/job_context.h"
17
#include "db/range_del_aggregator.h"
18
#include "db/range_tombstone_fragmenter.h"
19
#include "rocksdb/env.h"
20
#include "rocksdb/slice.h"
21
#include "rocksdb/slice_transform.h"
22
#include "table/merging_iterator.h"
23
#include "test_util/sync_point.h"
24
#include "util/string_util.h"
25
26
namespace ROCKSDB_NAMESPACE {
27
28
// Usage:
29
//     ForwardLevelIterator iter;
30
//     iter.SetFileIndex(file_index);
31
//     iter.Seek(target); // or iter.SeekToFirst();
32
//     iter.Next()
33
class ForwardLevelIterator : public InternalIterator {
34
 public:
35
  ForwardLevelIterator(const ColumnFamilyData* const cfd,
36
                       const ReadOptions& read_options,
37
                       const std::vector<FileMetaData*>& files,
38
                       const MutableCFOptions& mutable_cf_options,
39
                       bool allow_unprepared_value)
40
0
      : cfd_(cfd),
41
0
        read_options_(read_options),
42
0
        files_(files),
43
0
        valid_(false),
44
0
        file_index_(std::numeric_limits<uint32_t>::max()),
45
0
        file_iter_(nullptr),
46
0
        pinned_iters_mgr_(nullptr),
47
0
        mutable_cf_options_(mutable_cf_options),
48
0
        allow_unprepared_value_(allow_unprepared_value) {
49
0
    status_.PermitUncheckedError();  // Allow uninitialized status through
50
0
  }
51
52
0
  ~ForwardLevelIterator() override {
53
    // Reset current pointer
54
0
    if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
55
0
      pinned_iters_mgr_->PinIterator(file_iter_);
56
0
    } else {
57
0
      delete file_iter_;
58
0
    }
59
0
  }
60
61
0
  void SetFileIndex(uint32_t file_index) {
62
0
    assert(file_index < files_.size());
63
0
    status_ = Status::OK();
64
0
    if (file_index != file_index_) {
65
0
      file_index_ = file_index;
66
0
      Reset();
67
0
    }
68
0
  }
69
0
  void Reset() {
70
0
    assert(file_index_ < files_.size());
71
72
    // Reset current pointer
73
0
    if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
74
0
      pinned_iters_mgr_->PinIterator(file_iter_);
75
0
    } else {
76
0
      delete file_iter_;
77
0
    }
78
79
0
    ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
80
0
                                         kMaxSequenceNumber /* upper_bound */);
81
0
    file_iter_ = cfd_->table_cache()->NewIterator(
82
0
        read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
83
0
        *files_[file_index_],
84
0
        read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
85
0
        mutable_cf_options_, /*table_reader_ptr=*/nullptr,
86
0
        /*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
87
0
        /*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
88
0
        /*max_file_size_for_l0_meta_pin=*/0,
89
0
        /*smallest_compaction_key=*/nullptr,
90
0
        /*largest_compaction_key=*/nullptr, allow_unprepared_value_);
91
0
    file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
92
0
    valid_ = false;
93
0
    if (!range_del_agg.IsEmpty()) {
94
0
      status_ = Status::NotSupported(
95
0
          "Range tombstones unsupported with ForwardIterator");
96
0
    }
97
0
  }
98
0
  void SeekToLast() override {
99
0
    status_ = Status::NotSupported("ForwardLevelIterator::SeekToLast()");
100
0
    valid_ = false;
101
0
  }
102
0
  void Prev() override {
103
0
    status_ = Status::NotSupported("ForwardLevelIterator::Prev()");
104
0
    valid_ = false;
105
0
  }
106
0
  bool Valid() const override { return valid_; }
107
0
  void SeekToFirst() override {
108
0
    assert(file_iter_ != nullptr);
109
0
    if (!status_.ok()) {
110
0
      assert(!valid_);
111
0
      return;
112
0
    }
113
0
    file_iter_->SeekToFirst();
114
0
    valid_ = file_iter_->Valid();
115
0
  }
116
0
  void Seek(const Slice& internal_key) override {
117
0
    assert(file_iter_ != nullptr);
118
119
    // This deviates from the usual convention for InternalIterator::Seek() in
120
    // that it doesn't discard pre-existing error status. That's because this
121
    // Seek() is only supposed to be called immediately after SetFileIndex()
122
    // (which discards pre-existing error status), and SetFileIndex() may set
123
    // an error status, which we shouldn't discard.
124
0
    if (!status_.ok()) {
125
0
      assert(!valid_);
126
0
      return;
127
0
    }
128
129
0
    file_iter_->Seek(internal_key);
130
0
    valid_ = file_iter_->Valid();
131
0
  }
132
0
  void SeekForPrev(const Slice& /*internal_key*/) override {
133
0
    status_ = Status::NotSupported("ForwardLevelIterator::SeekForPrev()");
134
0
    valid_ = false;
135
0
  }
136
0
  void Next() override {
137
0
    assert(valid_);
138
0
    file_iter_->Next();
139
0
    for (;;) {
140
0
      valid_ = file_iter_->Valid();
141
0
      if (!file_iter_->status().ok()) {
142
0
        assert(!valid_);
143
0
        return;
144
0
      }
145
0
      if (valid_) {
146
0
        return;
147
0
      }
148
0
      if (file_index_ + 1 >= files_.size()) {
149
0
        valid_ = false;
150
0
        return;
151
0
      }
152
0
      SetFileIndex(file_index_ + 1);
153
0
      if (!status_.ok()) {
154
0
        assert(!valid_);
155
0
        return;
156
0
      }
157
0
      file_iter_->SeekToFirst();
158
0
    }
159
0
  }
160
0
  Slice key() const override {
161
0
    assert(valid_);
162
0
    return file_iter_->key();
163
0
  }
164
0
  Slice value() const override {
165
0
    assert(valid_);
166
0
    return file_iter_->value();
167
0
  }
168
0
  uint64_t write_unix_time() const override {
169
0
    assert(valid_);
170
0
    return file_iter_->write_unix_time();
171
0
  }
172
0
  Status status() const override {
173
0
    if (!status_.ok()) {
174
0
      return status_;
175
0
    } else if (file_iter_) {
176
0
      return file_iter_->status();
177
0
    }
178
0
    return Status::OK();
179
0
  }
180
0
  bool PrepareValue() override {
181
0
    assert(valid_);
182
0
    if (file_iter_->PrepareValue()) {
183
0
      return true;
184
0
    }
185
186
0
    assert(!file_iter_->Valid());
187
0
    valid_ = false;
188
0
    return false;
189
0
  }
190
0
  bool IsKeyPinned() const override {
191
0
    return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
192
0
           file_iter_->IsKeyPinned();
193
0
  }
194
0
  bool IsValuePinned() const override {
195
0
    return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
196
0
           file_iter_->IsValuePinned();
197
0
  }
198
0
  void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
199
0
    pinned_iters_mgr_ = pinned_iters_mgr;
200
0
    if (file_iter_) {
201
0
      file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
202
0
    }
203
0
  }
204
205
 private:
206
  const ColumnFamilyData* const cfd_;
207
  const ReadOptions& read_options_;
208
  const std::vector<FileMetaData*>& files_;
209
210
  bool valid_;
211
  uint32_t file_index_;
212
  Status status_;
213
  InternalIterator* file_iter_;
214
  PinnedIteratorsManager* pinned_iters_mgr_;
215
  const MutableCFOptions& mutable_cf_options_;
216
217
  const bool allow_unprepared_value_;
218
};
219
220
ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
221
                                 ColumnFamilyData* cfd,
222
                                 SuperVersion* current_sv,
223
                                 bool allow_unprepared_value)
224
0
    : db_(db),
225
0
      read_options_(read_options),
226
0
      cfd_(cfd),
227
0
      prefix_extractor_(current_sv->mutable_cf_options.prefix_extractor.get()),
228
0
      user_comparator_(cfd->user_comparator()),
229
0
      allow_unprepared_value_(allow_unprepared_value),
230
0
      immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())),
231
0
      sv_(current_sv),
232
0
      mutable_iter_(nullptr),
233
0
      current_(nullptr),
234
0
      valid_(false),
235
0
      status_(Status::OK()),
236
0
      immutable_status_(Status::OK()),
237
0
      has_iter_trimmed_for_upper_bound_(false),
238
0
      current_over_upper_bound_(false),
239
0
      is_prev_set_(false),
240
0
      is_prev_inclusive_(false),
241
0
      pinned_iters_mgr_(nullptr) {
242
0
  if (sv_) {
243
0
    RebuildIterators(false);
244
0
  }
245
0
  if (!CheckFSFeatureSupport(cfd_->ioptions().env->GetFileSystem().get(),
246
0
                             FSSupportedOps::kAsyncIO)) {
247
0
    read_options_.async_io = false;
248
0
  }
249
  // immutable_status_ is a local aggregation of the
250
  // status of the immutable Iterators.
251
  // We have to PermitUncheckedError in case it is never
252
  // used, otherwise it will fail ASSERT_STATUS_CHECKED.
253
0
  immutable_status_.PermitUncheckedError();
254
0
}
255
256
0
ForwardIterator::~ForwardIterator() { Cleanup(true); }
257
258
void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
259
0
                                bool background_purge_on_iterator_cleanup) {
260
0
  if (sv->Unref()) {
261
    // Job id == 0 means that this is not our background process, but rather
262
    // user thread
263
0
    JobContext job_context(0);
264
0
    db->mutex_.Lock();
265
0
    sv->Cleanup();
266
0
    db->FindObsoleteFiles(&job_context, false, true);
267
0
    if (background_purge_on_iterator_cleanup) {
268
0
      db->ScheduleBgLogWriterClose(&job_context);
269
0
      db->AddSuperVersionsToFreeQueue(sv);
270
0
      db->SchedulePurge();
271
0
    }
272
0
    db->mutex_.Unlock();
273
0
    if (!background_purge_on_iterator_cleanup) {
274
0
      delete sv;
275
0
    }
276
0
    if (job_context.HaveSomethingToDelete()) {
277
0
      db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
278
0
    }
279
0
    job_context.Clean();
280
0
  }
281
0
}
282
283
namespace {
284
struct SVCleanupParams {
285
  DBImpl* db;
286
  SuperVersion* sv;
287
  bool background_purge_on_iterator_cleanup;
288
};
289
}  // anonymous namespace
290
291
// Used in PinnedIteratorsManager to release pinned SuperVersion
292
0
void ForwardIterator::DeferredSVCleanup(void* arg) {
293
0
  auto d = static_cast<SVCleanupParams*>(arg);
294
0
  ForwardIterator::SVCleanup(d->db, d->sv,
295
0
                             d->background_purge_on_iterator_cleanup);
296
0
  delete d;
297
0
}
298
299
0
void ForwardIterator::SVCleanup() {
300
0
  if (sv_ == nullptr) {
301
0
    return;
302
0
  }
303
0
  bool background_purge =
304
0
      read_options_.background_purge_on_iterator_cleanup ||
305
0
      db_->immutable_db_options().avoid_unnecessary_blocking_io;
306
0
  if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
307
    // pinned_iters_mgr_ tells us to make sure that all visited key-value slices
308
    // are alive until pinned_iters_mgr_->ReleasePinnedData() is called.
309
    // The slices may point into some memtables owned by sv_, so we need to keep
310
    // sv_ referenced until pinned_iters_mgr_ unpins everything.
311
0
    auto p = new SVCleanupParams{db_, sv_, background_purge};
312
0
    pinned_iters_mgr_->PinPtr(p, &ForwardIterator::DeferredSVCleanup);
313
0
  } else {
314
0
    SVCleanup(db_, sv_, background_purge);
315
0
  }
316
0
}
317
318
0
void ForwardIterator::Cleanup(bool release_sv) {
319
0
  if (mutable_iter_ != nullptr) {
320
0
    DeleteIterator(mutable_iter_, true /* is_arena */);
321
0
  }
322
323
0
  for (auto* m : imm_iters_) {
324
0
    DeleteIterator(m, true /* is_arena */);
325
0
  }
326
0
  imm_iters_.clear();
327
328
0
  for (auto* f : l0_iters_) {
329
0
    DeleteIterator(f);
330
0
  }
331
0
  l0_iters_.clear();
332
333
0
  for (auto* l : level_iters_) {
334
0
    DeleteIterator(l);
335
0
  }
336
0
  level_iters_.clear();
337
338
0
  if (release_sv) {
339
0
    SVCleanup();
340
0
  }
341
0
}
342
343
0
bool ForwardIterator::Valid() const {
344
  // See UpdateCurrent().
345
0
  return valid_ ? !current_over_upper_bound_ : false;
346
0
}
347
348
0
void ForwardIterator::SeekToFirst() {
349
0
  if (sv_ == nullptr) {
350
0
    RebuildIterators(true);
351
0
  } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
352
0
    RenewIterators();
353
0
  } else if (immutable_status_.IsIncomplete()) {
354
0
    ResetIncompleteIterators();
355
0
  }
356
0
  SeekInternal(Slice(), true, false);
357
0
}
358
359
0
bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const {
360
0
  return !(read_options_.iterate_upper_bound == nullptr ||
361
0
           cfd_->internal_comparator().user_comparator()->Compare(
362
0
               ExtractUserKey(internal_key),
363
0
               *read_options_.iterate_upper_bound) < 0);
364
0
}
365
366
0
void ForwardIterator::Seek(const Slice& internal_key) {
367
0
  if (sv_ == nullptr) {
368
0
    RebuildIterators(true);
369
0
  } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
370
0
    RenewIterators();
371
0
  } else if (immutable_status_.IsIncomplete()) {
372
0
    ResetIncompleteIterators();
373
0
  }
374
375
0
  SeekInternal(internal_key, false, false);
376
0
  if (read_options_.async_io) {
377
0
    SeekInternal(internal_key, false, true);
378
0
  }
379
0
}
380
381
// In case of async_io, SeekInternal is called twice with seek_after_async_io
382
// enabled in second call which only does seeking part to retrieve the blocks.
383
void ForwardIterator::SeekInternal(const Slice& internal_key,
384
                                   bool seek_to_first,
385
0
                                   bool seek_after_async_io) {
386
0
  assert(mutable_iter_);
387
  // mutable
388
0
  if (!seek_after_async_io) {
389
0
    seek_to_first ? mutable_iter_->SeekToFirst()
390
0
                  : mutable_iter_->Seek(internal_key);
391
0
  }
392
393
  // immutable
394
  // TODO(ljin): NeedToSeekImmutable has negative impact on performance
395
  // if it turns to need to seek immutable often. We probably want to have
396
  // an option to turn it off.
397
0
  if (seek_to_first || seek_after_async_io ||
398
0
      NeedToSeekImmutable(internal_key)) {
399
0
    if (!seek_after_async_io) {
400
0
      immutable_status_ = Status::OK();
401
0
      if (has_iter_trimmed_for_upper_bound_ &&
402
0
          (
403
              // prev_ is not set yet
404
0
              is_prev_set_ == false ||
405
              // We are doing SeekToFirst() and internal_key.size() = 0
406
0
              seek_to_first ||
407
              // prev_key_ > internal_key
408
0
              cfd_->internal_comparator().InternalKeyComparator::Compare(
409
0
                  prev_key_.GetInternalKey(), internal_key) > 0)) {
410
        // Some iterators are trimmed. Need to rebuild.
411
0
        RebuildIterators(true);
412
        // Already seeked mutable iter, so seek again
413
0
        seek_to_first ? mutable_iter_->SeekToFirst()
414
0
                      : mutable_iter_->Seek(internal_key);
415
0
      }
416
0
      {
417
0
        auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
418
0
        immutable_min_heap_.swap(tmp);
419
0
      }
420
0
      for (size_t i = 0; i < imm_iters_.size(); i++) {
421
0
        auto* m = imm_iters_[i];
422
0
        seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
423
0
        if (!m->status().ok()) {
424
0
          immutable_status_ = m->status();
425
0
        } else if (m->Valid()) {
426
0
          immutable_min_heap_.push(m);
427
0
        }
428
0
      }
429
0
    }
430
431
0
    Slice target_user_key;
432
0
    if (!seek_to_first) {
433
0
      target_user_key = ExtractUserKey(internal_key);
434
0
    }
435
0
    const VersionStorageInfo* vstorage = sv_->current->storage_info();
436
0
    const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
437
0
    for (size_t i = 0; i < l0.size(); ++i) {
438
0
      if (!l0_iters_[i]) {
439
0
        continue;
440
0
      }
441
0
      if (seek_after_async_io) {
442
0
        if (!l0_iters_[i]->status().IsTryAgain()) {
443
0
          continue;
444
0
        }
445
0
      }
446
447
0
      if (seek_to_first) {
448
0
        l0_iters_[i]->SeekToFirst();
449
0
      } else {
450
        // If the target key passes over the largest key, we are sure Next()
451
        // won't go over this file.
452
0
        if (seek_after_async_io == false &&
453
0
            user_comparator_->Compare(target_user_key,
454
0
                                      l0[i]->largest.user_key()) > 0) {
455
0
          if (read_options_.iterate_upper_bound != nullptr) {
456
0
            has_iter_trimmed_for_upper_bound_ = true;
457
0
            DeleteIterator(l0_iters_[i]);
458
0
            l0_iters_[i] = nullptr;
459
0
          }
460
0
          continue;
461
0
        }
462
0
        l0_iters_[i]->Seek(internal_key);
463
0
      }
464
465
0
      if (l0_iters_[i]->status().IsTryAgain()) {
466
0
        assert(!seek_after_async_io);
467
0
        continue;
468
0
      } else if (!l0_iters_[i]->status().ok()) {
469
0
        immutable_status_ = l0_iters_[i]->status();
470
0
      } else if (l0_iters_[i]->Valid() &&
471
0
                 !IsOverUpperBound(l0_iters_[i]->key())) {
472
0
        immutable_min_heap_.push(l0_iters_[i]);
473
0
      } else {
474
0
        has_iter_trimmed_for_upper_bound_ = true;
475
0
        DeleteIterator(l0_iters_[i]);
476
0
        l0_iters_[i] = nullptr;
477
0
      }
478
0
    }
479
480
0
    for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
481
0
      const std::vector<FileMetaData*>& level_files =
482
0
          vstorage->LevelFiles(level);
483
0
      if (level_files.empty()) {
484
0
        continue;
485
0
      }
486
0
      if (level_iters_[level - 1] == nullptr) {
487
0
        continue;
488
0
      }
489
490
0
      if (seek_after_async_io) {
491
0
        if (!level_iters_[level - 1]->status().IsTryAgain()) {
492
0
          continue;
493
0
        }
494
0
      }
495
0
      uint32_t f_idx = 0;
496
0
      if (!seek_to_first && !seek_after_async_io) {
497
0
        f_idx = FindFileInRange(level_files, internal_key, 0,
498
0
                                static_cast<uint32_t>(level_files.size()));
499
0
      }
500
501
      // Seek
502
0
      if (seek_after_async_io || f_idx < level_files.size()) {
503
0
        if (!seek_after_async_io) {
504
0
          level_iters_[level - 1]->SetFileIndex(f_idx);
505
0
        }
506
0
        seek_to_first ? level_iters_[level - 1]->SeekToFirst()
507
0
                      : level_iters_[level - 1]->Seek(internal_key);
508
509
0
        if (level_iters_[level - 1]->status().IsTryAgain()) {
510
0
          assert(!seek_after_async_io);
511
0
          continue;
512
0
        } else if (!level_iters_[level - 1]->status().ok()) {
513
0
          immutable_status_ = level_iters_[level - 1]->status();
514
0
        } else if (level_iters_[level - 1]->Valid() &&
515
0
                   !IsOverUpperBound(level_iters_[level - 1]->key())) {
516
0
          immutable_min_heap_.push(level_iters_[level - 1]);
517
0
        } else {
518
          // Nothing in this level is interesting. Remove.
519
0
          has_iter_trimmed_for_upper_bound_ = true;
520
0
          DeleteIterator(level_iters_[level - 1]);
521
0
          level_iters_[level - 1] = nullptr;
522
0
        }
523
0
      }
524
0
    }
525
526
0
    if (seek_to_first) {
527
0
      is_prev_set_ = false;
528
0
    } else {
529
0
      prev_key_.SetInternalKey(internal_key);
530
0
      is_prev_set_ = true;
531
0
      is_prev_inclusive_ = true;
532
0
    }
533
534
0
    TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Immutable", this);
535
0
  } else if (current_ && current_ != mutable_iter_) {
536
    // current_ is one of immutable iterators, push it back to the heap
537
0
    immutable_min_heap_.push(current_);
538
0
  }
539
540
  // For async_io, it should be updated when seek_after_async_io is true (in
541
  // second call).
542
0
  if (seek_to_first || !read_options_.async_io || seek_after_async_io) {
543
0
    UpdateCurrent();
544
0
  }
545
0
  TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this);
546
0
}
547
548
0
void ForwardIterator::Next() {
549
0
  assert(valid_);
550
0
  bool update_prev_key = false;
551
552
0
  if (sv_ == nullptr || sv_->version_number != cfd_->GetSuperVersionNumber()) {
553
0
    std::string current_key = key().ToString();
554
0
    Slice old_key(current_key.data(), current_key.size());
555
556
0
    if (sv_ == nullptr) {
557
0
      RebuildIterators(true);
558
0
    } else {
559
0
      RenewIterators();
560
0
    }
561
562
0
    SeekInternal(old_key, false, false);
563
0
    if (read_options_.async_io) {
564
0
      SeekInternal(old_key, false, true);
565
0
    }
566
567
0
    if (!valid_ || key().compare(old_key) != 0) {
568
0
      return;
569
0
    }
570
0
  } else if (current_ != mutable_iter_) {
571
    // It is going to advance immutable iterator
572
573
0
    if (is_prev_set_ && prefix_extractor_) {
574
      // advance prev_key_ to current_ only if they share the same prefix
575
0
      update_prev_key =
576
0
          prefix_extractor_->Transform(prev_key_.GetUserKey())
577
0
              .compare(prefix_extractor_->Transform(current_->key())) == 0;
578
0
    } else {
579
0
      update_prev_key = true;
580
0
    }
581
582
0
    if (update_prev_key) {
583
0
      prev_key_.SetInternalKey(current_->key());
584
0
      is_prev_set_ = true;
585
0
      is_prev_inclusive_ = false;
586
0
    }
587
0
  }
588
589
0
  current_->Next();
590
0
  if (current_ != mutable_iter_) {
591
0
    if (!current_->status().ok()) {
592
0
      immutable_status_ = current_->status();
593
0
    } else if ((current_->Valid()) && (!IsOverUpperBound(current_->key()))) {
594
0
      immutable_min_heap_.push(current_);
595
0
    } else {
596
0
      if ((current_->Valid()) && (IsOverUpperBound(current_->key()))) {
597
        // remove the current iterator
598
0
        DeleteCurrentIter();
599
0
        current_ = nullptr;
600
0
      }
601
0
      if (update_prev_key) {
602
0
        mutable_iter_->Seek(prev_key_.GetInternalKey());
603
0
      }
604
0
    }
605
0
  }
606
0
  UpdateCurrent();
607
0
  TEST_SYNC_POINT_CALLBACK("ForwardIterator::Next:Return", this);
608
0
}
609
610
0
Slice ForwardIterator::key() const {
611
0
  assert(valid_);
612
0
  return current_->key();
613
0
}
614
615
0
uint64_t ForwardIterator::write_unix_time() const {
616
0
  assert(valid_);
617
0
  return current_->write_unix_time();
618
0
}
619
620
0
Slice ForwardIterator::value() const {
621
0
  assert(valid_);
622
0
  return current_->value();
623
0
}
624
625
0
Status ForwardIterator::status() const {
626
0
  if (!status_.ok()) {
627
0
    return status_;
628
0
  } else if (!mutable_iter_->status().ok()) {
629
0
    return mutable_iter_->status();
630
0
  }
631
632
0
  return immutable_status_;
633
0
}
634
635
0
bool ForwardIterator::PrepareValue() {
636
0
  assert(valid_);
637
0
  if (current_->PrepareValue()) {
638
0
    return true;
639
0
  }
640
641
0
  assert(!current_->Valid());
642
0
  assert(!current_->status().ok());
643
0
  assert(current_ != mutable_iter_);  // memtable iterator can't fail
644
0
  assert(immutable_status_.ok());
645
646
0
  valid_ = false;
647
0
  immutable_status_ = current_->status();
648
0
  return false;
649
0
}
650
651
0
Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) {
652
0
  assert(prop != nullptr);
653
0
  if (prop_name == "rocksdb.iterator.super-version-number") {
654
0
    *prop = std::to_string(sv_->version_number);
655
0
    return Status::OK();
656
0
  }
657
0
  return Status::InvalidArgument("Unrecognized property: " + prop_name);
658
0
}
659
660
void ForwardIterator::SetPinnedItersMgr(
661
0
    PinnedIteratorsManager* pinned_iters_mgr) {
662
0
  pinned_iters_mgr_ = pinned_iters_mgr;
663
0
  UpdateChildrenPinnedItersMgr();
664
0
}
665
666
0
void ForwardIterator::UpdateChildrenPinnedItersMgr() {
667
  // Set PinnedIteratorsManager for mutable memtable iterator.
668
0
  if (mutable_iter_) {
669
0
    mutable_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
670
0
  }
671
672
  // Set PinnedIteratorsManager for immutable memtable iterators.
673
0
  for (InternalIterator* child_iter : imm_iters_) {
674
0
    if (child_iter) {
675
0
      child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
676
0
    }
677
0
  }
678
679
  // Set PinnedIteratorsManager for L0 files iterators.
680
0
  for (InternalIterator* child_iter : l0_iters_) {
681
0
    if (child_iter) {
682
0
      child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
683
0
    }
684
0
  }
685
686
  // Set PinnedIteratorsManager for L1+ levels iterators.
687
0
  for (ForwardLevelIterator* child_iter : level_iters_) {
688
0
    if (child_iter) {
689
0
      child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
690
0
    }
691
0
  }
692
0
}
693
694
0
bool ForwardIterator::IsKeyPinned() const {
695
0
  return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
696
0
         current_->IsKeyPinned();
697
0
}
698
699
0
bool ForwardIterator::IsValuePinned() const {
700
0
  return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
701
0
         current_->IsValuePinned();
702
0
}
703
704
0
void ForwardIterator::RebuildIterators(bool refresh_sv) {
705
  // Clean up
706
0
  Cleanup(refresh_sv);
707
0
  if (refresh_sv) {
708
    // New
709
0
    sv_ = cfd_->GetReferencedSuperVersion(db_);
710
0
  }
711
0
  ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
712
0
                                       kMaxSequenceNumber /* upper_bound */);
713
0
  UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping =
714
0
      sv_->GetSeqnoToTimeMapping();
715
0
  mutable_iter_ =
716
0
      sv_->mem->NewIterator(read_options_, seqno_to_time_mapping, &arena_,
717
0
                            sv_->mutable_cf_options.prefix_extractor.get(),
718
0
                            /*for_flush=*/false);
719
0
  sv_->imm->AddIterators(read_options_, seqno_to_time_mapping,
720
0
                         sv_->mutable_cf_options.prefix_extractor.get(),
721
0
                         &imm_iters_, &arena_);
722
0
  if (!read_options_.ignore_range_deletions) {
723
0
    std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
724
0
        sv_->mem->NewRangeTombstoneIterator(
725
0
            read_options_, sv_->current->version_set()->LastSequence(),
726
0
            false /* immutable_memtable */));
727
0
    range_del_agg.AddTombstones(std::move(range_del_iter));
728
    // Always return Status::OK().
729
0
    Status temp_s = sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
730
0
                                                         &range_del_agg);
731
0
    assert(temp_s.ok());
732
0
  }
733
0
  has_iter_trimmed_for_upper_bound_ = false;
734
735
0
  const auto* vstorage = sv_->current->storage_info();
736
0
  const auto& l0_files = vstorage->LevelFiles(0);
737
0
  l0_iters_.reserve(l0_files.size());
738
0
  for (const auto* l0 : l0_files) {
739
0
    if ((read_options_.iterate_upper_bound != nullptr) &&
740
0
        cfd_->internal_comparator().user_comparator()->Compare(
741
0
            l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) {
742
      // No need to set has_iter_trimmed_for_upper_bound_: this ForwardIterator
743
      // will never be interested in files with smallest key above
744
      // iterate_upper_bound, since iterate_upper_bound can't be changed.
745
0
      l0_iters_.push_back(nullptr);
746
0
      continue;
747
0
    }
748
0
    l0_iters_.push_back(cfd_->table_cache()->NewIterator(
749
0
        read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
750
0
        read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
751
0
        sv_->mutable_cf_options,
752
0
        /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
753
0
        TableReaderCaller::kUserIterator, /*arena=*/nullptr,
754
0
        /*skip_filters=*/false, /*level=*/-1,
755
0
        MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
756
0
        /*smallest_compaction_key=*/nullptr,
757
0
        /*largest_compaction_key=*/nullptr, allow_unprepared_value_));
758
0
  }
759
0
  BuildLevelIterators(vstorage, sv_);
760
0
  current_ = nullptr;
761
0
  is_prev_set_ = false;
762
763
0
  UpdateChildrenPinnedItersMgr();
764
0
  if (!range_del_agg.IsEmpty()) {
765
0
    status_ = Status::NotSupported(
766
0
        "Range tombstones unsupported with ForwardIterator");
767
0
    valid_ = false;
768
0
  }
769
0
}
770
771
0
void ForwardIterator::RenewIterators() {
772
0
  SuperVersion* svnew;
773
0
  assert(sv_);
774
0
  svnew = cfd_->GetReferencedSuperVersion(db_);
775
776
0
  if (mutable_iter_ != nullptr) {
777
0
    DeleteIterator(mutable_iter_, true /* is_arena */);
778
0
  }
779
0
  for (auto* m : imm_iters_) {
780
0
    DeleteIterator(m, true /* is_arena */);
781
0
  }
782
0
  imm_iters_.clear();
783
784
0
  UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping =
785
0
      svnew->GetSeqnoToTimeMapping();
786
0
  mutable_iter_ =
787
0
      svnew->mem->NewIterator(read_options_, seqno_to_time_mapping, &arena_,
788
0
                              svnew->mutable_cf_options.prefix_extractor.get(),
789
0
                              /*for_flush=*/false);
790
0
  svnew->imm->AddIterators(read_options_, seqno_to_time_mapping,
791
0
                           svnew->mutable_cf_options.prefix_extractor.get(),
792
0
                           &imm_iters_, &arena_);
793
0
  ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
794
0
                                       kMaxSequenceNumber /* upper_bound */);
795
0
  if (!read_options_.ignore_range_deletions) {
796
0
    std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
797
0
        svnew->mem->NewRangeTombstoneIterator(
798
0
            read_options_, sv_->current->version_set()->LastSequence(),
799
0
            false /* immutable_memtable */));
800
0
    range_del_agg.AddTombstones(std::move(range_del_iter));
801
    // Always return Status::OK().
802
0
    Status temp_s = svnew->imm->AddRangeTombstoneIterators(
803
0
        read_options_, &arena_, &range_del_agg);
804
0
    assert(temp_s.ok());
805
0
  }
806
807
0
  const auto* vstorage = sv_->current->storage_info();
808
0
  const auto& l0_files = vstorage->LevelFiles(0);
809
0
  const auto* vstorage_new = svnew->current->storage_info();
810
0
  const auto& l0_files_new = vstorage_new->LevelFiles(0);
811
0
  size_t iold, inew;
812
0
  bool found;
813
0
  std::vector<InternalIterator*> l0_iters_new;
814
0
  l0_iters_new.reserve(l0_files_new.size());
815
816
0
  for (inew = 0; inew < l0_files_new.size(); inew++) {
817
0
    found = false;
818
0
    for (iold = 0; iold < l0_files.size(); iold++) {
819
0
      if (l0_files[iold] == l0_files_new[inew]) {
820
0
        found = true;
821
0
        break;
822
0
      }
823
0
    }
824
0
    if (found) {
825
0
      if (l0_iters_[iold] == nullptr) {
826
0
        l0_iters_new.push_back(nullptr);
827
0
        TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Null", this);
828
0
      } else {
829
0
        l0_iters_new.push_back(l0_iters_[iold]);
830
0
        l0_iters_[iold] = nullptr;
831
0
        TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Copy", this);
832
0
      }
833
0
      continue;
834
0
    }
835
0
    l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
836
0
        read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
837
0
        *l0_files_new[inew],
838
0
        read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
839
0
        svnew->mutable_cf_options,
840
0
        /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
841
0
        TableReaderCaller::kUserIterator, /*arena=*/nullptr,
842
0
        /*skip_filters=*/false, /*level=*/-1,
843
0
        MaxFileSizeForL0MetaPin(svnew->mutable_cf_options),
844
0
        /*smallest_compaction_key=*/nullptr,
845
0
        /*largest_compaction_key=*/nullptr, allow_unprepared_value_));
846
0
  }
847
848
0
  for (auto* f : l0_iters_) {
849
0
    DeleteIterator(f);
850
0
  }
851
0
  l0_iters_.clear();
852
0
  l0_iters_ = l0_iters_new;
853
854
0
  for (auto* l : level_iters_) {
855
0
    DeleteIterator(l);
856
0
  }
857
0
  level_iters_.clear();
858
0
  BuildLevelIterators(vstorage_new, svnew);
859
0
  current_ = nullptr;
860
0
  is_prev_set_ = false;
861
0
  SVCleanup();
862
0
  sv_ = svnew;
863
864
0
  UpdateChildrenPinnedItersMgr();
865
0
  if (!range_del_agg.IsEmpty()) {
866
0
    status_ = Status::NotSupported(
867
0
        "Range tombstones unsupported with ForwardIterator");
868
0
    valid_ = false;
869
0
  }
870
0
}
871
872
void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage,
873
0
                                          SuperVersion* sv) {
874
0
  level_iters_.reserve(vstorage->num_levels() - 1);
875
0
  for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
876
0
    const auto& level_files = vstorage->LevelFiles(level);
877
0
    if ((level_files.empty()) ||
878
0
        ((read_options_.iterate_upper_bound != nullptr) &&
879
0
         (user_comparator_->Compare(*read_options_.iterate_upper_bound,
880
0
                                    level_files[0]->smallest.user_key()) <
881
0
          0))) {
882
0
      level_iters_.push_back(nullptr);
883
0
      if (!level_files.empty()) {
884
0
        has_iter_trimmed_for_upper_bound_ = true;
885
0
      }
886
0
    } else {
887
0
      level_iters_.push_back(new ForwardLevelIterator(
888
0
          cfd_, read_options_, level_files, sv->mutable_cf_options,
889
0
          allow_unprepared_value_));
890
0
    }
891
0
  }
892
0
}
893
894
0
void ForwardIterator::ResetIncompleteIterators() {
895
0
  const auto& l0_files = sv_->current->storage_info()->LevelFiles(0);
896
0
  for (size_t i = 0; i < l0_iters_.size(); ++i) {
897
0
    assert(i < l0_files.size());
898
0
    if (!l0_iters_[i] || !l0_iters_[i]->status().IsIncomplete()) {
899
0
      continue;
900
0
    }
901
0
    DeleteIterator(l0_iters_[i]);
902
0
    l0_iters_[i] = cfd_->table_cache()->NewIterator(
903
0
        read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
904
0
        *l0_files[i], /*range_del_agg=*/nullptr, sv_->mutable_cf_options,
905
0
        /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
906
0
        TableReaderCaller::kUserIterator, /*arena=*/nullptr,
907
0
        /*skip_filters=*/false, /*level=*/-1,
908
0
        MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
909
0
        /*smallest_compaction_key=*/nullptr,
910
0
        /*largest_compaction_key=*/nullptr, allow_unprepared_value_);
911
0
    l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
912
0
  }
913
914
0
  for (auto* level_iter : level_iters_) {
915
0
    if (level_iter && level_iter->status().IsIncomplete()) {
916
0
      level_iter->Reset();
917
0
    }
918
0
  }
919
920
0
  current_ = nullptr;
921
0
  is_prev_set_ = false;
922
0
}
923
924
0
void ForwardIterator::UpdateCurrent() {
925
0
  if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) {
926
0
    current_ = nullptr;
927
0
  } else if (immutable_min_heap_.empty()) {
928
0
    current_ = mutable_iter_;
929
0
  } else if (!mutable_iter_->Valid()) {
930
0
    current_ = immutable_min_heap_.top();
931
0
    immutable_min_heap_.pop();
932
0
  } else {
933
0
    current_ = immutable_min_heap_.top();
934
0
    assert(current_ != nullptr);
935
0
    assert(current_->Valid());
936
0
    int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare(
937
0
        mutable_iter_->key(), current_->key());
938
0
    assert(cmp != 0);
939
0
    if (cmp > 0) {
940
0
      immutable_min_heap_.pop();
941
0
    } else {
942
0
      current_ = mutable_iter_;
943
0
    }
944
0
  }
945
0
  valid_ = current_ != nullptr && immutable_status_.ok();
946
0
  if (!status_.ok()) {
947
0
    status_ = Status::OK();
948
0
  }
949
950
  // Upper bound doesn't apply to the memtable iterator. We want Valid() to
951
  // return false when all iterators are over iterate_upper_bound, but can't
952
  // just set valid_ to false, as that would effectively disable the tailing
953
  // optimization (Seek() would be called on all immutable iterators regardless
954
  // of whether the target key is greater than prev_key_).
955
0
  current_over_upper_bound_ = valid_ && IsOverUpperBound(current_->key());
956
0
}
957
958
0
bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
959
  // We maintain the interval (prev_key_, immutable_min_heap_.top()->key())
960
  // such that there are no records with keys within that range in
961
  // immutable_min_heap_. Since immutable structures (SST files and immutable
962
  // memtables) can't change in this version, we don't need to do a seek if
963
  // 'target' belongs to that interval (immutable_min_heap_.top() is already
964
  // at the correct position).
965
966
0
  if (!valid_ || !current_ || !is_prev_set_ || !immutable_status_.ok()) {
967
0
    return true;
968
0
  }
969
0
  Slice prev_key = prev_key_.GetInternalKey();
970
0
  if (prefix_extractor_ && prefix_extractor_->Transform(target).compare(
971
0
                               prefix_extractor_->Transform(prev_key)) != 0) {
972
0
    return true;
973
0
  }
974
0
  if (cfd_->internal_comparator().InternalKeyComparator::Compare(
975
0
          prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
976
0
    return true;
977
0
  }
978
979
0
  if (immutable_min_heap_.empty() && current_ == mutable_iter_) {
980
    // Nothing to seek on.
981
0
    return false;
982
0
  }
983
0
  if (cfd_->internal_comparator().InternalKeyComparator::Compare(
984
0
          target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
985
0
                                            : current_->key()) > 0) {
986
0
    return true;
987
0
  }
988
0
  return false;
989
0
}
990
991
0
void ForwardIterator::DeleteCurrentIter() {
992
0
  const VersionStorageInfo* vstorage = sv_->current->storage_info();
993
0
  const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
994
0
  for (size_t i = 0; i < l0.size(); ++i) {
995
0
    if (!l0_iters_[i]) {
996
0
      continue;
997
0
    }
998
0
    if (l0_iters_[i] == current_) {
999
0
      has_iter_trimmed_for_upper_bound_ = true;
1000
0
      DeleteIterator(l0_iters_[i]);
1001
0
      l0_iters_[i] = nullptr;
1002
0
      return;
1003
0
    }
1004
0
  }
1005
1006
0
  for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
1007
0
    if (level_iters_[level - 1] == nullptr) {
1008
0
      continue;
1009
0
    }
1010
0
    if (level_iters_[level - 1] == current_) {
1011
0
      has_iter_trimmed_for_upper_bound_ = true;
1012
0
      DeleteIterator(level_iters_[level - 1]);
1013
0
      level_iters_[level - 1] = nullptr;
1014
0
    }
1015
0
  }
1016
0
}
1017
1018
bool ForwardIterator::TEST_CheckDeletedIters(int* pdeleted_iters,
1019
0
                                             int* pnum_iters) {
1020
0
  bool retval = false;
1021
0
  int deleted_iters = 0;
1022
0
  int num_iters = 0;
1023
1024
0
  const VersionStorageInfo* vstorage = sv_->current->storage_info();
1025
0
  const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
1026
0
  for (size_t i = 0; i < l0.size(); ++i) {
1027
0
    if (!l0_iters_[i]) {
1028
0
      retval = true;
1029
0
      deleted_iters++;
1030
0
    } else {
1031
0
      num_iters++;
1032
0
    }
1033
0
  }
1034
1035
0
  for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
1036
0
    if ((level_iters_[level - 1] == nullptr) &&
1037
0
        (!vstorage->LevelFiles(level).empty())) {
1038
0
      retval = true;
1039
0
      deleted_iters++;
1040
0
    } else if (!vstorage->LevelFiles(level).empty()) {
1041
0
      num_iters++;
1042
0
    }
1043
0
  }
1044
0
  if ((!retval) && num_iters <= 1) {
1045
0
    retval = true;
1046
0
  }
1047
0
  if (pdeleted_iters) {
1048
0
    *pdeleted_iters = deleted_iters;
1049
0
  }
1050
0
  if (pnum_iters) {
1051
0
    *pnum_iters = num_iters;
1052
0
  }
1053
0
  return retval;
1054
0
}
1055
1056
uint32_t ForwardIterator::FindFileInRange(
1057
    const std::vector<FileMetaData*>& files, const Slice& internal_key,
1058
0
    uint32_t left, uint32_t right) {
1059
0
  auto cmp = [&](const FileMetaData* f, const Slice& k) -> bool {
1060
0
    return cfd_->internal_comparator().InternalKeyComparator::Compare(
1061
0
               f->largest.Encode(), k) < 0;
1062
0
  };
1063
0
  const auto& b = files.begin();
1064
0
  return static_cast<uint32_t>(
1065
0
      std::lower_bound(b + left, b + right, internal_key, cmp) - b);
1066
0
}
1067
1068
0
void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) {
1069
0
  if (iter == nullptr) {
1070
0
    return;
1071
0
  }
1072
1073
0
  if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
1074
0
    pinned_iters_mgr_->PinIterator(iter, is_arena);
1075
0
  } else {
1076
0
    if (is_arena) {
1077
0
      iter->~InternalIterator();
1078
0
    } else {
1079
0
      delete iter;
1080
0
    }
1081
0
  }
1082
0
}
1083
1084
}  // namespace ROCKSDB_NAMESPACE