Coverage Report

Created: 2025-10-26 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/job_context.h
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#pragma once
11
12
#include <string>
13
#include <vector>
14
15
#include "db/column_family.h"
16
#include "db/log_writer.h"
17
#include "db/version_set.h"
18
#include "util/autovector.h"
19
20
namespace ROCKSDB_NAMESPACE {
21
22
class MemTable;
23
struct SuperVersion;
24
25
// The purpose of this struct is to simplify pushing work such as
26
// allocation/construction, de-allocation/destruction, and notifications to
27
// outside of holding the DB mutex.
28
struct SuperVersionContext {
29
  struct WriteStallNotification {
30
    WriteStallInfo write_stall_info;
31
    const ImmutableOptions* immutable_options;
32
  };
33
34
  autovector<SuperVersion*> superversions_to_free;
35
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
36
  autovector<WriteStallNotification> write_stall_notifications;
37
#endif
38
  std::unique_ptr<SuperVersion>
39
      new_superversion;  // if nullptr no new superversion
40
41
  explicit SuperVersionContext(bool create_superversion = false)
42
1.33M
      : new_superversion(create_superversion ? new SuperVersion() : nullptr) {}
43
44
  explicit SuperVersionContext(SuperVersionContext&& other) noexcept
45
236k
      : superversions_to_free(std::move(other.superversions_to_free)),
46
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
47
236k
        write_stall_notifications(std::move(other.write_stall_notifications)),
48
#endif
49
236k
        new_superversion(std::move(other.new_superversion)) {
50
236k
  }
51
  // No copies
52
  SuperVersionContext(const SuperVersionContext& other) = delete;
53
  void operator=(const SuperVersionContext& other) = delete;
54
55
1.74k
  void NewSuperVersion() {
56
1.74k
    new_superversion = std::unique_ptr<SuperVersion>(new SuperVersion());
57
1.74k
  }
58
59
8.46k
  inline bool HaveSomethingToDelete() const {
60
8.46k
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
61
8.46k
    return !superversions_to_free.empty() || !write_stall_notifications.empty();
62
#else
63
    return !superversions_to_free.empty();
64
#endif
65
8.46k
  }
66
67
  void PushWriteStallNotification(WriteStallCondition old_cond,
68
                                  WriteStallCondition new_cond,
69
                                  const std::string& name,
70
1
                                  const ImmutableOptions* ioptions) {
71
1
#if !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
72
1
    WriteStallNotification notif;
73
1
    notif.write_stall_info.cf_name = name;
74
1
    notif.write_stall_info.condition.prev = old_cond;
75
1
    notif.write_stall_info.condition.cur = new_cond;
76
1
    notif.immutable_options = ioptions;
77
1
    write_stall_notifications.push_back(notif);
78
#else
79
    (void)old_cond;
80
    (void)new_cond;
81
    (void)name;
82
    (void)ioptions;
83
#endif  // !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
84
1
  }
85
86
1.33M
  void Clean() {
87
1.33M
#if !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
88
    // notify listeners on changed write stall conditions
89
1.33M
    for (auto& notif : write_stall_notifications) {
90
1
      for (auto& listener : notif.immutable_options->listeners) {
91
0
        listener->OnStallConditionsChanged(notif.write_stall_info);
92
0
      }
93
1
    }
94
1.33M
    write_stall_notifications.clear();
95
1.33M
#endif
96
    // free superversions
97
1.33M
    for (auto s : superversions_to_free) {
98
8.81k
      delete s;
99
8.81k
    }
100
1.33M
    superversions_to_free.clear();
101
1.33M
  }
102
103
1.57M
  ~SuperVersionContext() {
104
1.57M
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
105
1.57M
    assert(write_stall_notifications.empty());
106
1.57M
#endif
107
1.57M
    assert(superversions_to_free.empty());
108
1.57M
  }
109
};
110
111
struct JobContext {
112
360k
  inline bool HaveSomethingToDelete() const {
113
360k
    return !(full_scan_candidate_files.empty() && sst_delete_files.empty() &&
114
155k
             blob_delete_files.empty() && log_delete_files.empty() &&
115
152k
             manifest_delete_files.empty());
116
360k
  }
117
118
8.46k
  inline bool HaveSomethingToClean() const {
119
8.46k
    bool sv_have_sth = false;
120
8.46k
    for (const auto& sv_ctx : superversion_contexts) {
121
8.46k
      if (sv_ctx.HaveSomethingToDelete()) {
122
7.06k
        sv_have_sth = true;
123
7.06k
        break;
124
7.06k
      }
125
8.46k
    }
126
8.46k
    return memtables_to_free.size() > 0 || wals_to_free.size() > 0 ||
127
6.71k
           job_snapshot != nullptr || sv_have_sth;
128
8.46k
  }
129
130
5.43k
  SequenceNumber GetJobSnapshotSequence() const {
131
5.43k
    if (job_snapshot) {
132
0
      assert(job_snapshot->snapshot());
133
0
      return job_snapshot->snapshot()->GetSequenceNumber();
134
0
    }
135
5.43k
    return kMaxSequenceNumber;
136
5.43k
  }
137
138
3.68k
  SequenceNumber GetLatestSnapshotSequence() const {
139
3.68k
    assert(snapshot_context_initialized);
140
3.68k
    if (snapshot_seqs.empty()) {
141
3.67k
      return 0;
142
3.67k
    }
143
9
    return snapshot_seqs.back();
144
3.68k
  }
145
146
5.44k
  SequenceNumber GetEarliestSnapshotSequence() const {
147
5.44k
    assert(snapshot_context_initialized);
148
5.44k
    if (snapshot_seqs.empty()) {
149
5.42k
      return kMaxSequenceNumber;
150
5.42k
    }
151
18
    return snapshot_seqs.front();
152
5.44k
  }
153
154
  void InitSnapshotContext(SnapshotChecker* checker,
155
                           std::unique_ptr<ManagedSnapshot> managed_snapshot,
156
                           SequenceNumber earliest_write_conflict,
157
5.43k
                           std::vector<SequenceNumber>&& snapshots) {
158
5.43k
    if (snapshot_context_initialized) {
159
0
      return;
160
0
    }
161
5.43k
    snapshot_context_initialized = true;
162
5.43k
    snapshot_checker = checker;
163
5.43k
    assert(!job_snapshot);
164
5.43k
    job_snapshot = std::move(managed_snapshot);
165
5.43k
    earliest_write_conflict_snapshot = earliest_write_conflict;
166
5.43k
    snapshot_seqs = std::move(snapshots);
167
5.43k
  }
168
169
  // Structure to store information for candidate files to delete.
170
  struct CandidateFileInfo {
171
    std::string file_name;
172
    std::string file_path;
173
    CandidateFileInfo(std::string name, std::string path)
174
1.97M
        : file_name(std::move(name)), file_path(std::move(path)) {}
175
1.87M
    bool operator==(const CandidateFileInfo& other) const {
176
1.87M
      return file_name == other.file_name && file_path == other.file_path;
177
1.87M
    }
178
  };
179
180
  // a list of all files that we'll consider deleting
181
  // (every once in a while this is filled up with all files
182
  // in the DB directory)
183
  // (filled only if we're doing full scan)
184
  std::vector<CandidateFileInfo> full_scan_candidate_files;
185
186
  // the list of all live sst files that cannot be deleted
187
  std::vector<uint64_t> sst_live;
188
189
  // the list of sst files that we need to delete
190
  std::vector<ObsoleteFileInfo> sst_delete_files;
191
192
  // the list of all live blob files that cannot be deleted
193
  std::vector<uint64_t> blob_live;
194
195
  // the list of blob files that we need to delete
196
  std::vector<ObsoleteBlobFileInfo> blob_delete_files;
197
198
  // a list of log files that we need to delete
199
  std::vector<uint64_t> log_delete_files;
200
201
  // a list of log files that we need to preserve during full purge since they
202
  // will be reused later
203
  std::vector<uint64_t> log_recycle_files;
204
205
  // Files quarantined from deletion. This list contains file numbers for files
206
  // that are in an ambiguous states. This includes newly generated SST files
207
  // and blob files from flush and compaction job whose VersionEdits' persist
208
  // state in Manifest are unclear. An old manifest file whose immediately
209
  // following new manifest file's CURRENT file creation is in an unclear state.
210
  // WAL logs don't have this premature deletion risk since
211
  // min_log_number_to_keep is only updated after successful manifest commits.
212
  // So this data structure doesn't track log files.
213
  autovector<uint64_t> files_to_quarantine;
214
215
  // a list of manifest files that we need to delete
216
  std::vector<std::string> manifest_delete_files;
217
218
  // a list of memtables to be free
219
  autovector<ReadOnlyMemTable*> memtables_to_free;
220
221
  // contexts for installing superversions for multiple column families
222
  std::vector<SuperVersionContext> superversion_contexts;
223
224
  autovector<log::Writer*> wals_to_free;
225
226
  // the current manifest_file_number, log_number and prev_log_number
227
  // that corresponds to the set of files in 'live'.
228
  uint64_t manifest_file_number = 0;
229
  uint64_t pending_manifest_file_number = 0;
230
231
  // Used for remote compaction. To prevent OPTIONS files from getting
232
  // purged by PurgeObsoleteFiles() of the primary host
233
  uint64_t min_options_file_number;
234
  uint64_t log_number = 0;
235
  uint64_t prev_log_number = 0;
236
237
  uint64_t min_pending_output = 0;
238
  uint64_t prev_wals_total_size = 0;
239
  size_t num_alive_wal_files = 0;
240
  uint64_t size_log_to_delete = 0;
241
242
  // Snapshot taken before flush/compaction job.
243
  std::unique_ptr<ManagedSnapshot> job_snapshot;
244
  SnapshotChecker* snapshot_checker = nullptr;
245
  std::vector<SequenceNumber> snapshot_seqs;
246
  // This is the earliest snapshot that could be used for write-conflict
247
  // checking by a transaction.  For any user-key newer than this snapshot, we
248
  // should make sure not to remove evidence that a write occurred.
249
  SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber;
250
251
  // Unique job id
252
  int job_id;
253
254
  bool snapshot_context_initialized = false;
255
256
236k
  explicit JobContext(int _job_id, bool create_superversion = false) {
257
236k
    job_id = _job_id;
258
236k
    superversion_contexts.emplace_back(
259
236k
        SuperVersionContext(create_superversion));
260
236k
  }
261
262
  // Delete the default constructor
263
  JobContext() = delete;
264
265
  // For non-empty JobContext Clean() has to be called at least once before
266
  // before destruction (see asserts in ~JobContext()). Should be called with
267
  // unlocked DB mutex. Destructor doesn't call Clean() to avoid accidentally
268
  // doing potentially slow Clean() with locked DB mutex.
269
236k
  void Clean() {
270
    // free superversions
271
236k
    for (auto& sv_context : superversion_contexts) {
272
236k
      sv_context.Clean();
273
236k
    }
274
    // free pending memtables
275
236k
    for (auto m : memtables_to_free) {
276
0
      delete m;
277
0
    }
278
236k
    for (auto l : wals_to_free) {
279
1.74k
      delete l;
280
1.74k
    }
281
282
236k
    memtables_to_free.clear();
283
236k
    wals_to_free.clear();
284
236k
    job_snapshot.reset();
285
236k
  }
286
287
236k
  ~JobContext() {
288
236k
    assert(memtables_to_free.size() == 0);
289
    assert(wals_to_free.size() == 0);
290
236k
  }
291
};
292
293
}  // namespace ROCKSDB_NAMESPACE