Coverage Report

Created: 2025-10-26 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/flush_job.h
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
#pragma once
10
11
#include <atomic>
12
#include <deque>
13
#include <limits>
14
#include <list>
15
#include <set>
16
#include <string>
17
#include <utility>
18
#include <vector>
19
20
#include "db/blob/blob_file_completion_callback.h"
21
#include "db/column_family.h"
22
#include "db/flush_scheduler.h"
23
#include "db/internal_stats.h"
24
#include "db/job_context.h"
25
#include "db/log_writer.h"
26
#include "db/logs_with_prep_tracker.h"
27
#include "db/memtable_list.h"
28
#include "db/seqno_to_time_mapping.h"
29
#include "db/snapshot_impl.h"
30
#include "db/version_edit.h"
31
#include "db/write_controller.h"
32
#include "db/write_thread.h"
33
#include "logging/event_logger.h"
34
#include "monitoring/instrumented_mutex.h"
35
#include "options/db_options.h"
36
#include "port/port.h"
37
#include "rocksdb/db.h"
38
#include "rocksdb/env.h"
39
#include "rocksdb/listener.h"
40
#include "rocksdb/memtablerep.h"
41
#include "rocksdb/transaction_log.h"
42
#include "util/autovector.h"
43
#include "util/stop_watch.h"
44
#include "util/thread_local.h"
45
46
namespace ROCKSDB_NAMESPACE {
47
48
class DBImpl;
49
class MemTable;
50
class SnapshotChecker;
51
class TableCache;
52
class Version;
53
class VersionEdit;
54
class VersionSet;
55
class Arena;
56
57
class FlushJob {
58
 public:
59
  // TODO(icanadi) make effort to reduce number of parameters here
60
  // IMPORTANT: mutable_cf_options needs to be alive while FlushJob is alive
61
  FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
62
           const ImmutableDBOptions& db_options,
63
           const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id,
64
           const FileOptions& file_options, VersionSet* versions,
65
           InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
66
           JobContext* job_context, FlushReason flush_reason,
67
           LogBuffer* log_buffer, FSDirectory* db_directory,
68
           FSDirectory* output_file_directory,
69
           CompressionType output_compression, Statistics* stats,
70
           EventLogger* event_logger, bool measure_io_stats,
71
           const bool sync_output_directory, const bool write_manifest,
72
           Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
73
           std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping,
74
           const std::string& db_id = "", const std::string& db_session_id = "",
75
           std::string full_history_ts_low = "",
76
           BlobFileCompletionCallback* blob_callback = nullptr);
77
78
  ~FlushJob();
79
80
  // Require db_mutex held.
81
  // Once PickMemTable() is called, either Run() or Cancel() has to be called.
82
  void PickMemTable();
83
  // @param skip_since_bg_error If not nullptr and if atomic_flush=false,
84
  // then it is set to true if flush installation is skipped and memtable
85
  // is rolled back due to existing background error.
86
  Status Run(LogsWithPrepTracker* prep_tracker = nullptr,
87
             FileMetaData* file_meta = nullptr,
88
             bool* switched_to_mempurge = nullptr,
89
             bool* skipped_since_bg_error = nullptr,
90
             ErrorHandler* error_handler = nullptr);
91
  void Cancel();
92
0
  const autovector<ReadOnlyMemTable*>& GetMemTables() const { return mems_; }
93
94
1.74k
  std::list<std::unique_ptr<FlushJobInfo>>* GetCommittedFlushJobsInfo() {
95
1.74k
    return &committed_flush_jobs_info_;
96
1.74k
  }
97
98
 private:
99
  friend class FlushJobTest_GetRateLimiterPriorityForWrite_Test;
100
101
  void ReportStartedFlush();
102
  static void ReportFlushInputSize(const autovector<ReadOnlyMemTable*>& mems);
103
  void RecordFlushIOStats();
104
  Status WriteLevel0Table();
105
106
  // Memtable Garbage Collection algorithm: a MemPurge takes the list
107
  // of immutable memtables and filters out (or "purge") the outdated bytes
108
  // out of it. The output (the filtered bytes, or "useful payload") is
109
  // then transfered into a new memtable. If this memtable is filled, then
110
  // the mempurge is aborted and rerouted to a regular flush process. Else,
111
  // depending on the heuristics, placed onto the immutable memtable list.
112
  // The addition to the imm list will not trigger a flush operation. The
113
  // flush of the imm list will instead be triggered once the mutable memtable
114
  // is added to the imm list.
115
  // This process is typically intended for workloads with heavy overwrites
116
  // when we want to avoid SSD writes (and reads) as much as possible.
117
  // "MemPurge" is an experimental feature still at a very early stage
118
  // of development. At the moment it is only compatible with the Get, Put,
119
  // Delete operations as well as Iterators and CompactionFilters.
120
  // For this early version, "MemPurge" is called by setting the
121
  // options.experimental_mempurge_threshold value as >0.0. When this is
122
  // the case, ALL automatic flush operations (kWRiteBufferManagerFull) will
123
  // first go through the MemPurge process. Therefore, we strongly
124
  // recommend all users not to set this flag as true given that the MemPurge
125
  // process has not matured yet.
126
  Status MemPurge();
127
  bool MemPurgeDecider(double threshold);
128
  // The rate limiter priority (io_priority) is determined dynamically here.
129
  Env::IOPriority GetRateLimiterPriority();
130
  std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
131
132
  // Require db_mutex held.
133
  // Called only when UDT feature is enabled and
134
  // `persist_user_defined_timestamps` flag is false. Because we will refrain
135
  // from flushing as long as there are still UDTs in a memtable that hasn't
136
  // expired w.r.t `full_history_ts_low`. However, flush is continued if there
137
  // is risk of entering write stall mode. In that case, we need
138
  // to track the effective cutoff timestamp below which all the udts are
139
  // removed because of flush, and use it to increase `full_history_ts_low` if
140
  // the effective cutoff timestamp is newer. See
141
  // `MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT` for details.
142
  void GetEffectiveCutoffUDTForPickedMemTables();
143
144
  // If this column family enables tiering feature, it will find the current
145
  // `preclude_last_level_min_seqno_`, and the smaller one between this and
146
  // the `earliset_snapshot_` will later be announced to user property
147
  // collectors. It indicates to tiering use cases which data are old enough to
148
  // be placed on the last level.
149
  void GetPrecludeLastLevelMinSeqno();
150
151
  Status MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT();
152
153
  const std::string& dbname_;
154
  const std::string db_id_;
155
  const std::string db_session_id_;
156
  ColumnFamilyData* cfd_;
157
  const ImmutableDBOptions& db_options_;
158
  const MutableCFOptions& mutable_cf_options_;
159
  // A variable storing the largest memtable id to flush in this
160
  // flush job. RocksDB uses this variable to select the memtables to flush in
161
  // this job. All memtables in this column family with an ID smaller than or
162
  // equal to max_memtable_id_ will be selected for flush.
163
  uint64_t max_memtable_id_;
164
  FileOptions file_options_;
165
  VersionSet* versions_;
166
  InstrumentedMutex* db_mutex_;
167
  std::atomic<bool>* shutting_down_;
168
  SequenceNumber earliest_snapshot_;
169
  JobContext* job_context_;
170
  FlushReason flush_reason_;
171
  LogBuffer* log_buffer_;
172
  FSDirectory* db_directory_;
173
  FSDirectory* output_file_directory_;
174
  CompressionType output_compression_;
175
  Statistics* stats_;
176
  EventLogger* event_logger_;
177
  TableProperties table_properties_;
178
  bool measure_io_stats_;
179
  // True if this flush job should call fsync on the output directory. False
180
  // otherwise.
181
  // Usually sync_output_directory_ is true. A flush job needs to call sync on
182
  // the output directory before committing to the MANIFEST.
183
  // However, an individual flush job does not have to call sync on the output
184
  // directory if it is part of an atomic flush. After all flush jobs in the
185
  // atomic flush succeed, call sync once on each distinct output directory.
186
  const bool sync_output_directory_;
187
  // True if this flush job should write to MANIFEST after successfully
188
  // flushing memtables. False otherwise.
189
  // Usually write_manifest_ is true. A flush job commits to the MANIFEST after
190
  // flushing the memtables.
191
  // However, an individual flush job cannot rashly write to the MANIFEST
192
  // immediately after it finishes the flush if it is part of an atomic flush.
193
  // In this case, only after all flush jobs succeed in flush can RocksDB
194
  // commit to the MANIFEST.
195
  const bool write_manifest_;
196
  // The current flush job can commit flush result of a concurrent flush job.
197
  // We collect FlushJobInfo of all jobs committed by current job and fire
198
  // OnFlushCompleted for them.
199
  std::list<std::unique_ptr<FlushJobInfo>> committed_flush_jobs_info_;
200
201
  // Variables below are set by PickMemTable():
202
  FileMetaData meta_;
203
  // Memtables to be flushed by this job.
204
  // Ordered by increasing memtable id, i.e., oldest memtable first.
205
  autovector<ReadOnlyMemTable*> mems_;
206
  VersionEdit* edit_;
207
  Version* base_;
208
  bool pick_memtable_called;
209
  Env::Priority thread_pri_;
210
211
  const std::shared_ptr<IOTracer> io_tracer_;
212
  SystemClock* clock_;
213
214
  const std::string full_history_ts_low_;
215
  BlobFileCompletionCallback* blob_callback_;
216
217
  // Shared copy of DB's seqno to time mapping stored in SuperVersion. The
218
  // ownership is shared with this FlushJob when it's created.
219
  // FlushJob accesses and ref counts immutable MemTables directly via
220
  // `MemTableListVersion` instead of ref `SuperVersion`, so we need to give
221
  // the flush job shared ownership of the mapping.
222
  // Note this is only installed when seqno to time recording feature is
223
  // enables, so it could be nullptr.
224
  std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping_;
225
226
  // Keeps track of the newest user-defined timestamp for this flush job if
227
  // `persist_user_defined_timestamps` flag is false.
228
  std::string cutoff_udt_;
229
230
  // The current minimum seqno that compaction jobs will preclude the data from
231
  // the last level. Data with seqnos larger than this or larger than
232
  // `earliest_snapshot_` will be output to the proximal level had it gone
233
  // through a compaction to the last level.
234
  SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber;
235
};
236
237
}  // namespace ROCKSDB_NAMESPACE