/src/rocksdb/db/job_context.h
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #pragma once |
11 | | |
12 | | #include <limits> |
13 | | #include <string> |
14 | | #include <vector> |
15 | | |
16 | | #include "db/column_family.h" |
17 | | #include "db/log_writer.h" |
18 | | #include "db/version_set.h" |
19 | | #include "util/autovector.h" |
20 | | #include "util/hash_containers.h" |
21 | | |
22 | | namespace ROCKSDB_NAMESPACE { |
23 | | |
24 | | class MemTable; |
25 | | struct SuperVersion; |
26 | | |
27 | | // The purpose of this struct is to simplify pushing work such as |
28 | | // allocation/construction, de-allocation/destruction, and notifications to |
29 | | // outside of holding the DB mutex. |
30 | | struct SuperVersionContext { |
31 | | struct WriteStallNotification { |
32 | | WriteStallInfo write_stall_info; |
33 | | const ImmutableOptions* immutable_options; |
34 | | }; |
35 | | |
36 | | autovector<SuperVersion*> superversions_to_free; |
37 | | #ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION |
38 | | autovector<WriteStallNotification> write_stall_notifications; |
39 | | #endif |
40 | | std::unique_ptr<SuperVersion> |
41 | | new_superversion; // if nullptr no new superversion |
42 | | |
43 | | explicit SuperVersionContext(bool create_superversion = false) |
44 | 842k | : new_superversion(create_superversion ? new SuperVersion() : nullptr) {} |
45 | | |
46 | | explicit SuperVersionContext(SuperVersionContext&& other) noexcept |
47 | 245k | : superversions_to_free(std::move(other.superversions_to_free)), |
48 | | #ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION |
49 | 245k | write_stall_notifications(std::move(other.write_stall_notifications)), |
50 | | #endif |
51 | 245k | new_superversion(std::move(other.new_superversion)) { |
52 | 245k | } |
53 | | // No copies |
54 | | SuperVersionContext(const SuperVersionContext& other) = delete; |
55 | | void operator=(const SuperVersionContext& other) = delete; |
56 | | |
57 | 2.09k | void NewSuperVersion() { |
58 | 2.09k | new_superversion = std::unique_ptr<SuperVersion>(new SuperVersion()); |
59 | 2.09k | } |
60 | | |
61 | 10.0k | inline bool HaveSomethingToDelete() const { |
62 | 10.0k | #ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION |
63 | 10.0k | return !superversions_to_free.empty() || !write_stall_notifications.empty(); |
64 | | #else |
65 | | return !superversions_to_free.empty(); |
66 | | #endif |
67 | 10.0k | } |
68 | | |
69 | | void PushWriteStallNotification(WriteStallCondition old_cond, |
70 | | WriteStallCondition new_cond, |
71 | | const std::string& name, |
72 | 0 | const ImmutableOptions* ioptions) { |
73 | 0 | #if !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION) |
74 | 0 | WriteStallNotification notif; |
75 | 0 | notif.write_stall_info.cf_name = name; |
76 | 0 | notif.write_stall_info.condition.prev = old_cond; |
77 | 0 | notif.write_stall_info.condition.cur = new_cond; |
78 | 0 | notif.immutable_options = ioptions; |
79 | 0 | write_stall_notifications.push_back(notif); |
80 | | #else |
81 | | (void)old_cond; |
82 | | (void)new_cond; |
83 | | (void)name; |
84 | | (void)ioptions; |
85 | | #endif // !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION) |
86 | 0 | } |
87 | | |
88 | 840k | void Clean() { |
89 | 840k | #if !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION) |
90 | | // notify listeners on changed write stall conditions |
91 | 840k | for (auto& notif : write_stall_notifications) { |
92 | 0 | for (auto& listener : notif.immutable_options->listeners) { |
93 | 0 | listener->OnStallConditionsChanged(notif.write_stall_info); |
94 | 0 | } |
95 | 0 | } |
96 | 840k | write_stall_notifications.clear(); |
97 | 840k | #endif |
98 | | // free superversions |
99 | 840k | for (auto s : superversions_to_free) { |
100 | 10.6k | delete s; |
101 | 10.6k | } |
102 | 840k | superversions_to_free.clear(); |
103 | 840k | } |
104 | | |
105 | 1.08M | ~SuperVersionContext() { |
106 | 1.08M | #ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION |
107 | 1.08M | assert(write_stall_notifications.empty()); |
108 | 1.08M | #endif |
109 | 1.08M | assert(superversions_to_free.empty()); |
110 | 1.08M | } |
111 | | }; |
112 | | |
113 | | struct JobContext { |
114 | 375k | inline bool HaveSomethingToDelete() const { |
115 | 375k | return !(full_scan_candidate_files.empty() && sst_delete_files.empty() && |
116 | 163k | blob_delete_files.empty() && log_delete_files.empty() && |
117 | 159k | manifest_delete_files.empty()); |
118 | 375k | } |
119 | | |
120 | 10.0k | inline bool HaveSomethingToClean() const { |
121 | 10.0k | bool sv_have_sth = false; |
122 | 10.0k | for (const auto& sv_ctx : superversion_contexts) { |
123 | 10.0k | if (sv_ctx.HaveSomethingToDelete()) { |
124 | 8.55k | sv_have_sth = true; |
125 | 8.55k | break; |
126 | 8.55k | } |
127 | 10.0k | } |
128 | 10.0k | return memtables_to_free.size() > 0 || wals_to_free.size() > 0 || |
129 | 7.91k | job_snapshot != nullptr || sv_have_sth; |
130 | 10.0k | } |
131 | | |
132 | 6.34k | SequenceNumber GetJobSnapshotSequence() const { |
133 | 6.34k | if (job_snapshot) { |
134 | 0 | assert(job_snapshot->snapshot()); |
135 | 0 | return job_snapshot->snapshot()->GetSequenceNumber(); |
136 | 0 | } |
137 | 6.34k | return kMaxSequenceNumber; |
138 | 6.34k | } |
139 | | |
140 | 4.25k | SequenceNumber GetLatestSnapshotSequence() const { |
141 | 4.25k | assert(snapshot_context_initialized); |
142 | 4.25k | if (snapshot_seqs.empty()) { |
143 | 4.24k | return 0; |
144 | 4.24k | } |
145 | 7 | return snapshot_seqs.back(); |
146 | 4.25k | } |
147 | | |
148 | 6.34k | SequenceNumber GetEarliestSnapshotSequence() const { |
149 | 6.34k | assert(snapshot_context_initialized); |
150 | 6.34k | if (snapshot_seqs.empty()) { |
151 | 6.33k | return kMaxSequenceNumber; |
152 | 6.33k | } |
153 | 14 | return snapshot_seqs.front(); |
154 | 6.34k | } |
155 | | |
156 | | void InitSnapshotContext(SnapshotChecker* checker, |
157 | | std::unique_ptr<ManagedSnapshot> managed_snapshot, |
158 | | SequenceNumber earliest_write_conflict, |
159 | 6.34k | std::vector<SequenceNumber>&& snapshots) { |
160 | 6.34k | if (snapshot_context_initialized) { |
161 | 0 | return; |
162 | 0 | } |
163 | 6.34k | snapshot_context_initialized = true; |
164 | 6.34k | snapshot_checker = checker; |
165 | 6.34k | assert(!job_snapshot); |
166 | 6.34k | job_snapshot = std::move(managed_snapshot); |
167 | 6.34k | earliest_write_conflict_snapshot = earliest_write_conflict; |
168 | 6.34k | snapshot_seqs = std::move(snapshots); |
169 | 6.34k | } |
170 | | |
171 | | // Structure to store information for candidate files to delete. |
172 | | struct CandidateFileInfo { |
173 | | std::string file_name; |
174 | | std::string file_path; |
175 | | CandidateFileInfo(std::string name, std::string path) |
176 | 2.12M | : file_name(std::move(name)), file_path(std::move(path)) {} |
177 | 2.02M | bool operator==(const CandidateFileInfo& other) const { |
178 | 2.02M | return file_name == other.file_name && file_path == other.file_path; |
179 | 2.02M | } |
180 | | }; |
181 | | |
182 | | // a list of all files that we'll consider deleting |
183 | | // (every once in a while this is filled up with all files |
184 | | // in the DB directory) |
185 | | // (filled only if we're doing full scan) |
186 | | std::vector<CandidateFileInfo> full_scan_candidate_files; |
187 | | |
188 | | // the list of all live sst files that cannot be deleted |
189 | | std::vector<uint64_t> sst_live; |
190 | | |
191 | | // the list of sst files that we need to delete |
192 | | std::vector<ObsoleteFileInfo> sst_delete_files; |
193 | | |
194 | | // the list of all live blob files that cannot be deleted |
195 | | std::vector<uint64_t> blob_live; |
196 | | |
197 | | // the list of blob files that we need to delete |
198 | | std::vector<ObsoleteBlobFileInfo> blob_delete_files; |
199 | | |
200 | | // a list of log files that we need to delete |
201 | | std::vector<uint64_t> log_delete_files; |
202 | | |
203 | | // a list of log files that we need to preserve during full purge since they |
204 | | // will be reused later |
205 | | std::vector<uint64_t> log_recycle_files; |
206 | | |
207 | | // Files quarantined from deletion. This list contains file numbers for files |
208 | | // that are in an ambiguous states. This includes newly generated SST files |
209 | | // and blob files from flush and compaction job whose VersionEdits' persist |
210 | | // state in Manifest are unclear. An old manifest file whose immediately |
211 | | // following new manifest file's CURRENT file creation is in an unclear state. |
212 | | // WAL logs don't have this premature deletion risk since |
213 | | // min_log_number_to_keep is only updated after successful manifest commits. |
214 | | // So this data structure doesn't track log files. |
215 | | autovector<uint64_t> files_to_quarantine; |
216 | | |
217 | | // Blob file numbers that PurgeObsoleteFiles must keep. This includes both |
218 | | // actively written direct-write files and sealed direct-write files that are |
219 | | // still reachable through live memtables / old SuperVersions. |
220 | | // Collected under db_mutex_ in FindObsoleteFiles so PurgeObsoleteFiles can |
221 | | // safely use the snapshot without taking DB mutex. |
222 | | UnorderedSet<uint64_t> active_blob_direct_write_files; |
223 | | |
224 | | // Snapshot of VersionSet's next file number taken before collecting active |
225 | | // direct-write blob files. This keeps the current purge pass from racing a |
226 | | // concurrently created blob file that was not yet part of the active set. |
227 | | uint64_t min_blob_file_number_to_keep = std::numeric_limits<uint64_t>::max(); |
228 | | |
229 | | // a list of manifest files that we need to delete |
230 | | std::vector<std::string> manifest_delete_files; |
231 | | |
232 | | // a list of memtables to be free |
233 | | autovector<ReadOnlyMemTable*> memtables_to_free; |
234 | | |
235 | | // contexts for installing superversions for multiple column families |
236 | | std::vector<SuperVersionContext> superversion_contexts; |
237 | | |
238 | | autovector<log::Writer*> wals_to_free; |
239 | | |
240 | | // the current manifest_file_number, log_number and prev_log_number |
241 | | // that corresponds to the set of files in 'live'. |
242 | | uint64_t manifest_file_number = 0; |
243 | | uint64_t pending_manifest_file_number = 0; |
244 | | |
245 | | // Used for remote compaction. To prevent OPTIONS files from getting |
246 | | // purged by PurgeObsoleteFiles() of the primary host |
247 | | uint64_t min_options_file_number; |
248 | | uint64_t log_number = 0; |
249 | | uint64_t prev_log_number = 0; |
250 | | |
251 | | uint64_t min_pending_output = 0; |
252 | | uint64_t prev_wals_total_size = 0; |
253 | | size_t num_alive_wal_files = 0; |
254 | | uint64_t size_log_to_delete = 0; |
255 | | |
256 | | // Snapshot taken before flush/compaction job. |
257 | | std::unique_ptr<ManagedSnapshot> job_snapshot; |
258 | | SnapshotChecker* snapshot_checker = nullptr; |
259 | | std::vector<SequenceNumber> snapshot_seqs; |
260 | | // This is the earliest snapshot that could be used for write-conflict |
261 | | // checking by a transaction. For any user-key newer than this snapshot, we |
262 | | // should make sure not to remove evidence that a write occurred. |
263 | | SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber; |
264 | | |
265 | | // Unique job id |
266 | | int job_id; |
267 | | |
268 | | bool snapshot_context_initialized = false; |
269 | | |
270 | 245k | explicit JobContext(int _job_id, bool create_superversion = false) { |
271 | 245k | job_id = _job_id; |
272 | 245k | superversion_contexts.emplace_back( |
273 | 245k | SuperVersionContext(create_superversion)); |
274 | 245k | } |
275 | | |
276 | | // Delete the default constructor |
277 | | JobContext() = delete; |
278 | | |
279 | | // For non-empty JobContext Clean() has to be called at least once before |
280 | | // before destruction (see asserts in ~JobContext()). Should be called with |
281 | | // unlocked DB mutex. Destructor doesn't call Clean() to avoid accidentally |
282 | | // doing potentially slow Clean() with locked DB mutex. |
283 | 245k | void Clean() { |
284 | | // free superversions |
285 | 245k | for (auto& sv_context : superversion_contexts) { |
286 | 245k | sv_context.Clean(); |
287 | 245k | } |
288 | | // free pending memtables |
289 | 245k | for (auto m : memtables_to_free) { |
290 | 0 | delete m; |
291 | 0 | } |
292 | 245k | for (auto l : wals_to_free) { |
293 | 2.09k | delete l; |
294 | 2.09k | } |
295 | | |
296 | 245k | memtables_to_free.clear(); |
297 | 245k | wals_to_free.clear(); |
298 | 245k | job_snapshot.reset(); |
299 | 245k | } |
300 | | |
301 | 245k | ~JobContext() { |
302 | 245k | assert(memtables_to_free.size() == 0); |
303 | | assert(wals_to_free.size() == 0); |
304 | 245k | } |
305 | | }; |
306 | | |
307 | | } // namespace ROCKSDB_NAMESPACE |