Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/db/job_context.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#pragma once
11
12
#include <string>
13
#include <vector>
14
15
#include "db/column_family.h"
16
#include "db/log_writer.h"
17
#include "db/version_set.h"
18
#include "util/autovector.h"
19
20
namespace ROCKSDB_NAMESPACE {
21
22
class MemTable;
23
struct SuperVersion;
24
25
struct SuperVersionContext {
26
  struct WriteStallNotification {
27
    WriteStallInfo write_stall_info;
28
    const ImmutableOptions* immutable_options;
29
  };
30
31
  autovector<SuperVersion*> superversions_to_free;
32
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
33
  autovector<WriteStallNotification> write_stall_notifications;
34
#endif
35
  std::unique_ptr<SuperVersion>
36
      new_superversion;  // if nullptr no new superversion
37
38
  // If not nullptr, a new seqno to time mapping is available to be installed.
39
  // Otherwise, make a shared copy of the one in the existing SuperVersion and
40
  // carry it over to the new SuperVersion. This is moved to the SuperVersion
41
  // during installation.
42
  std::shared_ptr<const SeqnoToTimeMapping> new_seqno_to_time_mapping{nullptr};
43
44
  explicit SuperVersionContext(bool create_superversion = false)
45
1.40M
      : new_superversion(create_superversion ? new SuperVersion() : nullptr) {}
46
47
  explicit SuperVersionContext(SuperVersionContext&& other) noexcept
48
      : superversions_to_free(std::move(other.superversions_to_free)),
49
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
50
        write_stall_notifications(std::move(other.write_stall_notifications)),
51
#endif
52
36.1k
        new_superversion(std::move(other.new_superversion)) {
53
36.1k
  }
54
  // No copies
55
  SuperVersionContext(const SuperVersionContext& other) = delete;
56
  void operator=(const SuperVersionContext& other) = delete;
57
58
0
  void NewSuperVersion() {
59
0
    new_superversion = std::unique_ptr<SuperVersion>(new SuperVersion());
60
0
  }
61
62
0
  inline bool HaveSomethingToDelete() const {
63
0
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
64
0
    return !superversions_to_free.empty() || !write_stall_notifications.empty();
65
#else
66
    return !superversions_to_free.empty();
67
#endif
68
0
  }
69
70
  void PushWriteStallNotification(WriteStallCondition old_cond,
71
                                  WriteStallCondition new_cond,
72
                                  const std::string& name,
73
0
                                  const ImmutableOptions* ioptions) {
74
0
#if !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
75
0
    WriteStallNotification notif;
76
0
    notif.write_stall_info.cf_name = name;
77
0
    notif.write_stall_info.condition.prev = old_cond;
78
0
    notif.write_stall_info.condition.cur = new_cond;
79
0
    notif.immutable_options = ioptions;
80
0
    write_stall_notifications.push_back(notif);
81
#else
82
    (void)old_cond;
83
    (void)new_cond;
84
    (void)name;
85
    (void)ioptions;
86
#endif  // !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
87
0
  }
88
89
1.40M
  void Clean() {
90
1.40M
#if !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
91
    // notify listeners on changed write stall conditions
92
1.40M
    for (auto& notif : write_stall_notifications) {
93
0
      for (auto& listener : notif.immutable_options->listeners) {
94
0
        listener->OnStallConditionsChanged(notif.write_stall_info);
95
0
      }
96
0
    }
97
1.40M
    write_stall_notifications.clear();
98
1.40M
#endif
99
    // free superversions
100
1.40M
    for (auto s : superversions_to_free) {
101
0
      delete s;
102
0
    }
103
1.40M
    superversions_to_free.clear();
104
1.40M
  }
105
106
1.44M
  ~SuperVersionContext() {
107
1.44M
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
108
1.44M
    assert(write_stall_notifications.empty());
109
1.44M
#endif
110
1.44M
    assert(superversions_to_free.empty());
111
1.44M
  }
112
};
113
114
struct JobContext {
115
54.1k
  inline bool HaveSomethingToDelete() const {
116
54.1k
    return !(full_scan_candidate_files.empty() && sst_delete_files.empty() &&
117
54.1k
             blob_delete_files.empty() && log_delete_files.empty() &&
118
54.1k
             manifest_delete_files.empty());
119
54.1k
  }
120
121
0
  inline bool HaveSomethingToClean() const {
122
0
    bool sv_have_sth = false;
123
0
    for (const auto& sv_ctx : superversion_contexts) {
124
0
      if (sv_ctx.HaveSomethingToDelete()) {
125
0
        sv_have_sth = true;
126
0
        break;
127
0
      }
128
0
    }
129
0
    return memtables_to_free.size() > 0 || logs_to_free.size() > 0 ||
130
0
           job_snapshot != nullptr || sv_have_sth;
131
0
  }
132
133
0
  SequenceNumber GetJobSnapshotSequence() const {
134
0
    if (job_snapshot) {
135
0
      assert(job_snapshot->snapshot());
136
0
      return job_snapshot->snapshot()->GetSequenceNumber();
137
0
    }
138
0
    return kMaxSequenceNumber;
139
0
  }
140
141
  // Structure to store information for candidate files to delete.
142
  struct CandidateFileInfo {
143
    std::string file_name;
144
    std::string file_path;
145
    CandidateFileInfo(std::string name, std::string path)
146
175k
        : file_name(std::move(name)), file_path(std::move(path)) {}
147
157k
    bool operator==(const CandidateFileInfo& other) const {
148
157k
      return file_name == other.file_name && file_path == other.file_path;
149
157k
    }
150
  };
151
152
  // Unique job id
153
  int job_id;
154
155
  // a list of all files that we'll consider deleting
156
  // (every once in a while this is filled up with all files
157
  // in the DB directory)
158
  // (filled only if we're doing full scan)
159
  std::vector<CandidateFileInfo> full_scan_candidate_files;
160
161
  // the list of all live sst files that cannot be deleted
162
  std::vector<uint64_t> sst_live;
163
164
  // the list of sst files that we need to delete
165
  std::vector<ObsoleteFileInfo> sst_delete_files;
166
167
  // the list of all live blob files that cannot be deleted
168
  std::vector<uint64_t> blob_live;
169
170
  // the list of blob files that we need to delete
171
  std::vector<ObsoleteBlobFileInfo> blob_delete_files;
172
173
  // a list of log files that we need to delete
174
  std::vector<uint64_t> log_delete_files;
175
176
  // a list of log files that we need to preserve during full purge since they
177
  // will be reused later
178
  std::vector<uint64_t> log_recycle_files;
179
180
  // Files quarantined from deletion. This list contains file numbers for files
181
  // that are in an ambiguous states. This includes newly generated SST files
182
  // and blob files from flush and compaction job whose VersionEdits' persist
183
  // state in Manifest are unclear. An old manifest file whose immediately
184
  // following new manifest file's CURRENT file creation is in an unclear state.
185
  // WAL logs don't have this premature deletion risk since
186
  // min_log_number_to_keep is only updated after successful manifest commits.
187
  // So this data structure doesn't track log files.
188
  autovector<uint64_t> files_to_quarantine;
189
190
  // a list of manifest files that we need to delete
191
  std::vector<std::string> manifest_delete_files;
192
193
  // a list of memtables to be free
194
  autovector<MemTable*> memtables_to_free;
195
196
  // contexts for installing superversions for multiple column families
197
  std::vector<SuperVersionContext> superversion_contexts;
198
199
  autovector<log::Writer*> logs_to_free;
200
201
  // the current manifest_file_number, log_number and prev_log_number
202
  // that corresponds to the set of files in 'live'.
203
  uint64_t manifest_file_number;
204
  uint64_t pending_manifest_file_number;
205
  uint64_t log_number;
206
  uint64_t prev_log_number;
207
208
  uint64_t min_pending_output = 0;
209
  uint64_t prev_total_log_size = 0;
210
  size_t num_alive_log_files = 0;
211
  uint64_t size_log_to_delete = 0;
212
213
  // Snapshot taken before flush/compaction job.
214
  std::unique_ptr<ManagedSnapshot> job_snapshot;
215
216
36.1k
  explicit JobContext(int _job_id, bool create_superversion = false) {
217
36.1k
    job_id = _job_id;
218
36.1k
    manifest_file_number = 0;
219
36.1k
    pending_manifest_file_number = 0;
220
36.1k
    log_number = 0;
221
36.1k
    prev_log_number = 0;
222
36.1k
    superversion_contexts.emplace_back(
223
36.1k
        SuperVersionContext(create_superversion));
224
36.1k
  }
225
226
  // For non-empty JobContext Clean() has to be called at least once before
227
  // before destruction (see asserts in ~JobContext()). Should be called with
228
  // unlocked DB mutex. Destructor doesn't call Clean() to avoid accidentally
229
  // doing potentially slow Clean() with locked DB mutex.
230
36.1k
  void Clean() {
231
    // free superversions
232
36.1k
    for (auto& sv_context : superversion_contexts) {
233
36.1k
      sv_context.Clean();
234
36.1k
    }
235
    // free pending memtables
236
36.1k
    for (auto m : memtables_to_free) {
237
0
      delete m;
238
0
    }
239
36.1k
    for (auto l : logs_to_free) {
240
0
      delete l;
241
0
    }
242
243
36.1k
    memtables_to_free.clear();
244
36.1k
    logs_to_free.clear();
245
36.1k
    job_snapshot.reset();
246
36.1k
  }
247
248
36.1k
  ~JobContext() {
249
36.1k
    assert(memtables_to_free.size() == 0);
250
36.1k
    assert(logs_to_free.size() == 0);
251
36.1k
  }
252
};
253
254
}  // namespace ROCKSDB_NAMESPACE