/src/rocksdb/db/job_context.h
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #pragma once |
11 | | |
12 | | #include <string> |
13 | | #include <vector> |
14 | | |
15 | | #include "db/column_family.h" |
16 | | #include "db/log_writer.h" |
17 | | #include "db/version_set.h" |
18 | | #include "util/autovector.h" |
19 | | |
20 | | namespace ROCKSDB_NAMESPACE { |
21 | | |
22 | | class MemTable; |
23 | | struct SuperVersion; |
24 | | |
25 | | // The purpose of this struct is to simplify pushing work such as |
26 | | // allocation/construction, de-allocation/destruction, and notifications to |
27 | | // outside of holding the DB mutex. |
28 | | struct SuperVersionContext { |
29 | | struct WriteStallNotification { |
30 | | WriteStallInfo write_stall_info; |
31 | | const ImmutableOptions* immutable_options; |
32 | | }; |
33 | | |
34 | | autovector<SuperVersion*> superversions_to_free; |
35 | | #ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION |
36 | | autovector<WriteStallNotification> write_stall_notifications; |
37 | | #endif |
38 | | std::unique_ptr<SuperVersion> |
39 | | new_superversion; // if nullptr no new superversion |
40 | | |
41 | | explicit SuperVersionContext(bool create_superversion = false) |
42 | 1.33M | : new_superversion(create_superversion ? new SuperVersion() : nullptr) {} |
43 | | |
44 | | explicit SuperVersionContext(SuperVersionContext&& other) noexcept |
45 | 236k | : superversions_to_free(std::move(other.superversions_to_free)), |
46 | | #ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION |
47 | 236k | write_stall_notifications(std::move(other.write_stall_notifications)), |
48 | | #endif |
49 | 236k | new_superversion(std::move(other.new_superversion)) { |
50 | 236k | } |
51 | | // No copies |
52 | | SuperVersionContext(const SuperVersionContext& other) = delete; |
53 | | void operator=(const SuperVersionContext& other) = delete; |
54 | | |
55 | 1.74k | void NewSuperVersion() { |
56 | 1.74k | new_superversion = std::unique_ptr<SuperVersion>(new SuperVersion()); |
57 | 1.74k | } |
58 | | |
59 | 8.46k | inline bool HaveSomethingToDelete() const { |
60 | 8.46k | #ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION |
61 | 8.46k | return !superversions_to_free.empty() || !write_stall_notifications.empty(); |
62 | | #else |
63 | | return !superversions_to_free.empty(); |
64 | | #endif |
65 | 8.46k | } |
66 | | |
67 | | void PushWriteStallNotification(WriteStallCondition old_cond, |
68 | | WriteStallCondition new_cond, |
69 | | const std::string& name, |
70 | 1 | const ImmutableOptions* ioptions) { |
71 | 1 | #if !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION) |
72 | 1 | WriteStallNotification notif; |
73 | 1 | notif.write_stall_info.cf_name = name; |
74 | 1 | notif.write_stall_info.condition.prev = old_cond; |
75 | 1 | notif.write_stall_info.condition.cur = new_cond; |
76 | 1 | notif.immutable_options = ioptions; |
77 | 1 | write_stall_notifications.push_back(notif); |
78 | | #else |
79 | | (void)old_cond; |
80 | | (void)new_cond; |
81 | | (void)name; |
82 | | (void)ioptions; |
83 | | #endif // !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION) |
84 | 1 | } |
85 | | |
86 | 1.33M | void Clean() { |
87 | 1.33M | #if !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION) |
88 | | // notify listeners on changed write stall conditions |
89 | 1.33M | for (auto& notif : write_stall_notifications) { |
90 | 1 | for (auto& listener : notif.immutable_options->listeners) { |
91 | 0 | listener->OnStallConditionsChanged(notif.write_stall_info); |
92 | 0 | } |
93 | 1 | } |
94 | 1.33M | write_stall_notifications.clear(); |
95 | 1.33M | #endif |
96 | | // free superversions |
97 | 1.33M | for (auto s : superversions_to_free) { |
98 | 8.81k | delete s; |
99 | 8.81k | } |
100 | 1.33M | superversions_to_free.clear(); |
101 | 1.33M | } |
102 | | |
103 | 1.57M | ~SuperVersionContext() { |
104 | 1.57M | #ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION |
105 | 1.57M | assert(write_stall_notifications.empty()); |
106 | 1.57M | #endif |
107 | 1.57M | assert(superversions_to_free.empty()); |
108 | 1.57M | } |
109 | | }; |
110 | | |
111 | | struct JobContext { |
112 | 360k | inline bool HaveSomethingToDelete() const { |
113 | 360k | return !(full_scan_candidate_files.empty() && sst_delete_files.empty() && |
114 | 155k | blob_delete_files.empty() && log_delete_files.empty() && |
115 | 152k | manifest_delete_files.empty()); |
116 | 360k | } |
117 | | |
118 | 8.46k | inline bool HaveSomethingToClean() const { |
119 | 8.46k | bool sv_have_sth = false; |
120 | 8.46k | for (const auto& sv_ctx : superversion_contexts) { |
121 | 8.46k | if (sv_ctx.HaveSomethingToDelete()) { |
122 | 7.06k | sv_have_sth = true; |
123 | 7.06k | break; |
124 | 7.06k | } |
125 | 8.46k | } |
126 | 8.46k | return memtables_to_free.size() > 0 || wals_to_free.size() > 0 || |
127 | 6.71k | job_snapshot != nullptr || sv_have_sth; |
128 | 8.46k | } |
129 | | |
130 | 5.43k | SequenceNumber GetJobSnapshotSequence() const { |
131 | 5.43k | if (job_snapshot) { |
132 | 0 | assert(job_snapshot->snapshot()); |
133 | 0 | return job_snapshot->snapshot()->GetSequenceNumber(); |
134 | 0 | } |
135 | 5.43k | return kMaxSequenceNumber; |
136 | 5.43k | } |
137 | | |
138 | 3.68k | SequenceNumber GetLatestSnapshotSequence() const { |
139 | 3.68k | assert(snapshot_context_initialized); |
140 | 3.68k | if (snapshot_seqs.empty()) { |
141 | 3.67k | return 0; |
142 | 3.67k | } |
143 | 9 | return snapshot_seqs.back(); |
144 | 3.68k | } |
145 | | |
146 | 5.44k | SequenceNumber GetEarliestSnapshotSequence() const { |
147 | 5.44k | assert(snapshot_context_initialized); |
148 | 5.44k | if (snapshot_seqs.empty()) { |
149 | 5.42k | return kMaxSequenceNumber; |
150 | 5.42k | } |
151 | 18 | return snapshot_seqs.front(); |
152 | 5.44k | } |
153 | | |
154 | | void InitSnapshotContext(SnapshotChecker* checker, |
155 | | std::unique_ptr<ManagedSnapshot> managed_snapshot, |
156 | | SequenceNumber earliest_write_conflict, |
157 | 5.43k | std::vector<SequenceNumber>&& snapshots) { |
158 | 5.43k | if (snapshot_context_initialized) { |
159 | 0 | return; |
160 | 0 | } |
161 | 5.43k | snapshot_context_initialized = true; |
162 | 5.43k | snapshot_checker = checker; |
163 | 5.43k | assert(!job_snapshot); |
164 | 5.43k | job_snapshot = std::move(managed_snapshot); |
165 | 5.43k | earliest_write_conflict_snapshot = earliest_write_conflict; |
166 | 5.43k | snapshot_seqs = std::move(snapshots); |
167 | 5.43k | } |
168 | | |
169 | | // Structure to store information for candidate files to delete. |
170 | | struct CandidateFileInfo { |
171 | | std::string file_name; |
172 | | std::string file_path; |
173 | | CandidateFileInfo(std::string name, std::string path) |
174 | 1.97M | : file_name(std::move(name)), file_path(std::move(path)) {} |
175 | 1.87M | bool operator==(const CandidateFileInfo& other) const { |
176 | 1.87M | return file_name == other.file_name && file_path == other.file_path; |
177 | 1.87M | } |
178 | | }; |
179 | | |
180 | | // a list of all files that we'll consider deleting |
181 | | // (every once in a while this is filled up with all files |
182 | | // in the DB directory) |
183 | | // (filled only if we're doing full scan) |
184 | | std::vector<CandidateFileInfo> full_scan_candidate_files; |
185 | | |
186 | | // the list of all live sst files that cannot be deleted |
187 | | std::vector<uint64_t> sst_live; |
188 | | |
189 | | // the list of sst files that we need to delete |
190 | | std::vector<ObsoleteFileInfo> sst_delete_files; |
191 | | |
192 | | // the list of all live blob files that cannot be deleted |
193 | | std::vector<uint64_t> blob_live; |
194 | | |
195 | | // the list of blob files that we need to delete |
196 | | std::vector<ObsoleteBlobFileInfo> blob_delete_files; |
197 | | |
198 | | // a list of log files that we need to delete |
199 | | std::vector<uint64_t> log_delete_files; |
200 | | |
201 | | // a list of log files that we need to preserve during full purge since they |
202 | | // will be reused later |
203 | | std::vector<uint64_t> log_recycle_files; |
204 | | |
205 | | // Files quarantined from deletion. This list contains file numbers for files |
206 | | // that are in an ambiguous states. This includes newly generated SST files |
207 | | // and blob files from flush and compaction job whose VersionEdits' persist |
208 | | // state in Manifest are unclear. An old manifest file whose immediately |
209 | | // following new manifest file's CURRENT file creation is in an unclear state. |
210 | | // WAL logs don't have this premature deletion risk since |
211 | | // min_log_number_to_keep is only updated after successful manifest commits. |
212 | | // So this data structure doesn't track log files. |
213 | | autovector<uint64_t> files_to_quarantine; |
214 | | |
215 | | // a list of manifest files that we need to delete |
216 | | std::vector<std::string> manifest_delete_files; |
217 | | |
218 | | // a list of memtables to be free |
219 | | autovector<ReadOnlyMemTable*> memtables_to_free; |
220 | | |
221 | | // contexts for installing superversions for multiple column families |
222 | | std::vector<SuperVersionContext> superversion_contexts; |
223 | | |
224 | | autovector<log::Writer*> wals_to_free; |
225 | | |
226 | | // the current manifest_file_number, log_number and prev_log_number |
227 | | // that corresponds to the set of files in 'live'. |
228 | | uint64_t manifest_file_number = 0; |
229 | | uint64_t pending_manifest_file_number = 0; |
230 | | |
231 | | // Used for remote compaction. To prevent OPTIONS files from getting |
232 | | // purged by PurgeObsoleteFiles() of the primary host |
233 | | uint64_t min_options_file_number; |
234 | | uint64_t log_number = 0; |
235 | | uint64_t prev_log_number = 0; |
236 | | |
237 | | uint64_t min_pending_output = 0; |
238 | | uint64_t prev_wals_total_size = 0; |
239 | | size_t num_alive_wal_files = 0; |
240 | | uint64_t size_log_to_delete = 0; |
241 | | |
242 | | // Snapshot taken before flush/compaction job. |
243 | | std::unique_ptr<ManagedSnapshot> job_snapshot; |
244 | | SnapshotChecker* snapshot_checker = nullptr; |
245 | | std::vector<SequenceNumber> snapshot_seqs; |
246 | | // This is the earliest snapshot that could be used for write-conflict |
247 | | // checking by a transaction. For any user-key newer than this snapshot, we |
248 | | // should make sure not to remove evidence that a write occurred. |
249 | | SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber; |
250 | | |
251 | | // Unique job id |
252 | | int job_id; |
253 | | |
254 | | bool snapshot_context_initialized = false; |
255 | | |
256 | 236k | explicit JobContext(int _job_id, bool create_superversion = false) { |
257 | 236k | job_id = _job_id; |
258 | 236k | superversion_contexts.emplace_back( |
259 | 236k | SuperVersionContext(create_superversion)); |
260 | 236k | } |
261 | | |
262 | | // Delete the default constructor |
263 | | JobContext() = delete; |
264 | | |
265 | | // For non-empty JobContext Clean() has to be called at least once before |
266 | | // before destruction (see asserts in ~JobContext()). Should be called with |
267 | | // unlocked DB mutex. Destructor doesn't call Clean() to avoid accidentally |
268 | | // doing potentially slow Clean() with locked DB mutex. |
269 | 236k | void Clean() { |
270 | | // free superversions |
271 | 236k | for (auto& sv_context : superversion_contexts) { |
272 | 236k | sv_context.Clean(); |
273 | 236k | } |
274 | | // free pending memtables |
275 | 236k | for (auto m : memtables_to_free) { |
276 | 0 | delete m; |
277 | 0 | } |
278 | 236k | for (auto l : wals_to_free) { |
279 | 1.74k | delete l; |
280 | 1.74k | } |
281 | | |
282 | 236k | memtables_to_free.clear(); |
283 | 236k | wals_to_free.clear(); |
284 | 236k | job_snapshot.reset(); |
285 | 236k | } |
286 | | |
287 | 236k | ~JobContext() { |
288 | 236k | assert(memtables_to_free.size() == 0); |
289 | | assert(wals_to_free.size() == 0); |
290 | 236k | } |
291 | | }; |
292 | | |
293 | | } // namespace ROCKSDB_NAMESPACE |