Coverage Report

Created: 2025-10-26 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/compaction/compaction_iterator.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "db/compaction/compaction_iterator.h"
7
8
#include <iterator>
9
#include <limits>
10
11
#include "db/blob/blob_fetcher.h"
12
#include "db/blob/blob_file_builder.h"
13
#include "db/blob/blob_index.h"
14
#include "db/blob/prefetch_buffer_collection.h"
15
#include "db/snapshot_checker.h"
16
#include "db/wide/wide_column_serialization.h"
17
#include "db/wide/wide_columns_helper.h"
18
#include "logging/logging.h"
19
#include "port/likely.h"
20
#include "rocksdb/listener.h"
21
#include "table/internal_iterator.h"
22
#include "test_util/sync_point.h"
23
24
namespace ROCKSDB_NAMESPACE {
25
CompactionIterator::CompactionIterator(
26
    InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
27
    SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
28
    SequenceNumber earliest_snapshot,
29
    SequenceNumber earliest_write_conflict_snapshot,
30
    SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
31
    Env* env, bool report_detailed_time,
32
    CompactionRangeDelAggregator* range_del_agg,
33
    BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
34
    bool enforce_single_del_contracts,
35
    const std::atomic<bool>& manual_compaction_canceled,
36
    bool must_count_input_entries, const Compaction* compaction,
37
    const CompactionFilter* compaction_filter,
38
    const std::atomic<bool>* shutting_down,
39
    const std::shared_ptr<Logger> info_log,
40
    const std::string* full_history_ts_low,
41
    std::optional<SequenceNumber> preserve_seqno_min)
42
22.3k
    : CompactionIterator(
43
22.3k
          input, cmp, merge_helper, last_sequence, snapshots, earliest_snapshot,
44
22.3k
          earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
45
22.3k
          report_detailed_time, range_del_agg, blob_file_builder,
46
22.3k
          allow_data_in_errors, enforce_single_del_contracts,
47
22.3k
          manual_compaction_canceled,
48
22.3k
          compaction ? std::make_unique<RealCompaction>(compaction) : nullptr,
49
22.3k
          must_count_input_entries, compaction_filter, shutting_down, info_log,
50
22.3k
          full_history_ts_low, preserve_seqno_min) {}
51
52
CompactionIterator::CompactionIterator(
53
    InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
54
    SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
55
    SequenceNumber earliest_snapshot,
56
    SequenceNumber earliest_write_conflict_snapshot,
57
    SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
58
    Env* env, bool report_detailed_time,
59
    CompactionRangeDelAggregator* range_del_agg,
60
    BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
61
    bool enforce_single_del_contracts,
62
    const std::atomic<bool>& manual_compaction_canceled,
63
    std::unique_ptr<CompactionProxy> compaction, bool must_count_input_entries,
64
    const CompactionFilter* compaction_filter,
65
    const std::atomic<bool>* shutting_down,
66
    const std::shared_ptr<Logger> info_log,
67
    const std::string* full_history_ts_low,
68
    std::optional<SequenceNumber> preserve_seqno_min)
69
22.3k
    : input_(input, cmp, must_count_input_entries),
70
22.3k
      cmp_(cmp),
71
22.3k
      merge_helper_(merge_helper),
72
22.3k
      snapshots_(snapshots),
73
22.3k
      earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
74
22.3k
      job_snapshot_(job_snapshot),
75
22.3k
      snapshot_checker_(snapshot_checker),
76
22.3k
      env_(env),
77
22.3k
      clock_(env_->GetSystemClock().get()),
78
22.3k
      report_detailed_time_(report_detailed_time),
79
22.3k
      range_del_agg_(range_del_agg),
80
22.3k
      blob_file_builder_(blob_file_builder),
81
22.3k
      compaction_(std::move(compaction)),
82
22.3k
      compaction_filter_(compaction_filter),
83
22.3k
      shutting_down_(shutting_down),
84
22.3k
      manual_compaction_canceled_(manual_compaction_canceled),
85
22.3k
      bottommost_level_(compaction_ && compaction_->bottommost_level() &&
86
3.68k
                        !compaction_->allow_ingest_behind()),
87
      // snapshots_ cannot be nullptr, but we will assert later in the body of
88
      // the constructor.
89
22.3k
      visible_at_tip_(snapshots_ ? snapshots_->empty() : false),
90
22.3k
      earliest_snapshot_(earliest_snapshot),
91
22.3k
      info_log_(info_log),
92
22.3k
      allow_data_in_errors_(allow_data_in_errors),
93
22.3k
      enforce_single_del_contracts_(enforce_single_del_contracts),
94
22.3k
      timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0),
95
22.3k
      full_history_ts_low_(full_history_ts_low),
96
22.3k
      current_user_key_sequence_(0),
97
22.3k
      current_user_key_snapshot_(0),
98
22.3k
      merge_out_iter_(merge_helper_),
99
      blob_garbage_collection_cutoff_file_number_(
100
22.3k
          ComputeBlobGarbageCollectionCutoffFileNumber(compaction_.get())),
101
22.3k
      blob_fetcher_(CreateBlobFetcherIfNeeded(compaction_.get())),
102
      prefetch_buffers_(
103
22.3k
          CreatePrefetchBufferCollectionIfNeeded(compaction_.get())),
104
22.3k
      current_key_committed_(false),
105
22.3k
      cmp_with_history_ts_low_(0),
106
22.3k
      level_(compaction_ == nullptr ? 0 : compaction_->level()),
107
22.3k
      preserve_seqno_after_(preserve_seqno_min.value_or(earliest_snapshot)) {
108
22.3k
  assert(snapshots_ != nullptr);
109
22.3k
  assert(preserve_seqno_after_ <= earliest_snapshot_);
110
111
22.3k
  if (compaction_ != nullptr) {
112
3.68k
    level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
113
3.68k
  }
114
#ifndef NDEBUG
115
  // findEarliestVisibleSnapshot assumes this ordering.
116
  for (size_t i = 1; i < snapshots_->size(); ++i) {
117
    assert(snapshots_->at(i - 1) < snapshots_->at(i));
118
  }
119
  assert(timestamp_size_ == 0 || !full_history_ts_low_ ||
120
         timestamp_size_ == full_history_ts_low_->size());
121
#endif
122
22.3k
  input_.SetPinnedItersMgr(&pinned_iters_mgr_);
123
  // The default `merge_until_status_` does not need to be checked since it is
124
  // overwritten as soon as `MergeUntil()` is called
125
22.3k
  merge_until_status_.PermitUncheckedError();
126
22.3k
  TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
127
22.3k
}
128
129
22.3k
CompactionIterator::~CompactionIterator() {
130
  // input_ Iterator lifetime is longer than pinned_iters_mgr_ lifetime
131
22.3k
  input_.SetPinnedItersMgr(nullptr);
132
22.3k
}
133
134
3.68k
void CompactionIterator::ResetRecordCounts() {
135
3.68k
  iter_stats_.num_record_drop_user = 0;
136
3.68k
  iter_stats_.num_record_drop_hidden = 0;
137
3.68k
  iter_stats_.num_record_drop_obsolete = 0;
138
3.68k
  iter_stats_.num_record_drop_range_del = 0;
139
3.68k
  iter_stats_.num_range_del_drop_obsolete = 0;
140
3.68k
  iter_stats_.num_optimized_del_drop_obsolete = 0;
141
3.68k
}
142
143
22.3k
void CompactionIterator::SeekToFirst() {
144
22.3k
  NextFromInput();
145
22.3k
  PrepareOutput();
146
22.3k
}
147
148
55.0k
void CompactionIterator::Next() {
149
  // If there is a merge output, return it before continuing to process the
150
  // input.
151
55.0k
  if (merge_out_iter_.Valid()) {
152
0
    merge_out_iter_.Next();
153
154
    // Check if we returned all records of the merge output.
155
0
    if (merge_out_iter_.Valid()) {
156
0
      key_ = merge_out_iter_.key();
157
0
      value_ = merge_out_iter_.value();
158
0
      Status s = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
159
      // MergeUntil stops when it encounters a corrupt key and does not
160
      // include them in the result, so we expect the keys here to be valid.
161
0
      if (!s.ok()) {
162
        // FIXME: should fail compaction after this fatal logging.
163
0
        ROCKS_LOG_FATAL(
164
0
            info_log_, "Invalid ikey %s in compaction. %s",
165
0
            allow_data_in_errors_ ? key_.ToString(true).c_str() : "hidden",
166
0
            s.getState());
167
0
        assert(false);
168
0
      }
169
170
      // Keep current_key_ in sync.
171
0
      if (0 == timestamp_size_) {
172
0
        current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
173
0
      } else {
174
0
        Slice ts = ikey_.GetTimestamp(timestamp_size_);
175
0
        current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type, &ts);
176
0
      }
177
0
      key_ = current_key_.GetInternalKey();
178
0
      ikey_.user_key = current_key_.GetUserKey();
179
0
      validity_info_.SetValid(ValidContext::kMerge1);
180
0
    } else {
181
0
      if (merge_until_status_.IsMergeInProgress()) {
182
        // `Status::MergeInProgress()` tells us that the previous `MergeUntil()`
183
        // produced only merge operands. Those merge operands were accessed and
184
        // written out using `merge_out_iter_`. Since `merge_out_iter_` is
185
        // exhausted at this point, all merge operands have been written out.
186
        //
187
        // Still, there may be a base value (PUT, DELETE, SINGLEDEL, etc.) that
188
        // needs to be written out. Normally, `CompactionIterator` would skip it
189
        // on the basis that it has already output something in the same
190
        // snapshot stripe. To prevent this, we reset `has_current_user_key_` to
191
        // trick the future iteration from finding out the snapshot stripe is
192
        // unchanged.
193
0
        has_current_user_key_ = false;
194
0
      }
195
      // We consumed all pinned merge operands, release pinned iterators
196
0
      pinned_iters_mgr_.ReleasePinnedData();
197
      // MergeHelper moves the iterator to the first record after the merged
198
      // records, so even though we reached the end of the merge output, we do
199
      // not want to advance the iterator.
200
0
      NextFromInput();
201
0
    }
202
55.0k
  } else {
203
    // Only advance the input iterator if there is no merge output and the
204
    // iterator is not already at the next record.
205
55.0k
    if (!at_next_) {
206
55.0k
      AdvanceInputIter();
207
55.0k
    }
208
55.0k
    NextFromInput();
209
55.0k
  }
210
211
55.0k
  if (Valid()) {
212
    // Record that we've outputted a record for the current key.
213
35.6k
    has_outputted_key_ = true;
214
35.6k
  }
215
216
55.0k
  PrepareOutput();
217
55.0k
}
218
219
bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
220
113k
                                              Slice* skip_until) {
221
113k
  if (!compaction_filter_) {
222
113k
    return true;
223
113k
  }
224
225
0
  if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
226
0
      ikey_.type != kTypeWideColumnEntity) {
227
0
    return true;
228
0
  }
229
230
0
  CompactionFilter::Decision decision =
231
0
      CompactionFilter::Decision::kUndetermined;
232
0
  CompactionFilter::ValueType value_type =
233
0
      ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
234
0
      : ikey_.type == kTypeBlobIndex
235
0
          ? CompactionFilter::ValueType::kBlobIndex
236
0
          : CompactionFilter::ValueType::kWideColumnEntity;
237
238
  // Hack: pass internal key to BlobIndexCompactionFilter since it needs
239
  // to get sequence number.
240
0
  assert(compaction_filter_);
241
0
  const Slice& filter_key =
242
0
      (ikey_.type != kTypeBlobIndex ||
243
0
       !compaction_filter_->IsStackedBlobDbInternalCompactionFilter())
244
0
          ? ikey_.user_key
245
0
          : key_;
246
247
0
  compaction_filter_value_.clear();
248
0
  compaction_filter_skip_until_.Clear();
249
250
0
  std::vector<std::pair<std::string, std::string>> new_columns;
251
252
0
  {
253
0
    StopWatchNano timer(clock_, report_detailed_time_);
254
255
0
    if (ikey_.type == kTypeBlobIndex) {
256
0
      decision = compaction_filter_->FilterBlobByKey(
257
0
          level_, filter_key, &compaction_filter_value_,
258
0
          compaction_filter_skip_until_.rep());
259
0
      if (decision == CompactionFilter::Decision::kUndetermined &&
260
0
          !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
261
0
        if (!compaction_) {
262
0
          status_ =
263
0
              Status::Corruption("Unexpected blob index outside of compaction");
264
0
          validity_info_.Invalidate();
265
0
          return false;
266
0
        }
267
268
0
        TEST_SYNC_POINT_CALLBACK(
269
0
            "CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex",
270
0
            &value_);
271
272
        // For integrated BlobDB impl, CompactionIterator reads blob value.
273
        // For Stacked BlobDB impl, the corresponding CompactionFilter's
274
        // FilterV2 method should read the blob value.
275
0
        BlobIndex blob_index;
276
0
        Status s = blob_index.DecodeFrom(value_);
277
0
        if (!s.ok()) {
278
0
          status_ = s;
279
0
          validity_info_.Invalidate();
280
0
          return false;
281
0
        }
282
283
0
        FilePrefetchBuffer* prefetch_buffer =
284
0
            prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer(
285
0
                                    blob_index.file_number())
286
0
                              : nullptr;
287
288
0
        uint64_t bytes_read = 0;
289
290
0
        assert(blob_fetcher_);
291
292
0
        s = blob_fetcher_->FetchBlob(ikey_.user_key, blob_index,
293
0
                                     prefetch_buffer, &blob_value_,
294
0
                                     &bytes_read);
295
0
        if (!s.ok()) {
296
0
          status_ = s;
297
0
          validity_info_.Invalidate();
298
0
          return false;
299
0
        }
300
301
0
        ++iter_stats_.num_blobs_read;
302
0
        iter_stats_.total_blob_bytes_read += bytes_read;
303
304
0
        value_type = CompactionFilter::ValueType::kValue;
305
0
      }
306
0
    }
307
308
0
    if (decision == CompactionFilter::Decision::kUndetermined) {
309
0
      const Slice* existing_val = nullptr;
310
0
      const WideColumns* existing_col = nullptr;
311
312
0
      WideColumns existing_columns;
313
314
0
      if (ikey_.type != kTypeWideColumnEntity) {
315
0
        if (!blob_value_.empty()) {
316
0
          existing_val = &blob_value_;
317
0
        } else {
318
0
          existing_val = &value_;
319
0
        }
320
0
      } else {
321
0
        Slice value_copy = value_;
322
0
        const Status s =
323
0
            WideColumnSerialization::Deserialize(value_copy, existing_columns);
324
325
0
        if (!s.ok()) {
326
0
          status_ = s;
327
0
          validity_info_.Invalidate();
328
0
          return false;
329
0
        }
330
331
0
        existing_col = &existing_columns;
332
0
      }
333
334
0
      decision = compaction_filter_->FilterV3(
335
0
          level_, filter_key, value_type, existing_val, existing_col,
336
0
          &compaction_filter_value_, &new_columns,
337
0
          compaction_filter_skip_until_.rep());
338
0
    }
339
340
0
    iter_stats_.total_filter_time +=
341
0
        env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
342
0
  }
343
344
0
  if (decision == CompactionFilter::Decision::kUndetermined) {
345
    // Should not reach here, since FilterV2/FilterV3 should never return
346
    // kUndetermined.
347
0
    status_ = Status::NotSupported(
348
0
        "FilterV2/FilterV3 should never return kUndetermined");
349
0
    validity_info_.Invalidate();
350
0
    return false;
351
0
  }
352
353
0
  if (decision == CompactionFilter::Decision::kRemoveAndSkipUntil &&
354
0
      cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
355
0
          0) {
356
    // Can't skip to a key smaller than the current one.
357
    // Keep the key as per FilterV2/FilterV3 documentation.
358
0
    decision = CompactionFilter::Decision::kKeep;
359
0
  }
360
361
0
  if (decision == CompactionFilter::Decision::kRemove) {
362
    // convert the current key to a delete; key_ is pointing into
363
    // current_key_ at this point, so updating current_key_ updates key()
364
0
    ikey_.type = kTypeDeletion;
365
0
    current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
366
    // no value associated with delete
367
0
    value_.clear();
368
0
    iter_stats_.num_record_drop_user++;
369
0
  } else if (decision == CompactionFilter::Decision::kPurge) {
370
    // convert the current key to a single delete; key_ is pointing into
371
    // current_key_ at this point, so updating current_key_ updates key()
372
0
    ikey_.type = kTypeSingleDeletion;
373
0
    current_key_.UpdateInternalKey(ikey_.sequence, kTypeSingleDeletion);
374
    // no value associated with single delete
375
0
    value_.clear();
376
0
    iter_stats_.num_record_drop_user++;
377
0
  } else if (decision == CompactionFilter::Decision::kChangeValue) {
378
0
    if (ikey_.type != kTypeValue) {
379
0
      ikey_.type = kTypeValue;
380
0
      current_key_.UpdateInternalKey(ikey_.sequence, kTypeValue);
381
0
    }
382
383
0
    value_ = compaction_filter_value_;
384
0
  } else if (decision == CompactionFilter::Decision::kRemoveAndSkipUntil) {
385
0
    *need_skip = true;
386
0
    compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
387
0
                                                     kValueTypeForSeek);
388
0
    *skip_until = compaction_filter_skip_until_.Encode();
389
0
  } else if (decision == CompactionFilter::Decision::kChangeBlobIndex) {
390
    // Only the StackableDB-based BlobDB impl's compaction filter should return
391
    // kChangeBlobIndex. Decision about rewriting blob and changing blob index
392
    // in the integrated BlobDB impl is made in subsequent call to
393
    // PrepareOutput() and its callees.
394
0
    if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
395
0
      status_ = Status::NotSupported(
396
0
          "Only stacked BlobDB's internal compaction filter can return "
397
0
          "kChangeBlobIndex.");
398
0
      validity_info_.Invalidate();
399
0
      return false;
400
0
    }
401
402
0
    if (ikey_.type != kTypeBlobIndex) {
403
0
      ikey_.type = kTypeBlobIndex;
404
0
      current_key_.UpdateInternalKey(ikey_.sequence, kTypeBlobIndex);
405
0
    }
406
407
0
    value_ = compaction_filter_value_;
408
0
  } else if (decision == CompactionFilter::Decision::kIOError) {
409
0
    if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
410
0
      status_ = Status::NotSupported(
411
0
          "CompactionFilter for integrated BlobDB should not return kIOError");
412
0
      validity_info_.Invalidate();
413
0
      return false;
414
0
    }
415
416
0
    status_ = Status::IOError("Failed to access blob during compaction filter");
417
0
    validity_info_.Invalidate();
418
0
    return false;
419
0
  } else if (decision == CompactionFilter::Decision::kChangeWideColumnEntity) {
420
0
    WideColumns sorted_columns;
421
0
    sorted_columns.reserve(new_columns.size());
422
423
0
    for (const auto& column : new_columns) {
424
0
      sorted_columns.emplace_back(column.first, column.second);
425
0
    }
426
427
0
    WideColumnsHelper::SortColumns(sorted_columns);
428
429
0
    {
430
0
      const Status s = WideColumnSerialization::Serialize(
431
0
          sorted_columns, compaction_filter_value_);
432
0
      if (!s.ok()) {
433
0
        status_ = s;
434
0
        validity_info_.Invalidate();
435
0
        return false;
436
0
      }
437
0
    }
438
439
0
    if (ikey_.type != kTypeWideColumnEntity) {
440
0
      ikey_.type = kTypeWideColumnEntity;
441
0
      current_key_.UpdateInternalKey(ikey_.sequence, kTypeWideColumnEntity);
442
0
    }
443
444
0
    value_ = compaction_filter_value_;
445
0
  }
446
447
0
  return true;
448
0
}
449
450
77.4k
void CompactionIterator::NextFromInput() {
451
77.4k
  at_next_ = false;
452
77.4k
  validity_info_.Invalidate();
453
454
600k
  while (!Valid() && input_.Valid() && !IsPausingManualCompaction() &&
455
524k
         !IsShuttingDown()) {
456
523k
    key_ = input_.key();
457
523k
    value_ = input_.value();
458
523k
    blob_value_.Reset();
459
523k
    iter_stats_.num_input_records++;
460
523k
    is_range_del_ = input_.IsDeleteRangeSentinelKey();
461
462
523k
    Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
463
523k
    if (!pik_status.ok()) {
464
0
      iter_stats_.num_input_corrupt_records++;
465
466
      // Always fail compaction when encountering corrupted internal keys
467
0
      status_ = pik_status;
468
0
      return;
469
0
    }
470
523k
    TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
471
523k
    if (is_range_del_) {
472
0
      validity_info_.SetValid(kRangeDeletion);
473
0
      break;
474
0
    }
475
    // Update input statistics
476
523k
    if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
477
397k
        ikey_.type == kTypeDeletionWithTimestamp) {
478
125k
      iter_stats_.num_input_deletion_records++;
479
397k
    } else if (ikey_.type == kTypeValuePreferredSeqno) {
480
0
      iter_stats_.num_input_timed_put_records++;
481
0
    }
482
523k
    iter_stats_.total_input_raw_key_bytes += key_.size();
483
523k
    iter_stats_.total_input_raw_value_bytes += value_.size();
484
485
    // If need_skip is true, we should seek the input iterator
486
    // to internal key skip_until and continue from there.
487
523k
    bool need_skip = false;
488
    // Points either into compaction_filter_skip_until_ or into
489
    // merge_helper_->compaction_filter_skip_until_.
490
523k
    Slice skip_until;
491
492
523k
    bool user_key_equal_without_ts = false;
493
523k
    int cmp_ts = 0;
494
523k
    if (has_current_user_key_) {
495
501k
      user_key_equal_without_ts =
496
501k
          cmp_->EqualWithoutTimestamp(ikey_.user_key, current_user_key_);
497
      // if timestamp_size_ > 0, then curr_ts_ has been initialized by a
498
      // previous key.
499
501k
      cmp_ts = timestamp_size_ ? cmp_->CompareTimestamp(
500
0
                                     ExtractTimestampFromUserKey(
501
0
                                         ikey_.user_key, timestamp_size_),
502
0
                                     curr_ts_)
503
501k
                               : 0;
504
501k
    }
505
506
    // Check whether the user key changed. After this if statement current_key_
507
    // is a copy of the current input key (maybe converted to a delete by the
508
    // compaction filter). ikey_.user_key is pointing to the copy.
509
523k
    if (!has_current_user_key_ || !user_key_equal_without_ts || cmp_ts != 0) {
510
      // First occurrence of this user key
511
      // Copy key for output
512
113k
      key_ = current_key_.SetInternalKey(key_, &ikey_);
513
514
113k
      int prev_cmp_with_ts_low =
515
113k
          !full_history_ts_low_ ? 0
516
113k
          : curr_ts_.empty()
517
0
              ? 0
518
0
              : cmp_->CompareTimestamp(curr_ts_, *full_history_ts_low_);
519
520
      // If timestamp_size_ > 0, then copy from ikey_ to curr_ts_ for the use
521
      // in next iteration to compare with the timestamp of next key.
522
113k
      UpdateTimestampAndCompareWithFullHistoryLow();
523
524
      // If
525
      // (1) !has_current_user_key_, OR
526
      // (2) timestamp is disabled, OR
527
      // (3) all history will be preserved, OR
528
      // (4) user key (excluding timestamp) is different from previous key, OR
529
      // (5) timestamp is NO older than *full_history_ts_low_, OR
530
      // (6) timestamp is the largest one older than full_history_ts_low_,
531
      // then current_user_key_ must be treated as a different user key.
532
      // This means, if a user key (excluding ts) is the same as the previous
533
      // user key, and its ts is older than *full_history_ts_low_, then we
534
      // consider this key for GC, e.g. it may be dropped if certain conditions
535
      // match.
536
113k
      if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ ||
537
0
          !user_key_equal_without_ts || cmp_with_history_ts_low_ >= 0 ||
538
113k
          prev_cmp_with_ts_low >= 0) {
539
        // Initialize for future comparison for rule (A) and etc.
540
113k
        current_user_key_sequence_ = kMaxSequenceNumber;
541
113k
        current_user_key_snapshot_ = 0;
542
113k
        has_current_user_key_ = true;
543
113k
      }
544
113k
      current_user_key_ = ikey_.user_key;
545
546
113k
      has_outputted_key_ = false;
547
548
113k
      last_key_seq_zeroed_ = false;
549
550
113k
      current_key_committed_ = KeyCommitted(ikey_.sequence);
551
552
      // Apply the compaction filter to the first committed version of the user
553
      // key.
554
113k
      if (current_key_committed_ &&
555
113k
          !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
556
0
        break;
557
0
      }
558
409k
    } else {
559
      // Update the current key to reflect the new sequence number/type without
560
      // copying the user key.
561
      // TODO(rven): Compaction filter does not process keys in this path
562
      // Need to have the compaction filter process multiple versions
563
      // if we have versions on both sides of a snapshot
564
409k
      current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
565
409k
      key_ = current_key_.GetInternalKey();
566
409k
      ikey_.user_key = current_key_.GetUserKey();
567
568
      // Note that newer version of a key is ordered before older versions. If a
569
      // newer version of a key is committed, so as the older version. No need
570
      // to query snapshot_checker_ in that case.
571
409k
      if (UNLIKELY(!current_key_committed_)) {
572
0
        assert(snapshot_checker_ != nullptr);
573
0
        current_key_committed_ = KeyCommitted(ikey_.sequence);
574
        // Apply the compaction filter to the first committed version of the
575
        // user key.
576
0
        if (current_key_committed_ &&
577
0
            !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
578
0
          break;
579
0
        }
580
0
      }
581
409k
    }
582
583
523k
    if (UNLIKELY(!current_key_committed_)) {
584
0
      assert(snapshot_checker_ != nullptr);
585
0
      validity_info_.SetValid(ValidContext::kCurrentKeyUncommitted);
586
0
      break;
587
0
    }
588
589
    // If there are no snapshots, then this kv affect visibility at tip.
590
    // Otherwise, search though all existing snapshots to find the earliest
591
    // snapshot that is affected by this kv.
592
523k
    SequenceNumber last_sequence = current_user_key_sequence_;
593
523k
    current_user_key_sequence_ = ikey_.sequence;
594
523k
    SequenceNumber last_snapshot = current_user_key_snapshot_;
595
523k
    SequenceNumber prev_snapshot = 0;  // 0 means no previous snapshot
596
523k
    current_user_key_snapshot_ =
597
523k
        visible_at_tip_
598
523k
            ? earliest_snapshot_
599
523k
            : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
600
601
523k
    if (need_skip) {
602
      // This case is handled below.
603
523k
    } else if (clear_and_output_next_key_) {
604
      // In the previous iteration we encountered a single delete that we could
605
      // not compact out.  We will keep this Put, but can drop it's data.
606
      // (See Optimization 3, below.)
607
0
      if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
608
0
          ikey_.type != kTypeWideColumnEntity &&
609
0
          ikey_.type != kTypeValuePreferredSeqno) {
610
0
        ROCKS_LOG_FATAL(info_log_, "Unexpected key %s for compaction output",
611
0
                        ikey_.DebugString(allow_data_in_errors_, true).c_str());
612
0
        assert(false);
613
0
      }
614
0
      if (current_user_key_snapshot_ < last_snapshot) {
615
0
        ROCKS_LOG_FATAL(info_log_,
616
0
                        "key %s, current_user_key_snapshot_ (%" PRIu64
617
0
                        ") < last_snapshot (%" PRIu64 ")",
618
0
                        ikey_.DebugString(allow_data_in_errors_, true).c_str(),
619
0
                        current_user_key_snapshot_, last_snapshot);
620
0
        assert(false);
621
0
      }
622
623
0
      if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity ||
624
0
          ikey_.type == kTypeValuePreferredSeqno) {
625
0
        ikey_.type = kTypeValue;
626
0
        current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
627
0
      }
628
629
0
      value_.clear();
630
0
      validity_info_.SetValid(ValidContext::kKeepSDAndClearPut);
631
0
      clear_and_output_next_key_ = false;
632
523k
    } else if (ikey_.type == kTypeSingleDeletion) {
633
      // We can compact out a SingleDelete if:
634
      // 1) We encounter the corresponding PUT -OR- we know that this key
635
      //    doesn't appear past this output level and  we are not in
636
      //    ingest_behind mode.
637
      // =AND=
638
      // 2) We've already returned a record in this snapshot -OR-
639
      //    there are no earlier earliest_write_conflict_snapshot.
640
      //
641
      // A note about 2) above:
642
      // we try to determine whether there is any earlier write conflict
643
      // checking snapshot by calling DefinitelyInSnapshot() with seq and
644
      // earliest_write_conflict_snapshot as arguments. For write-prepared
645
      // and write-unprepared transactions, if earliest_write_conflict_snapshot
646
      // is evicted from WritePreparedTxnDB::commit_cache, then
647
      // DefinitelyInSnapshot(seq, earliest_write_conflict_snapshot) returns
648
      // false, even if the seq is actually visible within
649
      // earliest_write_conflict_snapshot. Consequently, CompactionIterator
650
      // may try to zero out its sequence number, thus hitting assertion error
651
      // in debug mode or cause incorrect DBIter return result.
652
      // We observe that earliest_write_conflict_snapshot >= earliest_snapshot,
653
      // and the seq zeroing logic depends on
654
      // DefinitelyInSnapshot(seq, earliest_snapshot). Therefore, if we cannot
655
      // determine whether seq is **definitely** in
656
      // earliest_write_conflict_snapshot, then we can additionally check if
657
      // seq is definitely in earliest_snapshot. If the latter holds, then the
658
      // former holds too.
659
      //
660
      // Rule 1 is needed for SingleDelete correctness.  Rule 2 is needed to
661
      // allow Transactions to do write-conflict checking (if we compacted away
662
      // all keys, then we wouldn't know that a write happened in this
663
      // snapshot).  If there is no earlier snapshot, then we know that there
664
      // are no active transactions that need to know about any writes.
665
      //
666
      // Optimization 3:
667
      // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT
668
      // true, then we must output a SingleDelete.  In this case, we will decide
669
      // to also output the PUT.  While we are compacting less by outputting the
670
      // PUT now, hopefully this will lead to better compaction in the future
671
      // when Rule 2 is later true (Ie, We are hoping we can later compact out
672
      // both the SingleDelete and the Put, while we couldn't if we only
673
      // outputted the SingleDelete now).
674
      // In this case, we can save space by removing the PUT's value as it will
675
      // never be read.
676
      //
677
      // Deletes and Merges are not supported on the same key that has a
678
      // SingleDelete as it is not possible to correctly do any partial
679
      // compaction of such a combination of operations.  The result of mixing
680
      // those operations for a given key is documented as being undefined.  So
681
      // we can choose how to handle such a combinations of operations.  We will
682
      // try to compact out as much as we can in these cases.
683
      // We will report counts on these anomalous cases.
684
      //
685
      // Note: If timestamp is enabled, then record will be eligible for
686
      // deletion, only if, along with above conditions (Rule 1 and Rule 2)
687
      // full_history_ts_low_ is specified and timestamp for that key is less
688
      // than *full_history_ts_low_. If it's not eligible for deletion, then we
689
      // will output the SingleDelete. For Optimization 3 also, if
690
      // full_history_ts_low_ is specified and timestamp for the key is less
691
      // than *full_history_ts_low_ then only optimization will be applied.
692
693
      // The easiest way to process a SingleDelete during iteration is to peek
694
      // ahead at the next key.
695
0
      const bool is_timestamp_eligible_for_gc =
696
0
          (timestamp_size_ == 0 ||
697
0
           (full_history_ts_low_ && cmp_with_history_ts_low_ < 0));
698
699
0
      ParsedInternalKey next_ikey;
700
0
      AdvanceInputIter();
701
0
      while (input_.Valid() && input_.IsDeleteRangeSentinelKey() &&
702
0
             ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
703
0
                 .ok() &&
704
0
             cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
705
        // skip range tombstone start keys with the same user key
706
        // since they are not "real" point keys.
707
0
        AdvanceInputIter();
708
0
      }
709
710
      // Check whether the next key exists, is not corrupt, and is the same key
711
      // as the single delete.
712
0
      if (input_.Valid() &&
713
0
          ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
714
0
              .ok() &&
715
0
          cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
716
0
        assert(!input_.IsDeleteRangeSentinelKey());
717
#ifndef NDEBUG
718
        const Compaction* c =
719
            compaction_ ? compaction_->real_compaction() : nullptr;
720
#endif
721
0
        TEST_SYNC_POINT_CALLBACK(
722
0
            "CompactionIterator::NextFromInput:SingleDelete:1",
723
0
            const_cast<Compaction*>(c));
724
0
        if (last_key_seq_zeroed_) {
725
          // Drop SD and the next key since they are both in the last
726
          // snapshot (since last key has seqno zeroed).
727
0
          ++iter_stats_.num_record_drop_hidden;
728
0
          ++iter_stats_.num_record_drop_obsolete;
729
0
          assert(bottommost_level_);
730
0
          AdvanceInputIter();
731
0
        } else if (prev_snapshot == 0 ||
732
0
                   DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) {
733
          // Check whether the next key belongs to the same snapshot as the
734
          // SingleDelete.
735
736
0
          TEST_SYNC_POINT_CALLBACK(
737
0
              "CompactionIterator::NextFromInput:SingleDelete:2", nullptr);
738
0
          if (next_ikey.type == kTypeSingleDeletion) {
739
            // We encountered two SingleDeletes for same key in a row. This
740
            // could be due to unexpected user input. If write-(un)prepared
741
            // transaction is used, this could also be due to releasing an old
742
            // snapshot between a Put and its matching SingleDelete.
743
            // Skip the first SingleDelete and let the next iteration decide
744
            // how to handle the second SingleDelete.
745
746
            // First SingleDelete has been skipped since we already called
747
            // input_.Next().
748
0
            ++iter_stats_.num_record_drop_obsolete;
749
0
            ++iter_stats_.num_single_del_mismatch;
750
0
          } else if (next_ikey.type == kTypeDeletion) {
751
0
            std::ostringstream oss;
752
0
            oss << "Found SD and type: " << static_cast<int>(next_ikey.type)
753
0
                << " on the same key, violating the contract "
754
0
                   "of SingleDelete. Check your application to make sure the "
755
0
                   "application does not mix SingleDelete and Delete for "
756
0
                   "the same key. If you are using "
757
0
                   "write-prepared/write-unprepared transactions, and use "
758
0
                   "SingleDelete to delete certain keys, then make sure "
759
0
                   "TransactionDBOptions::rollback_deletion_type_callback is "
760
0
                   "configured properly. Mixing SD and DEL can lead to "
761
0
                   "undefined behaviors";
762
0
            ++iter_stats_.num_record_drop_obsolete;
763
0
            ++iter_stats_.num_single_del_mismatch;
764
0
            if (enforce_single_del_contracts_) {
765
0
              ROCKS_LOG_ERROR(info_log_, "%s", oss.str().c_str());
766
0
              validity_info_.Invalidate();
767
0
              status_ = Status::Corruption(oss.str());
768
0
              return;
769
0
            }
770
0
            ROCKS_LOG_WARN(info_log_, "%s", oss.str().c_str());
771
0
          } else if (!is_timestamp_eligible_for_gc) {
772
            // We cannot drop the SingleDelete as timestamp is enabled, and
773
            // timestamp of this key is greater than or equal to
774
            // *full_history_ts_low_. We will output the SingleDelete.
775
0
            validity_info_.SetValid(ValidContext::kKeepTsHistory);
776
0
          } else if (has_outputted_key_ ||
777
0
                     DefinitelyInSnapshot(ikey_.sequence,
778
0
                                          earliest_write_conflict_snapshot_) ||
779
0
                     (earliest_snapshot_ < earliest_write_conflict_snapshot_ &&
780
0
                      DefinitelyInSnapshot(ikey_.sequence,
781
0
                                           earliest_snapshot_))) {
782
            // Found a matching value, we can drop the single delete and the
783
            // value.  It is safe to drop both records since we've already
784
            // outputted a key in this snapshot, or there is no earlier
785
            // snapshot (Rule 2 above).
786
787
            // Note: it doesn't matter whether the second key is a Put or if it
788
            // is an unexpected Merge or Delete.  We will compact it out
789
            // either way. We will maintain counts of how many mismatches
790
            // happened
791
0
            if (next_ikey.type != kTypeValue &&
792
0
                next_ikey.type != kTypeBlobIndex &&
793
0
                next_ikey.type != kTypeWideColumnEntity &&
794
0
                next_ikey.type != kTypeValuePreferredSeqno) {
795
0
              ++iter_stats_.num_single_del_mismatch;
796
0
            }
797
798
0
            ++iter_stats_.num_record_drop_hidden;
799
0
            ++iter_stats_.num_record_drop_obsolete;
800
            // Already called input_.Next() once.  Call it a second time to
801
            // skip past the second key.
802
0
            AdvanceInputIter();
803
0
          } else {
804
            // Found a matching value, but we cannot drop both keys since
805
            // there is an earlier snapshot and we need to leave behind a record
806
            // to know that a write happened in this snapshot (Rule 2 above).
807
            // Clear the value and output the SingleDelete. (The value will be
808
            // outputted on the next iteration.)
809
810
            // Setting valid_ to true will output the current SingleDelete
811
0
            validity_info_.SetValid(ValidContext::kKeepSDForConflictCheck);
812
813
            // Set up the Put to be outputted in the next iteration.
814
            // (Optimization 3).
815
0
            clear_and_output_next_key_ = true;
816
0
            TEST_SYNC_POINT_CALLBACK(
817
0
                "CompactionIterator::NextFromInput:KeepSDForWW",
818
0
                /*arg=*/nullptr);
819
0
          }
820
0
        } else {
821
          // We hit the next snapshot without hitting a put, so the iterator
822
          // returns the single delete.
823
0
          validity_info_.SetValid(ValidContext::kKeepSDForSnapshot);
824
0
          TEST_SYNC_POINT_CALLBACK(
825
0
              "CompactionIterator::NextFromInput:SingleDelete:3",
826
0
              const_cast<Compaction*>(c));
827
0
        }
828
0
      } else {
829
        // We are at the end of the input, could not parse the next key, or hit
830
        // a different key. The iterator returns the single delete if the key
831
        // possibly exists beyond the current output level.  We set
832
        // has_current_user_key to false so that if the iterator is at the next
833
        // key, we do not compare it again against the previous key at the next
834
        // iteration. If the next key is corrupt, we return before the
835
        // comparison, so the value of has_current_user_key does not matter.
836
0
        has_current_user_key_ = false;
837
0
        if (compaction_ != nullptr && !compaction_->allow_ingest_behind() &&
838
0
            DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
839
0
            compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
840
0
                                                       &level_ptrs_) &&
841
0
            is_timestamp_eligible_for_gc) {
842
          // Key doesn't exist outside of this range.
843
          // Can compact out this SingleDelete.
844
0
          ++iter_stats_.num_record_drop_obsolete;
845
0
          ++iter_stats_.num_single_del_fallthru;
846
0
          if (!bottommost_level_) {
847
0
            ++iter_stats_.num_optimized_del_drop_obsolete;
848
0
          }
849
0
        } else if (last_key_seq_zeroed_) {
850
          // Sequence number zeroing requires bottommost_level_, which is
851
          // false with ingest_behind.
852
0
          assert(!compaction_->allow_ingest_behind());
853
          // Skip.
854
0
          ++iter_stats_.num_record_drop_hidden;
855
0
          ++iter_stats_.num_record_drop_obsolete;
856
0
          assert(bottommost_level_);
857
0
        } else {
858
          // Output SingleDelete
859
0
          validity_info_.SetValid(ValidContext::kKeepSD);
860
0
        }
861
0
      }
862
863
0
      if (Valid()) {
864
0
        at_next_ = true;
865
0
      }
866
523k
    } else if (last_sequence != kMaxSequenceNumber &&
867
409k
               (last_snapshot == current_user_key_snapshot_ ||
868
409k
                last_snapshot < current_user_key_snapshot_)) {
869
      // rule (A):
870
      // If the earliest snapshot is which this key is visible in
871
      // is the same as the visibility of a previous instance of the
872
      // same key, then this kv is not visible in any snapshot.
873
      // Hidden by an newer entry for same user key
874
      //
875
      // Note: Dropping this key will not affect TransactionDB write-conflict
876
      // checking since there has already been a record returned for this key
877
      // in this snapshot.
878
      // When ingest_behind is enabled, it's ok that we drop an overwritten
879
      // Delete here. The overwritting key still covers whatever that will be
880
      // ingested. Note that we will not drop SingleDelete here as SingleDelte
881
      // is handled entirely in its own if clause. This is important, see
882
      // example: from new to old: SingleDelete_1, PUT_1, SingleDelete_2, PUT_2,
883
      // where all operations are on the same key and PUT_2 is ingested with
884
      // ingest_behind=true. If SingleDelete_2 is dropped due to being compacted
885
      // together with PUT_1, and then PUT_1 is compacted away together with
886
      // SingleDelete_1, PUT_2 can incorrectly becomes visible.
887
409k
      if (last_sequence < current_user_key_sequence_) {
888
0
        ROCKS_LOG_FATAL(info_log_,
889
0
                        "key %s, last_sequence (%" PRIu64
890
0
                        ") < current_user_key_sequence_ (%" PRIu64 ")",
891
0
                        ikey_.DebugString(allow_data_in_errors_, true).c_str(),
892
0
                        last_sequence, current_user_key_sequence_);
893
0
        assert(false);
894
0
      }
895
896
409k
      ++iter_stats_.num_record_drop_hidden;
897
409k
      AdvanceInputIter();
898
409k
    } else if (compaction_ != nullptr &&
899
4.08k
               (ikey_.type == kTypeDeletion ||
900
2.27k
                (ikey_.type == kTypeDeletionWithTimestamp &&
901
0
                 cmp_with_history_ts_low_ < 0)) &&
902
1.80k
               !compaction_->allow_ingest_behind() &&
903
1.80k
               DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
904
1.80k
               compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
905
1.80k
                                                          &level_ptrs_)) {
906
      // TODO(noetzli): This is the only place where we use compaction_
907
      // (besides the constructor). We should probably get rid of this
908
      // dependency and find a way to do similar filtering during flushes.
909
      //
910
      // For this user key:
911
      // (1) there is no data in higher levels
912
      // (2) data in lower levels will have larger sequence numbers
913
      // (3) data in layers that are being compacted here and have
914
      //     smaller sequence numbers will be dropped in the next
915
      //     few iterations of this loop (by rule (A) above).
916
      // Therefore this deletion marker is obsolete and can be dropped.
917
      //
918
      // Note:  Dropping this Delete will not affect TransactionDB
919
      // write-conflict checking since it is earlier than any snapshot.
920
      //
921
      // It seems that we can also drop deletion later than earliest snapshot
922
      // given that:
923
      // (1) The deletion is earlier than earliest_write_conflict_snapshot, and
924
      // (2) No value exist earlier than the deletion.
925
      //
926
      // Note also that a deletion marker of type kTypeDeletionWithTimestamp
927
      // will be treated as a different user key unless the timestamp is older
928
      // than *full_history_ts_low_.
929
1.80k
      ++iter_stats_.num_record_drop_obsolete;
930
1.80k
      if (!bottommost_level_) {
931
0
        ++iter_stats_.num_optimized_del_drop_obsolete;
932
0
      }
933
1.80k
      AdvanceInputIter();
934
111k
    } else if ((ikey_.type == kTypeDeletion ||
935
88.2k
                (ikey_.type == kTypeDeletionWithTimestamp &&
936
0
                 cmp_with_history_ts_low_ < 0)) &&
937
23.5k
               bottommost_level_) {
938
0
      assert(compaction_);
939
0
      assert(!compaction_->allow_ingest_behind());  // bottommost_level_ is true
940
      // Handle the case where we have a delete key at the bottom most level
941
      // We can skip outputting the key iff there are no subsequent puts for
942
      // this key
943
0
      assert(compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
944
0
                                                        &level_ptrs_));
945
0
      ParsedInternalKey next_ikey;
946
0
      AdvanceInputIter();
947
#ifndef NDEBUG
948
      const Compaction* c =
949
          compaction_ ? compaction_->real_compaction() : nullptr;
950
#endif
951
0
      TEST_SYNC_POINT_CALLBACK(
952
0
          "CompactionIterator::NextFromInput:BottommostDelete:1",
953
0
          const_cast<Compaction*>(c));
954
      // Skip over all versions of this key that happen to occur in the same
955
      // snapshot range as the delete.
956
      //
957
      // Note that a deletion marker of type kTypeDeletionWithTimestamp will be
958
      // considered to have a different user key unless the timestamp is older
959
      // than *full_history_ts_low_.
960
      //
961
      // Range tombstone start keys are skipped as they are not "real" keys.
962
0
      while (!IsPausingManualCompaction() && !IsShuttingDown() &&
963
0
             input_.Valid() &&
964
0
             (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
965
0
                  .ok()) &&
966
0
             cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) &&
967
0
             (prev_snapshot == 0 || input_.IsDeleteRangeSentinelKey() ||
968
0
              DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) {
969
0
        AdvanceInputIter();
970
0
      }
971
      // If you find you still need to output a row with this key, we need to
972
      // output the delete too
973
0
      if (input_.Valid() &&
974
0
          (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
975
0
               .ok()) &&
976
0
          cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
977
0
        validity_info_.SetValid(ValidContext::kKeepDel);
978
0
        at_next_ = true;
979
0
      }
980
111k
    } else if (ikey_.type == kTypeValuePreferredSeqno &&
981
0
               DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
982
0
               (bottommost_level_ ||
983
0
                (compaction_ != nullptr &&
984
0
                 compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
985
0
                                                            &level_ptrs_)))) {
986
      // FIXME: it's possible that we are setting sequence number to 0 as
987
      // preferred sequence number here. If cf_ingest_behind is enabled, this
988
      // may fail ingestions since they expect all keys above the last level
989
      // to have non-zero sequence number. We should probably not allow seqno
990
      // zeroing here.
991
      //
992
      // This section that attempts to swap preferred sequence number will not
993
      // be invoked if this is a CompactionIterator created for flush, since
994
      // `compaction_` will be nullptr and it's not bottommost either.
995
      //
996
      // The entries with the same user key and smaller sequence numbers are
997
      // all in this earliest snapshot range to be iterated. Since those entries
998
      // will be hidden by this entry [rule A], it's safe to swap in the
999
      // preferred seqno now.
1000
      //
1001
      // It's otherwise not safe to swap in the preferred seqno since it's
1002
      // possible for entries in earlier snapshots to have sequence number that
1003
      // is smaller than this entry's sequence number but bigger than this
1004
      // entry's preferred sequence number. Swapping in the preferred sequence
1005
      // number will break the internal key ordering invariant for this key.
1006
      //
1007
      // A special case involving range deletion is handled separately below.
1008
0
      auto [unpacked_value, preferred_seqno] =
1009
0
          ParsePackedValueWithSeqno(value_);
1010
0
      assert(preferred_seqno < ikey_.sequence || ikey_.sequence == 0);
1011
0
      if (range_del_agg_->ShouldDelete(
1012
0
              key_, RangeDelPositioningMode::kForwardTraversal)) {
1013
0
        ++iter_stats_.num_record_drop_hidden;
1014
0
        ++iter_stats_.num_record_drop_range_del;
1015
0
        AdvanceInputIter();
1016
0
      } else {
1017
0
        InternalKey ikey_after_swap(ikey_.user_key,
1018
0
                                    std::min(preferred_seqno, ikey_.sequence),
1019
0
                                    kTypeValue);
1020
0
        Slice ikey_after_swap_slice(*ikey_after_swap.rep());
1021
0
        if (range_del_agg_->ShouldDelete(
1022
0
                ikey_after_swap_slice,
1023
0
                RangeDelPositioningMode::kForwardTraversal)) {
1024
          // A range tombstone that doesn't cover this kTypeValuePreferredSeqno
1025
          // entry will end up covering the entry, so it's not safe to swap
1026
          // preferred sequence number. In this case, we output the entry as is.
1027
0
          validity_info_.SetValid(ValidContext::kNewUserKey);
1028
0
        } else {
1029
0
          if (ikey_.sequence != 0) {
1030
0
            iter_stats_.num_timed_put_swap_preferred_seqno++;
1031
0
            ikey_.sequence = preferred_seqno;
1032
0
          }
1033
0
          ikey_.type = kTypeValue;
1034
0
          current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
1035
0
          key_ = current_key_.GetInternalKey();
1036
0
          ikey_.user_key = current_key_.GetUserKey();
1037
0
          value_ = unpacked_value;
1038
0
          validity_info_.SetValid(ValidContext::kSwapPreferredSeqno);
1039
0
        }
1040
0
      }
1041
111k
    } else if (ikey_.type == kTypeMerge) {
1042
0
      if (!merge_helper_->HasOperator()) {
1043
0
        status_ = Status::InvalidArgument(
1044
0
            "merge_operator is not properly initialized.");
1045
0
        return;
1046
0
      }
1047
1048
0
      pinned_iters_mgr_.StartPinning();
1049
1050
      // We know the merge type entry is not hidden, otherwise we would
1051
      // have hit (A)
1052
      // We encapsulate the merge related state machine in a different
1053
      // object to minimize change to the existing flow.
1054
0
      merge_until_status_ = merge_helper_->MergeUntil(
1055
0
          &input_, range_del_agg_, prev_snapshot, bottommost_level_,
1056
0
          allow_data_in_errors_, blob_fetcher_.get(), full_history_ts_low_,
1057
0
          prefetch_buffers_.get(), &iter_stats_);
1058
0
      merge_out_iter_.SeekToFirst();
1059
1060
0
      if (!merge_until_status_.ok() &&
1061
0
          !merge_until_status_.IsMergeInProgress()) {
1062
0
        status_ = merge_until_status_;
1063
0
        return;
1064
0
      } else if (merge_out_iter_.Valid()) {
1065
        // NOTE: key, value, and ikey_ refer to old entries.
1066
        //       These will be correctly set below.
1067
0
        key_ = merge_out_iter_.key();
1068
0
        value_ = merge_out_iter_.value();
1069
0
        pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
1070
        // MergeUntil stops when it encounters a corrupt key and does not
1071
        // include them in the result, so we expect the keys here to valid.
1072
0
        if (!pik_status.ok()) {
1073
0
          ROCKS_LOG_FATAL(
1074
0
              info_log_, "Invalid key %s in compaction. %s",
1075
0
              allow_data_in_errors_ ? key_.ToString(true).c_str() : "hidden",
1076
0
              pik_status.getState());
1077
0
          assert(false);
1078
0
        }
1079
        // Keep current_key_ in sync.
1080
0
        current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
1081
0
        key_ = current_key_.GetInternalKey();
1082
0
        ikey_.user_key = current_key_.GetUserKey();
1083
0
        validity_info_.SetValid(ValidContext::kMerge2);
1084
0
      } else {
1085
        // all merge operands were filtered out. reset the user key, since the
1086
        // batch consumed by the merge operator should not shadow any keys
1087
        // coming after the merges
1088
0
        has_current_user_key_ = false;
1089
0
        pinned_iters_mgr_.ReleasePinnedData();
1090
1091
0
        if (merge_helper_->FilteredUntil(&skip_until)) {
1092
0
          need_skip = true;
1093
0
        }
1094
0
      }
1095
111k
    } else {
1096
      // 1. new user key -OR-
1097
      // 2. different snapshot stripe
1098
      // If user-defined timestamp is enabled, we consider keys for GC if they
1099
      // are below history_ts_low_. CompactionRangeDelAggregator::ShouldDelete()
1100
      // only considers range deletions that are at or below history_ts_low_ and
1101
      // trim_ts_. We drop keys here that are below history_ts_low_ and are
1102
      // covered by a range tombstone that is at or below history_ts_low_ and
1103
      // trim_ts.
1104
111k
      bool should_delete = false;
1105
111k
      if (!timestamp_size_ || cmp_with_history_ts_low_ < 0) {
1106
111k
        should_delete = range_del_agg_->ShouldDelete(
1107
111k
            key_, RangeDelPositioningMode::kForwardTraversal);
1108
111k
      }
1109
111k
      if (should_delete) {
1110
56.8k
        ++iter_stats_.num_record_drop_hidden;
1111
56.8k
        ++iter_stats_.num_record_drop_range_del;
1112
56.8k
        AdvanceInputIter();
1113
56.8k
      } else {
1114
55.0k
        validity_info_.SetValid(ValidContext::kNewUserKey);
1115
55.0k
      }
1116
111k
    }
1117
1118
523k
    if (need_skip) {
1119
0
      SkipUntil(skip_until);
1120
0
    }
1121
523k
  }
1122
1123
77.4k
  if (status_.ok()) {
1124
77.4k
    if (!Valid() && IsShuttingDown()) {
1125
1.32k
      status_ = Status::ShutdownInProgress();
1126
76.0k
    } else if (IsPausingManualCompaction()) {
1127
0
      status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
1128
76.0k
    } else if (!input_.Valid() && input_.status().IsCorruption()) {
1129
      // Propagate corruption status from memtable iterator
1130
0
      status_ = input_.status();
1131
0
    }
1132
77.4k
  }
1133
77.4k
}
1134
1135
41.9k
bool CompactionIterator::ExtractLargeValueIfNeededImpl() {
1136
41.9k
  if (!blob_file_builder_) {
1137
41.9k
    return false;
1138
41.9k
  }
1139
1140
0
  blob_index_.clear();
1141
0
  const Status s = blob_file_builder_->Add(user_key(), value_, &blob_index_);
1142
1143
0
  if (!s.ok()) {
1144
0
    status_ = s;
1145
0
    validity_info_.Invalidate();
1146
1147
0
    return false;
1148
0
  }
1149
1150
0
  if (blob_index_.empty()) {
1151
0
    return false;
1152
0
  }
1153
1154
0
  value_ = blob_index_;
1155
1156
0
  return true;
1157
0
}
1158
1159
41.9k
void CompactionIterator::ExtractLargeValueIfNeeded() {
1160
41.9k
  assert(ikey_.type == kTypeValue);
1161
1162
41.9k
  if (!ExtractLargeValueIfNeededImpl()) {
1163
41.9k
    return;
1164
41.9k
  }
1165
1166
0
  ikey_.type = kTypeBlobIndex;
1167
0
  current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
1168
0
}
1169
1170
0
void CompactionIterator::GarbageCollectBlobIfNeeded() {
1171
0
  assert(ikey_.type == kTypeBlobIndex);
1172
1173
0
  if (!compaction_) {
1174
0
    return;
1175
0
  }
1176
1177
  // GC for integrated BlobDB
1178
0
  if (compaction_->enable_blob_garbage_collection()) {
1179
0
    TEST_SYNC_POINT_CALLBACK(
1180
0
        "CompactionIterator::GarbageCollectBlobIfNeeded::TamperWithBlobIndex",
1181
0
        &value_);
1182
1183
0
    BlobIndex blob_index;
1184
1185
0
    {
1186
0
      const Status s = blob_index.DecodeFrom(value_);
1187
1188
0
      if (!s.ok()) {
1189
0
        status_ = s;
1190
0
        validity_info_.Invalidate();
1191
1192
0
        return;
1193
0
      }
1194
0
    }
1195
1196
0
    if (blob_index.file_number() >=
1197
0
        blob_garbage_collection_cutoff_file_number_) {
1198
0
      return;
1199
0
    }
1200
1201
0
    FilePrefetchBuffer* prefetch_buffer =
1202
0
        prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer(
1203
0
                                blob_index.file_number())
1204
0
                          : nullptr;
1205
1206
0
    uint64_t bytes_read = 0;
1207
1208
0
    {
1209
0
      assert(blob_fetcher_);
1210
1211
0
      const Status s = blob_fetcher_->FetchBlob(
1212
0
          user_key(), blob_index, prefetch_buffer, &blob_value_, &bytes_read);
1213
1214
0
      if (!s.ok()) {
1215
0
        status_ = s;
1216
0
        validity_info_.Invalidate();
1217
1218
0
        return;
1219
0
      }
1220
0
    }
1221
1222
0
    ++iter_stats_.num_blobs_read;
1223
0
    iter_stats_.total_blob_bytes_read += bytes_read;
1224
1225
0
    ++iter_stats_.num_blobs_relocated;
1226
0
    iter_stats_.total_blob_bytes_relocated += blob_index.size();
1227
1228
0
    value_ = blob_value_;
1229
1230
0
    if (ExtractLargeValueIfNeededImpl()) {
1231
0
      return;
1232
0
    }
1233
1234
0
    ikey_.type = kTypeValue;
1235
0
    current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
1236
1237
0
    return;
1238
0
  }
1239
1240
  // GC for stacked BlobDB
1241
0
  if (compaction_filter_ &&
1242
0
      compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
1243
0
    const auto blob_decision = compaction_filter_->PrepareBlobOutput(
1244
0
        user_key(), value_, &compaction_filter_value_);
1245
1246
0
    if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
1247
0
      status_ =
1248
0
          Status::Corruption("Corrupted blob reference encountered during GC");
1249
0
      validity_info_.Invalidate();
1250
1251
0
      return;
1252
0
    }
1253
1254
0
    if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
1255
0
      status_ = Status::IOError("Could not relocate blob during GC");
1256
0
      validity_info_.Invalidate();
1257
1258
0
      return;
1259
0
    }
1260
1261
0
    if (blob_decision == CompactionFilter::BlobDecision::kChangeValue) {
1262
0
      value_ = compaction_filter_value_;
1263
1264
0
      return;
1265
0
    }
1266
0
  }
1267
0
}
1268
1269
77.4k
void CompactionIterator::PrepareOutput() {
1270
77.4k
  if (Valid()) {
1271
55.0k
    if (LIKELY(!is_range_del_)) {
1272
55.0k
      if (ikey_.type == kTypeValue) {
1273
41.9k
        ExtractLargeValueIfNeeded();
1274
41.9k
      } else if (ikey_.type == kTypeBlobIndex) {
1275
0
        GarbageCollectBlobIfNeeded();
1276
0
      }
1277
55.0k
    }
1278
1279
    // Zeroing out the sequence number leads to better compression.
1280
    // If this is the bottommost level (no files in lower levels)
1281
    // and the earliest snapshot is larger than this seqno
1282
    // and the userkey differs from the last userkey in compaction
1283
    // then we can squash the seqno to zero.
1284
    //
1285
    // This is safe for TransactionDB write-conflict checking since transactions
1286
    // only care about sequence number larger than any active snapshots.
1287
    //
1288
    // Can we do the same for levels above bottom level as long as
1289
    // KeyNotExistsBeyondOutputLevel() return true?
1290
55.0k
    if (Valid() && bottommost_level_ &&
1291
2.27k
        DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
1292
2.27k
        ikey_.type != kTypeMerge && current_key_committed_ &&
1293
2.27k
        ikey_.sequence <= preserve_seqno_after_ && !is_range_del_) {
1294
2.27k
      assert(compaction_ != nullptr && !compaction_->allow_ingest_behind());
1295
2.27k
      if (ikey_.type == kTypeDeletion ||
1296
2.27k
          (ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) {
1297
0
        ROCKS_LOG_FATAL(
1298
0
            info_log_,
1299
0
            "Unexpected key %s for seq-zero optimization. "
1300
0
            "earliest_snapshot %" PRIu64
1301
0
            ", earliest_write_conflict_snapshot %" PRIu64
1302
0
            " job_snapshot %" PRIu64
1303
0
            ". timestamp_size: %d full_history_ts_low_ %s. validity %x",
1304
0
            ikey_.DebugString(allow_data_in_errors_, true).c_str(),
1305
0
            earliest_snapshot_, earliest_write_conflict_snapshot_,
1306
0
            job_snapshot_, static_cast<int>(timestamp_size_),
1307
0
            full_history_ts_low_ != nullptr
1308
0
                ? Slice(*full_history_ts_low_).ToString(true).c_str()
1309
0
                : "null",
1310
0
            validity_info_.rep);
1311
0
        assert(false);
1312
0
      }
1313
2.27k
      ikey_.sequence = 0;
1314
2.27k
      last_key_seq_zeroed_ = true;
1315
2.27k
      TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput:ZeroingSeq",
1316
2.27k
                               &ikey_);
1317
2.27k
      if (!timestamp_size_) {
1318
2.27k
        current_key_.UpdateInternalKey(0, ikey_.type);
1319
2.27k
      } else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) {
1320
        // We can also zero out timestamp for better compression.
1321
        // For the same user key (excluding timestamp), the timestamp-based
1322
        // history can be collapsed to save some space if the timestamp is
1323
        // older than *full_history_ts_low_.
1324
0
        const std::string kTsMin(timestamp_size_, static_cast<char>(0));
1325
0
        const Slice ts_slice = kTsMin;
1326
0
        ikey_.SetTimestamp(ts_slice);
1327
0
        current_key_.UpdateInternalKey(0, ikey_.type, &ts_slice);
1328
0
      }
1329
2.27k
    }
1330
55.0k
  }
1331
77.4k
}
1332
1333
inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
1334
13
    SequenceNumber in, SequenceNumber* prev_snapshot) {
1335
13
  assert(snapshots_->size());
1336
13
  if (snapshots_->size() == 0) {
1337
0
    ROCKS_LOG_FATAL(info_log_,
1338
0
                    "No snapshot left in findEarliestVisibleSnapshot");
1339
0
  }
1340
13
  auto snapshots_iter =
1341
13
      std::lower_bound(snapshots_->begin(), snapshots_->end(), in);
1342
13
  assert(prev_snapshot != nullptr);
1343
13
  if (snapshots_iter == snapshots_->begin()) {
1344
13
    *prev_snapshot = 0;
1345
13
  } else {
1346
0
    *prev_snapshot = *std::prev(snapshots_iter);
1347
0
    if (*prev_snapshot >= in) {
1348
0
      ROCKS_LOG_FATAL(info_log_,
1349
0
                      "*prev_snapshot (%" PRIu64 ") >= in (%" PRIu64
1350
0
                      ") in findEarliestVisibleSnapshot",
1351
0
                      *prev_snapshot, in);
1352
0
      assert(false);
1353
0
    }
1354
0
  }
1355
13
  if (snapshot_checker_ == nullptr) {
1356
13
    return snapshots_iter != snapshots_->end() ? *snapshots_iter
1357
13
                                               : kMaxSequenceNumber;
1358
13
  }
1359
0
  bool has_released_snapshot = !released_snapshots_.empty();
1360
0
  for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
1361
0
    auto cur = *snapshots_iter;
1362
0
    if (in > cur) {
1363
0
      ROCKS_LOG_FATAL(info_log_,
1364
0
                      "in (%" PRIu64 ") > cur (%" PRIu64
1365
0
                      ") in findEarliestVisibleSnapshot",
1366
0
                      in, cur);
1367
0
      assert(false);
1368
0
    }
1369
    // Skip if cur is in released_snapshots.
1370
0
    if (has_released_snapshot && released_snapshots_.count(cur) > 0) {
1371
0
      continue;
1372
0
    }
1373
0
    auto res = snapshot_checker_->CheckInSnapshot(in, cur);
1374
0
    if (res == SnapshotCheckerResult::kInSnapshot) {
1375
0
      return cur;
1376
0
    } else if (res == SnapshotCheckerResult::kSnapshotReleased) {
1377
0
      released_snapshots_.insert(cur);
1378
0
    }
1379
0
    *prev_snapshot = cur;
1380
0
  }
1381
0
  return kMaxSequenceNumber;
1382
0
}
1383
1384
uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber(
1385
22.3k
    const CompactionProxy* compaction) {
1386
22.3k
  if (!compaction) {
1387
18.7k
    return 0;
1388
18.7k
  }
1389
1390
3.68k
  if (!compaction->enable_blob_garbage_collection()) {
1391
3.68k
    return 0;
1392
3.68k
  }
1393
1394
0
  const Version* const version = compaction->input_version();
1395
0
  assert(version);
1396
1397
0
  const VersionStorageInfo* const storage_info = version->storage_info();
1398
0
  assert(storage_info);
1399
1400
0
  const auto& blob_files = storage_info->GetBlobFiles();
1401
1402
0
  const size_t cutoff_index = static_cast<size_t>(
1403
0
      compaction->blob_garbage_collection_age_cutoff() * blob_files.size());
1404
1405
0
  if (cutoff_index >= blob_files.size()) {
1406
0
    return std::numeric_limits<uint64_t>::max();
1407
0
  }
1408
1409
0
  const auto& meta = blob_files[cutoff_index];
1410
0
  assert(meta);
1411
1412
0
  return meta->GetBlobFileNumber();
1413
0
}
1414
1415
std::unique_ptr<BlobFetcher> CompactionIterator::CreateBlobFetcherIfNeeded(
1416
22.3k
    const CompactionProxy* compaction) {
1417
22.3k
  if (!compaction) {
1418
18.7k
    return nullptr;
1419
18.7k
  }
1420
1421
3.68k
  const Version* const version = compaction->input_version();
1422
3.68k
  if (!version) {
1423
0
    return nullptr;
1424
0
  }
1425
1426
3.68k
  ReadOptions read_options;
1427
3.68k
  read_options.io_activity = Env::IOActivity::kCompaction;
1428
3.68k
  read_options.fill_cache = false;
1429
1430
3.68k
  return std::unique_ptr<BlobFetcher>(new BlobFetcher(version, read_options));
1431
3.68k
}
1432
1433
std::unique_ptr<PrefetchBufferCollection>
1434
CompactionIterator::CreatePrefetchBufferCollectionIfNeeded(
1435
22.3k
    const CompactionProxy* compaction) {
1436
22.3k
  if (!compaction) {
1437
18.7k
    return nullptr;
1438
18.7k
  }
1439
1440
3.68k
  if (!compaction->input_version()) {
1441
0
    return nullptr;
1442
0
  }
1443
1444
3.68k
  if (compaction->allow_mmap_reads()) {
1445
0
    return nullptr;
1446
0
  }
1447
1448
3.68k
  const uint64_t readahead_size = compaction->blob_compaction_readahead_size();
1449
3.68k
  if (!readahead_size) {
1450
3.68k
    return nullptr;
1451
3.68k
  }
1452
1453
0
  return std::unique_ptr<PrefetchBufferCollection>(
1454
0
      new PrefetchBufferCollection(readahead_size));
1455
3.68k
}
1456
1457
}  // namespace ROCKSDB_NAMESPACE