Coverage Report

Created: 2024-09-08 07:17

/src/rocksdb/db/compaction/subcompaction_state.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//
3
//  This source code is licensed under both the GPLv2 (found in the
4
//  COPYING file in the root directory) and Apache 2.0 License
5
//  (found in the LICENSE.Apache file in the root directory).
6
//
7
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
8
// Use of this source code is governed by a BSD-style license that can be
9
// found in the LICENSE file. See the AUTHORS file for names of contributors.
10
11
#pragma once
12
13
#include <optional>
14
15
#include "db/blob/blob_file_addition.h"
16
#include "db/blob/blob_garbage_meter.h"
17
#include "db/compaction/compaction.h"
18
#include "db/compaction/compaction_iterator.h"
19
#include "db/compaction/compaction_outputs.h"
20
#include "db/internal_stats.h"
21
#include "db/output_validator.h"
22
#include "db/range_del_aggregator.h"
23
24
namespace ROCKSDB_NAMESPACE {
25
26
// Maintains state and outputs for each sub-compaction
27
// It contains 2 `CompactionOutputs`:
28
//  1. one for the normal output files
29
//  2. another for the penultimate level outputs
30
// a `current` pointer maintains the current output group, when calling
31
// `AddToOutput()`, it checks the output of the current compaction_iterator key
32
// and point `current` to the target output group. By default, it just points to
33
// normal compaction_outputs, if the compaction_iterator key should be placed on
34
// the penultimate level, `current` is changed to point to
35
// `penultimate_level_outputs`.
36
// The later operations uses `Current()` to get the target group.
37
//
38
// +----------+          +-----------------------------+      +---------+
39
// | *current |--------> | compaction_outputs          |----->| output  |
40
// +----------+          +-----------------------------+      +---------+
41
//       |                                                    | output  |
42
//       |                                                    +---------+
43
//       |                                                    |  ...    |
44
//       |
45
//       |               +-----------------------------+      +---------+
46
//       +-------------> | penultimate_level_outputs   |----->| output  |
47
//                       +-----------------------------+      +---------+
48
//                                                            |  ...    |
49
50
class SubcompactionState {
51
 public:
52
  const Compaction* compaction;
53
54
  // The boundaries of the key-range this compaction is interested in. No two
55
  // sub-compactions may have overlapping key-ranges.
56
  // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
57
  const std::optional<Slice> start, end;
58
59
  // The return status of this sub-compaction
60
  Status status;
61
62
  // The return IO Status of this sub-compaction
63
  IOStatus io_status;
64
65
  // Notify on sub-compaction completion only if listener was notified on
66
  // sub-compaction begin.
67
  bool notify_on_subcompaction_completion = false;
68
69
  // compaction job stats for this sub-compaction
70
  CompactionJobStats compaction_job_stats;
71
72
  // sub-compaction job id, which is used to identify different sub-compaction
73
  // within the same compaction job.
74
  const uint32_t sub_job_id;
75
76
  Slice SmallestUserKey() const;
77
78
  Slice LargestUserKey() const;
79
80
  // Get all outputs from the subcompaction. For per_key_placement compaction,
81
  // it returns both the last level outputs and penultimate level outputs.
82
  OutputIterator GetOutputs() const;
83
84
  // Assign range dels aggregator, for each range_del, it can only be assigned
85
  // to one output level, for per_key_placement, it's going to be the
86
  // penultimate level.
87
  // TODO: This does not work for per_key_placement + user-defined timestamp +
88
  //  DeleteRange() combo. If user-defined timestamp is enabled,
89
  //  it is possible for a range tombstone to belong to bottommost level (
90
  //  seqno < earliest snapshot) without being dropped (garbage collection
91
  //  for user-defined timestamp).
92
  void AssignRangeDelAggregator(
93
187
      std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
94
187
    if (compaction->SupportsPerKeyPlacement()) {
95
0
      penultimate_level_outputs_.AssignRangeDelAggregator(
96
0
          std::move(range_del_agg));
97
187
    } else {
98
187
      compaction_outputs_.AssignRangeDelAggregator(std::move(range_del_agg));
99
187
    }
100
187
  }
101
102
187
  void RemoveLastEmptyOutput() {
103
187
    compaction_outputs_.RemoveLastEmptyOutput();
104
187
    penultimate_level_outputs_.RemoveLastEmptyOutput();
105
187
  }
106
107
  void BuildSubcompactionJobInfo(
108
0
      SubcompactionJobInfo& subcompaction_job_info) const {
109
0
    const Compaction* c = compaction;
110
0
    const ColumnFamilyData* cfd = c->column_family_data();
111
112
0
    subcompaction_job_info.cf_id = cfd->GetID();
113
0
    subcompaction_job_info.cf_name = cfd->GetName();
114
0
    subcompaction_job_info.status = status;
115
0
    subcompaction_job_info.subcompaction_job_id = static_cast<int>(sub_job_id);
116
0
    subcompaction_job_info.base_input_level = c->start_level();
117
0
    subcompaction_job_info.output_level = c->output_level();
118
0
    subcompaction_job_info.stats = compaction_job_stats;
119
0
  }
120
121
  SubcompactionState() = delete;
122
  SubcompactionState(const SubcompactionState&) = delete;
123
  SubcompactionState& operator=(const SubcompactionState&) = delete;
124
125
  SubcompactionState(Compaction* c, const std::optional<Slice> _start,
126
                     const std::optional<Slice> _end, uint32_t _sub_job_id)
127
      : compaction(c),
128
        start(_start),
129
        end(_end),
130
        sub_job_id(_sub_job_id),
131
        compaction_outputs_(c, /*is_penultimate_level=*/false),
132
187
        penultimate_level_outputs_(c, /*is_penultimate_level=*/true) {
133
187
    assert(compaction != nullptr);
134
    // Set output split key (used for RoundRobin feature) only for normal
135
    // compaction_outputs, output to penultimate_level feature doesn't support
136
    // RoundRobin feature (and may never going to be supported, because for
137
    // RoundRobin, the data time is mostly naturally sorted, no need to have
138
    // per-key placement with output_to_penultimate_level).
139
187
    compaction_outputs_.SetOutputSlitKey(start, end);
140
187
  }
141
142
  SubcompactionState(SubcompactionState&& state) noexcept
143
      : compaction(state.compaction),
144
        start(state.start),
145
        end(state.end),
146
        status(std::move(state.status)),
147
        io_status(std::move(state.io_status)),
148
        notify_on_subcompaction_completion(
149
            state.notify_on_subcompaction_completion),
150
        compaction_job_stats(std::move(state.compaction_job_stats)),
151
        sub_job_id(state.sub_job_id),
152
        compaction_outputs_(std::move(state.compaction_outputs_)),
153
        penultimate_level_outputs_(std::move(state.penultimate_level_outputs_)),
154
        is_current_penultimate_level_(state.is_current_penultimate_level_),
155
0
        has_penultimate_level_outputs_(state.has_penultimate_level_outputs_) {
156
0
    current_outputs_ = is_current_penultimate_level_
157
0
                           ? &penultimate_level_outputs_
158
0
                           : &compaction_outputs_;
159
0
  }
160
161
187
  bool HasPenultimateLevelOutputs() const {
162
187
    return has_penultimate_level_outputs_ ||
163
187
           penultimate_level_outputs_.HasRangeDel();
164
187
  }
165
166
0
  bool IsCurrentPenultimateLevel() const {
167
0
    return is_current_penultimate_level_;
168
0
  }
169
170
  // Add all the new files from this compaction to version_edit
171
140
  void AddOutputsEdit(VersionEdit* out_edit) const {
172
140
    for (const auto& file : penultimate_level_outputs_.outputs_) {
173
0
      out_edit->AddFile(compaction->GetPenultimateLevel(), file.meta);
174
0
    }
175
140
    for (const auto& file : compaction_outputs_.outputs_) {
176
107
      out_edit->AddFile(compaction->output_level(), file.meta);
177
107
    }
178
140
  }
179
180
  void Cleanup(Cache* cache);
181
182
  void AggregateCompactionStats(
183
      InternalStats::CompactionStatsFull& compaction_stats) const;
184
185
558
  CompactionOutputs& Current() const {
186
558
    assert(current_outputs_);
187
558
    return *current_outputs_;
188
558
  }
189
190
  // Add compaction_iterator key/value to the `Current` output group.
191
  Status AddToOutput(const CompactionIterator& iter,
192
                     const CompactionFileOpenFunc& open_file_func,
193
                     const CompactionFileCloseFunc& close_file_func);
194
195
  // Close all compaction output files, both output_to_penultimate_level outputs
196
  // and normal outputs.
197
  Status CloseCompactionFiles(const Status& curr_status,
198
                              const CompactionFileOpenFunc& open_file_func,
199
187
                              const CompactionFileCloseFunc& close_file_func) {
200
    // Call FinishCompactionOutputFile() even if status is not ok: it needs to
201
    // close the output file.
202
    // CloseOutput() may open new compaction output files.
203
187
    is_current_penultimate_level_ = true;
204
187
    Status s = penultimate_level_outputs_.CloseOutput(
205
187
        curr_status, open_file_func, close_file_func);
206
187
    is_current_penultimate_level_ = false;
207
187
    s = compaction_outputs_.CloseOutput(s, open_file_func, close_file_func);
208
187
    return s;
209
187
  }
210
211
 private:
212
  // State kept for output being generated
213
  CompactionOutputs compaction_outputs_;
214
  CompactionOutputs penultimate_level_outputs_;
215
  CompactionOutputs* current_outputs_ = &compaction_outputs_;
216
  bool is_current_penultimate_level_ = false;
217
  bool has_penultimate_level_outputs_ = false;
218
};
219
220
}  // namespace ROCKSDB_NAMESPACE