Coverage Report

Created: 2024-09-08 07:17

/src/rocksdb/db/merge_helper.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
#pragma once
7
8
#include <deque>
9
#include <string>
10
#include <vector>
11
12
#include "db/merge_context.h"
13
#include "db/range_del_aggregator.h"
14
#include "db/snapshot_checker.h"
15
#include "db/wide/wide_column_serialization.h"
16
#include "rocksdb/compaction_filter.h"
17
#include "rocksdb/env.h"
18
#include "rocksdb/merge_operator.h"
19
#include "rocksdb/slice.h"
20
#include "rocksdb/wide_columns.h"
21
#include "util/stop_watch.h"
22
23
namespace ROCKSDB_NAMESPACE {
24
25
class Comparator;
26
class Iterator;
27
class Logger;
28
class MergeOperator;
29
class Statistics;
30
class SystemClock;
31
class BlobFetcher;
32
class PrefetchBufferCollection;
33
struct CompactionIterationStats;
34
35
class MergeHelper {
36
 public:
37
  MergeHelper(Env* env, const Comparator* user_comparator,
38
              const MergeOperator* user_merge_operator,
39
              const CompactionFilter* compaction_filter, Logger* logger,
40
              bool assert_valid_internal_key, SequenceNumber latest_snapshot,
41
              const SnapshotChecker* snapshot_checker = nullptr, int level = 0,
42
              Statistics* stats = nullptr,
43
              const std::atomic<bool>* shutting_down = nullptr);
44
45
  // Wrappers around MergeOperator::FullMergeV3() that record perf statistics.
46
  // Set `update_num_ops_stats` to true if it is from a user read so that
47
  // the corresponding statistics are updated.
48
  // Returns one of the following statuses:
49
  // - OK: Entries were successfully merged.
50
  // - Corruption: Merge operator reported unsuccessful merge. The scope of the
51
  //   damage will be stored in `*op_failure_scope` when `op_failure_scope` is
52
  //   not nullptr
53
54
  // Empty tag types to disambiguate overloads
55
  struct NoBaseValueTag {};
56
  static constexpr NoBaseValueTag kNoBaseValue{};
57
58
  struct PlainBaseValueTag {};
59
  static constexpr PlainBaseValueTag kPlainBaseValue{};
60
61
  struct WideBaseValueTag {};
62
  static constexpr WideBaseValueTag kWideBaseValue{};
63
64
  template <typename... ResultTs>
65
  static Status TimedFullMerge(const MergeOperator* merge_operator,
66
                               const Slice& key, NoBaseValueTag,
67
                               const std::vector<Slice>& operands,
68
                               Logger* logger, Statistics* statistics,
69
                               SystemClock* clock, bool update_num_ops_stats,
70
                               MergeOperator::OpFailureScope* op_failure_scope,
71
0
                               ResultTs... results) {
72
0
    MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
73
74
0
    return TimedFullMergeImpl(
75
0
        merge_operator, key, std::move(existing_value), operands, logger,
76
0
        statistics, clock, update_num_ops_stats, op_failure_scope, results...);
77
0
  }
Unexecuted instantiation: rocksdb::Status rocksdb::MergeHelper::TimedFullMerge<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::Slice*, rocksdb::ValueType*>(rocksdb::MergeOperator const*, rocksdb::Slice const&, rocksdb::MergeHelper::NoBaseValueTag, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::Slice*, rocksdb::ValueType*)
Unexecuted instantiation: rocksdb::Status rocksdb::MergeHelper::TimedFullMerge<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::PinnableWideColumns*>(rocksdb::MergeOperator const*, rocksdb::Slice const&, rocksdb::MergeHelper::NoBaseValueTag, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::PinnableWideColumns*)
Unexecuted instantiation: rocksdb::Status rocksdb::MergeHelper::TimedFullMerge<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, decltype(nullptr), rocksdb::ValueType*>(rocksdb::MergeOperator const*, rocksdb::Slice const&, rocksdb::MergeHelper::NoBaseValueTag, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, decltype(nullptr), rocksdb::ValueType*)
78
79
  template <typename... ResultTs>
80
  static Status TimedFullMerge(
81
      const MergeOperator* merge_operator, const Slice& key, PlainBaseValueTag,
82
      const Slice& value, const std::vector<Slice>& operands, Logger* logger,
83
      Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
84
0
      MergeOperator::OpFailureScope* op_failure_scope, ResultTs... results) {
85
0
    MergeOperator::MergeOperationInputV3::ExistingValue existing_value(value);
86
87
0
    return TimedFullMergeImpl(
88
0
        merge_operator, key, std::move(existing_value), operands, logger,
89
0
        statistics, clock, update_num_ops_stats, op_failure_scope, results...);
90
0
  }
Unexecuted instantiation: rocksdb::Status rocksdb::MergeHelper::TimedFullMerge<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::Slice*, rocksdb::ValueType*>(rocksdb::MergeOperator const*, rocksdb::Slice const&, rocksdb::MergeHelper::PlainBaseValueTag, rocksdb::Slice const&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::Slice*, rocksdb::ValueType*)
Unexecuted instantiation: rocksdb::Status rocksdb::MergeHelper::TimedFullMerge<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::PinnableWideColumns*>(rocksdb::MergeOperator const*, rocksdb::Slice const&, rocksdb::MergeHelper::PlainBaseValueTag, rocksdb::Slice const&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::PinnableWideColumns*)
Unexecuted instantiation: rocksdb::Status rocksdb::MergeHelper::TimedFullMerge<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, decltype(nullptr), rocksdb::ValueType*>(rocksdb::MergeOperator const*, rocksdb::Slice const&, rocksdb::MergeHelper::PlainBaseValueTag, rocksdb::Slice const&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, decltype(nullptr), rocksdb::ValueType*)
91
92
  template <typename... ResultTs>
93
  static Status TimedFullMerge(
94
      const MergeOperator* merge_operator, const Slice& key, WideBaseValueTag,
95
      const Slice& entity, const std::vector<Slice>& operands, Logger* logger,
96
      Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
97
0
      MergeOperator::OpFailureScope* op_failure_scope, ResultTs... results) {
98
0
    MergeOperator::MergeOperationInputV3::ExistingValue existing_value;
99
100
0
    Slice entity_copy(entity);
101
0
    WideColumns existing_columns;
102
103
0
    const Status s =
104
0
        WideColumnSerialization::Deserialize(entity_copy, existing_columns);
105
0
    if (!s.ok()) {
106
0
      return s;
107
0
    }
108
109
0
    existing_value = std::move(existing_columns);
110
111
0
    return TimedFullMergeImpl(
112
0
        merge_operator, key, std::move(existing_value), operands, logger,
113
0
        statistics, clock, update_num_ops_stats, op_failure_scope, results...);
114
0
  }
Unexecuted instantiation: rocksdb::Status rocksdb::MergeHelper::TimedFullMerge<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::Slice*, rocksdb::ValueType*>(rocksdb::MergeOperator const*, rocksdb::Slice const&, rocksdb::MergeHelper::WideBaseValueTag, rocksdb::Slice const&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::Slice*, rocksdb::ValueType*)
Unexecuted instantiation: rocksdb::Status rocksdb::MergeHelper::TimedFullMerge<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::PinnableWideColumns*>(rocksdb::MergeOperator const*, rocksdb::Slice const&, rocksdb::MergeHelper::WideBaseValueTag, rocksdb::Slice const&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::PinnableWideColumns*)
Unexecuted instantiation: rocksdb::Status rocksdb::MergeHelper::TimedFullMerge<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, decltype(nullptr), rocksdb::ValueType*>(rocksdb::MergeOperator const*, rocksdb::Slice const&, rocksdb::MergeHelper::WideBaseValueTag, rocksdb::Slice const&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, decltype(nullptr), rocksdb::ValueType*)
115
116
  template <typename... ResultTs>
117
  static Status TimedFullMerge(const MergeOperator* merge_operator,
118
                               const Slice& key, WideBaseValueTag,
119
                               const WideColumns& columns,
120
                               const std::vector<Slice>& operands,
121
                               Logger* logger, Statistics* statistics,
122
                               SystemClock* clock, bool update_num_ops_stats,
123
                               MergeOperator::OpFailureScope* op_failure_scope,
124
0
                               ResultTs... results) {
125
0
    MergeOperator::MergeOperationInputV3::ExistingValue existing_value(columns);
126
127
0
    return TimedFullMergeImpl(
128
0
        merge_operator, key, std::move(existing_value), operands, logger,
129
0
        statistics, clock, update_num_ops_stats, op_failure_scope, results...);
130
0
  }
Unexecuted instantiation: rocksdb::Status rocksdb::MergeHelper::TimedFullMerge<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, decltype(nullptr), rocksdb::ValueType*>(rocksdb::MergeOperator const*, rocksdb::Slice const&, rocksdb::MergeHelper::WideBaseValueTag, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > const&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, decltype(nullptr), rocksdb::ValueType*)
Unexecuted instantiation: rocksdb::Status rocksdb::MergeHelper::TimedFullMerge<std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::PinnableWideColumns*>(rocksdb::MergeOperator const*, rocksdb::Slice const&, rocksdb::MergeHelper::WideBaseValueTag, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > const&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::PinnableWideColumns*)
131
132
  // During compaction, merge entries until we hit
133
  //     - a corrupted key
134
  //     - a Put/Delete,
135
  //     - a different user key,
136
  //     - a specific sequence number (snapshot boundary),
137
  //     - REMOVE_AND_SKIP_UNTIL returned from compaction filter,
138
  //  or - the end of iteration
139
  //
140
  // The result(s) of the merge can be accessed in `MergeHelper::keys()` and
141
  // `MergeHelper::values()`, which are invalidated the next time `MergeUntil()`
142
  // is called. `MergeOutputIterator` is specially designed to iterate the
143
  // results of a `MergeHelper`'s most recent `MergeUntil()`.
144
  //
145
  // iter: (IN)  points to the first merge type entry
146
  //       (OUT) points to the first entry not included in the merge process
147
  // range_del_agg: (IN) filters merge operands covered by range tombstones.
148
  // stop_before: (IN) a sequence number that merge should not cross.
149
  //                   0 means no restriction
150
  // at_bottom:   (IN) true if the iterator covers the bottem level, which means
151
  //                   we could reach the start of the history of this user key.
152
  // allow_data_in_errors: (IN) if true, data details will be displayed in
153
  //                   error/log messages.
154
  // blob_fetcher: (IN) blob fetcher object for the compaction's input version.
155
  // prefetch_buffers: (IN/OUT) a collection of blob file prefetch buffers
156
  //                            used for compaction readahead.
157
  // c_iter_stats: (OUT) compaction iteration statistics.
158
  //
159
  // Returns one of the following statuses:
160
  // - OK: Entries were successfully merged.
161
  // - MergeInProgress: Output consists of merge operands only.
162
  // - Corruption: Merge operator reported unsuccessful merge or a corrupted
163
  //   key has been encountered and not expected (applies only when compiling
164
  //   with asserts removed).
165
  // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true).
166
  //
167
  // REQUIRED: The first key in the input is not corrupted.
168
  Status MergeUntil(InternalIterator* iter,
169
                    CompactionRangeDelAggregator* range_del_agg,
170
                    const SequenceNumber stop_before, const bool at_bottom,
171
                    const bool allow_data_in_errors,
172
                    const BlobFetcher* blob_fetcher,
173
                    const std::string* const full_history_ts_low,
174
                    PrefetchBufferCollection* prefetch_buffers,
175
                    CompactionIterationStats* c_iter_stats);
176
177
  // Filters a merge operand using the compaction filter specified
178
  // in the constructor. Returns the decision that the filter made.
179
  // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the
180
  // optional outputs of compaction filter.
181
  // user_key includes timestamp if user-defined timestamp is enabled.
182
  CompactionFilter::Decision FilterMerge(const Slice& user_key,
183
                                         const Slice& value_slice);
184
185
  // Query the merge result
186
  // These are valid until the next MergeUntil call
187
  // If the merging was successful:
188
  //   - keys() contains a single element with the latest sequence number of
189
  //     the merges. The type will be Put or Merge. See IMPORTANT 1 note, below.
190
  //   - values() contains a single element with the result of merging all the
191
  //     operands together
192
  //
193
  //   IMPORTANT 1: the key type could change after the MergeUntil call.
194
  //        Put/Delete + Merge + ... + Merge => Put
195
  //        Merge + ... + Merge => Merge
196
  //
197
  // If the merge operator is not associative, and if a Put/Delete is not found
198
  // then the merging will be unsuccessful. In this case:
199
  //   - keys() contains the list of internal keys seen in order of iteration.
200
  //   - values() contains the list of values (merges) seen in the same order.
201
  //              values() is parallel to keys() so that the first entry in
202
  //              keys() is the key associated with the first entry in values()
203
  //              and so on. These lists will be the same length.
204
  //              All of these pairs will be merges over the same user key.
205
  //              See IMPORTANT 2 note below.
206
  //
207
  //   IMPORTANT 2: The entries were traversed in order from BACK to FRONT.
208
  //                So keys().back() was the first key seen by iterator.
209
  // TODO: Re-style this comment to be like the first one
210
34.6k
  const std::deque<std::string>& keys() const { return keys_; }
211
5.50k
  const std::vector<Slice>& values() const {
212
5.50k
    return merge_context_.GetOperands();
213
5.50k
  }
214
0
  uint64_t TotalFilterTime() const { return total_filter_time_; }
215
0
  bool HasOperator() const { return user_merge_operator_ != nullptr; }
216
217
  // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will
218
  // return true and fill *until with the key to which we should skip.
219
  // If true, keys() and values() are empty.
220
0
  bool FilteredUntil(Slice* skip_until) const {
221
0
    if (!has_compaction_filter_skip_until_) {
222
0
      return false;
223
0
    }
224
0
    assert(compaction_filter_ != nullptr);
225
0
    assert(skip_until != nullptr);
226
0
    assert(compaction_filter_skip_until_.Valid());
227
0
    *skip_until = compaction_filter_skip_until_.Encode();
228
0
    return true;
229
0
  }
230
231
 private:
232
  Env* env_;
233
  SystemClock* clock_;
234
  const Comparator* user_comparator_;
235
  const MergeOperator* user_merge_operator_;
236
  const CompactionFilter* compaction_filter_;
237
  const std::atomic<bool>* shutting_down_;
238
  Logger* logger_;
239
  bool assert_valid_internal_key_;  // enforce no internal key corruption?
240
  bool allow_single_operand_;
241
  SequenceNumber latest_snapshot_;
242
  const SnapshotChecker* const snapshot_checker_;
243
  int level_;
244
245
  // the scratch area that holds the result of MergeUntil
246
  // valid up to the next MergeUntil call
247
248
  // Keeps track of the sequence of keys seen
249
  std::deque<std::string> keys_;
250
  // Parallel with keys_; stores the operands
251
  mutable MergeContext merge_context_;
252
253
  StopWatchNano filter_timer_;
254
  uint64_t total_filter_time_;
255
  Statistics* stats_;
256
257
  bool has_compaction_filter_skip_until_ = false;
258
  std::string compaction_filter_value_;
259
  InternalKey compaction_filter_skip_until_;
260
261
0
  bool IsShuttingDown() {
262
    // This is a best-effort facility, so memory_order_relaxed is sufficient.
263
0
    return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
264
0
  }
265
266
  template <typename Visitor>
267
  static Status TimedFullMergeCommonImpl(
268
      const MergeOperator* merge_operator, const Slice& key,
269
      MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
270
      const std::vector<Slice>& operands, Logger* logger,
271
      Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
272
      MergeOperator::OpFailureScope* op_failure_scope, Visitor&& visitor);
273
274
  // Variant that exposes the merge result directly (in serialized form for wide
275
  // columns) as well as its value type. Used by iterator and compaction.
276
  static Status TimedFullMergeImpl(
277
      const MergeOperator* merge_operator, const Slice& key,
278
      MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
279
      const std::vector<Slice>& operands, Logger* logger,
280
      Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
281
      MergeOperator::OpFailureScope* op_failure_scope, std::string* result,
282
      Slice* result_operand, ValueType* result_type);
283
284
  // Variant that exposes the merge result translated into the form requested by
285
  // the client. (For example, if the result is a wide-column structure but the
286
  // client requested the results in plain-value form, the value of the default
287
  // column is returned.) Used by point lookups.
288
  static Status TimedFullMergeImpl(
289
      const MergeOperator* merge_operator, const Slice& key,
290
      MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
291
      const std::vector<Slice>& operands, Logger* logger,
292
      Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
293
      MergeOperator::OpFailureScope* op_failure_scope,
294
      std::string* result_value, PinnableWideColumns* result_entity);
295
};
296
297
// MergeOutputIterator can be used to iterate over the result of a merge.
298
class MergeOutputIterator {
299
 public:
300
  // The MergeOutputIterator is bound to a MergeHelper instance.
301
  explicit MergeOutputIterator(const MergeHelper* merge_helper);
302
303
  // Seeks to the first record in the output.
304
  void SeekToFirst();
305
  // Advances to the next record in the output.
306
  void Next();
307
308
0
  Slice key() { return Slice(*it_keys_); }
309
0
  Slice value() { return Slice(*it_values_); }
310
29.1k
  bool Valid() { return it_keys_ != merge_helper_->keys().rend(); }
311
312
 private:
313
  const MergeHelper* merge_helper_;
314
  std::deque<std::string>::const_reverse_iterator it_keys_;
315
  std::vector<Slice>::const_reverse_iterator it_values_;
316
};
317
318
}  // namespace ROCKSDB_NAMESPACE