Coverage Report

Created: 2025-07-23 07:17

/src/rocksdb/db/compaction/compaction_job.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
#pragma once
10
11
#include <atomic>
12
#include <deque>
13
#include <functional>
14
#include <limits>
15
#include <set>
16
#include <string>
17
#include <utility>
18
#include <vector>
19
20
#include "db/blob/blob_file_completion_callback.h"
21
#include "db/column_family.h"
22
#include "db/compaction/compaction_iterator.h"
23
#include "db/compaction/compaction_outputs.h"
24
#include "db/flush_scheduler.h"
25
#include "db/internal_stats.h"
26
#include "db/job_context.h"
27
#include "db/log_writer.h"
28
#include "db/memtable_list.h"
29
#include "db/range_del_aggregator.h"
30
#include "db/seqno_to_time_mapping.h"
31
#include "db/version_edit.h"
32
#include "db/write_controller.h"
33
#include "db/write_thread.h"
34
#include "logging/event_logger.h"
35
#include "options/cf_options.h"
36
#include "options/db_options.h"
37
#include "port/port.h"
38
#include "rocksdb/compaction_filter.h"
39
#include "rocksdb/compaction_job_stats.h"
40
#include "rocksdb/db.h"
41
#include "rocksdb/env.h"
42
#include "rocksdb/memtablerep.h"
43
#include "rocksdb/transaction_log.h"
44
#include "util/autovector.h"
45
#include "util/stop_watch.h"
46
#include "util/thread_local.h"
47
48
namespace ROCKSDB_NAMESPACE {
49
50
class Arena;
51
class CompactionState;
52
class ErrorHandler;
53
class MemTable;
54
class SnapshotChecker;
55
class SystemClock;
56
class TableCache;
57
class Version;
58
class VersionEdit;
59
class VersionSet;
60
61
class SubcompactionState;
62
63
// CompactionJob is responsible for executing the compaction. Each (manual or
64
// automated) compaction corresponds to a CompactionJob object, and usually
65
// goes through the stages of `Prepare()`->`Run()`->`Install()`. CompactionJob
66
// will divide the compaction into subcompactions and execute them in parallel
67
// if needed.
68
//
69
// CompactionJob has 2 main stats:
70
// 1. CompactionJobStats job_stats_
71
//    CompactionJobStats is a public data structure which is part of Compaction
72
//    event listener that rocksdb share the job stats with the user.
73
//    Internally it's an aggregation of all the compaction_job_stats from each
74
//    `SubcompactionState`:
75
//                                           +------------------------+
76
//                                           | SubcompactionState     |
77
//                                           |                        |
78
//                                +--------->|   compaction_job_stats |
79
//                                |          |                        |
80
//                                |          +------------------------+
81
// +------------------------+     |
82
// | CompactionJob          |     |          +------------------------+
83
// |                        |     |          | SubcompactionState     |
84
// |   job_stats            +-----+          |                        |
85
// |                        |     +--------->|   compaction_job_stats |
86
// |                        |     |          |                        |
87
// +------------------------+     |          +------------------------+
88
//                                |
89
//                                |          +------------------------+
90
//                                |          | SubcompactionState     |
91
//                                |          |                        |
92
//                                +--------->+   compaction_job_stats |
93
//                                |          |                        |
94
//                                |          +------------------------+
95
//                                |
96
//                                |          +------------------------+
97
//                                |          |       ...              |
98
//                                +--------->+                        |
99
//                                           +------------------------+
100
//
101
// 2. CompactionStatsFull internal_stats_
102
//    `CompactionStatsFull` is an internal stats about the compaction, which
103
//    is eventually sent to `ColumnFamilyData::internal_stats_` and used for
104
//    logging and public metrics.
105
//    Internally, it's an aggregation of stats_ from each `SubcompactionState`.
106
//    It has 2 parts, ordinary output level stats and the proximal level output
107
//    stats.
108
//                                                +---------------------------+
109
//                                                | SubcompactionState        |
110
//                                                |                           |
111
//                                                | +----------------------+  |
112
//                                                | | CompactionOutputs    |  |
113
//                                                | | (normal output)      |  |
114
//                                            +---->|   stats_             |  |
115
//                                            |   | +----------------------+  |
116
//                                            |   |                           |
117
//                                            |   | +----------------------+  |
118
// +--------------------------------+         |   | | CompactionOutputs    |  |
119
// | CompactionJob                  |         |   | | (proximal_level)     |  |
120
// |                                |    +--------->|   stats_             |  |
121
// |   internal_stats_              |    |    |   | +----------------------+  |
122
// |    +-------------------------+ |    |    |   |                           |
123
// |    |output_level_stats       |------|----+   +---------------------------+
124
// |    +-------------------------+ |    |    |
125
// |                                |    |    |
126
// |    +-------------------------+ |    |    |   +---------------------------+
127
// |    |proximal_level_stats     |------+    |   | SubcompactionState        |
128
// |    +-------------------------+ |    |    |   |                           |
129
// |                                |    |    |   | +----------------------+  |
130
// |                                |    |    |   | | CompactionOutputs    |  |
131
// +--------------------------------+    |    |   | | (normal output)      |  |
132
//                                       |    +---->|   stats_             |  |
133
//                                       |        | +----------------------+  |
134
//                                       |        |                           |
135
//                                       |        | +----------------------+  |
136
//                                       |        | | CompactionOutputs    |  |
137
//                                       |        | | (proximal_level)     |  |
138
//                                       +--------->|   stats_             |  |
139
//                                                | +----------------------+  |
140
//                                                |                           |
141
//                                                +---------------------------+
142
143
class CompactionJob {
144
 public:
145
  CompactionJob(int job_id, Compaction* compaction,
146
                const ImmutableDBOptions& db_options,
147
                const MutableDBOptions& mutable_db_options,
148
                const FileOptions& file_options, VersionSet* versions,
149
                const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
150
                FSDirectory* db_directory, FSDirectory* output_directory,
151
                FSDirectory* blob_output_directory, Statistics* stats,
152
                InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
153
                JobContext* job_context, std::shared_ptr<Cache> table_cache,
154
                EventLogger* event_logger, bool paranoid_file_checks,
155
                bool measure_io_stats, const std::string& dbname,
156
                CompactionJobStats* compaction_job_stats,
157
                Env::Priority thread_pri,
158
                const std::shared_ptr<IOTracer>& io_tracer,
159
                const std::atomic<bool>& manual_compaction_canceled,
160
                const std::string& db_id = "",
161
                const std::string& db_session_id = "",
162
                std::string full_history_ts_low = "", std::string trim_ts = "",
163
                BlobFileCompletionCallback* blob_callback = nullptr,
164
                int* bg_compaction_scheduled = nullptr,
165
                int* bg_bottom_compaction_scheduled = nullptr);
166
167
  virtual ~CompactionJob();
168
169
  // no copy/move
170
  CompactionJob(CompactionJob&& job) = delete;
171
  CompactionJob(const CompactionJob& job) = delete;
172
  CompactionJob& operator=(const CompactionJob& job) = delete;
173
174
  // REQUIRED: mutex held
175
  // Prepare for the compaction by setting up boundaries for each subcompaction
176
  // and organizing seqno <-> time info. `known_single_subcompact` is non-null
177
  // if we already have a known single subcompaction, with optional key bounds
178
  // (currently for executing a remote compaction).
179
  void Prepare(
180
      std::optional<std::pair<std::optional<Slice>, std::optional<Slice>>>
181
          known_single_subcompact);
182
183
  // REQUIRED mutex not held
184
  // Launch threads for each subcompaction and wait for them to finish. After
185
  // that, verify table is usable and finally do bookkeeping to unify
186
  // subcompaction results
187
  Status Run();
188
189
  // REQUIRED: mutex held
190
  // Add compaction input/output to the current version
191
  // Releases compaction file through Compaction::ReleaseCompactionFiles().
192
  // Sets *compaction_released to true if compaction is released.
193
  Status Install(bool* compaction_released);
194
195
  // Return the IO status
196
5.33k
  IOStatus io_status() const { return io_status_; }
197
198
 protected:
199
  void UpdateCompactionJobOutputStats(
200
      const InternalStats::CompactionStatsFull& internal_stats) const;
201
202
  void LogCompaction();
203
  virtual void RecordCompactionIOStats();
204
  void CleanupCompaction();
205
206
  // Iterate through input and compact the kv-pairs.
207
  void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
208
209
  CompactionState* compact_;
210
  InternalStats::CompactionStatsFull internal_stats_;
211
  const ImmutableDBOptions& db_options_;
212
  const MutableDBOptions mutable_db_options_copy_;
213
  LogBuffer* log_buffer_;
214
  FSDirectory* output_directory_;
215
  Statistics* stats_;
216
  // Is this compaction creating a file in the bottom most level?
217
  bool bottommost_level_;
218
219
  Env::WriteLifeTimeHint write_hint_;
220
221
  IOStatus io_status_;
222
223
  CompactionJobStats* job_stats_;
224
225
 private:
226
  friend class CompactionJobTestBase;
227
228
  // Collect the following stats from Input Table Properties
229
  // - num_input_files_in_non_output_levels
230
  // - num_input_files_in_output_level
231
  // - bytes_read_non_output_levels
232
  // - bytes_read_output_level
233
  // - num_input_records
234
  // - bytes_read_blob
235
  // - num_dropped_records
236
  // and set them in internal_stats_.output_level_stats
237
  //
238
  // @param num_input_range_del if non-null, will be set to the number of range
239
  // deletion entries in this compaction input.
240
  //
241
  // Returns true iff internal_stats_.output_level_stats.num_input_records and
242
  // num_input_range_del are calculated successfully.
243
  //
244
  // This should be called only once for compactions (not per subcompaction)
245
  bool BuildStatsFromInputTableProperties(
246
      uint64_t* num_input_range_del = nullptr);
247
248
  void UpdateCompactionJobInputStats(
249
      const InternalStats::CompactionStatsFull& internal_stats,
250
      uint64_t num_input_range_del) const;
251
252
  Status VerifyInputRecordCount(uint64_t num_input_range_del) const;
253
254
  // Generates a histogram representing potential divisions of key ranges from
255
  // the input. It adds the starting and/or ending keys of certain input files
256
  // to the working set and then finds the approximate size of data in between
257
  // each consecutive pair of slices. Then it divides these ranges into
258
  // consecutive groups such that each group has a similar size.
259
  void GenSubcompactionBoundaries();
260
261
  // Get the number of planned subcompactions based on max_subcompactions and
262
  // extra reserved resources
263
  uint64_t GetSubcompactionsLimit();
264
265
  // Additional reserved threads are reserved and the number is stored in
266
  // extra_num_subcompaction_threads_reserved__. For now, this happens only if
267
  // the compaction priority is round-robin and max_subcompactions is not
268
  // sufficient (extra resources may be needed)
269
  void AcquireSubcompactionResources(int num_extra_required_subcompactions);
270
271
  // Additional threads may be reserved during IncreaseSubcompactionResources()
272
  // if num_actual_subcompactions is less than num_planned_subcompactions.
273
  // Additional threads will be released and the bg_compaction_scheduled_ or
274
  // bg_bottom_compaction_scheduled_ will be updated if they are used.
275
  // DB Mutex lock is required.
276
  void ShrinkSubcompactionResources(uint64_t num_extra_resources);
277
278
  // Release all reserved threads and update the compaction limits.
279
  void ReleaseSubcompactionResources();
280
281
  CompactionServiceJobStatus ProcessKeyValueCompactionWithCompactionService(
282
      SubcompactionState* sub_compact);
283
284
  // update the thread status for starting a compaction.
285
  void ReportStartedCompaction(Compaction* compaction);
286
287
  Status FinishCompactionOutputFile(const Status& input_status,
288
                                    SubcompactionState* sub_compact,
289
                                    CompactionOutputs& outputs,
290
                                    const Slice& next_table_min_key,
291
                                    const Slice* comp_start_user_key,
292
                                    const Slice* comp_end_user_key);
293
  Status InstallCompactionResults(bool* compaction_released);
294
  Status OpenCompactionOutputFile(SubcompactionState* sub_compact,
295
                                  CompactionOutputs& outputs);
296
297
  void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats,
298
                         CompactionJobStats* compaction_job_stats = nullptr);
299
300
  void NotifyOnSubcompactionBegin(SubcompactionState* sub_compact);
301
302
  void NotifyOnSubcompactionCompleted(SubcompactionState* sub_compact);
303
304
  uint32_t job_id_;
305
306
  // DBImpl state
307
  const std::string& dbname_;
308
  const std::string db_id_;
309
  const std::string db_session_id_;
310
  const FileOptions file_options_;
311
312
  Env* env_;
313
  std::shared_ptr<IOTracer> io_tracer_;
314
  FileSystemPtr fs_;
315
  // env_option optimized for compaction table reads
316
  FileOptions file_options_for_read_;
317
  VersionSet* versions_;
318
  const std::atomic<bool>* shutting_down_;
319
  const std::atomic<bool>& manual_compaction_canceled_;
320
  FSDirectory* db_directory_;
321
  FSDirectory* blob_output_directory_;
322
  InstrumentedMutex* db_mutex_;
323
  ErrorHandler* db_error_handler_;
324
325
  SequenceNumber earliest_snapshot_;
326
  JobContext* job_context_;
327
328
  std::shared_ptr<Cache> table_cache_;
329
330
  EventLogger* event_logger_;
331
332
  bool paranoid_file_checks_;
333
  bool measure_io_stats_;
334
  // Stores the Slices that designate the boundaries for each subcompaction
335
  std::vector<std::string> boundaries_;
336
  Env::Priority thread_pri_;
337
  std::string full_history_ts_low_;
338
  std::string trim_ts_;
339
  BlobFileCompletionCallback* blob_callback_;
340
341
  uint64_t GetCompactionId(SubcompactionState* sub_compact) const;
342
  // Stores the number of reserved threads in shared env_ for the number of
343
  // extra subcompaction in kRoundRobin compaction priority
344
  int extra_num_subcompaction_threads_reserved_;
345
346
  // Stores the pointer to bg_compaction_scheduled_,
347
  // bg_bottom_compaction_scheduled_ in DBImpl. Mutex is required when accessing
348
  // or updating it.
349
  int* bg_compaction_scheduled_;
350
  int* bg_bottom_compaction_scheduled_;
351
352
  // Stores the sequence number to time mapping gathered from all input files
353
  // it also collects the smallest_seqno -> oldest_ancester_time from the SST.
354
  SeqnoToTimeMapping seqno_to_time_mapping_;
355
356
  // Max seqno that can be zeroed out in last level, including for preserving
357
  // write times.
358
  SequenceNumber preserve_seqno_after_ = kMaxSequenceNumber;
359
360
  // Minimal sequence number to preclude the data from the last level. If the
361
  // key has bigger (newer) sequence number than this, it will be precluded from
362
  // the last level (output to proximal level).
363
  SequenceNumber proximal_after_seqno_ = kMaxSequenceNumber;
364
365
  // Options File Number used for Remote Compaction
366
  // Setting this requires DBMutex.
367
  uint64_t options_file_number_ = 0;
368
369
  // Get table file name in where it's outputting to, which should also be in
370
  // `output_directory_`.
371
  virtual std::string GetTableFileName(uint64_t file_number);
372
  // The rate limiter priority (io_priority) is determined dynamically here.
373
  // The Compaction Read and Write priorities are the same for different
374
  // scenarios, such as write stalled.
375
  Env::IOPriority GetRateLimiterPriority();
376
};
377
378
// CompactionServiceInput is used the pass compaction information between two
379
// db instances. It contains the information needed to do a compaction. It
380
// doesn't contain the LSM tree information, which is passed though MANIFEST
381
// file.
382
struct CompactionServiceInput {
383
  std::string cf_name;
384
385
  std::vector<SequenceNumber> snapshots;
386
387
  // SST files for compaction, it should already be expended to include all the
388
  // files needed for this compaction, for both input level files and output
389
  // level files.
390
  std::vector<std::string> input_files;
391
  int output_level = 0;
392
393
  // db_id is used to generate unique id of sst on the remote compactor
394
  std::string db_id;
395
396
  // information for subcompaction
397
  bool has_begin = false;
398
  std::string begin;
399
  bool has_end = false;
400
  std::string end;
401
402
  uint64_t options_file_number = 0;
403
404
  // serialization interface to read and write the object
405
  static Status Read(const std::string& data_str, CompactionServiceInput* obj);
406
  Status Write(std::string* output);
407
408
#ifndef NDEBUG
409
  bool TEST_Equals(CompactionServiceInput* other);
410
  bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch);
411
#endif  // NDEBUG
412
};
413
414
// CompactionServiceOutputFile is the metadata for the output SST file
415
struct CompactionServiceOutputFile {
416
  std::string file_name;
417
  uint64_t file_size{};
418
  SequenceNumber smallest_seqno{};
419
  SequenceNumber largest_seqno{};
420
  std::string smallest_internal_key;
421
  std::string largest_internal_key;
422
  uint64_t oldest_ancester_time = kUnknownOldestAncesterTime;
423
  uint64_t file_creation_time = kUnknownFileCreationTime;
424
  uint64_t epoch_number = kUnknownEpochNumber;
425
  std::string file_checksum = kUnknownFileChecksum;
426
  std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
427
  uint64_t paranoid_hash{};
428
  bool marked_for_compaction;
429
  UniqueId64x2 unique_id{};
430
  TableProperties table_properties;
431
  bool is_proximal_level_output;
432
  Temperature file_temperature = Temperature::kUnknown;
433
434
0
  CompactionServiceOutputFile() = default;
435
  CompactionServiceOutputFile(
436
      const std::string& name, uint64_t size, SequenceNumber smallest,
437
      SequenceNumber largest, std::string _smallest_internal_key,
438
      std::string _largest_internal_key, uint64_t _oldest_ancester_time,
439
      uint64_t _file_creation_time, uint64_t _epoch_number,
440
      const std::string& _file_checksum,
441
      const std::string& _file_checksum_func_name, uint64_t _paranoid_hash,
442
      bool _marked_for_compaction, UniqueId64x2 _unique_id,
443
      const TableProperties& _table_properties, bool _is_proximal_level_output,
444
      Temperature _file_temperature)
445
0
      : file_name(name),
446
0
        file_size(size),
447
0
        smallest_seqno(smallest),
448
0
        largest_seqno(largest),
449
0
        smallest_internal_key(std::move(_smallest_internal_key)),
450
0
        largest_internal_key(std::move(_largest_internal_key)),
451
0
        oldest_ancester_time(_oldest_ancester_time),
452
0
        file_creation_time(_file_creation_time),
453
0
        epoch_number(_epoch_number),
454
0
        file_checksum(_file_checksum),
455
0
        file_checksum_func_name(_file_checksum_func_name),
456
0
        paranoid_hash(_paranoid_hash),
457
0
        marked_for_compaction(_marked_for_compaction),
458
0
        unique_id(std::move(_unique_id)),
459
0
        table_properties(_table_properties),
460
0
        is_proximal_level_output(_is_proximal_level_output),
461
0
        file_temperature(_file_temperature) {}
462
};
463
464
// CompactionServiceResult contains the compaction result from a different db
465
// instance, with these information, the primary db instance with write
466
// permission is able to install the result to the DB.
467
struct CompactionServiceResult {
468
  Status status;
469
  std::vector<CompactionServiceOutputFile> output_files;
470
  int output_level = 0;
471
472
  // location of the output files
473
  std::string output_path;
474
475
  uint64_t bytes_read = 0;
476
  uint64_t bytes_written = 0;
477
478
  // Job-level Compaction Stats.
479
  //
480
  // NOTE: Job level stats cannot be rebuilt from scratch by simply aggregating
481
  // per-level stats due to some fields populated directly during compaction
482
  // (e.g. RecordDroppedKeys()). This is why we need both job-level stats and
483
  // per-level in the serialized result. If rebuilding job-level stats from
484
  // per-level stats become possible in the future, consider deprecating this
485
  // field.
486
  CompactionJobStats stats;
487
488
  // Per-level Compaction Stats for both output_level_stats and
489
  // proximal_level_stats
490
  InternalStats::CompactionStatsFull internal_stats;
491
492
  // serialization interface to read and write the object
493
  static Status Read(const std::string& data_str, CompactionServiceResult* obj);
494
  Status Write(std::string* output);
495
496
#ifndef NDEBUG
497
  bool TEST_Equals(CompactionServiceResult* other);
498
  bool TEST_Equals(CompactionServiceResult* other, std::string* mismatch);
499
#endif  // NDEBUG
500
};
501
502
// CompactionServiceCompactionJob is an read-only compaction job, it takes
503
// input information from `compaction_service_input` and put result information
504
// in `compaction_service_result`, the SST files are generated to `output_path`.
505
class CompactionServiceCompactionJob : private CompactionJob {
506
 public:
507
  CompactionServiceCompactionJob(
508
      int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
509
      const MutableDBOptions& mutable_db_options,
510
      const FileOptions& file_options, VersionSet* versions,
511
      const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
512
      FSDirectory* output_directory, Statistics* stats,
513
      InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
514
      JobContext* job_context, std::shared_ptr<Cache> table_cache,
515
      EventLogger* event_logger, const std::string& dbname,
516
      const std::shared_ptr<IOTracer>& io_tracer,
517
      const std::atomic<bool>& manual_compaction_canceled,
518
      const std::string& db_id, const std::string& db_session_id,
519
      std::string output_path,
520
      const CompactionServiceInput& compaction_service_input,
521
      CompactionServiceResult* compaction_service_result);
522
523
  // REQUIRED: mutex held
524
  // Like CompactionJob::Prepare()
525
  void Prepare();
526
527
  // Run the compaction in current thread and return the result
528
  Status Run();
529
530
  void CleanupCompaction();
531
532
0
  IOStatus io_status() const { return CompactionJob::io_status(); }
533
534
 protected:
535
  void RecordCompactionIOStats() override;
536
537
 private:
538
  // Get table file name in output_path
539
  std::string GetTableFileName(uint64_t file_number) override;
540
  // Specific the compaction output path, otherwise it uses default DB path
541
  const std::string output_path_;
542
543
  // Compaction job input
544
  const CompactionServiceInput& compaction_input_;
545
546
  // Compaction job result
547
  CompactionServiceResult* compaction_result_;
548
};
549
550
}  // namespace ROCKSDB_NAMESPACE