Coverage Report

Created: 2026-05-16 07:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/job_context.h
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#pragma once
11
12
#include <limits>
13
#include <string>
14
#include <vector>
15
16
#include "db/column_family.h"
17
#include "db/log_writer.h"
18
#include "db/version_set.h"
19
#include "util/autovector.h"
20
#include "util/hash_containers.h"
21
22
namespace ROCKSDB_NAMESPACE {
23
24
class MemTable;
25
struct SuperVersion;
26
27
// The purpose of this struct is to simplify pushing work such as
28
// allocation/construction, de-allocation/destruction, and notifications to
29
// outside of holding the DB mutex.
30
struct SuperVersionContext {
31
  struct WriteStallNotification {
32
    WriteStallInfo write_stall_info;
33
    const ImmutableOptions* immutable_options;
34
  };
35
36
  autovector<SuperVersion*> superversions_to_free;
37
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
38
  autovector<WriteStallNotification> write_stall_notifications;
39
#endif
40
  std::unique_ptr<SuperVersion>
41
      new_superversion;  // if nullptr no new superversion
42
43
  explicit SuperVersionContext(bool create_superversion = false)
44
842k
      : new_superversion(create_superversion ? new SuperVersion() : nullptr) {}
45
46
  explicit SuperVersionContext(SuperVersionContext&& other) noexcept
47
245k
      : superversions_to_free(std::move(other.superversions_to_free)),
48
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
49
245k
        write_stall_notifications(std::move(other.write_stall_notifications)),
50
#endif
51
245k
        new_superversion(std::move(other.new_superversion)) {
52
245k
  }
53
  // No copies
54
  SuperVersionContext(const SuperVersionContext& other) = delete;
55
  void operator=(const SuperVersionContext& other) = delete;
56
57
2.09k
  void NewSuperVersion() {
58
2.09k
    new_superversion = std::unique_ptr<SuperVersion>(new SuperVersion());
59
2.09k
  }
60
61
10.0k
  inline bool HaveSomethingToDelete() const {
62
10.0k
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
63
10.0k
    return !superversions_to_free.empty() || !write_stall_notifications.empty();
64
#else
65
    return !superversions_to_free.empty();
66
#endif
67
10.0k
  }
68
69
  void PushWriteStallNotification(WriteStallCondition old_cond,
70
                                  WriteStallCondition new_cond,
71
                                  const std::string& name,
72
0
                                  const ImmutableOptions* ioptions) {
73
0
#if !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
74
0
    WriteStallNotification notif;
75
0
    notif.write_stall_info.cf_name = name;
76
0
    notif.write_stall_info.condition.prev = old_cond;
77
0
    notif.write_stall_info.condition.cur = new_cond;
78
0
    notif.immutable_options = ioptions;
79
0
    write_stall_notifications.push_back(notif);
80
#else
81
    (void)old_cond;
82
    (void)new_cond;
83
    (void)name;
84
    (void)ioptions;
85
#endif  // !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
86
0
  }
87
88
840k
  void Clean() {
89
840k
#if !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
90
    // notify listeners on changed write stall conditions
91
840k
    for (auto& notif : write_stall_notifications) {
92
0
      for (auto& listener : notif.immutable_options->listeners) {
93
0
        listener->OnStallConditionsChanged(notif.write_stall_info);
94
0
      }
95
0
    }
96
840k
    write_stall_notifications.clear();
97
840k
#endif
98
    // free superversions
99
840k
    for (auto s : superversions_to_free) {
100
10.6k
      delete s;
101
10.6k
    }
102
840k
    superversions_to_free.clear();
103
840k
  }
104
105
1.08M
  ~SuperVersionContext() {
106
1.08M
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
107
1.08M
    assert(write_stall_notifications.empty());
108
1.08M
#endif
109
1.08M
    assert(superversions_to_free.empty());
110
1.08M
  }
111
};
112
113
struct JobContext {
114
375k
  inline bool HaveSomethingToDelete() const {
115
375k
    return !(full_scan_candidate_files.empty() && sst_delete_files.empty() &&
116
163k
             blob_delete_files.empty() && log_delete_files.empty() &&
117
159k
             manifest_delete_files.empty());
118
375k
  }
119
120
10.0k
  inline bool HaveSomethingToClean() const {
121
10.0k
    bool sv_have_sth = false;
122
10.0k
    for (const auto& sv_ctx : superversion_contexts) {
123
10.0k
      if (sv_ctx.HaveSomethingToDelete()) {
124
8.55k
        sv_have_sth = true;
125
8.55k
        break;
126
8.55k
      }
127
10.0k
    }
128
10.0k
    return memtables_to_free.size() > 0 || wals_to_free.size() > 0 ||
129
7.91k
           job_snapshot != nullptr || sv_have_sth;
130
10.0k
  }
131
132
6.34k
  SequenceNumber GetJobSnapshotSequence() const {
133
6.34k
    if (job_snapshot) {
134
0
      assert(job_snapshot->snapshot());
135
0
      return job_snapshot->snapshot()->GetSequenceNumber();
136
0
    }
137
6.34k
    return kMaxSequenceNumber;
138
6.34k
  }
139
140
4.25k
  SequenceNumber GetLatestSnapshotSequence() const {
141
4.25k
    assert(snapshot_context_initialized);
142
4.25k
    if (snapshot_seqs.empty()) {
143
4.24k
      return 0;
144
4.24k
    }
145
7
    return snapshot_seqs.back();
146
4.25k
  }
147
148
6.34k
  SequenceNumber GetEarliestSnapshotSequence() const {
149
6.34k
    assert(snapshot_context_initialized);
150
6.34k
    if (snapshot_seqs.empty()) {
151
6.33k
      return kMaxSequenceNumber;
152
6.33k
    }
153
14
    return snapshot_seqs.front();
154
6.34k
  }
155
156
  void InitSnapshotContext(SnapshotChecker* checker,
157
                           std::unique_ptr<ManagedSnapshot> managed_snapshot,
158
                           SequenceNumber earliest_write_conflict,
159
6.34k
                           std::vector<SequenceNumber>&& snapshots) {
160
6.34k
    if (snapshot_context_initialized) {
161
0
      return;
162
0
    }
163
6.34k
    snapshot_context_initialized = true;
164
6.34k
    snapshot_checker = checker;
165
6.34k
    assert(!job_snapshot);
166
6.34k
    job_snapshot = std::move(managed_snapshot);
167
6.34k
    earliest_write_conflict_snapshot = earliest_write_conflict;
168
6.34k
    snapshot_seqs = std::move(snapshots);
169
6.34k
  }
170
171
  // Structure to store information for candidate files to delete.
172
  struct CandidateFileInfo {
173
    std::string file_name;
174
    std::string file_path;
175
    CandidateFileInfo(std::string name, std::string path)
176
2.12M
        : file_name(std::move(name)), file_path(std::move(path)) {}
177
2.02M
    bool operator==(const CandidateFileInfo& other) const {
178
2.02M
      return file_name == other.file_name && file_path == other.file_path;
179
2.02M
    }
180
  };
181
182
  // a list of all files that we'll consider deleting
183
  // (every once in a while this is filled up with all files
184
  // in the DB directory)
185
  // (filled only if we're doing full scan)
186
  std::vector<CandidateFileInfo> full_scan_candidate_files;
187
188
  // the list of all live sst files that cannot be deleted
189
  std::vector<uint64_t> sst_live;
190
191
  // the list of sst files that we need to delete
192
  std::vector<ObsoleteFileInfo> sst_delete_files;
193
194
  // the list of all live blob files that cannot be deleted
195
  std::vector<uint64_t> blob_live;
196
197
  // the list of blob files that we need to delete
198
  std::vector<ObsoleteBlobFileInfo> blob_delete_files;
199
200
  // a list of log files that we need to delete
201
  std::vector<uint64_t> log_delete_files;
202
203
  // a list of log files that we need to preserve during full purge since they
204
  // will be reused later
205
  std::vector<uint64_t> log_recycle_files;
206
207
  // Files quarantined from deletion. This list contains file numbers for files
208
  // that are in an ambiguous states. This includes newly generated SST files
209
  // and blob files from flush and compaction job whose VersionEdits' persist
210
  // state in Manifest are unclear. An old manifest file whose immediately
211
  // following new manifest file's CURRENT file creation is in an unclear state.
212
  // WAL logs don't have this premature deletion risk since
213
  // min_log_number_to_keep is only updated after successful manifest commits.
214
  // So this data structure doesn't track log files.
215
  autovector<uint64_t> files_to_quarantine;
216
217
  // Blob file numbers that PurgeObsoleteFiles must keep. This includes both
218
  // actively written direct-write files and sealed direct-write files that are
219
  // still reachable through live memtables / old SuperVersions.
220
  // Collected under db_mutex_ in FindObsoleteFiles so PurgeObsoleteFiles can
221
  // safely use the snapshot without taking DB mutex.
222
  UnorderedSet<uint64_t> active_blob_direct_write_files;
223
224
  // Snapshot of VersionSet's next file number taken before collecting active
225
  // direct-write blob files. This keeps the current purge pass from racing a
226
  // concurrently created blob file that was not yet part of the active set.
227
  uint64_t min_blob_file_number_to_keep = std::numeric_limits<uint64_t>::max();
228
229
  // a list of manifest files that we need to delete
230
  std::vector<std::string> manifest_delete_files;
231
232
  // a list of memtables to be free
233
  autovector<ReadOnlyMemTable*> memtables_to_free;
234
235
  // contexts for installing superversions for multiple column families
236
  std::vector<SuperVersionContext> superversion_contexts;
237
238
  autovector<log::Writer*> wals_to_free;
239
240
  // the current manifest_file_number, log_number and prev_log_number
241
  // that corresponds to the set of files in 'live'.
242
  uint64_t manifest_file_number = 0;
243
  uint64_t pending_manifest_file_number = 0;
244
245
  // Used for remote compaction. To prevent OPTIONS files from getting
246
  // purged by PurgeObsoleteFiles() of the primary host
247
  uint64_t min_options_file_number;
248
  uint64_t log_number = 0;
249
  uint64_t prev_log_number = 0;
250
251
  uint64_t min_pending_output = 0;
252
  uint64_t prev_wals_total_size = 0;
253
  size_t num_alive_wal_files = 0;
254
  uint64_t size_log_to_delete = 0;
255
256
  // Snapshot taken before flush/compaction job.
257
  std::unique_ptr<ManagedSnapshot> job_snapshot;
258
  SnapshotChecker* snapshot_checker = nullptr;
259
  std::vector<SequenceNumber> snapshot_seqs;
260
  // This is the earliest snapshot that could be used for write-conflict
261
  // checking by a transaction.  For any user-key newer than this snapshot, we
262
  // should make sure not to remove evidence that a write occurred.
263
  SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber;
264
265
  // Unique job id
266
  int job_id;
267
268
  bool snapshot_context_initialized = false;
269
270
245k
  explicit JobContext(int _job_id, bool create_superversion = false) {
271
245k
    job_id = _job_id;
272
245k
    superversion_contexts.emplace_back(
273
245k
        SuperVersionContext(create_superversion));
274
245k
  }
275
276
  // Delete the default constructor
277
  JobContext() = delete;
278
279
  // For non-empty JobContext Clean() has to be called at least once before
280
  // before destruction (see asserts in ~JobContext()). Should be called with
281
  // unlocked DB mutex. Destructor doesn't call Clean() to avoid accidentally
282
  // doing potentially slow Clean() with locked DB mutex.
283
245k
  void Clean() {
284
    // free superversions
285
245k
    for (auto& sv_context : superversion_contexts) {
286
245k
      sv_context.Clean();
287
245k
    }
288
    // free pending memtables
289
245k
    for (auto m : memtables_to_free) {
290
0
      delete m;
291
0
    }
292
245k
    for (auto l : wals_to_free) {
293
2.09k
      delete l;
294
2.09k
    }
295
296
245k
    memtables_to_free.clear();
297
245k
    wals_to_free.clear();
298
245k
    job_snapshot.reset();
299
245k
  }
300
301
245k
  ~JobContext() {
302
245k
    assert(memtables_to_free.size() == 0);
303
    assert(wals_to_free.size() == 0);
304
245k
  }
305
};
306
307
}  // namespace ROCKSDB_NAMESPACE